Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Fri Jul 25 20:33:09 2014 @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -71,11 +72,20 @@ public interface DataTransferProtocol { /** * Write a block to a datanode pipeline. - * + * The receiver datanode of this call is the next datanode in the pipeline. + * The other downstream datanodes are specified by the targets parameter. + * Note that the receiver {@link DatanodeInfo} is not required in the + * parameter list since the receiver datanode knows its info. However, the + * {@link StorageType} for storing the replica in the receiver datanode is a + * parameter since the receiver datanode may support multiple storage types. + * * @param blk the block being written. + * @param storageType for storing the replica in the receiver datanode. * @param blockToken security token for accessing the block. * @param clientName client's name. - * @param targets target datanodes in the pipeline. + * @param targets other downstream datanodes in the pipeline. + * @param targetStorageTypes target {@link StorageType}s corresponding + * to the target datanodes. * @param source source datanode. * @param stage pipeline stage. * @param pipelineSize the size of the pipeline. @@ -84,9 +94,11 @@ public interface DataTransferProtocol { * @param latestGenerationStamp the latest generation stamp of the block. */ public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -110,7 +122,8 @@ public interface DataTransferProtocol { public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException; + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException; /** * Request short circuit access file descriptors from a DataNode. @@ -148,11 +161,13 @@ public interface DataTransferProtocol { * It is used for balancing purpose. * * @param blk the block being replaced. + * @param storageType the {@link StorageType} for storing the block. * @param blockToken security token for accessing the block. * @param delHint the hint for deleting the block in the original datanode. * @param source the source datanode for receiving the block. */ public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source) throws IOException;
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Fri Jul 25 20:33:09 2014 @@ -27,7 +27,7 @@ import java.nio.channels.ReadableByteCha import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.util.DirectBufferPool; +import org.apache.hadoop.util.DirectBufferPool; import org.apache.hadoop.io.IOUtils; import com.google.common.base.Preconditions; Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Jul 25 20:33:09 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; @@ -121,10 +122,13 @@ public abstract class Receiver implement /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList()), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), @@ -140,10 +144,12 @@ public abstract class Receiver implement private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList())); + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ @@ -176,6 +182,7 @@ public abstract class Receiver implement private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Fri Jul 25 20:33:09 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -111,9 +112,11 @@ public class Sender implements DataTrans @Override public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -130,7 +133,9 @@ public class Sender implements DataTrans OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) + .setStorageType(PBHelper.convertStorageType(storageType)) .addAllTargets(PBHelper.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) @@ -150,12 +155,14 @@ public class Sender implements DataTrans public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelper.convert(targets)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -196,11 +203,13 @@ public class Sender implements DataTrans @Override public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelper.convertStorageType(storageType)) .setDelHint(delHint) .setSource(PBHelper.convertDatanodeInfo(source)) .build(); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Fri Jul 25 20:33:09 2014 @@ -97,7 +97,7 @@ public class DatanodeProtocolClientSideT RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, ProtobufRpcEngine.class); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi)); + rpcProxy = createNamenode(nameNodeAddr, conf, ugi); } private static DatanodeProtocolPB createNamenode( @@ -109,33 +109,6 @@ public class DatanodeProtocolClientSideT org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy(); } - /** Create a {@link NameNode} proxy */ - static DatanodeProtocolPB createNamenodeWithRetry( - DatanodeProtocolPB rpcNamenode) { - RetryPolicy createPolicy = RetryPolicies - .retryUpToMaximumCountWithFixedSleep(5, - HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); - - Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = - new HashMap<Class<? extends Exception>, RetryPolicy>(); - remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, - createPolicy); - - Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = - new HashMap<Class<? extends Exception>, RetryPolicy>(); - exceptionToPolicyMap.put(RemoteException.class, RetryPolicies - .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, - remoteExceptionToPolicyMap)); - RetryPolicy methodPolicy = RetryPolicies.retryByException( - RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); - Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>(); - - methodNameToPolicyMap.put("create", methodPolicy); - - return (DatanodeProtocolPB) RetryProxy.create(DatanodeProtocolPB.class, - rpcNamenode, methodNameToPolicyMap); - } - @Override public void close() throws IOException { RPC.stopProxy(rpcProxy); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Fri Jul 25 20:33:09 2014 @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolMetaInterface; +import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; @@ -61,7 +62,7 @@ import com.google.protobuf.ServiceExcept @InterfaceAudience.Private @InterfaceStability.Stable public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, - ProtocolMetaInterface, Closeable { + ProtocolMetaInterface, Closeable, ProtocolTranslator { /** RpcController is not used and hence is set to null */ private final static RpcController NULL_CONTROLLER = null; @@ -89,6 +90,11 @@ public class NamenodeProtocolTranslatorP } @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Jul 25 20:33:09 2014 @@ -150,6 +150,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; @@ -674,14 +675,8 @@ public class PBHelper { targets[i] = PBHelper.convert(locs.get(i)); } - final int storageTypesCount = proto.getStorageTypesCount(); - final StorageType[] storageTypes; - if (storageTypesCount == 0) { - storageTypes = null; - } else { - Preconditions.checkState(storageTypesCount == locs.size()); - storageTypes = convertStorageTypeProtos(proto.getStorageTypesList()); - } + final StorageType[] storageTypes = convertStorageTypes( + proto.getStorageTypesList(), locs.size()); final int storageIDsCount = proto.getStorageIDsCount(); final String[] storageIDs; @@ -969,6 +964,20 @@ public class PBHelper { targets[i] = PBHelper.convert(targetList.get(i)); } + StorageType[][] targetStorageTypes = new StorageType[targetList.size()][]; + List<StorageTypesProto> targetStorageTypesList = blkCmd.getTargetStorageTypesList(); + if (targetStorageTypesList.isEmpty()) { // missing storage types + for(int i = 0; i < targetStorageTypes.length; i++) { + targetStorageTypes[i] = new StorageType[targets[i].length]; + Arrays.fill(targetStorageTypes[i], StorageType.DEFAULT); + } + } else { + for(int i = 0; i < targetStorageTypes.length; i++) { + List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList(); + targetStorageTypes[i] = p.toArray(new StorageType[p.size()]); + } + } + List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList(); String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][]; for(int i = 0; i < targetStorageIDs.length; i++) { @@ -991,7 +1000,7 @@ public class PBHelper { throw new AssertionError("Unknown action type: " + blkCmd.getAction()); } return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets, - targetStorageIDs); + targetStorageTypes, targetStorageIDs); } public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) { @@ -1605,8 +1614,25 @@ public class PBHelper { } } - private static StorageTypeProto convertStorageType( - StorageType type) { + public static List<StorageTypeProto> convertStorageTypes( + StorageType[] types) { + return convertStorageTypes(types, 0); + } + + public static List<StorageTypeProto> convertStorageTypes( + StorageType[] types, int startIdx) { + if (types == null) { + return null; + } + final List<StorageTypeProto> protos = new ArrayList<StorageTypeProto>( + types.length); + for (int i = startIdx; i < types.length; ++i) { + protos.add(convertStorageType(types[i])); + } + return protos; + } + + public static StorageTypeProto convertStorageType(StorageType type) { switch(type) { case DISK: return StorageTypeProto.DISK; @@ -1621,7 +1647,7 @@ public class PBHelper { public static DatanodeStorage convert(DatanodeStorageProto s) { return new DatanodeStorage(s.getStorageUuid(), PBHelper.convertState(s.getState()), - PBHelper.convertType(s.getStorageType())); + PBHelper.convertStorageType(s.getStorageType())); } private static State convertState(StorageState state) { @@ -1634,7 +1660,7 @@ public class PBHelper { } } - private static StorageType convertType(StorageTypeProto type) { + public static StorageType convertStorageType(StorageTypeProto type) { switch(type) { case DISK: return StorageType.DISK; @@ -1646,11 +1672,16 @@ public class PBHelper { } } - private static StorageType[] convertStorageTypeProtos( - List<StorageTypeProto> storageTypesList) { - final StorageType[] storageTypes = new StorageType[storageTypesList.size()]; - for (int i = 0; i < storageTypes.length; ++i) { - storageTypes[i] = PBHelper.convertType(storageTypesList.get(i)); + public static StorageType[] convertStorageTypes( + List<StorageTypeProto> storageTypesList, int expectedSize) { + final StorageType[] storageTypes = new StorageType[expectedSize]; + if (storageTypesList.size() != expectedSize) { // missing storage types + Preconditions.checkState(storageTypesList.isEmpty()); + Arrays.fill(storageTypes, StorageType.DEFAULT); + } else { + for (int i = 0; i < storageTypes.length; ++i) { + storageTypes[i] = convertStorageType(storageTypesList.get(i)); + } } return storageTypes; } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Jul 25 20:33:09 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.balancer; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.BufferedInputStream; @@ -57,14 +59,17 @@ import org.apache.hadoop.conf.Configured import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -202,6 +207,7 @@ public class Balancer { private final NameNodeConnector nnc; private final BalancingPolicy policy; + private final SaslDataTransferClient saslClient; private final double threshold; // all data node lists @@ -352,19 +358,18 @@ public class Balancer { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); - if (nnc.getDataEncryptionKey() != null) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, nnc.getDataEncryptionKey()); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); + Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, nnc, accessToken, target.datanode); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE)); in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - sendRequest(out); + sendRequest(out, eb, StorageType.DEFAULT, accessToken); receiveResponse(in); bytesMoved.addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); @@ -395,10 +400,10 @@ public class Balancer { } /* Send a block replace request to the output stream*/ - private void sendRequest(DataOutputStream out) throws IOException { - final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); - final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); - new Sender(out).replaceBlock(eb, accessToken, + private void sendRequest(DataOutputStream out, ExtendedBlock eb, + StorageType storageType, + Token<BlockTokenIdentifier> accessToken) throws IOException { + new Sender(out).replaceBlock(eb, storageType, accessToken, source.getStorageID(), proxySource.getDatanode()); } @@ -876,6 +881,12 @@ public class Balancer { this.maxConcurrentMovesPerNode = conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + this.saslClient = new SaslDataTransferClient( + DataTransferSaslUtil.getSaslPropertiesResolver(conf), + TrustedChannelResolver.getInstance(conf), + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); } /* Given a data node set, build a network topology and decide Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Fri Jul 25 20:33:09 2014 @@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodePr import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon; * The class provides utilities for {@link Balancer} to access a NameNode */ @InterfaceAudience.Private -class NameNodeConnector { +class NameNodeConnector implements DataEncryptionKeyFactory { private static final Log LOG = Balancer.LOG; private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); private static final int MAX_NOT_CHANGED_ITERATIONS = 5; @@ -72,7 +72,6 @@ class NameNodeConnector { private BlockTokenSecretManager blockTokenSecretManager; private Daemon keyupdaterthread; // AccessKeyUpdater thread private DataEncryptionKey encryptionKey; - private final TrustedChannelResolver trustedChannelResolver; NameNodeConnector(URI nameNodeUri, Configuration conf) throws IOException { @@ -122,7 +121,6 @@ class NameNodeConnector { if (out == null) { throw new IOException("Another balancer is running"); } - this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); } boolean shouldContinue(long dispatchBlockMoveBytes) { @@ -154,10 +152,10 @@ class NameNodeConnector { BlockTokenSecretManager.AccessMode.COPY)); } } - - DataEncryptionKey getDataEncryptionKey() - throws IOException { - if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) { + + @Override + public DataEncryptionKey newDataEncryptionKey() { + if (encryptDataTransfer) { synchronized (this) { if (encryptionKey == null) { encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 25 20:33:09 2014 @@ -725,7 +725,6 @@ public class BlockManager { final List<DatanodeStorageInfo> locations = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block)); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { - final String storageID = storage.getStorageID(); // filter invalidate replicas if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) { locations.add(storage); @@ -2637,7 +2636,7 @@ public class BlockManager { if (addedNode == delNodeHint) { delNodeHint = null; } - Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>(); + Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>(); Collection<DatanodeDescriptor> corruptNodes = corruptReplicas .getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { @@ -2657,7 +2656,7 @@ public class BlockManager { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas if (corruptNodes == null || !corruptNodes.contains(cur)) { - nonExcess.add(cur); + nonExcess.add(storage); } } } @@ -2681,7 +2680,7 @@ public class BlockManager { * If no such a node is available, * then pick a node with least free space */ - private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, + private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, Block b, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint, @@ -2689,28 +2688,33 @@ public class BlockManager { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(b); - final Map<String, List<DatanodeDescriptor>> rackMap - = new HashMap<String, List<DatanodeDescriptor>>(); - final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>(); - final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>(); + + final Map<String, List<DatanodeStorageInfo>> rackMap + = new HashMap<String, List<DatanodeStorageInfo>>(); + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); // split nodes into two sets // moreThanOne contains nodes on rack with more than one replica // exactlyOne contains the remaining nodes - replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, - exactlyOne); + replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne); // pick one node to delete that favors the delete hint // otherwise pick one with least space from priSet if it is not empty // otherwise one node with least space from remains boolean firstOne = true; + final DatanodeStorageInfo delNodeHintStorage + = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint); + final DatanodeStorageInfo addedNodeStorage + = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode); while (nonExcess.size() - replication > 0) { // check if we can delete delNodeHint - final DatanodeInfo cur; - if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) - && (moreThanOne.contains(delNodeHint) - || (addedNode != null && !moreThanOne.contains(addedNode))) ) { - cur = delNodeHint; + final DatanodeStorageInfo cur; + if (firstOne && delNodeHintStorage != null + && (moreThanOne.contains(delNodeHintStorage) + || (addedNodeStorage != null + && !moreThanOne.contains(addedNodeStorage)))) { + cur = delNodeHintStorage; } else { // regular excessive replica removal cur = replicator.chooseReplicaToDelete(bc, b, replication, moreThanOne, exactlyOne); @@ -2722,7 +2726,7 @@ public class BlockManager { exactlyOne, cur); nonExcess.remove(cur); - addToExcessReplicate(cur, b); + addToExcessReplicate(cur.getDatanodeDescriptor(), b); // // The 'excessblocks' tracks blocks until we get confirmation @@ -2733,7 +2737,7 @@ public class BlockManager { // should be deleted. Items are removed from the invalidate list // upon giving instructions to the namenode. // - addToInvalidates(b, cur); + addToInvalidates(b, cur.getDatanodeDescriptor()); blockLog.info("BLOCK* chooseExcessReplicates: " +"("+cur+", "+b+") is added to invalidated blocks set"); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Fri Jul 25 20:33:09 2014 @@ -124,11 +124,12 @@ public abstract class BlockPlacementPoli listed in the previous parameter. * @return the replica that is the best candidate for deletion */ - abstract public DatanodeDescriptor chooseReplicaToDelete(BlockCollection srcBC, - Block block, - short replicationFactor, - Collection<DatanodeDescriptor> existingReplicas, - Collection<DatanodeDescriptor> moreExistingReplicas); + abstract public DatanodeStorageInfo chooseReplicaToDelete( + BlockCollection srcBC, + Block block, + short replicationFactor, + Collection<DatanodeStorageInfo> existingReplicas, + Collection<DatanodeStorageInfo> moreExistingReplicas); /** * Used to setup a BlockPlacementPolicy object. This should be defined by @@ -175,21 +176,23 @@ public abstract class BlockPlacementPoli * @param exactlyOne The List of replica nodes on rack with only one replica * @param cur current replica to remove */ - public void adjustSetsWithChosenReplica(final Map<String, - List<DatanodeDescriptor>> rackMap, - final List<DatanodeDescriptor> moreThanOne, - final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) { + public void adjustSetsWithChosenReplica( + final Map<String, List<DatanodeStorageInfo>> rackMap, + final List<DatanodeStorageInfo> moreThanOne, + final List<DatanodeStorageInfo> exactlyOne, + final DatanodeStorageInfo cur) { - String rack = getRack(cur); - final List<DatanodeDescriptor> datanodes = rackMap.get(rack); - datanodes.remove(cur); - if (datanodes.isEmpty()) { + final String rack = getRack(cur.getDatanodeDescriptor()); + final List<DatanodeStorageInfo> storages = rackMap.get(rack); + storages.remove(cur); + if (storages.isEmpty()) { rackMap.remove(rack); } if (moreThanOne.remove(cur)) { - if (datanodes.size() == 1) { - moreThanOne.remove(datanodes.get(0)); - exactlyOne.add(datanodes.get(0)); + if (storages.size() == 1) { + final DatanodeStorageInfo remaining = storages.get(0); + moreThanOne.remove(remaining); + exactlyOne.add(remaining); } } else { exactlyOne.remove(cur); @@ -214,28 +217,28 @@ public abstract class BlockPlacementPoli * @param exactlyOne remains contains the remaining nodes */ public void splitNodesWithRack( - Collection<DatanodeDescriptor> dataNodes, - final Map<String, List<DatanodeDescriptor>> rackMap, - final List<DatanodeDescriptor> moreThanOne, - final List<DatanodeDescriptor> exactlyOne) { - for(DatanodeDescriptor node : dataNodes) { - final String rackName = getRack(node); - List<DatanodeDescriptor> datanodeList = rackMap.get(rackName); - if (datanodeList == null) { - datanodeList = new ArrayList<DatanodeDescriptor>(); - rackMap.put(rackName, datanodeList); + final Iterable<DatanodeStorageInfo> storages, + final Map<String, List<DatanodeStorageInfo>> rackMap, + final List<DatanodeStorageInfo> moreThanOne, + final List<DatanodeStorageInfo> exactlyOne) { + for(DatanodeStorageInfo s: storages) { + final String rackName = getRack(s.getDatanodeDescriptor()); + List<DatanodeStorageInfo> storageList = rackMap.get(rackName); + if (storageList == null) { + storageList = new ArrayList<DatanodeStorageInfo>(); + rackMap.put(rackName, storageList); } - datanodeList.add(node); + storageList.add(s); } // split nodes into two sets - for(List<DatanodeDescriptor> datanodeList : rackMap.values()) { - if (datanodeList.size() == 1) { + for(List<DatanodeStorageInfo> storageList : rackMap.values()) { + if (storageList.size() == 1) { // exactlyOne contains nodes on rack with only one replica - exactlyOne.add(datanodeList.get(0)); + exactlyOne.add(storageList.get(0)); } else { // moreThanOne contains nodes on rack with more than one replica - moreThanOne.addAll(datanodeList); + moreThanOne.addAll(storageList); } } } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Jul 25 20:33:09 2014 @@ -145,14 +145,14 @@ public class BlockPlacementPolicyDefault List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(); boolean avoidStaleNodes = stats != null && stats.isAvoidingStaleDataNodesForWrite(); - for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) { + for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) { DatanodeDescriptor favoredNode = favoredNodes.get(i); // Choose a single node which is local to favoredNode. // 'results' is updated within chooseLocalNode final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, favoriteAndExcludedNodes, blocksize, getMaxNodesPerRack(results.size(), numOfReplicas)[1], - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageType, false); if (target == null) { LOG.warn("Could not find a target for file " + src + " with favored node " + favoredNode); @@ -271,7 +271,7 @@ public class BlockPlacementPolicyDefault try { if (numOfResults == 0) { writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType) + maxNodesPerRack, results, avoidStaleNodes, storageType, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -345,12 +345,14 @@ public class BlockPlacementPolicyDefault int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) + StorageType storageType, + boolean fallbackToLocalRack) throws NotEnoughReplicasException { // if no local machine, randomly choose one node - if (localMachine == null) + if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + } if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first @@ -363,7 +365,11 @@ public class BlockPlacementPolicyDefault } } } - } + } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); @@ -636,15 +642,11 @@ public class BlockPlacementPolicyDefault // check the communication traffic of the target machine if (considerLoad) { - double avgLoad = 0; - if (stats != null) { - int size = stats.getNumDatanodesInService(); - if (size != 0) { - avgLoad = (double)stats.getTotalLoad()/size; - } - } - if (node.getXceiverCount() > (2.0 * avgLoad)) { - logNodeIsNotChosen(storage, "the node is too busy "); + final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); + final int nodeLoad = node.getXceiverCount(); + if (nodeLoad > maxLoad) { + logNodeIsNotChosen(storage, + "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") "); return false; } } @@ -727,31 +729,34 @@ public class BlockPlacementPolicyDefault } @Override - public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc, + public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc, Block block, short replicationFactor, - Collection<DatanodeDescriptor> first, - Collection<DatanodeDescriptor> second) { + Collection<DatanodeStorageInfo> first, + Collection<DatanodeStorageInfo> second) { long oldestHeartbeat = now() - heartbeatInterval * tolerateHeartbeatMultiplier; - DatanodeDescriptor oldestHeartbeatNode = null; + DatanodeStorageInfo oldestHeartbeatStorage = null; long minSpace = Long.MAX_VALUE; - DatanodeDescriptor minSpaceNode = null; + DatanodeStorageInfo minSpaceStorage = null; // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - for(DatanodeDescriptor node : pickupReplicaSet(first, second)) { + for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); long free = node.getRemaining(); long lastHeartbeat = node.getLastUpdate(); if(lastHeartbeat < oldestHeartbeat) { oldestHeartbeat = lastHeartbeat; - oldestHeartbeatNode = node; + oldestHeartbeatStorage = storage; } if (minSpace > free) { minSpace = free; - minSpaceNode = node; + minSpaceStorage = storage; } } - return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode; + + return oldestHeartbeatStorage != null? oldestHeartbeatStorage + : minSpaceStorage; } /** @@ -760,9 +765,9 @@ public class BlockPlacementPolicyDefault * replica while second set contains remaining replica nodes. * So pick up first set if not empty. If first is empty, then pick second. */ - protected Collection<DatanodeDescriptor> pickupReplicaSet( - Collection<DatanodeDescriptor> first, - Collection<DatanodeDescriptor> second) { + protected Collection<DatanodeStorageInfo> pickupReplicaSet( + Collection<DatanodeStorageInfo> first, + Collection<DatanodeStorageInfo> second) { return first.isEmpty() ? second : first; } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Jul 25 20:33:09 2014 @@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNod protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + StorageType storageType, boolean fallbackToLocalRack + ) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, @@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNod if (chosenStorage != null) { return chosenStorage; } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); @@ -286,9 +291,9 @@ public class BlockPlacementPolicyWithNod * If first is empty, then pick second. */ @Override - public Collection<DatanodeDescriptor> pickupReplicaSet( - Collection<DatanodeDescriptor> first, - Collection<DatanodeDescriptor> second) { + public Collection<DatanodeStorageInfo> pickupReplicaSet( + Collection<DatanodeStorageInfo> first, + Collection<DatanodeStorageInfo> second) { // If no replica within same rack, return directly. if (first.isEmpty()) { return second; @@ -296,25 +301,24 @@ public class BlockPlacementPolicyWithNod // Split data nodes in the first set into two sets, // moreThanOne contains nodes on nodegroup with more than one replica // exactlyOne contains the remaining nodes - Map<String, List<DatanodeDescriptor>> nodeGroupMap = - new HashMap<String, List<DatanodeDescriptor>>(); + Map<String, List<DatanodeStorageInfo>> nodeGroupMap = + new HashMap<String, List<DatanodeStorageInfo>>(); - for(DatanodeDescriptor node : first) { - final String nodeGroupName = - NetworkTopology.getLastHalf(node.getNetworkLocation()); - List<DatanodeDescriptor> datanodeList = - nodeGroupMap.get(nodeGroupName); - if (datanodeList == null) { - datanodeList = new ArrayList<DatanodeDescriptor>(); - nodeGroupMap.put(nodeGroupName, datanodeList); + for(DatanodeStorageInfo storage : first) { + final String nodeGroupName = NetworkTopology.getLastHalf( + storage.getDatanodeDescriptor().getNetworkLocation()); + List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName); + if (storageList == null) { + storageList = new ArrayList<DatanodeStorageInfo>(); + nodeGroupMap.put(nodeGroupName, storageList); } - datanodeList.add(node); + storageList.add(storage); } - final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>(); - final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>(); + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); // split nodes into two sets - for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) { + for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) { if (datanodeList.size() == 1 ) { // exactlyOne contains nodes on nodegroup with exactly one replica exactlyOne.add(datanodeList.get(0)); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jul 25 20:33:09 2014 @@ -345,7 +345,8 @@ public class DatanodeManager { /** Sort the located blocks by the distance to the target host. */ public void sortLocatedBlocks(final String targethost, - final List<LocatedBlock> locatedblocks) { + final List<LocatedBlock> locatedblocks, + boolean randomizeBlockLocationsPerBlock) { //sort the blocks // As it is possible for the separation of node manager and datanode, // here we should get node but not datanode only . @@ -372,8 +373,8 @@ public class DatanodeManager { --lastActiveIndex; } int activeLen = lastActiveIndex + 1; - networktopology.sortByDistance(client, b.getLocations(), activeLen, - b.getBlock().getBlockId()); + networktopology.sortByDistance(client, b.getLocations(), activeLen, b + .getBlock().getBlockId(), randomizeBlockLocationsPerBlock); } } @@ -820,7 +821,9 @@ public class DatanodeManager { } /** Start decommissioning the specified datanode. */ - private void startDecommission(DatanodeDescriptor node) { + @InterfaceAudience.Private + @VisibleForTesting + public void startDecommission(DatanodeDescriptor node) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) { LOG.info("Start Decommissioning " + node + " " + storage Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Fri Jul 25 20:33:09 2014 @@ -52,6 +52,12 @@ public interface DatanodeStatistics { /** @return the xceiver count */ public int getXceiverCount(); + /** @return average xceiver count for non-decommission(ing|ed) nodes */ + public int getInServiceXceiverCount(); + + /** @return number of non-decommission(ing|ed) nodes */ + public int getNumDatanodesInService(); + /** * @return the total used space by data nodes for non-DFS purposes * such as storing temporary files on the local file system Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Fri Jul 25 20:33:09 2014 @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -290,4 +291,21 @@ public class DatanodeStorageInfo { public String toString() { return "[" + storageType + "]" + storageID + ":" + state; } + + /** @return the first {@link DatanodeStorageInfo} corresponding to + * the given datanode + */ + static DatanodeStorageInfo getDatanodeStorageInfo( + final Iterable<DatanodeStorageInfo> infos, + final DatanodeDescriptor datanode) { + if (datanode == null) { + return null; + } + for(DatanodeStorageInfo storage : infos) { + if (storage.getDatanodeDescriptor() == datanode) { + return storage; + } + } + return null; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Jul 25 20:33:09 2014 @@ -151,6 +151,16 @@ class HeartbeatManager implements Datano } @Override + public synchronized int getInServiceXceiverCount() { + return stats.nodesInServiceXceiverCount; + } + + @Override + public synchronized int getNumDatanodesInService() { + return stats.nodesInService; + } + + @Override public synchronized long getCacheCapacity() { return stats.cacheCapacity; } @@ -178,7 +188,7 @@ class HeartbeatManager implements Datano } synchronized void register(final DatanodeDescriptor d) { - if (!datanodes.contains(d)) { + if (!d.isAlive) { addDatanode(d); //update its timestamp @@ -191,6 +201,8 @@ class HeartbeatManager implements Datano } synchronized void addDatanode(final DatanodeDescriptor d) { + // update in-service node count + stats.add(d); datanodes.add(d); d.isAlive = true; } @@ -323,6 +335,9 @@ class HeartbeatManager implements Datano private long cacheCapacity = 0L; private long cacheUsed = 0L; + private int nodesInService = 0; + private int nodesInServiceXceiverCount = 0; + private int expiredHeartbeats = 0; private void add(final DatanodeDescriptor node) { @@ -330,6 +345,8 @@ class HeartbeatManager implements Datano blockPoolUsed += node.getBlockPoolUsed(); xceiverCount += node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService++; + nodesInServiceXceiverCount += node.getXceiverCount(); capacityTotal += node.getCapacity(); capacityRemaining += node.getRemaining(); } else { @@ -344,6 +361,8 @@ class HeartbeatManager implements Datano blockPoolUsed -= node.getBlockPoolUsed(); xceiverCount -= node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService--; + nodesInServiceXceiverCount -= node.getXceiverCount(); capacityTotal -= node.getCapacity(); capacityRemaining -= node.getRemaining(); } else { Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Jul 25 20:33:09 2014 @@ -93,7 +93,8 @@ public final class HdfsServerConstants { FORCE("-force"), NONINTERACTIVE("-nonInteractive"), RENAMERESERVED("-renameReserved"), - METADATAVERSION("-metadataVersion"); + METADATAVERSION("-metadataVersion"), + UPGRADEONLY("-upgradeOnly"); private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile( "(\\w+)\\((\\w+)\\)"); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Jul 25 20:33:09 2014 @@ -575,7 +575,8 @@ class BPOfferService { switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), + bcmd.getTargets(), bcmd.getTargetStorageTypes()); dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Jul 25 20:33:09 2014 @@ -84,6 +84,10 @@ class BlockPoolSliceScanner { private final SortedSet<BlockScanInfo> blockInfoSet = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR); + + private final SortedSet<BlockScanInfo> newBlockInfoSet = + new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR); + private final GSet<Block, BlockScanInfo> blockMap = new LightWeightGSet<Block, BlockScanInfo>( LightWeightGSet.computeCapacity(0.5, "BlockMap")); @@ -195,7 +199,7 @@ class BlockPoolSliceScanner { BlockScanInfo info = new BlockScanInfo( block ); info.lastScanTime = scanTime--; //still keep 'info.lastScanType' to NONE. - addBlockInfo(info); + addBlockInfo(info, false); } RollingLogs rollingLogs = null; @@ -221,25 +225,42 @@ class BlockPoolSliceScanner { // Should we change throttler bandwidth every time bytesLeft changes? // not really required. } - - private synchronized void addBlockInfo(BlockScanInfo info) { - boolean added = blockInfoSet.add(info); + + /** + * Add the BlockScanInfo to sorted set of blockScanInfo + * @param info BlockScanInfo to be added + * @param isNewBlock true if the block is the new Block, false if + * BlockScanInfo is being updated with new scanTime + */ + private synchronized void addBlockInfo(BlockScanInfo info, + boolean isNewBlock) { + boolean added = false; + if (isNewBlock) { + // check whether the block already present + boolean exists = blockInfoSet.contains(info); + added = !exists && newBlockInfoSet.add(info); + } else { + added = blockInfoSet.add(info); + } blockMap.put(info); if (added) { updateBytesToScan(info.getNumBytes(), info.lastScanTime); } } - + private synchronized void delBlockInfo(BlockScanInfo info) { boolean exists = blockInfoSet.remove(info); + if (!exists){ + exists = newBlockInfoSet.remove(info); + } blockMap.remove(info); if (exists) { updateBytesToScan(-info.getNumBytes(), info.lastScanTime); } } - + /** Update blockMap by the given LogEntry */ private synchronized void updateBlockInfo(LogEntry e) { BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp)); @@ -249,7 +270,7 @@ class BlockPoolSliceScanner { delBlockInfo(info); info.lastScanTime = e.verificationTime; info.lastScanType = ScanType.VERIFICATION_SCAN; - addBlockInfo(info); + addBlockInfo(info, false); } } @@ -275,14 +296,14 @@ class BlockPoolSliceScanner { info = new BlockScanInfo(block.getLocalBlock()); info.lastScanTime = getNewBlockScanTime(); - addBlockInfo(info); + addBlockInfo(info, true); adjustThrottler(); } /** Deletes the block from internal structures */ synchronized void deleteBlock(Block block) { BlockScanInfo info = blockMap.get(block); - if ( info != null ) { + if (info != null) { delBlockInfo(info); } } @@ -310,23 +331,16 @@ class BlockPoolSliceScanner { } } - private synchronized void updateScanStatus(Block block, + private synchronized void updateScanStatus(BlockScanInfo info, ScanType type, boolean scanOk) { - BlockScanInfo info = blockMap.get(block); - - if ( info != null ) { - delBlockInfo(info); - } else { - // It might already be removed. Thats ok, it will be caught next time. - info = new BlockScanInfo(block); - } - + delBlockInfo(info); + long now = Time.monotonicNow(); info.lastScanType = type; info.lastScanTime = now; info.lastScanOk = scanOk; - addBlockInfo(info); + addBlockInfo(info, false); // Don't update meta data if the verification failed. if (!scanOk) { @@ -334,8 +348,8 @@ class BlockPoolSliceScanner { } if (verificationLog != null) { - verificationLog.append(now, block.getGenerationStamp(), - block.getBlockId()); + verificationLog.append(now, info.getGenerationStamp(), + info.getBlockId()); } } @@ -434,11 +448,13 @@ class BlockPoolSliceScanner { totalTransientErrors++; } - updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true); + updateScanStatus((BlockScanInfo)block.getLocalBlock(), + ScanType.VERIFICATION_SCAN, true); return; } catch (IOException e) { - updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false); + updateScanStatus((BlockScanInfo)block.getLocalBlock(), + ScanType.VERIFICATION_SCAN, false); // If the block does not exists anymore, then its not an error if (!dataset.contains(block)) { @@ -497,7 +513,7 @@ class BlockPoolSliceScanner { // Picks one block and verifies it private void verifyFirstBlock() { - Block block = null; + BlockScanInfo block = null; synchronized (this) { if (!blockInfoSet.isEmpty()) { block = blockInfoSet.first(); @@ -583,7 +599,7 @@ class BlockPoolSliceScanner { delBlockInfo(info); info.lastScanTime = lastScanTime; lastScanTime += verifyInterval; - addBlockInfo(info); + addBlockInfo(info, false); } } } @@ -679,12 +695,21 @@ class BlockPoolSliceScanner { throw e; } finally { rollVerificationLogs(); + rollNewBlocksInfo(); if (LOG.isDebugEnabled()) { LOG.debug("Done scanning block pool: " + blockPoolId); } } } - + + // add new blocks to scan in next iteration + private synchronized void rollNewBlocksInfo() { + for (BlockScanInfo newBlock : newBlockInfoSet) { + blockInfoSet.add(newBlock); + } + newBlockInfoSet.clear(); + } + private synchronized void rollVerificationLogs() { if (verificationLog != null) { try { Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Jul 25 20:33:09 2014 @@ -37,6 +37,7 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -122,7 +123,8 @@ class BlockReceiver implements Closeable private boolean syncOnClose; private long restartBudget; - BlockReceiver(final ExtendedBlock block, final DataInputStream in, + BlockReceiver(final ExtendedBlock block, final StorageType storageType, + final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, @@ -162,11 +164,11 @@ class BlockReceiver implements Closeable // Open local disk out // if (isDatanode) { //replication or move - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(block); + replicaInfo = datanode.data.createRbw(storageType, block); datanode.notifyNamenodeReceivingBlock( block, replicaInfo.getStorageUuid()); break; @@ -198,7 +200,7 @@ class BlockReceiver implements Closeable case TRANSFER_RBW: case TRANSFER_FINALIZED: // this is a transfer destination - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Jul 25 20:33:09 2014 @@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.security.SaslPropertiesResolver; /** * Simple class encapsulating all of the configuration that the DataNode @@ -86,6 +88,7 @@ public class DNConf { final String minimumNameNodeVersion; final String encryptionAlgorithm; + final SaslPropertiesResolver saslPropsResolver; final TrustedChannelResolver trustedChannelResolver; final long xceiverStopTimeout; @@ -168,6 +171,8 @@ public class DNConf { DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); + this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver( + conf); this.xceiverStopTimeout = conf.getLong( DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, @@ -186,7 +191,26 @@ public class DNConf { String getMinimumNameNodeVersion() { return this.minimumNameNodeVersion; } - + + /** + * Returns true if encryption enabled for DataTransferProtocol. + * + * @return boolean true if encryption enabled for DataTransferProtocol + */ + public boolean getEncryptDataTransfer() { + return encryptDataTransfer; + } + + /** + * Returns encryption algorithm configured for DataTransferProtocol, or null + * if not configured. + * + * @return encryption algorithm configured for DataTransferProtocol + */ + public String getEncryptionAlgorithm() { + return encryptionAlgorithm; + } + public long getXceiverStopTimeout() { return xceiverStopTimeout; } @@ -194,4 +218,24 @@ public class DNConf { public long getMaxLockedMemory() { return maxLockedMemory; } + + /** + * Returns the SaslPropertiesResolver configured for use with + * DataTransferProtocol, or null if not configured. + * + * @return SaslPropertiesResolver configured for use with DataTransferProtocol + */ + public SaslPropertiesResolver getSaslPropsResolver() { + return saslPropsResolver; + } + + /** + * Returns the TrustedChannelResolver configured for use with + * DataTransferProtocol, or null if not configured. + * + * @return TrustedChannelResolver configured for use with DataTransferProtocol + */ + public TrustedChannelResolver getTrustedChannelResolver() { + return trustedChannelResolver; + } }