HDFS-11170. Add builder-based create API to FileSystem. Contributed by 
SammiChen and Wei Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/332a997e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/332a997e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/332a997e

Branch: refs/heads/HDFS-9806
Commit: 332a997e10cca88d9ab3aa8252102366b628eaec
Parents: 52b0060
Author: Andrew Wang <w...@apache.org>
Authored: Fri Mar 24 12:56:46 2017 -0700
Committer: Andrew Wang <w...@apache.org>
Committed: Fri Mar 24 12:56:46 2017 -0700

----------------------------------------------------------------------
 .../hadoop/fs/FSDataOutputStreamBuilder.java    | 142 +++++++++++++++++++
 .../java/org/apache/hadoop/fs/FileSystem.java   |   9 ++
 .../org/apache/hadoop/fs/FilterFileSystem.java  |   5 +
 .../org/apache/hadoop/fs/HarFileSystem.java     |   5 +
 .../apache/hadoop/fs/TestLocalFileSystem.java   |  54 +++++++
 .../hadoop/hdfs/DistributedFileSystem.java      |  81 +++++++++++
 .../hadoop/hdfs/TestDistributedFileSystem.java  |  35 ++++-
 .../namenode/TestFavoredNodesEndToEnd.java      |  23 +++
 8 files changed, 353 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
