http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index 542161f..bfe6a28 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -18,12 +18,11 @@ package org.apache.hadoop.ozone.container.common.states.endpoint; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; -import org.apache.hadoop.hdsl.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; import org.apache.hadoop.scm.ScmConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,7 @@ public final class RegisterEndpointTask implements private final EndpointStateMachine rpcEndPoint; private final Configuration conf; private Future<EndpointStateMachine.EndPointStates> result; - private ContainerNodeIDProto containerNodeIDProto; + private DatanodeDetailsProto datanodeDetailsProto; /** * Creates a register endpoint task. @@ -59,22 +58,22 @@ public final class RegisterEndpointTask implements } /** - * Get the ContainerNodeID Proto. + * Get the DatanodeDetailsProto Proto. * - * @return ContainerNodeIDProto + * @return DatanodeDetailsProto */ - public ContainerNodeIDProto getContainerNodeIDProto() { - return containerNodeIDProto; + public DatanodeDetailsProto getDatanodeDetailsProto() { + return datanodeDetailsProto; } /** * Set the contiainerNodeID Proto. * - * @param containerNodeIDProto - Container Node ID. + * @param datanodeDetailsProto - Container Node ID. */ - public void setContainerNodeIDProto(ContainerNodeIDProto - containerNodeIDProto) { - this.containerNodeIDProto = containerNodeIDProto; + public void setDatanodeDetailsProto( + DatanodeDetailsProto datanodeDetailsProto) { + this.datanodeDetailsProto = datanodeDetailsProto; } /** @@ -86,7 +85,7 @@ public final class RegisterEndpointTask implements @Override public EndpointStateMachine.EndPointStates call() throws Exception { - if (getContainerNodeIDProto() == null) { + if (getDatanodeDetailsProto() == null) { LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " + "shutting down the endpoint."); return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); @@ -94,11 +93,9 @@ public final class RegisterEndpointTask implements rpcEndPoint.lock(); try { - DatanodeID dnNodeID = DatanodeID.getFromProtoBuf( - getContainerNodeIDProto().getDatanodeID()); // TODO : Add responses to the command Queue. - rpcEndPoint.getEndPoint().register(dnNodeID, + rpcEndPoint.getEndPoint().register(datanodeDetailsProto, conf.getStrings(ScmConfigKeys.OZONE_SCM_NAMES)); EndpointStateMachine.EndPointStates nextState = rpcEndPoint.getState().getNextState(); @@ -129,7 +126,7 @@ public final class RegisterEndpointTask implements public static class Builder { private EndpointStateMachine endPointStateMachine; private Configuration conf; - private ContainerNodeIDProto containerNodeIDProto; + private DatanodeDetails datanodeDetails; /** * Constructs the builder class. @@ -162,11 +159,11 @@ public final class RegisterEndpointTask implements /** * Sets the NodeID. * - * @param nodeID - NodeID proto + * @param dnDetails - NodeID proto * @return Builder */ - public Builder setNodeID(ContainerNodeIDProto nodeID) { - this.containerNodeIDProto = nodeID; + public Builder setDatanodeDetails(DatanodeDetails dnDetails) { + this.datanodeDetails = dnDetails; return this; } @@ -183,15 +180,15 @@ public final class RegisterEndpointTask implements " construct RegisterEndpoint task"); } - if (containerNodeIDProto == null) { - LOG.error("No nodeID specified."); + if (datanodeDetails == null) { + LOG.error("No datanode specified."); throw new IllegalArgumentException("A vaild Node ID is needed to " + "construct RegisterEndpoint task"); } RegisterEndpointTask task = new RegisterEndpointTask(this .endPointStateMachine, this.conf); - task.setContainerNodeIDProto(containerNodeIDProto); + task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage()); return task; } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index 6e654bc..bd180ef 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -27,6 +27,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; @@ -57,7 +58,7 @@ public final class XceiverServer implements XceiverServerSpi { * * @param conf - Configuration */ - public XceiverServer(Configuration conf, + public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf, ContainerDispatcher dispatcher) { Preconditions.checkNotNull(conf); @@ -78,6 +79,7 @@ public final class XceiverServer implements XceiverServerSpi { + "fallback to use default port {}", this.port, e); } } + datanodeDetails.setContainerPort(port); this.storageContainer = dispatcher; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index c72a0e2..7aee5bb 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -21,7 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.transport.server @@ -62,7 +62,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { private final RaftServer server; private ThreadPoolExecutor writeChunkExecutor; - private XceiverServerRatis(DatanodeID id, int port, String storageDir, + private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, ContainerDispatcher dispatcher, Configuration conf) throws IOException { final String rpcType = conf.get( @@ -80,7 +80,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); - Objects.requireNonNull(id, "id == null"); + Objects.requireNonNull(dd, "id == null"); this.port = port; RaftProperties serverProperties = newRaftProperties(rpc, port, storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize); @@ -93,7 +93,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { ContainerStateMachine stateMachine = new ContainerStateMachine(dispatcher, writeChunkExecutor); this.server = RaftServer.newBuilder() - .setServerId(RatisHelper.toRaftPeerId(id)) + .setServerId(RatisHelper.toRaftPeerId(dd)) .setGroup(RatisHelper.emptyRaftGroup()) .setProperties(serverProperties) .setStateMachine(stateMachine) @@ -131,9 +131,9 @@ public final class XceiverServerRatis implements XceiverServerSpi { return properties; } - public static XceiverServerRatis newXceiverServerRatis(DatanodeID datanodeID, - Configuration ozoneConf, ContainerDispatcher dispatcher) - throws IOException { + public static XceiverServerRatis newXceiverServerRatis( + DatanodeDetails datanodeDetails, Configuration ozoneConf, + ContainerDispatcher dispatcher) throws IOException { final String ratisDir = File.separator + "ratis"; int localPort = ozoneConf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, @@ -168,14 +168,15 @@ public final class XceiverServerRatis implements XceiverServerSpi { // directories, so we need to pass different local directory for each // local instance. So we map ratis directories under datanode ID. storageDir = - storageDir.concat(File.separator + datanodeID.getDatanodeUuid()); + storageDir.concat(File.separator + + datanodeDetails.getUuidString()); } catch (IOException e) { LOG.error("Unable find a random free port for the server, " + "fallback to use default port {}", localPort, e); } } - datanodeID.setRatisPort(localPort); - return new XceiverServerRatis(datanodeID, localPort, storageDir, + datanodeDetails.setRatisPort(localPort); + return new XceiverServerRatis(datanodeDetails, localPort, storageDir, dispatcher, ozoneConf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index d507b50..0ef9406 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,8 +19,8 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; @@ -81,8 +81,9 @@ public class OzoneContainer { * @param ozoneConfig - Config * @throws IOException */ - public OzoneContainer(DatanodeID datanodeID, Configuration ozoneConfig) throws - IOException { + public OzoneContainer( + DatanodeDetails datanodeDetails, Configuration ozoneConfig) + throws IOException { this.ozoneConfig = ozoneConfig; List<StorageLocation> locations = new LinkedList<>(); String[] paths = ozoneConfig.getStrings( @@ -97,7 +98,7 @@ public class OzoneContainer { } manager = new ContainerManagerImpl(); - manager.init(this.ozoneConfig, locations, datanodeID); + manager.init(this.ozoneConfig, locations, datanodeDetails); this.chunkManager = new ChunkManagerImpl(manager); manager.setChunkManager(this.chunkManager); @@ -116,9 +117,9 @@ public class OzoneContainer { this.dispatcher = new Dispatcher(manager, this.ozoneConfig); server = new XceiverServerSpi[]{ - new XceiverServer(this.ozoneConfig, this.dispatcher), + new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher), XceiverServerRatis - .newXceiverServerRatis(datanodeID, ozoneConfig, dispatcher) + .newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher) }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index ea00f76..fbb8426 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; @@ -45,23 +45,23 @@ public interface StorageContainerDatanodeProtocol { /** * Used by data node to send a Heartbeat. - * @param datanodeID - Datanode ID. + * @param datanodeDetails - Datanode Details. * @param nodeReport - node report state * @param reportState - container report state. * @return - SCMHeartbeatResponseProto * @throws IOException */ - SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, + SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport, ReportState reportState) throws IOException; /** * Register Datanode. - * @param datanodeID - DatanodID. + * @param datanodeDetails - Datanode Details. * @param scmAddresses - List of SCMs this datanode is configured to * communicate. * @return SCM Command. */ - SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails, String[] scmAddresses) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index bf52d6b..fffbfd1 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos.ReportState; @@ -50,20 +50,19 @@ public interface StorageContainerNodeProtocol { /** * Register the node if the node finds that it is not registered with any SCM. - * @param datanodeID - Send datanodeID with Node info, but datanode UUID is - * empty. Server returns a datanodeID for the given node. + * @param datanodeDetails DatanodeDetails * @return SCMHeartbeatResponseProto */ - SCMCommand register(DatanodeID datanodeID); + SCMCommand register(DatanodeDetailsProto datanodeDetails); /** * Send heartbeat to indicate the datanode is alive and doing well. - * @param datanodeID - Datanode ID. + * @param datanodeDetails - Datanode ID. * @param nodeReport - node report. * @param reportState - container report. * @return SCMheartbeat response list */ - List<SCMCommand> sendHeartbeat(DatanodeID datanodeID, + List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport, ReportState reportState); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index 9885ced..4abd8a6 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -118,17 +118,18 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB /** * Send by datanode to SCM. * - * @param datanodeID - DatanodeID + * @param datanodeDetailsProto - Datanode Details * @param nodeReport - node report * @throws IOException */ @Override - public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, + public SCMHeartbeatResponseProto sendHeartbeat( + DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, ReportState reportState) throws IOException { SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto .newBuilder(); - req.setDatanodeID(datanodeID.getProtoBufMessage()); + req.setDatanodeDetails(datanodeDetailsProto); req.setNodeReport(nodeReport); req.setContainerReportState(reportState); final SCMHeartbeatResponseProto resp; @@ -143,15 +144,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB /** * Register Datanode. * - * @param datanodeID - DatanodID. + * @param datanodeDetailsProto - Datanode Details * @return SCM Command. */ @Override - public SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + public SCMRegisteredCmdResponseProto register( + DatanodeDetailsProto datanodeDetailsProto, String[] scmAddresses) throws IOException { SCMRegisterRequestProto.Builder req = SCMRegisterRequestProto.newBuilder(); - req.setDatanodeID(datanodeID.getProtoBufMessage()); + req.setDatanodeDetails(datanodeDetailsProto); final SCMRegisteredCmdResponseProto response; try { response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 9ee5edc..cd2fb59 100644 --- a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; @@ -69,8 +68,7 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } try { - return impl.register(DatanodeID.getFromProtoBuf(request - .getDatanodeID()), addressArray); + return impl.register(request.getDatanodeDetails(), addressArray); } catch (IOException e) { throw new ServiceException(e); } @@ -81,8 +79,8 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB sendHeartbeat(RpcController controller, SCMHeartbeatRequestProto request) throws ServiceException { try { - return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request - .getDatanodeID()), request.getNodeReport(), + return impl.sendHeartbeat(request.getDatanodeDetails(), + request.getNodeReport(), request.getContainerReportState()); } catch (IOException e) { throw new ServiceException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdsl/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index cdca62e..187ecda 100644 --- a/hadoop-hdsl/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdsl/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -32,12 +32,6 @@ option java_generate_equals_and_hash = true; package hadoop.hdsl; -import "hdfs.proto"; - -import "HdfsServer.proto"; - -import "DatanodeProtocol.proto"; - import "hdsl.proto"; @@ -46,7 +40,7 @@ import "hdsl.proto"; * registering with the node manager. */ message SCMHeartbeatRequestProto { - required hadoop.hdfs.DatanodeIDProto datanodeID = 1; + required DatanodeDetailsProto datanodeDetails = 1; optional SCMNodeReport nodeReport = 2; optional ReportState containerReportState = 3; } @@ -125,7 +119,7 @@ message ContainerReportsRequestProto { fullReport = 0; deltaReport = 1; } - required hadoop.hdfs.DatanodeIDProto datanodeID = 1; + required DatanodeDetailsProto datanodeDetails = 1; repeated ContainerInfo reports = 2; required reportType type = 3; } @@ -146,11 +140,11 @@ message SCMStorageReport { optional uint64 capacity = 2 [default = 0]; optional uint64 scmUsed = 3 [default = 0]; optional uint64 remaining = 4 [default = 0]; - optional hadoop.hdfs.StorageTypeProto storageType = 5 [default = DISK]; + //optional hadoop.hdfs.StorageTypeProto storageType = 5 [default = DISK]; } message SCMRegisterRequestProto { - required hadoop.hdfs.DatanodeIDProto datanodeID = 1; + required DatanodeDetailsProto datanodeDetails = 1; optional SCMNodeAddressList addressList = 2; } @@ -196,17 +190,6 @@ message SCMRegisteredCmdResponseProto { message SCMReregisterCmdResponseProto {} /** - * Container ID maintains the container's Identity along with cluster ID - * after the registration is done. - */ -message ContainerNodeIDProto { - required hadoop.hdfs.DatanodeIDProto datanodeID = 1; - optional string clusterID = 2; -} - - - -/** This command tells the data node to send in the container report when possible */ message SendContainerReportProto { @@ -326,7 +309,8 @@ message ContainerBlocksDeletionACKProto { * it needs to do a registration. * * If registration is need datanode moves into REGISTER state. It will - * send a register call with datanodeID data structure and presist that info. + * send a register call with DatanodeDetailsProto data structure and presist + * that info. * * The response to the command contains clusterID. This information is * also persisted by the datanode and moves into heartbeat state. http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index d15c9e6..4349b1a 100644 --- a/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -17,7 +17,8 @@ package org.apache.hadoop.ozone.container.common; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos; @@ -50,7 +51,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { private AtomicInteger containerReportsCount = new AtomicInteger(0); // Map of datanode to containers - private Map<DatanodeID, Map<String, ContainerInfo>> nodeContainers = + private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers = new HashMap(); /** * Returns the number of heartbeats made to this class. @@ -161,15 +162,16 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { /** * Used by data node to send a Heartbeat. * - * @param datanodeID - Datanode ID. + * @param datanodeDetailsProto - DatanodeDetailsProto. * @param nodeReport - node report. * @return - SCMHeartbeatResponseProto * @throws IOException */ @Override public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto - sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport, - ReportState scmReportState) throws IOException { + sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto, + SCMNodeReport nodeReport, ReportState scmReportState) + throws IOException { rpcCount.incrementAndGet(); heartbeatCount.incrementAndGet(); this.reportState = scmReportState; @@ -183,21 +185,22 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { /** * Register Datanode. * - * @param datanodeID - DatanodID. + * @param datanodeDetailsProto DatanodDetailsProto. * @param scmAddresses - List of SCMs this datanode is configured to * communicate. * @return SCM Command. */ @Override public StorageContainerDatanodeProtocolProtos - .SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, - String[] scmAddresses) throws IOException { + .SCMRegisteredCmdResponseProto register( + DatanodeDetailsProto datanodeDetailsProto, String[] scmAddresses) + throws IOException { rpcCount.incrementAndGet(); sleepIfNeeded(); return StorageContainerDatanodeProtocolProtos .SCMRegisteredCmdResponseProto .newBuilder().setClusterID(UUID.randomUUID().toString()) - .setDatanodeUUID(datanodeID.getDatanodeUuid()).setErrorCode( + .setDatanodeUUID(datanodeDetailsProto.getUuid()).setErrorCode( StorageContainerDatanodeProtocolProtos .SCMRegisteredCmdResponseProto.ErrorCode.success).build(); } @@ -216,7 +219,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { Preconditions.checkNotNull(reports); containerReportsCount.incrementAndGet(); - DatanodeID datanode = DatanodeID.getFromProtoBuf(reports.getDatanodeID()); + DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf( + reports.getDatanodeDetails()); if (reports.getReportsCount() > 0) { Map containers = nodeContainers.get(datanode); if (containers == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index 7e664ff..f05ba49 100644 --- a/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdsl/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -19,8 +19,7 @@ package org.apache.hadoop.ozone.container.common; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; @@ -48,6 +47,7 @@ import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -154,7 +154,7 @@ public class TestDatanodeStateMachine { public void testStartStopDatanodeStateMachine() throws IOException, InterruptedException, TimeoutException { try (DatanodeStateMachine stateMachine = - new DatanodeStateMachine(DFSTestUtil.getLocalDatanodeID(), conf)) { + new DatanodeStateMachine(getNewDatanodeDetails(), conf)) { stateMachine.startDaemon(); SCMConnectionManager connectionManager = stateMachine.getConnectionManager(); @@ -204,12 +204,13 @@ public class TestDatanodeStateMachine { File idPath = new File( conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID)); idPath.delete(); - DatanodeID dnID = DFSTestUtil.getLocalDatanodeID(); - dnID.setContainerPort(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); - ContainerUtils.writeDatanodeIDTo(dnID, idPath); + DatanodeDetails datanodeDetails = getNewDatanodeDetails(); + datanodeDetails.setContainerPort( + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath); - try (DatanodeStateMachine stateMachine = new DatanodeStateMachine( - DFSTestUtil.getLocalDatanodeID(), conf)) { + try (DatanodeStateMachine stateMachine = + new DatanodeStateMachine(datanodeDetails, conf)) { DatanodeStateMachine.DatanodeStates currentState = stateMachine.getContext().getState(); Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT, @@ -341,7 +342,7 @@ public class TestDatanodeStateMachine { perTestConf.setStrings(entry.getKey(), entry.getValue()); LOG.info("Test with {} = {}", entry.getKey(), entry.getValue()); try (DatanodeStateMachine stateMachine = new DatanodeStateMachine( - DFSTestUtil.getLocalDatanodeID(), perTestConf)) { + getNewDatanodeDetails(), perTestConf)) { DatanodeStateMachine.DatanodeStates currentState = stateMachine.getContext().getState(); Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT, @@ -358,4 +359,17 @@ public class TestDatanodeStateMachine { } }); } + + private DatanodeDetails getNewDatanodeDetails() { + return DatanodeDetails.newBuilder() + .setUuid(UUID.randomUUID().toString()) + .setHostName("localhost") + .setIpAddress("127.0.0.1") + .setInfoPort(0) + .setInfoSecurePort(0) + .setContainerPort(0) + .setRatisPort(0) + .setOzoneRestPort(0) + .build(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 99e16de..88391e2 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -28,8 +28,8 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdsl.conf.OzoneConfiguration; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdsl.HdslUtils; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -48,6 +48,7 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.NodeState; import org.apache.hadoop.hdsl.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos; @@ -665,11 +666,11 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl throw new IllegalArgumentException("Not Supported yet"); } - List<DatanodeID> datanodes = queryNode(nodeStatuses); + List<DatanodeDetails> datanodes = queryNode(nodeStatuses); HdslProtos.NodePool.Builder poolBuilder = HdslProtos.NodePool.newBuilder(); - for (DatanodeID datanode : datanodes) { + for (DatanodeDetails datanode : datanodes) { HdslProtos.Node node = HdslProtos.Node.newBuilder() .setNodeID(datanode.getProtoBufMessage()) .addAllNodeStates(nodeStatuses) @@ -746,15 +747,15 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl * @return List of Datanodes. */ - public List<DatanodeID> queryNode(EnumSet<NodeState> nodeStatuses) { + public List<DatanodeDetails> queryNode(EnumSet<NodeState> nodeStatuses) { Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null"); Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " + "in the query set"); - List<DatanodeID> resultList = new LinkedList<>(); - Set<DatanodeID> currentSet = new TreeSet<>(); + List<DatanodeDetails> resultList = new LinkedList<>(); + Set<DatanodeDetails> currentSet = new TreeSet<>(); for (NodeState nodeState : nodeStatuses) { - Set<DatanodeID> nextSet = queryNodeState(nodeState); + Set<DatanodeDetails> nextSet = queryNodeState(nodeState); if ((nextSet == null) || (nextSet.size() == 0)) { // Right now we only support AND operation. So intersect with // any empty set is null. @@ -779,13 +780,13 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl * @param nodeState - NodeState that we are interested in matching. * @return Set of Datanodes that match the NodeState. */ - private Set<DatanodeID> queryNodeState(NodeState nodeState) { + private Set<DatanodeDetails> queryNodeState(NodeState nodeState) { if (nodeState == NodeState.RAFT_MEMBER || nodeState == NodeState.FREE_NODE) { throw new IllegalStateException("Not implemented yet"); } - Set<DatanodeID> returnSet = new TreeSet<>(); - List<DatanodeID> tmp = getScmNodeManager().getNodes(nodeState); + Set<DatanodeDetails> returnSet = new TreeSet<>(); + List<DatanodeDetails> tmp = getScmNodeManager().getNodes(nodeState); if ((tmp != null) && (tmp.size() > 0)) { returnSet.addAll(tmp); } @@ -945,20 +946,22 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl /** * Used by data node to send a Heartbeat. * - * @param datanodeID - Datanode ID. + * @param datanodeDetails - Datanode Details. * @param nodeReport - Node Report * @param reportState - Container report ready info. * @return - SCMHeartbeatResponseProto * @throws IOException */ @Override - public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport, ReportState reportState) throws IOException { + public SCMHeartbeatResponseProto sendHeartbeat( + DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport, + ReportState reportState) throws IOException { List<SCMCommand> commands = - getScmNodeManager().sendHeartbeat(datanodeID, nodeReport, reportState); + getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport, + reportState); List<SCMCommandResponseProto> cmdResponses = new LinkedList<>(); for (SCMCommand cmd : commands) { - cmdResponses.add(getCommandResponse(cmd, datanodeID.getDatanodeUuid() + cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid() .toString())); } return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses) @@ -968,17 +971,17 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl /** * Register Datanode. * - * @param datanodeID - DatanodID. + * @param datanodeDetails - DatanodID. * @param scmAddresses - List of SCMs this datanode is configured to * communicate. * @return SCM Command. */ @Override public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto - register(DatanodeID datanodeID, String[] scmAddresses) - throws IOException { + register(DatanodeDetailsProto datanodeDetails, String[] scmAddresses) { // TODO : Return the list of Nodes that forms the SCM HA. - return getRegisteredResponse(scmNodeManager.register(datanodeID), null); + return getRegisteredResponse( + scmNodeManager.register(datanodeDetails), null); } /** @@ -1020,7 +1023,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl // Update container stat entry, this will trigger a removal operation if it // exists in cache. synchronized (containerReportCache) { - String datanodeUuid = reports.getDatanodeID().getDatanodeUuid(); + String datanodeUuid = reports.getDatanodeDetails().getUuid(); if (datanodeUuid != null && newStat != null) { containerReportCache.put(datanodeUuid, newStat); // update global view container metrics http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java index 88a661c..9a208d5 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java @@ -20,9 +20,10 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.ozone.scm.container.Mapping; import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; @@ -41,7 +42,7 @@ public class DatanodeDeletedBlockTransactions { private int currentTXNum; private Mapping mappingService; // A list of TXs mapped to a certain datanode ID. - private final ArrayListMultimap<DatanodeID, DeletedBlocksTransaction> + private final ArrayListMultimap<UUID, DeletedBlocksTransaction> transactions; DatanodeDeletedBlockTransactions(Mapping mappingService, @@ -67,7 +68,8 @@ public class DatanodeDeletedBlockTransactions { return; } - for (DatanodeID dnID : info.getPipeline().getMachines()) { + for (DatanodeDetails dd : info.getPipeline().getMachines()) { + UUID dnID = dd.getUuid(); if (transactions.containsKey(dnID)) { List<DeletedBlocksTransaction> txs = transactions.get(dnID); if (txs != null && txs.size() < maximumAllowedTXNum) { @@ -93,7 +95,7 @@ public class DatanodeDeletedBlockTransactions { } } - Set<DatanodeID> getDatanodes() { + Set<UUID> getDatanodeIDs() { return transactions.keySet(); } @@ -101,18 +103,18 @@ public class DatanodeDeletedBlockTransactions { return transactions.isEmpty(); } - boolean hasTransactions(DatanodeID dnID) { - return transactions.containsKey(dnID) && !transactions.get(dnID).isEmpty(); + boolean hasTransactions(UUID dnId) { + return transactions.containsKey(dnId) && + !transactions.get(dnId).isEmpty(); } - List<DeletedBlocksTransaction> getDatanodeTransactions( - DatanodeID dnID) { - return transactions.get(dnID); + List<DeletedBlocksTransaction> getDatanodeTransactions(UUID dnId) { + return transactions.get(dnId); } - List<String> getTransactionIDList(DatanodeID dnID) { - if (hasTransactions(dnID)) { - return transactions.get(dnID).stream() + List<String> getTransactionIDList(UUID dnId) { + if (hasTransactions(dnId)) { + return transactions.get(dnId).stream() .map(DeletedBlocksTransaction::getTxID).map(String::valueOf) .collect(Collectors.toList()); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java index e45bd1c..5d7c2d5 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java @@ -20,7 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.NodeState; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -39,6 +39,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTA import java.io.IOException; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; /** @@ -114,7 +115,7 @@ public class SCMBlockDeletingService extends BackgroundService { // to delete blocks. LOG.debug("Running DeletedBlockTransactionScanner"); DatanodeDeletedBlockTransactions transactions = null; - List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); if (datanodes != null) { transactions = new DatanodeDeletedBlockTransactions(mappingService, blockDeleteLimitSize, datanodes.size()); @@ -133,22 +134,22 @@ public class SCMBlockDeletingService extends BackgroundService { } if (transactions != null && !transactions.isEmpty()) { - for (DatanodeID datanodeID : transactions.getDatanodes()) { + for (UUID dnId : transactions.getDatanodeIDs()) { List<DeletedBlocksTransaction> dnTXs = transactions - .getDatanodeTransactions(datanodeID); + .getDatanodeTransactions(dnId); if (dnTXs != null && !dnTXs.isEmpty()) { dnTxCount += dnTXs.size(); // TODO commandQueue needs a cap. // We should stop caching new commands if num of un-processed // command is bigger than a limit, e.g 50. In case datanode goes // offline for sometime, the cached commands be flooded. - nodeManager.addDatanodeCommand(datanodeID, + nodeManager.addDatanodeCommand(dnId, new DeleteBlocksCommand(dnTXs)); LOG.debug( "Added delete block command for datanode {} in the queue," + " number of delete block transactions: {}, TxID list: {}", - datanodeID, dnTXs.size(), String.join(",", - transactions.getTransactionIDList(datanodeID))); + dnId, dnTXs.size(), String.join(",", + transactions.getTransactionIDList(dnId))); } } } @@ -157,7 +158,7 @@ public class SCMBlockDeletingService extends BackgroundService { LOG.info( "Totally added {} delete blocks command for" + " {} datanodes, task elapsed time: {}ms", - dnTxCount, transactions.getDatanodes().size(), + dnTxCount, transactions.getDatanodeIDs().size(), Time.monotonicNow() - startTime); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 66eda00..112e58e 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -415,7 +415,7 @@ public class ContainerMapping implements Mapping { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + " {}, for container: {}, reason: container doesn't exist in" + - "container database.", reports.getDatanodeID(), + "container database.", reports.getDatanodeDetails(), datanodeState.getContainerName()); } } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java index 029d015..c925695 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java @@ -22,8 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -128,9 +127,10 @@ public class ContainerCloser { // to SCM. In that case also, data node will ignore this command. HdslProtos.Pipeline pipeline = info.getPipeline(); - for (HdfsProtos.DatanodeIDProto datanodeID : + for (HdslProtos.DatanodeDetailsProto datanodeDetails : pipeline.getPipelineChannel().getMembersList()) { - nodeManager.addDatanodeCommand(DatanodeID.getFromProtoBuf(datanodeID), + nodeManager.addDatanodeCommand( + DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(), new CloseContainerCommand(info.getContainerName())); } if (!commandIssued.containsKey(info.getContainerName())) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/ContainerPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/ContainerPlacementPolicy.java index 0cf1fde..1129d93 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/ContainerPlacementPolicy.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/ContainerPlacementPolicy.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.scm.container.placement.algorithms; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import java.io.IOException; import java.util.List; @@ -36,6 +36,6 @@ public interface ContainerPlacementPolicy { * @return list of datanodes chosen. * @throws IOException */ - List<DatanodeID> chooseDatanodes(int nodesRequired, long sizeRequired) + List<DatanodeDetails> chooseDatanodes(int nodesRequired, long sizeRequired) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java index 351d41c..71e5ebb 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.scm.container.placement.algorithms; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.exceptions.SCMException; @@ -101,9 +101,9 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { * @throws SCMException SCM exception. */ - public List<DatanodeID> chooseDatanodes(int nodesRequired, final long + public List<DatanodeDetails> chooseDatanodes(int nodesRequired, final long sizeRequired) throws SCMException { - List<DatanodeID> healthyNodes = + List<DatanodeDetails> healthyNodes = nodeManager.getNodes(HdslProtos.NodeState.HEALTHY); String msg; if (healthyNodes.size() == 0) { @@ -121,7 +121,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); } - List<DatanodeID> healthyList = healthyNodes.stream().filter(d -> + List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> hasEnoughSpace(d, sizeRequired)).collect(Collectors.toList()); if (healthyList.size() < nodesRequired) { @@ -140,11 +140,12 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { /** * Returns true if this node has enough space to meet our requirement. * - * @param datanodeID DatanodeID + * @param datanodeDetails DatanodeDetails * @return true if we have enough space. */ - private boolean hasEnoughSpace(DatanodeID datanodeID, long sizeRequired) { - SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeID); + private boolean hasEnoughSpace(DatanodeDetails datanodeDetails, + long sizeRequired) { + SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails); return (nodeMetric != null) && nodeMetric.get().getRemaining() .hasResources(sizeRequired); } @@ -159,12 +160,13 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { * @return List of Datanodes that can be used for placement. * @throws SCMException */ - public List<DatanodeID> getResultSet(int nodesRequired, List<DatanodeID> - healthyNodes) throws SCMException { - List<DatanodeID> results = new LinkedList<>(); + public List<DatanodeDetails> getResultSet( + int nodesRequired, List<DatanodeDetails> healthyNodes) + throws SCMException { + List<DatanodeDetails> results = new LinkedList<>(); for (int x = 0; x < nodesRequired; x++) { // invoke the choose function defined in the derived classes. - DatanodeID nodeId = chooseNode(healthyNodes); + DatanodeDetails nodeId = chooseNode(healthyNodes); if (nodeId != null) { results.add(nodeId); } @@ -186,9 +188,10 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { * PlacementRandom. * * @param healthyNodes - Set of healthy nodes we can choose from. - * @return DatanodeID + * @return DatanodeDetails */ - public abstract DatanodeID chooseNode(List<DatanodeID> healthyNodes); + public abstract DatanodeDetails chooseNode( + List<DatanodeDetails> healthyNodes); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java index de9bb74..90d301c 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.scm.container.placement.algorithms; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -89,9 +89,9 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { * @throws SCMException SCMException */ @Override - public List<DatanodeID> chooseDatanodes(final int nodesRequired, - final long sizeRequired) throws SCMException { - List<DatanodeID> healthyNodes = + public List<DatanodeDetails> chooseDatanodes( + final int nodesRequired, final long sizeRequired) throws SCMException { + List<DatanodeDetails> healthyNodes = super.chooseDatanodes(nodesRequired, sizeRequired); if (healthyNodes.size() == nodesRequired) { return healthyNodes; @@ -105,29 +105,29 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { * * @param healthyNodes - List of healthy nodes that meet the size * requirement. - * @return DatanodeID that is chosen. + * @return DatanodeDetails that is chosen. */ @Override - public DatanodeID chooseNode(List<DatanodeID> healthyNodes) { + public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) { int firstNodeNdx = getRand().nextInt(healthyNodes.size()); int secondNodeNdx = getRand().nextInt(healthyNodes.size()); - DatanodeID chosenID; + DatanodeDetails datanodeDetails; // There is a possibility that both numbers will be same. // if that is so, we just return the node. if (firstNodeNdx == secondNodeNdx) { - chosenID = healthyNodes.get(firstNodeNdx); + datanodeDetails = healthyNodes.get(firstNodeNdx); } else { - DatanodeID firstNodeID = healthyNodes.get(firstNodeNdx); - DatanodeID secondNodeID = healthyNodes.get(secondNodeNdx); + DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx); + DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx); SCMNodeMetric firstNodeMetric = - getNodeManager().getNodeStat(firstNodeID); + getNodeManager().getNodeStat(firstNodeDetails); SCMNodeMetric secondNodeMetric = - getNodeManager().getNodeStat(secondNodeID); - chosenID = firstNodeMetric.isGreater(secondNodeMetric.get()) - ? firstNodeID : secondNodeID; + getNodeManager().getNodeStat(secondNodeDetails); + datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get()) + ? firstNodeDetails : secondNodeDetails; } - healthyNodes.remove(chosenID); - return chosenID; + healthyNodes.remove(datanodeDetails); + return datanodeDetails; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java index b145b14..f469762 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.scm.container.placement.algorithms; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.slf4j.Logger; @@ -62,9 +62,9 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy * @throws SCMException SCMException */ @Override - public List<DatanodeID> chooseDatanodes(final int nodesRequired, - final long sizeRequired) throws SCMException { - List<DatanodeID> healthyNodes = + public List<DatanodeDetails> chooseDatanodes( + final int nodesRequired, final long sizeRequired) throws SCMException { + List<DatanodeDetails> healthyNodes = super.chooseDatanodes(nodesRequired, sizeRequired); if (healthyNodes.size() == nodesRequired) { @@ -80,8 +80,8 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy * @param healthyNodes - all healthy datanodes. * @return one randomly chosen datanode that from two randomly chosen datanode */ - public DatanodeID chooseNode(final List<DatanodeID> healthyNodes) { - DatanodeID selectedNode = + public DatanodeDetails chooseNode(final List<DatanodeDetails> healthyNodes) { + DatanodeDetails selectedNode = healthyNodes.get(getRand().nextInt(healthyNodes.size())); healthyNodes.remove(selectedNode); return selectedNode; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java index 62bf5d7..e51ad79 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java @@ -20,7 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.exceptions.SCMException; @@ -288,11 +288,11 @@ public class ContainerSupervisor implements Closeable { */ public void handleContainerReport( ContainerReportsRequestProto containerReport) { - DatanodeID datanodeID = DatanodeID.getFromProtoBuf( - containerReport.getDatanodeID()); + DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( + containerReport.getDatanodeDetails()); inProgressPoolListLock.readLock().lock(); try { - String poolName = poolManager.getNodePool(datanodeID); + String poolName = poolManager.getNodePool(datanodeDetails); for (InProgressPool ppool : inProgressPoolList) { if (ppool.getPoolName().equalsIgnoreCase(poolName)) { ppool.handleContainerReport(containerReport); @@ -302,11 +302,12 @@ public class ContainerSupervisor implements Closeable { // TODO: Decide if we can do anything else with this report. LOG.debug("Discarding the container report for pool {}. " + "That pool is not currently in the pool reconciliation process." + - " Container Name: {}", poolName, containerReport.getDatanodeID()); + " Container Name: {}", poolName, + containerReport.getDatanodeDetails()); } catch (SCMException e) { LOG.warn("Skipping processing container report from datanode {}, " + "cause: failed to get the corresponding node pool", - datanodeID.toString(), e); + datanodeDetails.toString(), e); } finally { inProgressPoolListLock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java index 24d7bd7..5b2dd0f 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.scm.container.replication; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.NodeState; import org.apache.hadoop.hdsl.protocol.proto @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -62,7 +63,7 @@ public final class InProgressPool { private final NodePoolManager poolManager; private final ExecutorService executorService; private final Map<String, Integer> containerCountMap; - private final Map<String, Boolean> processedNodeSet; + private final Map<UUID, Boolean> processedNodeSet; private final long startTime; private ProgressStatus status; private AtomicInteger nodeCount; @@ -165,9 +166,9 @@ public final class InProgressPool { * Starts the reconciliation process for all the nodes in the pool. */ public void startReconciliation() { - List<DatanodeID> datanodeIDList = + List<DatanodeDetails> datanodeDetailsList = this.poolManager.getNodes(pool.getPoolName()); - if (datanodeIDList.size() == 0) { + if (datanodeDetailsList.size() == 0) { LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ", pool.getPoolName()); this.status = ProgressStatus.Error; @@ -181,14 +182,14 @@ public final class InProgressPool { Ask each datanode to send us commands. */ SendContainerCommand cmd = SendContainerCommand.newBuilder().build(); - for (DatanodeID id : datanodeIDList) { - NodeState currentState = getNodestate(id); + for (DatanodeDetails dd : datanodeDetailsList) { + NodeState currentState = getNodestate(dd); if (currentState == HEALTHY || currentState == STALE) { nodeCount.incrementAndGet(); // Queue commands to all datanodes in this pool to send us container // report. Since we ignore dead nodes, it is possible that we would have // over replicated the container if the node comes back. - nodeManager.addDatanodeCommand(id, cmd); + nodeManager.addDatanodeCommand(dd.getUuid(), cmd); } } this.status = ProgressStatus.InProgress; @@ -198,10 +199,10 @@ public final class InProgressPool { /** * Gets the node state. * - * @param id - datanode ID. + * @param datanode - datanode information. * @return NodeState. */ - private NodeState getNodestate(DatanodeID id) { + private NodeState getNodestate(DatanodeDetails datanode) { NodeState currentState = INVALID; int maxTry = 100; // We need to loop to make sure that we will retry if we get @@ -212,7 +213,7 @@ public final class InProgressPool { while (currentState == INVALID && currentTry < maxTry) { // Retry to make sure that we deal with the case of node state not // known. - currentState = nodeManager.getNodeState(id); + currentState = nodeManager.getNodeState(datanode); currentTry++; if (currentState == INVALID) { // Sleep to make sure that this is not a tight loop. @@ -222,7 +223,7 @@ public final class InProgressPool { if (currentState == INVALID) { LOG.error("Not able to determine the state of Node: {}, Exceeded max " + "try and node manager returns INVALID state. This indicates we " + - "are dealing with a node that we don't know about.", id); + "are dealing with a node that we don't know about.", datanode); } return currentState; } @@ -248,13 +249,13 @@ public final class InProgressPool { private Runnable processContainerReport( ContainerReportsRequestProto reports) { return () -> { - DatanodeID datanodeID = - DatanodeID.getFromProtoBuf(reports.getDatanodeID()); - if (processedNodeSet.computeIfAbsent(datanodeID.getDatanodeUuid(), + DatanodeDetails datanodeDetails = + DatanodeDetails.getFromProtoBuf(reports.getDatanodeDetails()); + if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(), (k) -> true)) { nodeProcessed.incrementAndGet(); LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed, - datanodeID.getDatanodeUuid()); + datanodeDetails.getUuid()); for (ContainerInfo info : reports.getReportsList()) { containerProcessedCount.incrementAndGet(); LOG.debug("Total Containers processed: {} Container Name: {}", http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java index bbf319b..c376efa 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java @@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.scm.node; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -27,6 +26,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,7 +40,7 @@ import java.util.concurrent.locks.ReentrantLock; public class CommandQueue { // This list is used as default return value. private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>(); - private final Map<DatanodeID, Commands> commandMap; + private final Map<UUID, Commands> commandMap; private final Lock lock; private long commandsInQueue; @@ -82,14 +82,14 @@ public class CommandQueue { * commands returns a empty list otherwise the current set of * commands are returned and command map set to empty list again. * - * @param datanodeID DatanodeID + * @param datanodeUuid Datanode UUID * @return List of SCM Commands. */ @SuppressWarnings("unchecked") - List<SCMCommand> getCommand(final DatanodeID datanodeID) { + List<SCMCommand> getCommand(final UUID datanodeUuid) { lock.lock(); try { - Commands cmds = commandMap.remove(datanodeID); + Commands cmds = commandMap.remove(datanodeUuid); List<SCMCommand> cmdList = null; if(cmds != null) { cmdList = cmds.getCommands(); @@ -106,17 +106,17 @@ public class CommandQueue { /** * Adds a Command to the SCM Queue to send the command to container. * - * @param datanodeID DatanodeID + * @param datanodeUuid DatanodeDetails.Uuid * @param command - Command */ - public void addCommand(final DatanodeID datanodeID, final SCMCommand + public void addCommand(final UUID datanodeUuid, final SCMCommand command) { lock.lock(); try { - if (commandMap.containsKey(datanodeID)) { - commandMap.get(datanodeID).add(command); + if (commandMap.containsKey(datanodeUuid)) { + commandMap.get(datanodeUuid).add(command); } else { - commandMap.put(datanodeID, new Commands(command)); + commandMap.put(datanodeUuid, new Commands(command)); } commandsInQueue++; } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java index 3bdb95e..fe7ff14 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/HeartbeatQueueItem.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.scm.node; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; @@ -30,21 +30,21 @@ import static org.apache.hadoop.util.Time.monotonicNow; * This class represents the item in SCM heartbeat queue. */ public class HeartbeatQueueItem { - private DatanodeID datanodeID; + private DatanodeDetails datanodeDetails; private long recvTimestamp; private SCMNodeReport nodeReport; private ReportState containerReportState; /** * - * @param datanodeID - datanode ID of the heartbeat. + * @param datanodeDetails - datanode ID of the heartbeat. * @param recvTimestamp - heartbeat receive timestamp. * @param nodeReport - node report associated with the heartbeat if any. * @param containerReportState - container report state. */ - HeartbeatQueueItem(DatanodeID datanodeID, long recvTimestamp, + HeartbeatQueueItem(DatanodeDetails datanodeDetails, long recvTimestamp, SCMNodeReport nodeReport, ReportState containerReportState) { - this.datanodeID = datanodeID; + this.datanodeDetails = datanodeDetails; this.recvTimestamp = recvTimestamp; this.nodeReport = nodeReport; this.containerReportState = containerReportState; @@ -53,8 +53,8 @@ public class HeartbeatQueueItem { /** * @return datanode ID. */ - public DatanodeID getDatanodeID() { - return datanodeID; + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; } /** @@ -82,13 +82,13 @@ public class HeartbeatQueueItem { * Builder for HeartbeatQueueItem. */ public static class Builder { - private DatanodeID datanodeID; + private DatanodeDetails datanodeDetails; private SCMNodeReport nodeReport; private ReportState containerReportState; private long recvTimestamp = monotonicNow(); - public Builder setDatanodeID(DatanodeID datanodeId) { - this.datanodeID = datanodeId; + public Builder setDatanodeDetails(DatanodeDetails dnDetails) { + this.datanodeDetails = dnDetails; return this; } @@ -109,7 +109,7 @@ public class HeartbeatQueueItem { } public HeartbeatQueueItem build() { - return new HeartbeatQueueItem(datanodeID, recvTimestamp, nodeReport, + return new HeartbeatQueueItem(datanodeDetails, recvTimestamp, nodeReport, containerReportState); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index 12c94da..c1b2aca 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -18,8 +18,8 @@ package org.apache.hadoop.ozone.scm.node; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.NodeState; @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import java.io.Closeable; import java.util.List; import java.util.Map; +import java.util.UUID; /** * A node manager supports a simple interface for managing a datanode. @@ -60,14 +61,14 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @param node - DataNode. * @throws UnregisteredNodeException */ - void removeNode(DatanodeID node) throws UnregisteredNodeException; + void removeNode(DatanodeDetails node) throws UnregisteredNodeException; /** * Gets all Live Datanodes that is currently communicating with SCM. * @param nodeState - State of the node * @return List of Datanodes that are Heartbeating SCM. */ - List<DatanodeID> getNodes(NodeState nodeState); + List<DatanodeDetails> getNodes(NodeState nodeState); /** * Returns the Number of Datanodes that are communicating with SCM. @@ -79,9 +80,9 @@ public interface NodeManager extends StorageContainerNodeProtocol, /** * Get all datanodes known to SCM. * - * @return List of DatanodeIDs known to SCM. + * @return List of DatanodeDetails known to SCM. */ - List<DatanodeID> getAllNodes(); + List<DatanodeDetails> getAllNodes(); /** * Chill mode is the period when node manager waits for a minimum @@ -113,14 +114,14 @@ public interface NodeManager extends StorageContainerNodeProtocol, * Return a map of node stats. * @return a map of individual node stats (live/stale but not dead). */ - Map<String, SCMNodeStat> getNodeStats(); + Map<UUID, SCMNodeStat> getNodeStats(); /** * Return the node stat of the specified datanode. - * @param datanodeID - datanode ID. + * @param datanodeDetails DatanodeDetails. * @return node stat if it is live/stale, null if it is dead or does't exist. */ - SCMNodeMetric getNodeStat(DatanodeID datanodeID); + SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails); /** * Returns the NodePoolManager associated with the NodeManager. @@ -137,16 +138,16 @@ public interface NodeManager extends StorageContainerNodeProtocol, /** * Returns the node state of a specific node. - * @param id - DatanodeID + * @param datanodeDetails DatanodeDetails * @return Healthy/Stale/Dead. */ - NodeState getNodeState(DatanodeID id); + NodeState getNodeState(DatanodeDetails datanodeDetails); /** * Add a {@link SCMCommand} to the command queue, which are * handled by HB thread asynchronously. - * @param id + * @param dnId datanode uuid * @param command */ - void addDatanodeCommand(DatanodeID id, SCMCommand command); + void addDatanodeCommand(UUID dnId, SCMCommand command); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java index d3218b7..c330526 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/NodePoolManager.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.scm.node; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import java.io.Closeable; @@ -36,7 +36,7 @@ public interface NodePoolManager extends Closeable { * @param pool - name of the node pool. * @param node - data node. */ - void addNode(String pool, DatanodeID node) throws IOException; + void addNode(String pool, DatanodeDetails node) throws IOException; /** * Remove a node from a node pool. @@ -44,7 +44,7 @@ public interface NodePoolManager extends Closeable { * @param node - data node. * @throws SCMException */ - void removeNode(String pool, DatanodeID node) + void removeNode(String pool, DatanodeDetails node) throws SCMException; /** @@ -60,13 +60,13 @@ public interface NodePoolManager extends Closeable { * @return a list of datanode ids or an empty list if the node pool was not * found. */ - List<DatanodeID> getNodes(String pool); + List<DatanodeDetails> getNodes(String pool); /** * Get the node pool name if the node has been added to a node pool. - * @param datanodeID - datanode ID. + * @param datanodeDetails - datanode ID. * @return node pool name if it has been assigned. * null if the node has not been assigned to any node pool yet. */ - String getNodePool(DatanodeID datanodeID) throws SCMException; + String getNodePool(DatanodeDetails datanodeDetails) throws SCMException; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
