HDFS-3702. Add an option for NOT writing the blocks locally if there is a datanode on the same box as the client. (Contributed by Lei (Eddy) Xu)
(cherry picked from commit 0a152103f19a3e8e1b7f33aeb9dd115ba231d7b7) (cherry picked from commit 4289cb8b36bcb96510b9e63e3e966e306c6e3893) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5477c31c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5477c31c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5477c31c Branch: refs/heads/branch-2.8 Commit: 5477c31c5c02bf51da27051e2f4e43edd0c8e5ee Parents: ef9e7a1 Author: Lei Xu <[email protected]> Authored: Wed Apr 27 14:22:51 2016 -0700 Committer: Lei Xu <[email protected]> Committed: Mon May 2 10:21:59 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/CreateFlag.java | 9 ++- .../org/apache/hadoop/hdfs/AddBlockFlag.java | 59 +++++++++++++++++++ .../org/apache/hadoop/hdfs/DFSOutputStream.java | 19 ++++-- .../org/apache/hadoop/hdfs/DataStreamer.java | 18 ++++-- .../hadoop/hdfs/protocol/ClientProtocol.java | 5 +- .../ClientNamenodeProtocolTranslatorPB.java | 8 ++- .../hadoop/hdfs/protocolPB/PBHelperClient.java | 27 +++++++++ .../src/main/proto/ClientNamenodeProtocol.proto | 5 ++ ...tNamenodeProtocolServerSideTranslatorPB.java | 6 +- .../server/blockmanagement/BlockManager.java | 13 ++-- .../blockmanagement/BlockPlacementPolicy.java | 11 +++- .../BlockPlacementPolicyDefault.java | 51 ++++++++++++---- .../server/blockmanagement/ReplicationWork.java | 2 +- .../hdfs/server/namenode/FSDirWriteFileOp.java | 9 +-- .../hdfs/server/namenode/FSNamesystem.java | 6 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 7 ++- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- .../hadoop/hdfs/TestBlockStoragePolicy.java | 6 +- .../hadoop/hdfs/TestDFSClientRetries.java | 12 +++- .../apache/hadoop/hdfs/TestDFSOutputStream.java | 62 ++++++++++++++++++-- .../apache/hadoop/hdfs/TestFileCreation.java | 4 +- .../BaseReplicationPolicyTest.java | 2 +- .../TestAvailableSpaceBlockPlacementPolicy.java | 2 +- .../blockmanagement/TestReplicationPolicy.java | 47 +++++++++++++-- .../TestReplicationPolicyConsiderLoad.java | 4 +- .../TestReplicationPolicyWithNodeGroup.java | 8 ++- .../TestReplicationPolicyWithUpgradeDomain.java | 2 +- .../server/namenode/NNThroughputBenchmark.java | 2 +- .../hdfs/server/namenode/TestAddBlockRetry.java | 8 +-- ...stBlockPlacementPolicyRackFaultTolerant.java | 4 +- .../hdfs/server/namenode/TestDeadDatanode.java | 3 +- .../TestDefaultBlockPlacementPolicy.java | 2 +- .../hdfs/server/namenode/TestDeleteRace.java | 7 ++- .../hdfs/server/namenode/ha/TestHASafeMode.java | 2 +- .../snapshot/TestOpenFilesWithSnapshot.java | 2 +- .../snapshot/TestSnapshotBlocksMap.java | 6 +- 36 files changed, 358 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java index 539b511..d480fc9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java @@ -103,7 +103,14 @@ public enum CreateFlag { * Append data to a new block instead of the end of the last partial block. * This is only useful for APPEND. */ - NEW_BLOCK((short) 0x20); + NEW_BLOCK((short) 0x20), + + /** + * Advise that a block replica NOT be written to the local DataNode where + * 'local' means the same host as the client is being run on. + */ + @InterfaceAudience.LimitedPrivate({"HBase"}) + NO_LOCAL_WRITE((short) 0x40); private final short mode; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java new file mode 100644 index 0000000..6a0805b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AddBlockFlag.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CreateFlag; + +/** + * AddBlockFlag provides hints for new block allocation and placement. + * Users can use this flag to control <em>per DFSOutputStream</em> + * {@see ClientProtocol#addBlock()} behavior. + */ [email protected] [email protected] +public enum AddBlockFlag { + + /** + * Advise that a block replica NOT be written to the local DataNode where + * 'local' means the same host as the client is being run on. + * + * @see CreateFlag#NO_LOCAL_WRITE + */ + NO_LOCAL_WRITE((short) 0x01); + + private final short mode; + + AddBlockFlag(short mode) { + this.mode = mode; + } + + public static AddBlockFlag valueOf(short mode) { + for (AddBlockFlag flag : AddBlockFlag.values()) { + if (flag.getMode() == mode) { + return flag; + } + } + return null; + } + + public short getMode() { + return mode; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 18509f8..472c41f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -116,6 +116,7 @@ public class DFSOutputStream extends FSOutputSummer private long initialFileSize = 0; // at time of file open private final short blockReplication; // replication factor of file protected boolean shouldSyncBlock = false; // force blocks to disk upon close + private final EnumSet<AddBlockFlag> addBlockFlags; protected final AtomicReference<CachingStrategy> cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; @@ -178,6 +179,7 @@ public class DFSOutputStream extends FSOutputSummer } private DFSOutputStream(DFSClient dfsClient, String src, + EnumSet<CreateFlag> flag, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) { super(getChecksum4Compute(checksum, stat)); this.dfsClient = dfsClient; @@ -188,6 +190,10 @@ public class DFSOutputStream extends FSOutputSummer this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.cachingStrategy = new AtomicReference<>( dfsClient.getDefaultWriteCachingStrategy()); + this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class); + if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { + this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); + } if (progress != null) { DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " +"{}", src); @@ -211,14 +217,14 @@ public class DFSOutputStream extends FSOutputSummer protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { - this(dfsClient, src, progress, stat, checksum); + this(dfsClient, src, flag, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, - cachingStrategy, byteArrayManager, favoredNodes); + cachingStrategy, byteArrayManager, favoredNodes, addBlockFlags); } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, @@ -280,7 +286,7 @@ public class DFSOutputStream extends FSOutputSummer EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { - this(dfsClient, src, progress, stat, checksum); + this(dfsClient, src, flags, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); @@ -301,7 +307,8 @@ public class DFSOutputStream extends FSOutputSummer bytesPerChecksum); streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src, - progress, checksum, cachingStrategy, byteArrayManager, favoredNodes); + progress, checksum, cachingStrategy, byteArrayManager, favoredNodes, + addBlockFlags); } } @@ -848,6 +855,10 @@ public class DFSOutputStream extends FSOutputSummer return initialFileSize; } + protected EnumSet<AddBlockFlag> getAddBlockFlags() { + return addBlockFlags; + } + /** * @return the FileEncryptionInfo for this stream, or null if not encrypted. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 9ccb89a..0d8cc66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -33,6 +33,7 @@ import java.net.Socket; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -392,12 +393,15 @@ class DataStreamer extends Daemon { private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; private final String[] favoredNodes; + private final EnumSet<AddBlockFlag> addBlockFlags; private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage, - boolean isAppend, String[] favoredNodes) { + boolean isAppend, String[] favoredNodes, + EnumSet<AddBlockFlag> flags) { + this.block = block; this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -408,11 +412,11 @@ class DataStreamer extends Daemon { this.isLazyPersistFile = isLazyPersist(stat); this.isAppend = isAppend; this.favoredNodes = favoredNodes; - final DfsClientConf conf = dfsClient.getConf(); this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs(); this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); + this.addBlockFlags = flags; } /** @@ -421,9 +425,10 @@ class DataStreamer extends Daemon { DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, String src, Progressable progress, DataChecksum checksum, AtomicReference<CachingStrategy> cachingStrategy, - ByteArrayManager byteArrayManage, String[] favoredNodes) { + ByteArrayManager byteArrayManage, String[] favoredNodes, + EnumSet<AddBlockFlag> flags) { this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, false, favoredNodes); + byteArrayManage, false, favoredNodes, flags); this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; } @@ -438,7 +443,7 @@ class DataStreamer extends Daemon { AtomicReference<CachingStrategy> cachingStrategy, ByteArrayManager byteArrayManage) throws IOException { this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, true, null); + byteArrayManage, true, null, null); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); @@ -1643,7 +1648,8 @@ class DataStreamer extends Daemon { while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, stat.getFileId(), favoredNodes); + block, excludedNodes, stat.getFileId(), favoredNodes, + addBlockFlags); } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 5fa8025..01660db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -377,6 +378,8 @@ public interface ClientProtocol { * @param fileId the id uniquely identifying a file * @param favoredNodes the list of nodes where the client wants the blocks. * Nodes are identified by either host name or address. + * @param addBlockFlags flags to advise the behavior of allocating and placing + * a new block. * * @return LocatedBlock allocated block information. * @@ -395,7 +398,7 @@ public interface ClientProtocol { @Idempotent LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) + String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index c94d515..6aeed28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -384,7 +385,8 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, - String[] favoredNodes) throws IOException { + String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags) + throws IOException { AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder() .setSrc(src).setClientName(clientName).setFileId(fileId); if (previous != null) @@ -394,6 +396,10 @@ public class ClientNamenodeProtocolTranslatorPB implements if (favoredNodes != null) { req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } + if (addBlockFlags != null) { + req.addAllFlags(PBHelperClient.convertAddBlockFlags( + addBlockFlags)); + } try { return PBHelperClient.convert(rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 5d81c1a..80f0d4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -36,6 +36,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.hadoop.crypto.CipherOption; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -94,6 +95,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; @@ -2394,4 +2396,29 @@ public class PBHelperClient { } return Arrays.asList(ret); } + + public static EnumSet<AddBlockFlag> convertAddBlockFlags( + List<AddBlockFlagProto> addBlockFlags) { + EnumSet<AddBlockFlag> flags = + EnumSet.noneOf(AddBlockFlag.class); + for (AddBlockFlagProto af : addBlockFlags) { + AddBlockFlag flag = AddBlockFlag.valueOf((short)af.getNumber()); + if (flag != null) { + flags.add(flag); + } + } + return flags; + } + + public static List<AddBlockFlagProto> convertAddBlockFlags( + EnumSet<AddBlockFlag> flags) { + List<AddBlockFlagProto> ret = new ArrayList<>(); + for (AddBlockFlag flag : flags) { + AddBlockFlagProto abfp = AddBlockFlagProto.valueOf(flag.getMode()); + if (abfp != null) { + ret.add(abfp); + } + } + return ret; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 9925d1d..67365c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -154,6 +154,10 @@ message AbandonBlockRequestProto { message AbandonBlockResponseProto { // void response } +enum AddBlockFlagProto { + NO_LOCAL_WRITE = 1; // avoid writing to local node. +} + message AddBlockRequestProto { required string src = 1; required string clientName = 2; @@ -161,6 +165,7 @@ message AddBlockRequestProto { repeated DatanodeInfoProto excludeNodes = 4; optional uint64 fileId = 5 [default = 0]; // default as a bogus id repeated string favoredNodes = 6; //the set of datanodes to use for the block + repeated AddBlockFlagProto flags = 7; // default to empty. } message AddBlockResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index a88307f..ca16c12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FsServerDefaults; @@ -495,6 +496,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements try { List<DatanodeInfoProto> excl = req.getExcludeNodesList(); List<String> favor = req.getFavoredNodesList(); + EnumSet<AddBlockFlag> flags = + PBHelperClient.convertAddBlockFlags(req.getFlagsList()); LocatedBlock result = server.addBlock( req.getSrc(), req.getClientName(), @@ -502,7 +505,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements (excl == null || excl.size() == 0) ? null : PBHelperClient.convert(excl .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(), (favor == null || favor.size() == 0) ? null : favor - .toArray(new String[favor.size()])); + .toArray(new String[favor.size()]), + flags); return AddBlockResponseProto.newBuilder() .setBlock(PBHelperClient.convert(result)).build(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7bb2edc..d02e9ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -47,6 +47,8 @@ import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -1624,7 +1626,7 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) { return blockplacement.chooseTarget(src, 1, clientnode, Collections.<DatanodeStorageInfo>emptyList(), false, excludes, - blocksize, storagePolicySuite.getDefaultPolicy()); + blocksize, storagePolicySuite.getDefaultPolicy(), null); } /** Choose target for getting additional datanodes for an existing pipeline. */ @@ -1638,7 +1640,7 @@ public class BlockManager implements BlockStatsMXBean { final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, - chosen, true, excludes, blocksize, storagePolicy); + chosen, true, excludes, blocksize, storagePolicy, null); } /** @@ -1654,13 +1656,14 @@ public class BlockManager implements BlockStatsMXBean { final Set<Node> excludedNodes, final long blocksize, final List<String> favoredNodes, - final byte storagePolicyID) throws IOException { - List<DatanodeDescriptor> favoredDatanodeDescriptors = + final byte storagePolicyID, + final EnumSet<AddBlockFlag> flags) throws IOException { + List<DatanodeDescriptor> favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, - favoredDatanodeDescriptors, storagePolicy); + favoredDatanodeDescriptors, storagePolicy, flags); if (targets.length < minReplication) { throw new IOException("File " + src + " could only be replicated to " + targets.length + " nodes instead of minReplication (=" http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index d35b246..1b614f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -26,6 +27,7 @@ import java.util.Set; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -66,6 +68,7 @@ public abstract class BlockPlacementPolicy { * @param returnChosenNodes decide if the chosenNodes are returned. * @param excludedNodes datanodes that should not be considered as targets. * @param blocksize size of the data to be written. + * @param flags Block placement flags. * @return array of DatanodeDescriptor instances chosen as target * and sorted as a pipeline. */ @@ -76,7 +79,8 @@ public abstract class BlockPlacementPolicy { boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, - BlockStoragePolicy storagePolicy); + BlockStoragePolicy storagePolicy, + EnumSet<AddBlockFlag> flags); /** * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)} @@ -90,14 +94,15 @@ public abstract class BlockPlacementPolicy { Set<Node> excludedNodes, long blocksize, List<DatanodeDescriptor> favoredNodes, - BlockStoragePolicy storagePolicy) { + BlockStoragePolicy storagePolicy, + EnumSet<AddBlockFlag> flags) { // This class does not provide the functionality of placing // a block in favored datanodes. The implementations of this class // are expected to provide this functionality return chooseTarget(src, numOfReplicas, writer, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 63e96c5..e936abd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -23,6 +23,7 @@ import java.util.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -111,9 +112,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet<AddBlockFlag> flags) { return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } @Override @@ -123,13 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { Set<Node> excludedNodes, long blocksize, List<DatanodeDescriptor> favoredNodes, - BlockStoragePolicy storagePolicy) { + BlockStoragePolicy storagePolicy, + EnumSet<AddBlockFlag> flags) { try { if (favoredNodes == null || favoredNodes.size() == 0) { // Favored nodes not specified, fall back to regular block placement. return chooseTarget(src, numOfReplicas, writer, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } Set<Node> favoriteAndExcludedNodes = excludedNodes == null ? @@ -164,7 +167,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { DatanodeStorageInfo[] remainingTargets = chooseTarget(src, numOfReplicas, writer, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, - favoriteAndExcludedNodes, blocksize, storagePolicy); + favoriteAndExcludedNodes, blocksize, storagePolicy, flags); for (int i = 0; i < remainingTargets.length; i++) { results.add(remainingTargets[i]); } @@ -179,7 +182,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Fall back to regular block placement disregarding favored nodes hint return chooseTarget(src, numOfReplicas, writer, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, - excludedNodes, blocksize, storagePolicy); + excludedNodes, blocksize, storagePolicy, flags); } } @@ -213,7 +216,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet<AddBlockFlag> addBlockFlags) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return DatanodeStorageInfo.EMPTY_ARRAY; } @@ -226,17 +230,42 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { numOfReplicas = result[0]; int maxNodesPerRack = result[1]; - final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage); for (DatanodeStorageInfo storage : chosenStorage) { // add localMachine and related nodes to excludedNodes addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes); } + List<DatanodeStorageInfo> results = null; + Node localNode = null; boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); - final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, - EnumSet.noneOf(StorageType.class), results.isEmpty()); + boolean avoidLocalNode = (addBlockFlags != null + && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE) + && writer != null + && !excludedNodes.contains(writer)); + // Attempt to exclude local node if the client suggests so. If no enough + // nodes can be obtained, it falls back to the default block placement + // policy. + if (avoidLocalNode) { + results = new ArrayList<>(chosenStorage); + Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes); + excludedNodeCopy.add(writer); + localNode = chooseTarget(numOfReplicas, writer, + excludedNodeCopy, blocksize, maxNodesPerRack, results, + avoidStaleNodes, storagePolicy, + EnumSet.noneOf(StorageType.class), results.isEmpty()); + if (results.size() < numOfReplicas) { + // not enough nodes; discard results and fall back + results = null; + } + } + if (results == null) { + results = new ArrayList<>(chosenStorage); + localNode = chooseTarget(numOfReplicas, writer, excludedNodes, + blocksize, maxNodesPerRack, results, avoidStaleNodes, + storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty()); + } + if (!returnChosenNodes) { results.removeAll(chosenStorage); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java index f8a6dad..258dfdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java @@ -55,7 +55,7 @@ class ReplicationWork { targets = blockplacement.chooseTarget(bc.getName(), additionalReplRequired, srcNode, liveReplicaStorages, false, excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + storagePolicySuite.getPolicy(bc.getStoragePolicyID()), null); } finally { srcNode.decrementPendingReplicationWithoutTargets(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 10cb555..683d3b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -23,6 +23,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; @@ -265,8 +266,9 @@ class FSDirWriteFileOp { } static DatanodeStorageInfo[] chooseTargetForNewBlock( - BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[] - favoredNodes, ValidateAddBlockResult r) throws IOException { + BlockManager bm, String src, DatanodeInfo[] excludedNodes, + String[] favoredNodes, EnumSet<AddBlockFlag> flags, + ValidateAddBlockResult r) throws IOException { Node clientNode = bm.getDatanodeManager() .getDatanodeByHost(r.clientMachine); if (clientNode == null) { @@ -280,11 +282,10 @@ class FSDirWriteFileOp { } List<String> favoredNodesList = (favoredNodes == null) ? null : Arrays.asList(favoredNodes); - // choose targets for the new block to be allocated. return bm.chooseTarget4NewBlock(src, r.replication, clientNode, excludedNodesSet, r.blockSize, - favoredNodesList, r.storagePolicyID); + favoredNodesList, r.storagePolicyID, flags); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 31eed7e..88ff62e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -147,6 +147,7 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.CryptoCodec; import org.apache.hadoop.crypto.key.KeyProvider.Metadata; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@ -2424,7 +2425,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, */ LocatedBlock getAdditionalBlock( String src, long fileId, String clientName, ExtendedBlock previous, - DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException { + DatanodeInfo[] excludedNodes, String[] favoredNodes, + EnumSet<AddBlockFlag> flags) throws IOException { NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" + " for {}", src, fileId, clientName); @@ -2449,7 +2451,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock( - blockManager, src, excludedNodes, favoredNodes, r); + blockManager, src, excludedNodes, favoredNodes, flags, r); checkOperation(OperationCategory.WRITE); writeLock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index ed34817..ed9b4e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -47,6 +47,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.ContentSummary; @@ -812,11 +813,11 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override public LocatedBlock addBlock(String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, - String[] favoredNodes) + String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags) throws IOException { checkNNStartup(); LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId, - clientName, previous, excludedNodes, favoredNodes); + clientName, previous, excludedNodes, favoredNodes, addBlockFlags); if (locatedBlock != null) { metrics.incrAddBlockOps(); } @@ -1123,7 +1124,7 @@ class NameNodeRpcServer implements NamenodeProtocols { DatanodeInfo results[] = namesystem.datanodeReport(type); return results; } - + @Override // ClientProtocol public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d6dcf0c..781dcf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1929,7 +1929,7 @@ public class DFSTestUtil { String clientName, ExtendedBlock previous, int len) throws Exception { fs.getClient().namenode.addBlock(file, clientName, previous, null, - fileNode.getId(), null); + fileNode.getId(), null, null); final BlockInfo lastBlock = fileNode.getLastBlock(); final int groupSize = fileNode.getPreferredBlockReplication(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index 53b8038..861bb2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -1237,12 +1237,12 @@ public class TestBlockStoragePolicy { DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, - new HashSet<Node>(), 0, policy1); + new HashSet<Node>(), 0, policy1, null); System.out.println(Arrays.asList(targets)); Assert.assertEquals(3, targets.length); targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, - new HashSet<Node>(), 0, policy2); + new HashSet<Node>(), 0, policy2, null); System.out.println(Arrays.asList(targets)); Assert.assertEquals(3, targets.length); } @@ -1308,7 +1308,7 @@ public class TestBlockStoragePolicy { DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, - new HashSet<Node>(), 0, policy); + new HashSet<Node>(), 0, policy, null); System.out.println(policy.getName() + ": " + Arrays.asList(targets)); Assert.assertEquals(2, targets.length); Assert.assertEquals(StorageType.SSD, targets[0].getStorageType()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index d7b7b41..6325957 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -44,6 +44,7 @@ import java.net.URI; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -96,6 +97,7 @@ import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.invocation.InvocationOnMock; @@ -252,7 +254,9 @@ public class TestDFSClientRetries { anyString(), any(ExtendedBlock.class), any(DatanodeInfo[].class), - anyLong(), any(String[].class))).thenAnswer(answer); + anyLong(), any(String[].class), + Matchers.<EnumSet<AddBlockFlag>>any())) + .thenAnswer(answer); Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( @@ -471,7 +475,8 @@ public class TestDFSClientRetries { } }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(), Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(), - Mockito.anyLong(), Mockito.<String[]> any()); + Mockito.anyLong(), Mockito.<String[]> any(), + Mockito.<EnumSet<AddBlockFlag>> any()); doAnswer(new Answer<Boolean>() { @@ -513,7 +518,8 @@ public class TestDFSClientRetries { Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock( Mockito.anyString(), Mockito.anyString(), Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(), - Mockito.anyLong(), Mockito.<String[]> any()); + Mockito.anyLong(), Mockito.<String[]> any(), + Mockito.<EnumSet<AddBlockFlag>> any()); Mockito.verify(spyNN, Mockito.atLeastOnce()).complete( Mockito.anyString(), Mockito.anyString(), Mockito.<ExtendedBlock>any(), anyLong()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index a404ac8..d9df1ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -22,19 +22,29 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.EnumSet; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; +import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.htrace.core.SpanId; import org.junit.AfterClass; import org.junit.Assert; @@ -42,8 +52,11 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class TestDFSOutputStream { @@ -52,7 +65,7 @@ public class TestDFSOutputStream { @BeforeClass public static void setup() throws IOException { Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); } /** @@ -80,7 +93,7 @@ public class TestDFSOutputStream { try { dos.close(); } catch (IOException e) { - Assert.assertEquals(e, dummy); + assertEquals(e, dummy); } thrown = (Throwable) Whitebox.getInternalState(ex, "thrown"); Assert.assertNull(thrown); @@ -127,7 +140,7 @@ public class TestDFSOutputStream { mock(HdfsFileStatus.class), mock(ExtendedBlock.class), client, - "foo", null, null, null, null, null); + "foo", null, null, null, null, null, null); DataOutputStream blockStream = mock(DataOutputStream.class); doThrow(new IOException()).when(blockStream).flush(); @@ -148,6 +161,47 @@ public class TestDFSOutputStream { Assert.assertTrue(congestedNodes.isEmpty()); } + @Test + public void testNoLocalWriteFlag() throws IOException { + DistributedFileSystem fs = cluster.getFileSystem(); + EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.NO_LOCAL_WRITE, + CreateFlag.CREATE); + BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager(); + DatanodeManager dm = bm.getDatanodeManager(); + try(FSDataOutputStream os = fs.create(new Path("/test-no-local"), + FsPermission.getDefault(), + flags, 512, (short)2, 512, null)) { + // Inject a DatanodeManager that returns one DataNode as local node for + // the client. + DatanodeManager spyDm = spy(dm); + DatanodeDescriptor dn1 = dm.getDatanodeListForReport + (HdfsConstants.DatanodeReportType.LIVE).get(0); + doReturn(dn1).when(spyDm).getDatanodeByHost("127.0.0.1"); + Whitebox.setInternalState(bm, "datanodeManager", spyDm); + byte[] buf = new byte[512 * 16]; + new Random().nextBytes(buf); + os.write(buf); + } finally { + Whitebox.setInternalState(bm, "datanodeManager", dm); + } + cluster.triggerBlockReports(); + final String bpid = cluster.getNamesystem().getBlockPoolId(); + // Total number of DataNodes is 3. + assertEquals(3, cluster.getAllBlockReports(bpid).size()); + int numDataNodesWithData = 0; + for (Map<DatanodeStorage, BlockListAsLongs> dnBlocks : + cluster.getAllBlockReports(bpid)) { + for (BlockListAsLongs blocks : dnBlocks.values()) { + if (blocks.getNumberOfBlocks() > 0) { + numDataNodesWithData++; + break; + } + } + } + // Verify that only one DN has no data. + assertEquals(1, 3 - numDataNodesWithData); + } + @AfterClass public static void tearDown() { if (cluster != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 05c98ac..e47c8b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -531,7 +531,7 @@ public class TestFileCreation { // add one block to the file LocatedBlock location = client.getNamenode().addBlock(file1.toString(), - client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); + client.clientName, null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null); System.out.println("testFileCreationError2: " + "Added block " + location.getBlock()); @@ -582,7 +582,7 @@ public class TestFileCreation { createFile(dfs, f, 3); try { cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName, - null, null, HdfsConstants.GRANDFATHER_INODE_ID, null); + null, null, HdfsConstants.GRANDFATHER_INODE_ID, null, null); fail(); } catch(IOException ioe) { FileSystem.LOG.info("GOOD!", ioe); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java index 7dc52fa..99986e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -156,6 +156,6 @@ abstract public class BaseReplicationPolicyTest { Set<Node> excludedNodes) { return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, false, excludedNodes, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java index f1e4e1c..a5090cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBlockPlacementPolicy.java @@ -142,7 +142,7 @@ public class TestAvailableSpaceBlockPlacementPolicy { .getBlockManager() .getBlockPlacementPolicy() .chooseTarget(file, replica, null, new ArrayList<DatanodeStorageInfo>(), false, null, - blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); Assert.assertTrue(targets.length == replica); for (int j = 0; j < replica; j++) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 04fff9f..1d4a464 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -38,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -290,7 +293,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { excludedNodes.add(dataNodes[1]); chosenNodes.add(storages[2]); targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); //make sure that the chosen node is in the target. @@ -667,7 +671,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { .getNamesystem().getBlockManager().getBlockPlacementPolicy(); DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3, staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null, - BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); assertEquals(targets.length, 3); assertFalse(isOnSameRack(targets[0], staleNodeInfo)); @@ -693,7 +698,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Call chooseTarget targets = replicator.chooseTarget(filename, 3, staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); assertEquals(targets.length, 3); assertTrue(isOnSameRack(targets[0], staleNodeInfo)); @@ -1490,8 +1495,42 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, DatanodeDescriptor writer, Set<Node> excludedNodes, List<DatanodeDescriptor> favoredNodes) { + return chooseTarget(numOfReplicas, writer, excludedNodes, + favoredNodes, null); + } + + private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, Set<Node> excludedNodes, + List<DatanodeDescriptor> favoredNodes, EnumSet<AddBlockFlag> flags) { return replicator.chooseTarget(filename, numOfReplicas, writer, excludedNodes, BLOCK_SIZE, favoredNodes, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, flags); + } + + @Test + public void testAvoidLocalWrite() throws IOException { + DatanodeDescriptor writer = dataNodes[2]; + EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE); + DatanodeStorageInfo[] targets; + targets = chooseTarget(5, writer, null, null, flags); + for (DatanodeStorageInfo info : targets) { + assertNotEquals(info.getDatanodeDescriptor(), writer); + } + } + + @Test + public void testAvoidLocalWriteNoEnoughNodes() throws IOException { + DatanodeDescriptor writer = dataNodes[2]; + EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE); + DatanodeStorageInfo[] targets; + targets = chooseTarget(6, writer, null, null, flags); + assertEquals(6, targets.length); + boolean found = false; + for (DatanodeStorageInfo info : targets) { + if (info.getDatanodeDescriptor().equals(writer)) { + found = true; + } + } + assertTrue(found); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index 123e635..1992fcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -112,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn, new ArrayList<DatanodeStorageInfo>(), false, null, - 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); assertEquals(3, targets.length); Set<DatanodeStorageInfo> targetSet = new HashSet<>( @@ -170,7 +170,7 @@ public class TestReplicationPolicyConsiderLoad DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() .getBlockPlacementPolicy().chooseTarget("testFile.txt", 3, writerDn, new ArrayList<DatanodeStorageInfo>(), false, null, - 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); for(DatanodeStorageInfo info : targets) { assertTrue("The node "+info.getDatanodeDescriptor().getName()+ " has higher load and should not have been picked!", http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index edcab10..1fb46f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -269,7 +269,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes List<DatanodeDescriptor> favoredNodes) { return replicator.chooseTarget(filename, numOfReplicas, writer, excludedNodes, BLOCK_SIZE, favoredNodes, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); } /** @@ -351,7 +351,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes Set<Node> excludedNodes = new HashSet<>(); excludedNodes.add(dataNodes[1]); targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); assertEquals(targets.length, 4); assertEquals(storages[0], targets[0]); @@ -369,7 +370,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes excludedNodes.add(dataNodes[1]); chosenNodes.add(storages[2]); targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, - excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, + null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); //make sure that the chosen node is in the target. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index c939220..c157ed1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -187,7 +187,7 @@ public class TestReplicationPolicyWithUpgradeDomain chosenNodes.add(storages[2]); targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, excludedNodes, BLOCK_SIZE, - TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); System.out.println("targets=" + Arrays.asList(targets)); assertEquals(2, targets.length); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index c93fd89..6744571 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -1204,7 +1204,7 @@ public class NNThroughputBenchmark implements Tool { for (int i = 0; i < 30; i++) { try { return clientProto.addBlock(src, clientName, - previous, excludeNodes, fileId, favoredNodes); + previous, excludeNodes, fileId, favoredNodes, null); } catch (NotReplicatedYetException|RemoteException e) { if (e instanceof RemoteException) { String className = ((RemoteException) e).getClassName(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java index a8cd5b9..94abe3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java @@ -101,13 +101,13 @@ public class TestAddBlockRetry { ns.readUnlock();; } DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock( - ns.getBlockManager(), src, null, null, r); + ns.getBlockManager(), src, null, null, null, r); assertNotNull("Targets must be generated", targets); // run second addBlock() LOG.info("Starting second addBlock for " + src); nn.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertTrue("Penultimate block must be complete", checkFileProgress(src, false)); LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE); @@ -161,14 +161,14 @@ public class TestAddBlockRetry { // start first addBlock() LOG.info("Starting first addBlock for " + src); LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertTrue("Block locations should be present", lb1.getLocations().length > 0); cluster.restartNameNode(); nameNodeRpc = cluster.getNameNodeRpc(); LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock()); assertTrue("Wrong locations with retry", lb2.getLocations().length > 0); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java index 9f844d7..f40c464 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java @@ -113,7 +113,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); doTestLocatedBlock(replication, locatedBlock); //test chooseTarget for existing file. @@ -143,7 +143,7 @@ public class TestBlockPlacementPolicyRackFaultTolerant { //test chooseTarget for new file LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); doTestLocatedBlock(20, locatedBlock); DatanodeInfo[] locs = locatedBlock.getLocations(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 442873d..1455dae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -165,7 +165,8 @@ public class TestDeadDatanode { // choose the targets, but local node should not get selected as this is not // part of the cluster anymore DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3, - clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7); + clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7, + null); for (DatanodeStorageInfo datanodeStorageInfo : results) { assertFalse("Dead node should not be choosen", datanodeStorageInfo .getDatanodeDescriptor().equals(clientNode)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index a54040b..1a10b7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -140,7 +140,7 @@ public class TestDefaultBlockPlacementPolicy { clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null); + null, null, fileStatus.getFileId(), null, null); assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java index 7d4eb31..15f697a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeleteRace.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.FileNotFoundException; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -123,10 +125,11 @@ public class TestDeleteRace { boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + EnumSet<AddBlockFlag> flags) { DatanodeStorageInfo[] results = super.chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, returnChosenNodes, excludedNodes, - blocksize, storagePolicy); + blocksize, storagePolicy, flags); try { Thread.sleep(3000); } catch (InterruptedException e) {} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index 3530ff2..d58284b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -836,7 +836,7 @@ public class TestHASafeMode { new ExtendedBlock(previousBlock), new DatanodeInfo[0], DFSClientAdapter.getFileId((DFSOutputStream) create - .getWrappedStream()), null); + .getWrappedStream()), null, null); cluster.restartNameNode(0, true); cluster.restartDataNode(0); cluster.transitionToActive(0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java index 694d15e..812bcc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java @@ -198,7 +198,7 @@ public class TestOpenFilesWithSnapshot { String clientName = fs.getClient().getClientName(); // create one empty block nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null, - HdfsConstants.GRANDFATHER_INODE_ID, null); + HdfsConstants.GRANDFATHER_INODE_ID, null, null); fs.createSnapshot(path, "s2"); fs.rename(new Path("/test/test"), new Path("/test/test-renamed")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5477c31c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java index 3f06d3d..44e8b35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java @@ -282,7 +282,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); @@ -319,7 +319,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); @@ -358,7 +358,7 @@ public class TestSnapshotBlocksMap { ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]); cluster.getNameNodeRpc() .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous, - null, barNode.getId(), null); + null, barNode.getId(), null, null); SnapshotTestHelper.createSnapshot(hdfs, foo, "s1"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