new file mode 100644
index 0000000..2e885f3
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+
+/** Base of specific file system FSDataOutputStreamBuilder. */
+public class FSDataOutputStreamBuilder{
+  private Path path = null;
+  private FsPermission permission = null;
+  private Integer bufferSize;
+  private Short replication;
+  private Long blockSize;
+  private Progressable progress = null;
+  private EnumSet<CreateFlag> flags = null;
+  private ChecksumOpt checksumOpt = null;
+
+  private final FileSystem fs;
+
+  public FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+    fs = fileSystem;
+    path = p;
+  }
+
+  protected Path getPath() {
+    return path;
+  }
+
+  protected FsPermission getPermission() {
+    if (permission == null) {
+      return FsPermission.getFileDefault();
+    }
+    return permission;
+  }
+
+  public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
+    Preconditions.checkNotNull(perm);
+    permission = perm;
+    return this;
+  }
+
+  protected int getBufferSize() {
+    if (bufferSize == null) {
+      return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+          IO_FILE_BUFFER_SIZE_DEFAULT);
+    }
+    return bufferSize;
+  }
+
+  public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
+    bufferSize = bufSize;
+    return this;
+  }
+
+  protected short getReplication() {
+    if (replication == null) {
+      return fs.getDefaultReplication(getPath());
+    }
+    return replication;
+  }
+
+  public FSDataOutputStreamBuilder setReplication(short replica) {
+    replication = replica;
+    return this;
+  }
+
+  protected long getBlockSize() {
+    if (blockSize == null) {
+      return fs.getDefaultBlockSize(getPath());
+    }
+    return blockSize;
+  }
+
+  public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
+    blockSize = blkSize;
+    return this;
+  }
+
+  protected Progressable getProgress() {
+    return progress;
+  }
+
+  public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
+    Preconditions.checkNotNull(prog);
+    progress = prog;
+    return this;
+  }
+
+  protected EnumSet<CreateFlag> getFlags() {
+    if (flags == null) {
+      return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+    }
+    return flags;
+  }
+
+  public FSDataOutputStreamBuilder setFlags(
+      final EnumSet<CreateFlag> enumFlags) {
+    Preconditions.checkNotNull(enumFlags);
+    flags = enumFlags;
+    return this;
+  }
+
+  protected ChecksumOpt getChecksumOpt() {
+    return checksumOpt;
+  }
+
+  public FSDataOutputStreamBuilder setChecksumOpt(
+      final ChecksumOpt chksumOpt) {
+    Preconditions.checkNotNull(chksumOpt);
+    checksumOpt = chksumOpt;
+    return this;
+  }
+
+  public FSDataOutputStream build() throws IOException {
+    return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
+        getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index ededfa9..c3282e5 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -4138,4 +4138,13 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   public static GlobalStorageStatistics getGlobalStorageStatistics() {
     return GlobalStorageStatistics.INSTANCE;
   }
+
+  /**
+   * Create a new FSDataOutputStreamBuilder for the file with path.
+   * @param path file path
+   * @return a FSDataOutputStreamBuilder object to build the file
+   */
+  public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
+    return new FSDataOutputStreamBuilder(this, path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index 41429ac..ef09458 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -665,4 +665,9 @@ public class FilterFileSystem extends FileSystem {
   public Collection<FileStatus> getTrashRoots(boolean allUsers) {
     return fs.getTrashRoots(allUsers);
   }
+
+  @Override
+  public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
+    return fs.newFSDataOutputStreamBuilder(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
index ce1cf45..7e50ab1 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
@@ -1268,4 +1268,9 @@ public class HarFileSystem extends FileSystem {
   public short getDefaultReplication(Path f) {
     return fs.getDefaultReplication(f);
   }
+
+  @Override
+  public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
+    return fs.newFSDataOutputStreamBuilder(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index 2311337..5da5a4a 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.fs;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import static org.apache.hadoop.fs.FileSystemTestHelper.*;
 
 import java.io.*;
@@ -636,4 +639,55 @@ public class TestLocalFileSystem {
     FileStatus[] stats = fs.listStatus(path);
     assertTrue(stats != null && stats.length == 1 && stats[0] == stat);
   }
+
+  @Test
+  public void testFSOutputStreamBuilder() throws Exception {
+    Path path = new Path(TEST_ROOT_DIR, "testBuilder");
+
+    try {
+      FSDataOutputStreamBuilder builder =
+          fileSys.newFSDataOutputStreamBuilder(path);
+      FSDataOutputStream out = builder.build();
+      String content = "Create with a generic type of createBuilder!";
+      byte[] contentOrigin = content.getBytes("UTF8");
+      out.write(contentOrigin);
+      out.close();
+
+      FSDataInputStream input = fileSys.open(path);
+      byte[] buffer =
+          new byte[(int) (fileSys.getFileStatus(path).getLen())];
+      input.readFully(0, buffer);
+      input.close();
+      Assert.assertArrayEquals("The data be read should equals with the "
+          + "data written.", contentOrigin, buffer);
+    } catch (IOException e) {
+      throw e;
+    }
+
+    // Test value not being set for replication, block size, buffer size
+    // and permission
+    FSDataOutputStreamBuilder builder =
+        fileSys.newFSDataOutputStreamBuilder(path);
+    builder.build();
+    Assert.assertEquals("Should be default block size",
+        builder.getBlockSize(), fileSys.getDefaultBlockSize());
+    Assert.assertEquals("Should be default replication factor",
+        builder.getReplication(), fileSys.getDefaultReplication());
+    Assert.assertEquals("Should be default buffer size",
+        builder.getBufferSize(),
+        fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+            IO_FILE_BUFFER_SIZE_DEFAULT));
+    Assert.assertEquals("Should be default permission",
+        builder.getPermission(), FsPermission.getFileDefault());
+
+    // Test set 0 to replication, block size and buffer size
+    builder = fileSys.newFSDataOutputStreamBuilder(path);
+    builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
+    Assert.assertEquals("Block size should be 0",
+        builder.getBlockSize(), 0);
+    Assert.assertEquals("Replication factor should be 0",
+        builder.getReplication(), 0);
+    Assert.assertEquals("Buffer size should be 0",
+        builder.getBufferSize(), 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 1eef560..5cd956c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FSLinkResolver;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -446,6 +447,48 @@ public class DistributedFileSystem extends FileSystem {
     }.resolve(this, absF);
   }
 
+  /**
+   * Same as
+   * {@link #create(Path, FsPermission, EnumSet<CreateFlag>, int, short, long,
+   * Progressable, ChecksumOpt)} with the addition of favoredNodes that is a
+   * hint to where the namenode should place the file blocks.
+   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+   * at the creation time only. And with favored nodes, blocks will be pinned
+   * on the datanodes to prevent balancing move the block. HDFS could move the
+   * blocks during replication, to move the blocks from favored nodes. A value
+   * of null means no favored nodes for this create
+   */
+  private HdfsDataOutputStream create(final Path f,
+      final FsPermission permission, EnumSet<CreateFlag> flag,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress, final ChecksumOpt checksumOpt,
+      final InetSocketAddress[] favoredNodes) throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+      @Override
+      public HdfsDataOutputStream doCall(final Path p) throws IOException {
+        final DFSOutputStream out = dfs.create(getPathName(f), permission,
+            flag, true, replication, blockSize, progress, bufferSize,
+            checksumOpt, favoredNodes);
+        return dfs.createWrappedOutputStream(out, statistics);
+      }
+      @Override
+      public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.create(p, permission, flag, bufferSize, replication,
+              blockSize, progress, checksumOpt, favoredNodes);
+        }
+        throw new UnsupportedOperationException("Cannot create with" +
+            " favoredNodes through a symlink to a non-DistributedFileSystem: "
+            + f + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
+
   @Override
   protected HdfsDataOutputStream primitiveCreate(Path f,
       FsPermission absolutePermission, EnumSet<CreateFlag> flag, int 
bufferSize,
@@ -2584,4 +2627,42 @@ public class DistributedFileSystem extends FileSystem {
   DFSOpsCountStatistics getDFSOpsCountStatistics() {
     return storageStatistics;
   }
+
+  /**
+   * Extends FSDataOutputStreamBuilder to support special requirements
+   * of DistributedFileSystem.
+   */
+  public static class HdfsDataOutputStreamBuilder
+      extends FSDataOutputStreamBuilder {
+    private final DistributedFileSystem dfs;
+    private InetSocketAddress[] favoredNodes = null;
+
+    public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
+      super(dfs, path);
+      this.dfs = dfs;
+    }
+
+    protected InetSocketAddress[] getFavoredNodes() {
+      return favoredNodes;
+    }
+
+    public HdfsDataOutputStreamBuilder setFavoredNodes(
+        final InetSocketAddress[] nodes) {
+      Preconditions.checkNotNull(nodes);
+      favoredNodes = nodes.clone();
+      return this;
+    }
+
+    @Override
+    public HdfsDataOutputStream build() throws IOException {
+      return dfs.create(getPath(), getPermission(), getFlags(),
+          getBufferSize(), getReplication(), getBlockSize(),
+          getProgress(), getChecksumOpt(), getFavoredNodes());
+    }
+  }
+
+  @Override
+  public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
+    return new HdfsDataOutputStreamBuilder(this, path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 7654531..e9af594 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
@@ -81,7 +82,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import 
org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
@@ -1410,4 +1410,37 @@ public class TestDistributedFileSystem {
       }
     }
   }
+
+  @Test
+  public void testDFSDataOutputStreamBuilder() throws Exception {
+    Configuration conf = getTestConfiguration();
+    MiniDFSCluster cluster = null;
+    String testFile = "/testDFSDataOutputStreamBuilder";
+    Path testFilePath = new Path(testFile);
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Test create an empty file
+      FSDataOutputStream out =
+          fs.newFSDataOutputStreamBuilder(testFilePath).build();
+      out.close();
+
+      // Test create a file with content, and verify the content
+      String content = "This is a test!";
+      out = fs.newFSDataOutputStreamBuilder(testFilePath)
+          .setBufferSize(4096).setReplication((short) 1)
+          .setBlockSize(4096).build();
+      byte[] contentOrigin = content.getBytes("UTF8");
+      out.write(contentOrigin);
+      out.close();
+
+      ContractTestUtils.verifyFileContents(fs, testFilePath,
+          content.getBytes());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
index b78b6cc..50e56cc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
@@ -189,6 +189,29 @@ public class TestFavoredNodesEndToEnd {
     }
   }
 
+  @Test(timeout = 180000)
+  public void testCreateStreamBuilderFavoredNodesEndToEnd() throws Exception {
+    //create 10 files with random preferred nodes
+    for (int i = 0; i < NUM_FILES; i++) {
+      Random rand = new Random(System.currentTimeMillis() + i);
+      //pass a new created rand so as to get a uniform distribution each time
+      //without too much collisions (look at the do-while loop in getDatanodes)
+      InetSocketAddress[] dns = getDatanodes(rand);
+      Path p = new Path("/filename"+i);
+      FSDataOutputStream out =
+          dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
+      out.write(SOME_BYTES);
+      out.close();
+      BlockLocation[] locations = getBlockLocations(p);
+      //verify the files got created in the right nodes
+      for (BlockLocation loc : locations) {
+        String[] hosts = loc.getNames();
+        String[] hosts1 = getStringForInetSocketAddrs(dns);
+        assertTrue(compareNodes(hosts, hosts1));
+      }
+    }
+  }
+
   private BlockLocation[] getBlockLocations(Path p) throws Exception {
     DFSTestUtil.waitReplication(dfs, p, (short)3);
     BlockLocation[] locations = dfs.getClient().getBlockLocations(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to