http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index a5e1bac..31915b1 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -20,8 +20,10 @@ package org.apache.hadoop.ozone.scm.node; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.hdsl.conf.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; @@ -62,11 +64,13 @@ import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; +import java.net.InetAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledExecutorService; @@ -115,13 +119,13 @@ public class SCMNodeManager /** * Key = NodeID, value = timestamp. */ - private final ConcurrentHashMap<String, Long> healthyNodes; - private final ConcurrentHashMap<String, Long> staleNodes; - private final ConcurrentHashMap<String, Long> deadNodes; + private final ConcurrentHashMap<UUID, Long> healthyNodes; + private final ConcurrentHashMap<UUID, Long> staleNodes; + private final ConcurrentHashMap<UUID, Long> deadNodes; private final Queue<HeartbeatQueueItem> heartbeatQueue; - private final ConcurrentHashMap<String, DatanodeID> nodes; + private final ConcurrentHashMap<UUID, DatanodeDetails> nodes; // Individual live node stats - private final ConcurrentHashMap<String, SCMNodeStat> nodeStats; + private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats; // Aggregated node stats private SCMNodeStat scmStat; // TODO: expose nodeStats and scmStat as metrics @@ -170,7 +174,7 @@ public class SCMNodeManager deadNodes = new ConcurrentHashMap<>(); staleNodes = new ConcurrentHashMap<>(); nodes = new ConcurrentHashMap<>(); - nodeStats = new ConcurrentHashMap(); + nodeStats = new ConcurrentHashMap<>(); scmStat = new SCMNodeStat(); healthyNodeCount = new AtomicInteger(0); @@ -228,7 +232,7 @@ public class SCMNodeManager * @throws UnregisteredNodeException */ @Override - public void removeNode(DatanodeID node) throws UnregisteredNodeException { + public void removeNode(DatanodeDetails node) { // TODO : Fix me when adding the SCM CLI. } @@ -242,9 +246,9 @@ public class SCMNodeManager * @return List of Datanodes that are known to SCM in the requested state. */ @Override - public List<DatanodeID> getNodes(NodeState nodestate) + public List<DatanodeDetails> getNodes(NodeState nodestate) throws IllegalArgumentException { - Map<String, Long> set; + Map<UUID, Long> set; switch (nodestate) { case HEALTHY: synchronized (this) { @@ -272,11 +276,11 @@ public class SCMNodeManager /** * Returns all datanodes that are known to SCM. * - * @return List of DatanodeIDs + * @return List of DatanodeDetails */ @Override - public List<DatanodeID> getAllNodes() { - Map<String, DatanodeID> set; + public List<DatanodeDetails> getAllNodes() { + Map<UUID, DatanodeDetails> set; synchronized (this) { set = Collections.unmodifiableMap(new HashMap<>(nodes)); } @@ -406,11 +410,11 @@ public class SCMNodeManager /** * Returns the node state of a specific node. * - * @param id - DatanodeID + * @param datanodeDetails - Datanode Details * @return Healthy/Stale/Dead/Unknown. */ @Override - public NodeState getNodeState(DatanodeID id) { + public NodeState getNodeState(DatanodeDetails datanodeDetails) { // There is a subtle race condition here, hence we also support // the NODEState.UNKNOWN. It is possible that just before we check the // healthyNodes, we have removed the node from the healthy list but stil @@ -419,15 +423,16 @@ public class SCMNodeManager // then the node is in 2 states to avoid this race condition. Instead we // just deal with the possibilty of getting a state called unknown. - if(healthyNodes.containsKey(id.getDatanodeUuid())) { + UUID id = datanodeDetails.getUuid(); + if(healthyNodes.containsKey(id)) { return HEALTHY; } - if(staleNodes.containsKey(id.getDatanodeUuid())) { + if(staleNodes.containsKey(id)) { return STALE; } - if(deadNodes.containsKey(id.getDatanodeUuid())) { + if(deadNodes.containsKey(id)) { return DEAD; } @@ -477,7 +482,7 @@ public class SCMNodeManager // Iterate over the Stale nodes and decide if we need to move any node to // dead State. long currentTime = monotonicNow(); - for (Map.Entry<String, Long> entry : staleNodes.entrySet()) { + for (Map.Entry<UUID, Long> entry : staleNodes.entrySet()) { if (currentTime - entry.getValue() > deadNodeIntervalMs) { synchronized (this) { moveStaleNodeToDead(entry); @@ -488,7 +493,7 @@ public class SCMNodeManager // Iterate over the healthy nodes and decide if we need to move any node to // Stale State. currentTime = monotonicNow(); - for (Map.Entry<String, Long> entry : healthyNodes.entrySet()) { + for (Map.Entry<UUID, Long> entry : healthyNodes.entrySet()) { if (currentTime - entry.getValue() > staleNodeIntervalMs) { synchronized (this) { moveHealthyNodeToStale(entry); @@ -555,7 +560,7 @@ public class SCMNodeManager * * @param entry - Map Entry */ - private void moveHealthyNodeToStale(Map.Entry<String, Long> entry) { + private void moveHealthyNodeToStale(Map.Entry<UUID, Long> entry) { LOG.trace("Moving healthy node to stale: {}", entry.getKey()); healthyNodes.remove(entry.getKey()); healthyNodeCount.decrementAndGet(); @@ -564,7 +569,7 @@ public class SCMNodeManager if (scmManager != null) { // remove stale node's container report - scmManager.removeContainerReport(entry.getKey()); + scmManager.removeContainerReport(entry.getKey().toString()); } } @@ -573,7 +578,7 @@ public class SCMNodeManager * * @param entry - Map Entry */ - private void moveStaleNodeToDead(Map.Entry<String, Long> entry) { + private void moveStaleNodeToDead(Map.Entry<UUID, Long> entry) { LOG.trace("Moving stale node to dead: {}", entry.getKey()); staleNodes.remove(entry.getKey()); staleNodeCount.decrementAndGet(); @@ -594,8 +599,8 @@ public class SCMNodeManager private void handleHeartbeat(HeartbeatQueueItem hbItem) { lastHBProcessedCount++; - DatanodeID datanodeID = hbItem.getDatanodeID(); - String datanodeUuid = datanodeID.getDatanodeUuid(); + DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails(); + UUID datanodeUuid = datanodeDetails.getUuid(); SCMNodeReport nodeReport = hbItem.getNodeReport(); long recvTimestamp = hbItem.getRecvTimestamp(); long processTimestamp = Time.monotonicNow(); @@ -610,7 +615,7 @@ public class SCMNodeManager if (healthyNodes.containsKey(datanodeUuid)) { healthyNodes.put(datanodeUuid, processTimestamp); updateNodeStat(datanodeUuid, nodeReport); - updateCommandQueue(datanodeID, + updateCommandQueue(datanodeUuid, hbItem.getContainerReportState().getState()); return; } @@ -623,7 +628,7 @@ public class SCMNodeManager healthyNodeCount.incrementAndGet(); staleNodeCount.decrementAndGet(); updateNodeStat(datanodeUuid, nodeReport); - updateCommandQueue(datanodeID, + updateCommandQueue(datanodeUuid, hbItem.getContainerReportState().getState()); return; } @@ -636,22 +641,22 @@ public class SCMNodeManager deadNodeCount.decrementAndGet(); healthyNodeCount.incrementAndGet(); updateNodeStat(datanodeUuid, nodeReport); - updateCommandQueue(datanodeID, + updateCommandQueue(datanodeUuid, hbItem.getContainerReportState().getState()); return; } LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeUuid); - this.commandQueue.addCommand(hbItem.getDatanodeID(), + this.commandQueue.addCommand(datanodeUuid, new ReregisterCommand()); } - private void updateNodeStat(String datanodeUuid, SCMNodeReport nodeReport) { - SCMNodeStat stat = nodeStats.get(datanodeUuid); + private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) { + SCMNodeStat stat = nodeStats.get(dnId); if (stat == null) { LOG.debug("SCM updateNodeStat based on heartbeat from previous" + - "dead datanode {}", datanodeUuid); + "dead datanode {}", dnId); stat = new SCMNodeStat(); } @@ -667,17 +672,17 @@ public class SCMNodeManager } scmStat.subtract(stat); stat.set(totalCapacity, totalScmUsed, totalRemaining); - nodeStats.put(datanodeUuid, stat); + nodeStats.put(dnId, stat); scmStat.add(stat); } } - private void updateCommandQueue(DatanodeID datanodeID, + private void updateCommandQueue(UUID dnId, ReportState.states containerReportState) { if (containerReportState != null) { switch (containerReportState) { case completeContinerReport: - commandQueue.addCommand(datanodeID, + commandQueue.addCommand(dnId, SendContainerCommand.newBuilder().build()); return; case deltaContainerReport: @@ -736,26 +741,36 @@ public class SCMNodeManager * Register the node if the node finds that it is not registered with any * SCM. * - * @param datanodeID - Send datanodeID with Node info. This function - * generates and assigns new datanode ID for the datanode. - * This allows SCM to be run independent of Namenode if - * required. + * @param datanodeDetailsProto - Send datanodeDetails with Node info. + * This function generates and assigns new datanode ID + * for the datanode. This allows SCM to be run independent + * of Namenode if required. * * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(DatanodeID datanodeID) { - - SCMCommand responseCommand = verifyDatanodeUUID(datanodeID); + public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto) { + + DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( + datanodeDetailsProto); + InetAddress dnAddress = Server.getRemoteIp(); + if (dnAddress != null) { + // Mostly called inside an RPC, update ip and peer hostname + String hostname = dnAddress.getHostName(); + String ip = dnAddress.getHostAddress(); + datanodeDetails.setHostName(hostname); + datanodeDetails.setIpAddress(ip); + } + SCMCommand responseCommand = verifyDatanodeUUID(datanodeDetails); if (responseCommand != null) { return responseCommand; } - - nodes.put(datanodeID.getDatanodeUuid(), datanodeID); + UUID dnId = datanodeDetails.getUuid(); + nodes.put(dnId, datanodeDetails); totalNodes.incrementAndGet(); - healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); + healthyNodes.put(dnId, monotonicNow()); healthyNodeCount.incrementAndGet(); - nodeStats.put(datanodeID.getDatanodeUuid(), new SCMNodeStat()); + nodeStats.put(dnId, new SCMNodeStat()); if(inStartupChillMode.get() && totalNodes.get() >= getMinimumChillModeNodes()) { @@ -767,9 +782,9 @@ public class SCMNodeManager // For now, all nodes are added to the "DefaultNodePool" upon registration // if it has not been added to any node pool yet. try { - if (nodePoolManager.getNodePool(datanodeID) == null) { + if (nodePoolManager.getNodePool(datanodeDetails) == null) { nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, - datanodeID); + datanodeDetails); } } catch (IOException e) { // TODO: make sure registration failure is handled correctly. @@ -778,10 +793,10 @@ public class SCMNodeManager .build(); } LOG.info("Data node with ID: {} Registered.", - datanodeID.getDatanodeUuid()); + datanodeDetails.getUuid()); return RegisteredCommand.newBuilder() .setErrorCode(ErrorCode.success) - .setDatanodeUUID(datanodeID.getDatanodeUuid()) + .setDatanodeUUID(datanodeDetails.getUuidString()) .setClusterID(this.clusterID) .build(); } @@ -789,18 +804,18 @@ public class SCMNodeManager /** * Verifies the datanode does not have a valid UUID already. * - * @param datanodeID - Datanode UUID. + * @param datanodeDetails - Datanode Details. * @return SCMCommand */ - private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) { - if (datanodeID.getDatanodeUuid() != null && - nodes.containsKey(datanodeID.getDatanodeUuid())) { + private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) { + if (datanodeDetails.getUuid() != null && + nodes.containsKey(datanodeDetails.getUuid())) { LOG.trace("Datanode is already registered. Datanode: {}", - datanodeID.toString()); + datanodeDetails.toString()); return RegisteredCommand.newBuilder() .setErrorCode(ErrorCode.success) .setClusterID(this.clusterID) - .setDatanodeUUID(datanodeID.getDatanodeUuid()) + .setDatanodeUUID(datanodeDetails.getUuidString()) .build(); } return null; @@ -809,24 +824,28 @@ public class SCMNodeManager /** * Send heartbeat to indicate the datanode is alive and doing well. * - * @param datanodeID - Datanode ID. + * @param datanodeDetailsProto - DatanodeDetailsProto. * @param nodeReport - node report. * @param containerReportState - container report state. * @return SCMheartbeat response. * @throws IOException */ @Override - public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport, ReportState containerReportState) { + public List<SCMCommand> sendHeartbeat( + DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, + ReportState containerReportState) { + + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(datanodeDetailsProto); // Checking for NULL to make sure that we don't get // an exception from ConcurrentList. // This could be a problem in tests, if this function is invoked via // protobuf, transport layer will guarantee that this is not null. - if (datanodeID != null) { + if (datanodeDetails != null) { heartbeatQueue.add( new HeartbeatQueueItem.Builder() - .setDatanodeID(datanodeID) + .setDatanodeDetails(datanodeDetails) .setNodeReport(nodeReport) .setContainerReportState(containerReportState) .build()); @@ -834,7 +853,7 @@ public class SCMNodeManager LOG.error("Datanode ID in heartbeat is null"); } - return commandQueue.getCommand(datanodeID); + return commandQueue.getCommand(datanodeDetails.getUuid()); } /** @@ -851,18 +870,18 @@ public class SCMNodeManager * @return a map of individual node stats (live/stale but not dead). */ @Override - public Map<String, SCMNodeStat> getNodeStats() { + public Map<UUID, SCMNodeStat> getNodeStats() { return Collections.unmodifiableMap(nodeStats); } /** * Return the node stat of the specified datanode. - * @param datanodeID - datanode ID. + * @param datanodeDetails - datanode ID. * @return node stat if it is live/stale, null if it is dead or does't exist. */ @Override - public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { - return new SCMNodeMetric(nodeStats.get(datanodeID.getDatanodeUuid())); + public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { + return new SCMNodeMetric(nodeStats.get(datanodeDetails)); } @Override @@ -880,8 +899,8 @@ public class SCMNodeManager } @Override - public void addDatanodeCommand(DatanodeID id, SCMCommand command) { - this.commandQueue.addCommand(id, command); + public void addDatanodeCommand(UUID dnId, SCMCommand command) { + this.commandQueue.addCommand(dnId, command); } @VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java index db54b97..a8b9c35 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java @@ -20,9 +20,9 @@ package org.apache.hadoop.ozone.scm.node; import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.utils.MetadataStore; @@ -60,7 +60,8 @@ public final class SCMNodePoolManager implements NodePoolManager { private static final Logger LOG = LoggerFactory.getLogger(SCMNodePoolManager.class); - private static final List<DatanodeID> EMPTY_NODE_LIST = new ArrayList<>(); + private static final List<DatanodeDetails> EMPTY_NODE_LIST = + new ArrayList<>(); private static final List<String> EMPTY_NODEPOOL_LIST = new ArrayList<>(); public static final String DEFAULT_NODEPOOL = "DefaultNodePool"; @@ -68,7 +69,7 @@ public final class SCMNodePoolManager implements NodePoolManager { private MetadataStore nodePoolStore; // In-memory node pool to nodes mapping - private HashMap<String, Set<DatanodeID>> nodePools; + private HashMap<String, Set<DatanodeDetails>> nodePools; // Read-write lock for nodepool operations private ReadWriteLock lock; @@ -104,11 +105,11 @@ public final class SCMNodePoolManager implements NodePoolManager { try { nodePoolStore.iterate(null, (key, value) -> { try { - DatanodeID nodeId = DatanodeID.getFromProtoBuf( - HdfsProtos.DatanodeIDProto.PARSER.parseFrom(key)); + DatanodeDetails nodeId = DatanodeDetails.getFromProtoBuf( + HdslProtos.DatanodeDetailsProto.PARSER.parseFrom(key)); String poolName = DFSUtil.bytes2String(value); - Set<DatanodeID> nodePool = null; + Set<DatanodeDetails> nodePool = null; if (nodePools.containsKey(poolName)) { nodePool = nodePools.get(poolName); } else { @@ -138,7 +139,7 @@ public final class SCMNodePoolManager implements NodePoolManager { * @param node - name of the datanode. */ @Override - public void addNode(final String pool, final DatanodeID node) + public void addNode(final String pool, final DatanodeDetails node) throws IOException { Preconditions.checkNotNull(pool, "pool name is null"); Preconditions.checkNotNull(node, "node is null"); @@ -149,11 +150,11 @@ public final class SCMNodePoolManager implements NodePoolManager { DFSUtil.string2Bytes(pool)); // add to the in-memory store - Set<DatanodeID> nodePool = null; + Set<DatanodeDetails> nodePool = null; if (nodePools.containsKey(pool)) { nodePool = nodePools.get(pool); } else { - nodePool = new HashSet<DatanodeID>(); + nodePool = new HashSet<DatanodeDetails>(); nodePools.put(pool, nodePool); } nodePool.add(node); @@ -169,7 +170,7 @@ public final class SCMNodePoolManager implements NodePoolManager { * @throws SCMException */ @Override - public void removeNode(final String pool, final DatanodeID node) + public void removeNode(final String pool, final DatanodeDetails node) throws SCMException { Preconditions.checkNotNull(pool, "pool name is null"); Preconditions.checkNotNull(node, "node is null"); @@ -187,12 +188,13 @@ public final class SCMNodePoolManager implements NodePoolManager { // Remove from the in-memory store if (nodePools.containsKey(pool)) { - Set<DatanodeID> nodePool = nodePools.get(pool); + Set<DatanodeDetails> nodePool = nodePools.get(pool); nodePool.remove(node); } else { throw new SCMException(String.format("Unable to find node %s from" + " pool %s in MAP.", DFSUtil.bytes2String(kName), pool), - FAILED_TO_FIND_NODE_IN_POOL); } + FAILED_TO_FIND_NODE_IN_POOL); + } } catch (IOException e) { throw new SCMException("Failed to remove node " + node.toString() + " from node pool " + pool, e, @@ -226,7 +228,7 @@ public final class SCMNodePoolManager implements NodePoolManager { * @return all datanodes of the specified node pool. */ @Override - public List<DatanodeID> getNodes(final String pool) { + public List<DatanodeDetails> getNodes(final String pool) { Preconditions.checkNotNull(pool, "pool name is null"); if (nodePools.containsKey(pool)) { return nodePools.get(pool).stream().collect(Collectors.toList()); @@ -237,21 +239,22 @@ public final class SCMNodePoolManager implements NodePoolManager { /** * Get the node pool name if the node has been added to a node pool. - * @param datanodeID - datanode ID. + * @param datanodeDetails - datanode ID. * @return node pool name if it has been assigned. * null if the node has not been assigned to any node pool yet. * TODO: Put this in a in-memory map if performance is an issue. */ @Override - public String getNodePool(final DatanodeID datanodeID) throws SCMException { - Preconditions.checkNotNull(datanodeID, "node is null"); + public String getNodePool(final DatanodeDetails datanodeDetails) + throws SCMException { + Preconditions.checkNotNull(datanodeDetails, "node is null"); try { byte[] result = nodePoolStore.get( - datanodeID.getProtoBufMessage().toByteArray()); + datanodeDetails.getProtoBufMessage().toByteArray()); return result == null ? null : DFSUtil.bytes2String(result); } catch (IOException e) { throw new SCMException("Failed to get node pool for node " - + datanodeID.toString(), e, + + datanodeDetails.toString(), e, SCMException.ResultCodes.IO_EXCEPTION); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java index 988f631..35e1bc1 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.scm.pipelines; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationFactor; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState; @@ -154,7 +154,7 @@ public abstract class PipelineManager { * @param datanodes - The list of datanodes that make this pipeline. */ public abstract void createPipeline(String pipelineID, - List<DatanodeID> datanodes) throws IOException; + List<DatanodeDetails> datanodes) throws IOException; /** * Close the pipeline with the given clusterId. @@ -165,12 +165,12 @@ public abstract class PipelineManager { * list members in the pipeline . * @return the datanode */ - public abstract List<DatanodeID> getMembers(String pipelineID) + public abstract List<DatanodeDetails> getMembers(String pipelineID) throws IOException; /** * Update the datanode list of the pipeline. */ public abstract void updatePipeline(String pipelineID, - List<DatanodeID> newDatanodes) throws IOException; + List<DatanodeDetails> newDatanodes) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java index 5ec81a6..0eac7f6 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.scm.pipelines; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType; @@ -83,16 +83,17 @@ public class PipelineSelector { * The first of the list will be the leader node. * @return pipeline corresponding to nodes */ - public static PipelineChannel newPipelineFromNodes(List<DatanodeID> nodes, - LifeCycleState state, ReplicationType replicationType, - ReplicationFactor replicationFactor, String name) { + public static PipelineChannel newPipelineFromNodes( + List<DatanodeDetails> nodes, LifeCycleState state, + ReplicationType replicationType, ReplicationFactor replicationFactor, + String name) { Preconditions.checkNotNull(nodes); Preconditions.checkArgument(nodes.size() > 0); - String leaderId = nodes.get(0).getDatanodeUuid(); + String leaderId = nodes.get(0).getUuidString(); PipelineChannel pipelineChannel = new PipelineChannel(leaderId, state, replicationType, replicationFactor, name); - for (DatanodeID node : nodes) { + for (DatanodeDetails node : nodes) { pipelineChannel.addMember(node); } return pipelineChannel; @@ -178,11 +179,11 @@ public class PipelineSelector { */ public void createPipeline(ReplicationType replicationType, String - pipelineID, List<DatanodeID> datanodes) throws IOException { + pipelineID, List<DatanodeDetails> datanodes) throws IOException { PipelineManager manager = getPipelineManager(replicationType); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID, - datanodes.stream().map(DatanodeID::toString) + datanodes.stream().map(DatanodeDetails::toString) .collect(Collectors.joining(","))); manager.createPipeline(pipelineID, datanodes); } @@ -203,7 +204,7 @@ public class PipelineSelector { * list members in the pipeline . */ - public List<DatanodeID> getDatanodes(ReplicationType replicationType, + public List<DatanodeDetails> getDatanodes(ReplicationType replicationType, String pipelineID) throws IOException { PipelineManager manager = getPipelineManager(replicationType); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); @@ -216,11 +217,11 @@ public class PipelineSelector { */ public void updateDatanodes(ReplicationType replicationType, String - pipelineID, List<DatanodeID> newDatanodes) throws IOException { + pipelineID, List<DatanodeDetails> newDatanodes) throws IOException { PipelineManager manager = getPipelineManager(replicationType); Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID, - newDatanodes.stream().map(DatanodeID::toString) + newDatanodes.stream().map(DatanodeDetails::toString) .collect(Collectors.joining(","))); manager.updatePipeline(pipelineID, newDatanodes); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java index 6744caf..c98573e 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.scm.pipelines.ratis; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationFactor; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType; @@ -52,7 +52,7 @@ public class RatisManagerImpl extends PipelineManager { private static final String PREFIX = "Ratis-"; private final Configuration conf; private final NodeManager nodeManager; - private final Set<DatanodeID> ratisMembers; + private final Set<DatanodeDetails> ratisMembers; /** * Constructs a Ratis Pipeline Manager. @@ -74,12 +74,12 @@ public class RatisManagerImpl extends PipelineManager { * @return PipelineChannel. */ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { - List<DatanodeID> newNodesList = new LinkedList<>(); - List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + List<DatanodeDetails> newNodesList = new LinkedList<>(); + List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); int count = getReplicationCount(factor); //TODO: Add Raft State to the Nodes, so we can query and skip nodes from // data from datanode instead of maintaining a set. - for (DatanodeID datanode : datanodes) { + for (DatanodeDetails datanode : datanodes) { Preconditions.checkNotNull(datanode); if (!ratisMembers.contains(datanode)) { newNodesList.add(datanode); @@ -116,7 +116,8 @@ public class RatisManagerImpl extends PipelineManager { * @param datanodes - The list of datanodes that make this pipeline. */ @Override - public void createPipeline(String pipelineID, List<DatanodeID> datanodes) { + public void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) { } @@ -137,7 +138,8 @@ public class RatisManagerImpl extends PipelineManager { * @return the datanode */ @Override - public List<DatanodeID> getMembers(String pipelineID) throws IOException { + public List<DatanodeDetails> getMembers(String pipelineID) + throws IOException { return null; } @@ -148,7 +150,8 @@ public class RatisManagerImpl extends PipelineManager { * @param newDatanodes */ @Override - public void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes) + public void updatePipeline(String pipelineID, + List<DatanodeDetails> newDatanodes) throws IOException { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java index 96bdaf9..023baea 100644 --- a/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdsl/server-scm/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.scm.pipelines.standalone; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationFactor; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState; @@ -47,7 +47,7 @@ public class StandaloneManagerImpl extends PipelineManager { private final NodeManager nodeManager; private final ContainerPlacementPolicy placementPolicy; private final long containerSize; - private final Set<DatanodeID> standAloneMembers; + private final Set<DatanodeDetails> standAloneMembers; /** * Constructor for Standalone Node Manager Impl. @@ -72,10 +72,10 @@ public class StandaloneManagerImpl extends PipelineManager { * @return PipelineChannel. */ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { - List<DatanodeID> newNodesList = new LinkedList<>(); - List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + List<DatanodeDetails> newNodesList = new LinkedList<>(); + List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); int count = getReplicationCount(factor); - for (DatanodeID datanode : datanodes) { + for (DatanodeDetails datanode : datanodes) { Preconditions.checkNotNull(datanode); if (!standAloneMembers.contains(datanode)) { newNodesList.add(datanode); @@ -103,7 +103,8 @@ public class StandaloneManagerImpl extends PipelineManager { * @param datanodes - The list of datanodes that make this pipeline. */ @Override - public void createPipeline(String pipelineID, List<DatanodeID> datanodes) { + public void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) { //return newPipelineFromNodes(datanodes, pipelineID); } @@ -124,7 +125,8 @@ public class StandaloneManagerImpl extends PipelineManager { * @return the datanode */ @Override - public List<DatanodeID> getMembers(String pipelineID) throws IOException { + public List<DatanodeDetails> getMembers(String pipelineID) + throws IOException { return null; } @@ -135,7 +137,7 @@ public class StandaloneManagerImpl extends PipelineManager { * @param newDatanodes */ @Override - public void updatePipeline(String pipelineID, List<DatanodeID> + public void updatePipeline(String pipelineID, List<DatanodeDetails> newDatanodes) throws IOException { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 6724c23..825efaa 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -19,8 +19,8 @@ package org.apache.hadoop.ozone.container.common; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -39,8 +39,6 @@ import org.apache.hadoop.ozone.container.common.states.endpoint import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdsl.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; -import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; @@ -54,13 +52,14 @@ import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.hdsl.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.scm.TestUtils; import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.Time; import static org.apache.hadoop.ozone.container.common.ContainerTestUtils .createEndpoint; -import static org.apache.hadoop.ozone.scm.TestUtils.getDatanodeID; +import static org.apache.hadoop.ozone.scm.TestUtils.getDatanodeDetails; import static org.hamcrest.Matchers.lessThanOrEqualTo; import org.junit.AfterClass; import org.junit.Assert; @@ -208,21 +207,21 @@ public class TestEndPoint { public void testRegister() throws Exception { String[] scmAddressArray = new String[1]; scmAddressArray[0] = serverAddress.toString(); - DatanodeID nodeToRegister = getDatanodeID(); + DatanodeDetails nodeToRegister = getDatanodeDetails(); try (EndpointStateMachine rpcEndPoint = createEndpoint( SCMTestUtils.getConf(), serverAddress, 1000)) { SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint() - .register(nodeToRegister, scmAddressArray); + .register(nodeToRegister.getProtoBufMessage(), scmAddressArray); Assert.assertNotNull(responseProto); - Assert.assertEquals(nodeToRegister.getDatanodeUuid(), + Assert.assertEquals(nodeToRegister.getUuid(), responseProto.getDatanodeUUID()); Assert.assertNotNull(responseProto.getClusterID()); } } private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress, - int rpcTimeout, boolean clearContainerID) throws Exception { + int rpcTimeout, boolean clearDatanodeDetails) throws Exception { Configuration conf = SCMTestUtils.getConf(); EndpointStateMachine rpcEndPoint = createEndpoint(conf, @@ -230,12 +229,12 @@ public class TestEndPoint { rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER); RegisterEndpointTask endpointTask = new RegisterEndpointTask(rpcEndPoint, conf); - if (!clearContainerID) { - ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder() - .setClusterID(UUID.randomUUID().toString()) - .setDatanodeID(getDatanodeID().getProtoBufMessage()) - .build(); - endpointTask.setContainerNodeIDProto(containerNodeID); + if (!clearDatanodeDetails) { + HdslProtos.DatanodeDetailsProto datanodeDetails = + HdslProtos.DatanodeDetailsProto.newBuilder() + .setUuid(UUID.randomUUID().toString()) + .build(); + endpointTask.setDatanodeDetailsProto(datanodeDetails); } endpointTask.call(); return rpcEndPoint; @@ -287,7 +286,7 @@ public class TestEndPoint { @Test public void testHeartbeat() throws Exception { - DatanodeID dataNode = getDatanodeID(); + DatanodeDetails dataNode = getDatanodeDetails(); try (EndpointStateMachine rpcEndPoint = createEndpoint(SCMTestUtils.getConf(), serverAddress, 1000)) { @@ -297,7 +296,8 @@ public class TestEndPoint { srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build(); nrb.addStorageReport(srb); SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() - .sendHeartbeat(dataNode, nrb.build(), defaultReportState); + .sendHeartbeat( + dataNode.getProtoBufMessage(), nrb.build(), defaultReportState); Assert.assertNotNull(responseProto); Assert.assertEquals(0, responseProto.getCommandsCount()); } @@ -316,12 +316,11 @@ public class TestEndPoint { // Create a datanode state machine for stateConext used by endpoint task try (DatanodeStateMachine stateMachine = new DatanodeStateMachine( - DFSTestUtil.getLocalDatanodeID(), conf); + TestUtils.getDatanodeDetails(), conf); EndpointStateMachine rpcEndPoint = createEndpoint(conf, scmAddress, rpcTimeout)) { - ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder() - .setClusterID(UUID.randomUUID().toString()) - .setDatanodeID(getDatanodeID().getProtoBufMessage()).build(); + HdslProtos.DatanodeDetailsProto datanodeDetailsProto = + getDatanodeDetails().getProtoBufMessage(); rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT); final StateContext stateContext = @@ -330,9 +329,9 @@ public class TestEndPoint { HeartbeatEndpointTask endpointTask = new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext); - endpointTask.setContainerNodeIDProto(containerNodeID); + endpointTask.setDatanodeDetailsProto(datanodeDetailsProto); endpointTask.call(); - Assert.assertNotNull(endpointTask.getContainerNodeIDProto()); + Assert.assertNotNull(endpointTask.getDatanodeDetailsProto()); Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, rpcEndPoint.getState()); @@ -387,7 +386,7 @@ public class TestEndPoint { reportsBuilder.addReports(getRandomContainerReport() .getProtoBufMessage()); } - reportsBuilder.setDatanodeID(getDatanodeID() + reportsBuilder.setDatanodeDetails(getDatanodeDetails() .getProtoBufMessage()); reportsBuilder.setType(StorageContainerDatanodeProtocolProtos .ContainerReportsRequestProto.reportType.fullReport); @@ -456,7 +455,7 @@ public class TestEndPoint { reportsBuilder.addReports(report.getProtoBufMessage()); } - reportsBuilder.setDatanodeID(getDatanodeID() + reportsBuilder.setDatanodeDetails(getDatanodeDetails() .getProtoBufMessage()); reportsBuilder.setType(StorageContainerDatanodeProtocolProtos .ContainerReportsRequestProto.reportType.fullReport); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java index 041b0d8..2947789 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.container.placement; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.scm.container.MockNodeManager; import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity; @@ -41,10 +41,10 @@ public class TestContainerPlacement { private DescriptiveStatistics computeStatistics(NodeManager nodeManager) { DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics(); - for (DatanodeID id : nodeManager.getNodes(HEALTHY)) { + for (DatanodeDetails dd : nodeManager.getNodes(HEALTHY)) { float weightedValue = - nodeManager.getNodeStat(id).get().getScmUsed().get() / (float) - nodeManager.getNodeStat(id).get().getCapacity().get(); + nodeManager.getNodeStat(dd).get().getScmUsed().get() / (float) + nodeManager.getNodeStat(dd).get().getCapacity().get(); descriptiveStatistics.addValue(weightedValue); } return descriptiveStatistics; @@ -82,11 +82,11 @@ public class TestContainerPlacement { for (int x = 0; x < opsCount; x++) { long containerSize = random.nextInt(100) * OzoneConsts.GB; - List<DatanodeID> nodesCapacity = + List<DatanodeDetails> nodesCapacity = capacityPlacer.chooseDatanodes(nodesRequired, containerSize); assertEquals(nodesRequired, nodesCapacity.size()); - List<DatanodeID> nodesRandom = randomPlacer.chooseDatanodes(nodesRequired, + List<DatanodeDetails> nodesRandom = randomPlacer.chooseDatanodes(nodesRequired, containerSize); // One fifth of all calls are delete @@ -116,16 +116,16 @@ public class TestContainerPlacement { } private void deleteContainer(MockNodeManager nodeManager, - List<DatanodeID> nodes, long containerSize) { - for (DatanodeID id : nodes) { - nodeManager.delContainer(id, containerSize); + List<DatanodeDetails> nodes, long containerSize) { + for (DatanodeDetails dd : nodes) { + nodeManager.delContainer(dd, containerSize); } } private void createContainer(MockNodeManager nodeManager, - List<DatanodeID> nodes, long containerSize) { - for (DatanodeID id : nodes) { - nodeManager.addContainer(id, containerSize); + List<DatanodeDetails> nodes, long containerSize) { + for (DatanodeDetails dd : nodes) { + nodeManager.addContainer(dd, containerSize); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java index 17a72e6..a35d4d4 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java @@ -17,8 +17,8 @@ package org.apache.hadoop.ozone.container.replication; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.container.testutils.ReplicationDatanodeStateManager; import org.apache.hadoop.ozone.container.testutils.ReplicationNodeManagerMock; import org.apache.hadoop.ozone.container.testutils.ReplicationNodePoolManagerMock; @@ -66,7 +66,7 @@ public class TestContainerSupervisor { static final int POOL_COUNT = 3; private LogCapturer logCapturer = LogCapturer.captureLogs( LogFactory.getLog(ContainerSupervisor.class)); - private List<DatanodeID> datanodes = new LinkedList<>(); + private List<DatanodeDetails> datanodes = new LinkedList<>(); private NodeManager nodeManager; private NodePoolManager poolManager; private CommandQueue commandQueue; @@ -82,11 +82,11 @@ public class TestContainerSupervisor { @Before public void setUp() throws Exception { GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG); - Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>(); + Map<DatanodeDetails, NodeState> nodeStateMap = new HashMap<>(); // We are setting up 3 pools with 24 nodes each in this cluster. // First we create 72 Datanodes. for (int x = 0; x < MAX_DATANODES; x++) { - DatanodeID datanode = TestUtils.getDatanodeID(); + DatanodeDetails datanode = TestUtils.getDatanodeDetails(); datanodes.add(datanode); nodeStateMap.put(datanode, HEALTHY); } @@ -105,7 +105,7 @@ public class TestContainerSupervisor { for (int y = 1; y <= POOL_COUNT; y++) { String poolName = String.format(POOL_NAME_TEMPLATE, y); for (int z = 0; z < POOL_SIZE; z++) { - DatanodeID id = datanodes.get(y * z); + DatanodeDetails id = datanodes.get(y * z); poolManager.addNode(poolName, id); } } @@ -245,7 +245,7 @@ public class TestContainerSupervisor { LogFactory.getLog(InProgressPool.class)); GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG); try { - DatanodeID id = TestUtils.getDatanodeID(); + DatanodeDetails id = TestUtils.getDatanodeDetails(); ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY); poolManager.addNode("PoolNew", id); GenericTestUtils.waitFor(() -> @@ -260,7 +260,8 @@ public class TestContainerSupervisor { containerSupervisor.handleContainerReport(clist.get(0)); GenericTestUtils.waitFor(() -> inProgressLog.getOutput().contains("NewContainer1") && inProgressLog - .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000); + .getOutput().contains(id.getUuidString()), + 200, 10 * 1000); } finally { inProgressLog.stopCapturing(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java index a122167..55b8e6d 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationDatanodeStateManager.java @@ -17,7 +17,7 @@ package org.apache.hadoop.ozone.container.testutils; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -61,7 +61,7 @@ public class ReplicationDatanodeStateManager { public List<ContainerReportsRequestProto> getContainerReport( String containerName, String poolName, int dataNodeCount) { List<ContainerReportsRequestProto> containerList = new LinkedList<>(); - List<DatanodeID> nodesInPool = poolManager.getNodes(poolName); + List<DatanodeDetails> nodesInPool = poolManager.getNodes(poolName); if (nodesInPool == null) { return containerList; @@ -74,7 +74,7 @@ public class ReplicationDatanodeStateManager { int containerID = 1; while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) { - DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size())); + DatanodeDetails id = nodesInPool.get(r.nextInt(nodesInPool.size())); nodesInPool.remove(id); containerID++; // We return container reports only for nodes that are healthy. @@ -86,7 +86,7 @@ public class ReplicationDatanodeStateManager { .build(); ContainerReportsRequestProto containerReport = ContainerReportsRequestProto.newBuilder().addReports(info) - .setDatanodeID(id.getProtoBufMessage()) + .setDatanodeDetails(id.getProtoBufMessage()) .setType(ContainerReportsRequestProto.reportType.fullReport) .build(); containerList.add(containerReport); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 34904e0..f8a6432 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -17,8 +17,9 @@ package org.apache.hadoop.ozone.container.testutils; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; +import org.apache.hadoop.hdsl.protocol.proto.HdslProtos; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.NodeState; @@ -37,20 +38,22 @@ import org.apache.hadoop.ozone.scm.node.NodeManager; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.UUID; + import org.mockito.Mockito; /** * A Node Manager to test replication. */ public class ReplicationNodeManagerMock implements NodeManager { - private final Map<DatanodeID, NodeState> nodeStateMap; + private final Map<DatanodeDetails, NodeState> nodeStateMap; private final CommandQueue commandQueue; /** * A list of Datanodes and current states. * @param nodeState A node state map. */ - public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState, + public ReplicationNodeManagerMock(Map<DatanodeDetails, NodeState> nodeState, CommandQueue commandQueue) { Preconditions.checkNotNull(nodeState); this.nodeStateMap = nodeState; @@ -94,7 +97,8 @@ public class ReplicationNodeManagerMock implements NodeManager { * @throws UnregisteredNodeException */ @Override - public void removeNode(DatanodeID node) throws UnregisteredNodeException { + public void removeNode(DatanodeDetails node) + throws UnregisteredNodeException { nodeStateMap.remove(node); } @@ -106,7 +110,7 @@ public class ReplicationNodeManagerMock implements NodeManager { * @return List of Datanodes that are Heartbeating SCM. */ @Override - public List<DatanodeID> getNodes(NodeState nodestate) { + public List<DatanodeDetails> getNodes(NodeState nodestate) { return null; } @@ -124,10 +128,10 @@ public class ReplicationNodeManagerMock implements NodeManager { /** * Get all datanodes known to SCM. * - * @return List of DatanodeIDs known to SCM. + * @return List of DatanodeDetails known to SCM. */ @Override - public List<DatanodeID> getAllNodes() { + public List<DatanodeDetails> getAllNodes() { return null; } @@ -185,18 +189,18 @@ public class ReplicationNodeManagerMock implements NodeManager { * @return a map of individual node stats (live/stale but not dead). */ @Override - public Map<String, SCMNodeStat> getNodeStats() { + public Map<UUID, SCMNodeStat> getNodeStats() { return null; } /** * Return the node stat of the specified datanode. * - * @param datanodeID - datanode ID. + * @param dd - datanode details. * @return node stat if it is live/stale, null if it is dead or does't exist. */ @Override - public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { + public SCMNodeMetric getNodeStat(DatanodeDetails dd) { return null; } @@ -218,12 +222,12 @@ public class ReplicationNodeManagerMock implements NodeManager { /** * Returns the node state of a specific node. * - * @param id - DatanodeID + * @param dd - DatanodeDetails * @return Healthy/Stale/Dead. */ @Override - public NodeState getNodeState(DatanodeID id) { - return nodeStateMap.get(id); + public NodeState getNodeState(DatanodeDetails dd) { + return nodeStateMap.get(dd); } /** @@ -275,25 +279,25 @@ public class ReplicationNodeManagerMock implements NodeManager { /** * Register the node if the node finds that it is not registered with any SCM. * - * @param datanodeID - Send datanodeID with Node info, but datanode UUID is - * empty. Server returns a datanodeID for the given node. + * @param dd DatanodeDetailsProto + * * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(DatanodeID datanodeID) { + public SCMCommand register(HdslProtos.DatanodeDetailsProto dd) { return null; } /** * Send heartbeat to indicate the datanode is alive and doing well. * - * @param datanodeID - Datanode ID. + * @param dd - Datanode Details. * @param nodeReport - node report. * @param containerReportState - container report state. * @return SCMheartbeat response list */ @Override - public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID, + public List<SCMCommand> sendHeartbeat(HdslProtos.DatanodeDetailsProto dd, SCMNodeReport nodeReport, ReportState containerReportState) { return null; } @@ -308,16 +312,16 @@ public class ReplicationNodeManagerMock implements NodeManager { /** * Adds a node to the existing Node manager. This is used only for test * purposes. - * @param id - DatanodeID + * @param id DatanodeDetails * @param state State you want to put that node to. */ - public void addNode(DatanodeID id, NodeState state) { + public void addNode(DatanodeDetails id, NodeState state) { nodeStateMap.put(id, state); } @Override - public void addDatanodeCommand(DatanodeID id, SCMCommand command) { - this.commandQueue.addCommand(id, command); + public void addDatanodeCommand(UUID dnId, SCMCommand command) { + this.commandQueue.addCommand(dnId, command); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java index 4b43237..766a882 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java @@ -16,7 +16,7 @@ */ package org.apache.hadoop.ozone.container.testutils; -import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodePoolManager; @@ -33,7 +33,7 @@ import java.util.Set; */ public class ReplicationNodePoolManagerMock implements NodePoolManager { - private final Map<DatanodeID, String> nodeMemberShip; + private final Map<DatanodeDetails, String> nodeMemberShip; /** * A node pool manager for testing. @@ -49,7 +49,7 @@ public class ReplicationNodePoolManagerMock implements NodePoolManager { * @param node - data node. */ @Override - public void addNode(String pool, DatanodeID node) { + public void addNode(String pool, DatanodeDetails node) { nodeMemberShip.put(node, pool); } @@ -61,7 +61,8 @@ public class ReplicationNodePoolManagerMock implements NodePoolManager { * @throws SCMException */ @Override - public void removeNode(String pool, DatanodeID node) throws SCMException { + public void removeNode(String pool, DatanodeDetails node) + throws SCMException { nodeMemberShip.remove(node); } @@ -75,7 +76,7 @@ public class ReplicationNodePoolManagerMock implements NodePoolManager { @Override public List<String> getNodePools() { Set<String> poolSet = new HashSet<>(); - for (Map.Entry<DatanodeID, String> entry : nodeMemberShip.entrySet()) { + for (Map.Entry<DatanodeDetails, String> entry : nodeMemberShip.entrySet()) { poolSet.add(entry.getValue()); } return new ArrayList<>(poolSet); @@ -90,9 +91,9 @@ public class ReplicationNodePoolManagerMock implements NodePoolManager { * found. */ @Override - public List<DatanodeID> getNodes(String pool) { - Set<DatanodeID> datanodeSet = new HashSet<>(); - for (Map.Entry<DatanodeID, String> entry : nodeMemberShip.entrySet()) { + public List<DatanodeDetails> getNodes(String pool) { + Set<DatanodeDetails> datanodeSet = new HashSet<>(); + for (Map.Entry<DatanodeDetails, String> entry : nodeMemberShip.entrySet()) { if (entry.getValue().equals(pool)) { datanodeSet.add(entry.getKey()); } @@ -103,13 +104,13 @@ public class ReplicationNodePoolManagerMock implements NodePoolManager { /** * Get the node pool name if the node has been added to a node pool. * - * @param datanodeID - datanode ID. + * @param datanodeDetails DatanodeDetails. * @return node pool name if it has been assigned. null if the node has not * been assigned to any node pool yet. */ @Override - public String getNodePool(DatanodeID datanodeID) { - return nodeMemberShip.get(datanodeID); + public String getNodePool(DatanodeDetails datanodeDetails) { + return nodeMemberShip.get(datanodeDetails); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/TestUtils.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/TestUtils.java index 93fd750..3385fd6 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/TestUtils.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/TestUtils.java @@ -21,8 +21,7 @@ import java.util.List; import java.util.Random; import java.util.UUID; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.scm.node.SCMNodeManager; /** @@ -33,70 +32,79 @@ public class TestUtils { private TestUtils() { } - public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) { + public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager) { - return getDatanodeID(nodeManager, UUID.randomUUID().toString()); + return getDatanodeDetails(nodeManager, UUID.randomUUID().toString()); } /** - * Create a new DatanodeID with NodeID set to the string. + * Create a new DatanodeDetails with NodeID set to the string. * * @param uuid - node ID, it is generally UUID. * @return DatanodeID. */ - public static DatanodeID getDatanodeID(SCMNodeManager nodeManager, + public static DatanodeDetails getDatanodeDetails(SCMNodeManager nodeManager, String uuid) { - DatanodeID tempDataNode = getDatanodeID(uuid); - RegisteredCommand command = - (RegisteredCommand) nodeManager.register(tempDataNode); - return new DatanodeID(command.getDatanodeUUID(), tempDataNode); + DatanodeDetails datanodeDetails = getDatanodeDetails(uuid); + nodeManager.register(datanodeDetails.getProtoBufMessage()); + return datanodeDetails; } /** - * Get specified number of datanode IDs and registered them with node manager. + * Get specified number of DatanodeDetails and registered them with node + * manager. * * @param nodeManager - node manager to register the datanode ids. - * @param count - number of datanode IDs needed. + * @param count - number of DatanodeDetails needed. * @return */ - public static List<DatanodeID> getRegisteredDatanodeIDs( + public static List<DatanodeDetails> getListOfRegisteredDatanodeDetails( SCMNodeManager nodeManager, int count) { - ArrayList<DatanodeID> datanodes = new ArrayList<>(); + ArrayList<DatanodeDetails> datanodes = new ArrayList<>(); for (int i = 0; i < count; i++) { - datanodes.add(getDatanodeID(nodeManager)); + datanodes.add(getDatanodeDetails(nodeManager)); } return datanodes; } /** - * Get a datanode ID. + * Get a datanode details. * - * @return DatanodeID + * @return DatanodeDetails */ - public static DatanodeID getDatanodeID() { - return getDatanodeID(UUID.randomUUID().toString()); + public static DatanodeDetails getDatanodeDetails() { + return getDatanodeDetails(UUID.randomUUID().toString()); } - private static DatanodeID getDatanodeID(String uuid) { + private static DatanodeDetails getDatanodeDetails(String uuid) { Random random = new Random(); String ipAddress = random.nextInt(256) + "." + random.nextInt(256) + "." + random .nextInt(256) + "." + random.nextInt(256); String hostName = uuid; - return new DatanodeID(ipAddress, hostName, uuid, 0, 0, 0, 0); + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + builder.setUuid(uuid) + .setHostName("localhost") + .setIpAddress(ipAddress) + .setInfoPort(0) + .setInfoSecurePort(0) + .setContainerPort(0) + .setRatisPort(0) + .setOzoneRestPort(0); + return builder.build(); } /** - * Get specified number of datanode IDs. + * Get specified number of list of DatanodeDetails. * * @param count - number of datanode IDs needed. * @return */ - public static List<DatanodeID> getDatanodeIDs(int count) { - ArrayList<DatanodeID> datanodes = new ArrayList<>(); + public static List<DatanodeDetails> getListOfDatanodeDetails(int count) { + ArrayList<DatanodeDetails> datanodes = new ArrayList<>(); for (int i = 0; i < count; i++) { - datanodes.add(getDatanodeID()); + datanodes.add(getDatanodeDetails()); } return datanodes; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java index 47aa8dc..c28f835 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java @@ -112,8 +112,8 @@ public class TestBlockManager { type, factor, containerOwner); Assert.assertNotNull(block); Pipeline pipeline = blockManager.getBlock(block.getKey()); - Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(), - block.getPipeline().getLeader().getDatanodeUuid()); + Assert.assertEquals(pipeline.getLeader().getUuid(), + block.getPipeline().getLeader().getUuid()); } @Test @@ -131,8 +131,8 @@ public class TestBlockManager { // cleaned yet. String deletedKeyName = blockManager.getDeletedKeyName(block.getKey()); Pipeline pipeline = blockManager.getBlock(deletedKeyName); - Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(), - block.getPipeline().getLeader().getDatanodeUuid()); + Assert.assertEquals(pipeline.getLeader().getUuid(), + block.getPipeline().getLeader().getUuid()); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java index 3872cea..6e88339 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java @@ -19,8 +19,8 @@ package org.apache.hadoop.ozone.scm.block; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdsl.conf.OzoneConfiguration; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationType; import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.ReplicationFactor; @@ -260,8 +260,26 @@ public class TestDeletedBlockLog { int count = 0; String containerName = null; - DatanodeID dnID1 = new DatanodeID(null, null, "node1", 0, 0, 0, 0); - DatanodeID dnID2 = new DatanodeID(null, null, "node2", 0, 0, 0, 0); + DatanodeDetails dnDd1 = DatanodeDetails.newBuilder() + .setUuid("node1") + .setIpAddress("127.0.0.1") + .setHostName("localhost") + .setInfoPort(0) + .setInfoSecurePort(0) + .setContainerPort(0) + .setRatisPort(0) + .setOzoneRestPort(0) + .build(); + DatanodeDetails dnId2 = DatanodeDetails.newBuilder() + .setUuid("node2") + .setIpAddress("127.0.0.1") + .setHostName("localhost") + .setInfoPort(0) + .setInfoSecurePort(0) + .setContainerPort(0) + .setRatisPort(0) + .setOzoneRestPort(0) + .build(); Mapping mappingService = mock(ContainerMapping.class); // Creates {TXNum} TX in the log. for (Map.Entry<String, List<String>> entry : generateData(txNum) @@ -273,9 +291,9 @@ public class TestDeletedBlockLog { // make TX[1-6] for datanode1; TX[7-10] for datanode2 if (count <= (maximumAllowedTXNum + 1)) { - mockContainerInfo(mappingService, containerName, dnID1); + mockContainerInfo(mappingService, containerName, dnDd1); } else { - mockContainerInfo(mappingService, containerName, dnID2); + mockContainerInfo(mappingService, containerName, dnId2); } } @@ -285,9 +303,9 @@ public class TestDeletedBlockLog { deletedBlockLog.getTransactions(transactions); List<Long> txIDs = new LinkedList<>(); - for (DatanodeID dnID : transactions.getDatanodes()) { + for (UUID id : transactions.getDatanodeIDs()) { List<DeletedBlocksTransaction> txs = transactions - .getDatanodeTransactions(dnID); + .getDatanodeTransactions(id); for (DeletedBlocksTransaction tx : txs) { txIDs.add(tx.getTxID()); } @@ -303,9 +321,9 @@ public class TestDeletedBlockLog { Assert.assertFalse(transactions.isFull()); // The number of TX in dnID1 won't more than maximum value. Assert.assertEquals(maximumAllowedTXNum, - transactions.getDatanodeTransactions(dnID1).size()); + transactions.getDatanodeTransactions(dnDd1.getUuid()).size()); - int size = transactions.getDatanodeTransactions(dnID2).size(); + int size = transactions.getDatanodeTransactions(dnId2.getUuid()).size(); // add duplicated container in dnID2, this should be failed. DeletedBlocksTransaction.Builder builder = DeletedBlocksTransaction.newBuilder(); @@ -316,7 +334,7 @@ public class TestDeletedBlockLog { // The number of TX in dnID2 should not be changed. Assert.assertEquals(size, - transactions.getDatanodeTransactions(dnID2).size()); + transactions.getDatanodeTransactions(dnId2.getUuid()).size()); // Add new TX in dnID2, then dnID2 will reach maximum value. containerName = "newContainer"; @@ -324,18 +342,18 @@ public class TestDeletedBlockLog { builder.setTxID(12); builder.setContainerName(containerName); builder.setCount(0); - mockContainerInfo(mappingService, containerName, dnID2); + mockContainerInfo(mappingService, containerName, dnId2); transactions.addTransaction(builder.build()); // Since all node are full, then transactions is full. Assert.assertTrue(transactions.isFull()); } private void mockContainerInfo(Mapping mappingService, String containerName, - DatanodeID dnID) throws IOException { + DatanodeDetails dd) throws IOException { PipelineChannel pipelineChannel = new PipelineChannel("fake", LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake"); - pipelineChannel.addMember(dnID); + pipelineChannel.addMember(dd); Pipeline pipeline = new Pipeline(containerName, pipelineChannel); ContainerInfo.Builder builder = new ContainerInfo.Builder(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3440ca6e/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index fda8190..587e60e 100644 --- a/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdsl/server-scm/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -16,8 +16,8 @@ */ package org.apache.hadoop.ozone.scm.container; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdsl.protocol.DatanodeDetails; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -36,7 +36,7 @@ import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; -import static org.apache.hadoop.ozone.scm.TestUtils.getDatanodeID; +import static org.apache.hadoop.ozone.scm.TestUtils.getDatanodeDetails; import org.mockito.Mockito; import org.assertj.core.util.Preconditions; @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos.NodeState.DEAD; import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos.NodeState @@ -69,13 +70,13 @@ public class MockNodeManager implements NodeManager { new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.STALE), new NodeData(OzoneConsts.TB, 200L * OzoneConsts.GB, NodeData.DEAD) }; - private final List<DatanodeID> healthyNodes; - private final List<DatanodeID> staleNodes; - private final List<DatanodeID> deadNodes; - private final Map<String, SCMNodeStat> nodeMetricMap; + private final List<DatanodeDetails> healthyNodes; + private final List<DatanodeDetails> staleNodes; + private final List<DatanodeDetails> deadNodes; + private final Map<UUID, SCMNodeStat> nodeMetricMap; private final SCMNodeStat aggregateStat; private boolean chillmode; - private final Map<DatanodeID, List<SCMCommand>> commandMap; + private final Map<UUID, List<SCMCommand>> commandMap; public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); @@ -85,8 +86,8 @@ public class MockNodeManager implements NodeManager { aggregateStat = new SCMNodeStat(); if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { - DatanodeID id = getDatanodeID(); - populateNodeMetric(id, x); + DatanodeDetails dd = getDatanodeDetails(); + populateNodeMetric(dd, x); } } chillmode = false; @@ -96,28 +97,28 @@ public class MockNodeManager implements NodeManager { /** * Invoked from ctor to create some node Metrics. * - * @param datanodeID - Datanode ID + * @param datanodeDetails - Datanode details */ - private void populateNodeMetric(DatanodeID datanodeID, int x) { + private void populateNodeMetric(DatanodeDetails datanodeDetails, int x) { SCMNodeStat newStat = new SCMNodeStat(); long remaining = NODES[x % NODES.length].capacity - NODES[x % NODES.length].used; newStat.set( (NODES[x % NODES.length].capacity), (NODES[x % NODES.length].used), remaining); - this.nodeMetricMap.put(datanodeID.toString(), newStat); + this.nodeMetricMap.put(datanodeDetails.getUuid(), newStat); aggregateStat.add(newStat); if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) { - healthyNodes.add(datanodeID); + healthyNodes.add(datanodeDetails); } if (NODES[x % NODES.length].getCurrentState() == NodeData.STALE) { - staleNodes.add(datanodeID); + staleNodes.add(datanodeDetails); } if (NODES[x % NODES.length].getCurrentState() == NodeData.DEAD) { - deadNodes.add(datanodeID); + deadNodes.add(datanodeDetails); } } @@ -137,7 +138,8 @@ public class MockNodeManager implements NodeManager { * @throws UnregisteredNodeException */ @Override - public void removeNode(DatanodeID node) throws UnregisteredNodeException { + public void removeNode(DatanodeDetails node) + throws UnregisteredNodeException { } @@ -148,7 +150,7 @@ public class MockNodeManager implements NodeManager { * @return List of Datanodes that are Heartbeating SCM. */ @Override - public List<DatanodeID> getNodes(HdslProtos.NodeState nodestate) { + public List<DatanodeDetails> getNodes(HdslProtos.NodeState nodestate) { if (nodestate == HEALTHY) { return healthyNodes; } @@ -172,7 +174,7 @@ public class MockNodeManager implements NodeManager { */ @Override public int getNodeCount(HdslProtos.NodeState nodestate) { - List<DatanodeID> nodes = getNodes(nodestate); + List<DatanodeDetails> nodes = getNodes(nodestate); if (nodes != null) { return nodes.size(); } @@ -182,10 +184,10 @@ public class MockNodeManager implements NodeManager { /** * Get all datanodes known to SCM. * - * @return List of DatanodeIDs known to SCM. + * @return List of DatanodeDetails known to SCM. */ @Override - public List<DatanodeID> getAllNodes() { + public List<DatanodeDetails> getAllNodes() { return null; } @@ -261,18 +263,18 @@ public class MockNodeManager implements NodeManager { * @return a list of individual node stats (live/stale but not dead). */ @Override - public Map<String, SCMNodeStat> getNodeStats() { + public Map<UUID, SCMNodeStat> getNodeStats() { return nodeMetricMap; } /** * Return the node stat of the specified datanode. - * @param datanodeID - datanode ID. + * @param datanodeDetails - datanode details. * @return node stat if it is live/stale, null if it is dead or does't exist. */ @Override - public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { - return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString())); + public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { + return new SCMNodeMetric(nodeMetricMap.get(datanodeDetails.getUuid())); } @Override @@ -293,36 +295,36 @@ public class MockNodeManager implements NodeManager { /** * Returns the node state of a specific node. * - * @param id - DatanodeID + * @param dd - DatanodeDetails * @return Healthy/Stale/Dead. */ @Override - public HdslProtos.NodeState getNodeState(DatanodeID id) { + public HdslProtos.NodeState getNodeState(DatanodeDetails dd) { return null; } @Override - public void addDatanodeCommand(DatanodeID id, SCMCommand command) { - if(commandMap.containsKey(id)) { - List<SCMCommand> commandList = commandMap.get(id); + public void addDatanodeCommand(UUID dnId, SCMCommand command) { + if(commandMap.containsKey(dnId)) { + List<SCMCommand> commandList = commandMap.get(dnId); Preconditions.checkNotNull(commandList); commandList.add(command); } else { List<SCMCommand> commandList = new LinkedList<>(); commandList.add(command); - commandMap.put(id, commandList); + commandMap.put(dnId, commandList); } } // Returns the number of commands that is queued to this node manager. - public int getCommandCount(DatanodeID id) { - List<SCMCommand> list = commandMap.get(id); + public int getCommandCount(DatanodeDetails dd) { + List<SCMCommand> list = commandMap.get(dd); return (list == null) ? 0 : list.size(); } - public void clearCommandQueue(DatanodeID id) { - if(commandMap.containsKey(id)) { - commandMap.put(id, new LinkedList<>()); + public void clearCommandQueue(UUID dnId) { + if(commandMap.containsKey(dnId)) { + commandMap.put(dnId, new LinkedList<>()); } } @@ -373,29 +375,29 @@ public class MockNodeManager implements NodeManager { * Register the node if the node finds that it is not registered with any * SCM. * - * @param datanodeID - Send datanodeID with Node info, but datanode UUID is - * empty. Server returns a datanodeID for the given node. + * @param datanodeDetails DatanodeDetailsProto * @return SCMHeartbeatResponseProto */ @Override - public SCMCommand register(DatanodeID datanodeID) { + public SCMCommand register(HdslProtos.DatanodeDetailsProto datanodeDetails) { return null; } /** * Send heartbeat to indicate the datanode is alive and doing well. * - * @param datanodeID - Datanode ID. + * @param datanodeDetails - Datanode ID. * @param nodeReport - node report. * @param containerReportState - container report state. * @return SCMheartbeat response list */ @Override - public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID, + public List<SCMCommand> sendHeartbeat( + HdslProtos.DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport, ReportState containerReportState) { - if ((datanodeID != null) && (nodeReport != null) && (nodeReport + if ((datanodeDetails != null) && (nodeReport != null) && (nodeReport .getStorageReportCount() > 0)) { - SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString()); + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); long totalCapacity = 0L; long totalRemaining = 0L; @@ -409,7 +411,8 @@ public class MockNodeManager implements NodeManager { aggregateStat.subtract(stat); stat.set(totalCapacity, totalScmUsed, totalRemaining); aggregateStat.add(stat); - nodeMetricMap.put(datanodeID.toString(), stat); + nodeMetricMap.put(DatanodeDetails + .getFromProtoBuf(datanodeDetails).getUuid(), stat); } return null; @@ -427,32 +430,32 @@ public class MockNodeManager implements NodeManager { /** * Makes it easy to add a container. * - * @param datanodeID datanode ID + * @param datanodeDetails datanode details * @param size number of bytes. */ - public void addContainer(DatanodeID datanodeID, long size) { - SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString()); + public void addContainer(DatanodeDetails datanodeDetails, long size) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); if (stat != null) { aggregateStat.subtract(stat); stat.getCapacity().add(size); aggregateStat.add(stat); - nodeMetricMap.put(datanodeID.toString(), stat); + nodeMetricMap.put(datanodeDetails.getUuid(), stat); } } /** * Makes it easy to simulate a delete of a container. * - * @param datanodeID datanode ID + * @param datanodeDetails datanode Details * @param size number of bytes. */ - public void delContainer(DatanodeID datanodeID, long size) { - SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString()); + public void delContainer(DatanodeDetails datanodeDetails, long size) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid()); if (stat != null) { aggregateStat.subtract(stat); stat.getCapacity().subtract(size); aggregateStat.add(stat); - nodeMetricMap.put(datanodeID.toString(), stat); + nodeMetricMap.put(datanodeDetails.getUuid(), stat); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
