http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java deleted file mode 100644 index 88f984b..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ /dev/null @@ -1,725 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.*; -import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; -import org.apache.hadoop.hdds.server.events.Event; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.common.statemachine - .InvalidStateTransitionException; -import org.apache.hadoop.ozone.common.statemachine.StateMachine; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.util.*; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; - -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_DEADNODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_STALENODE_INTERVAL; - -/** - * NodeStateManager maintains the state of all the datanodes in the cluster. All - * the node state change should happen only via NodeStateManager. It also - * runs a heartbeat thread which periodically updates the node state. - * <p> - * The getNode(byState) functions make copy of node maps and then creates a list - * based on that. It should be assumed that these get functions always report - * *stale* information. For example, getting the deadNodeCount followed by - * getNodes(DEAD) could very well produce totally different count. Also - * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not - * guaranteed to add up to the total nodes that we know off. Please treat all - * get functions in this file as a snap-shot of information that is inconsistent - * as soon as you read it. - */ -public class NodeStateManager implements Runnable, Closeable { - - /** - * Node's life cycle events. - */ - private enum NodeLifeCycleEvent { - TIMEOUT, RESTORE, RESURRECT, DECOMMISSION, DECOMMISSIONED - } - - private static final Logger LOG = LoggerFactory - .getLogger(NodeStateManager.class); - - /** - * StateMachine for node lifecycle. - */ - private final StateMachine<NodeState, NodeLifeCycleEvent> stateMachine; - /** - * This is the map which maintains the current state of all datanodes. - */ - private final NodeStateMap nodeStateMap; - /** - * Maintains the mapping from node to pipelines a node is part of. - */ - private final Node2PipelineMap node2PipelineMap; - /** - * Maintains the map from node to ContainerIDs for the containers - * available on the node. - */ - private final Node2ContainerMap node2ContainerMap; - /** - * Used for publishing node state change events. - */ - private final EventPublisher eventPublisher; - /** - * Maps the event to be triggered when a node state us updated. - */ - private final Map<NodeState, Event<DatanodeDetails>> state2EventMap; - /** - * ExecutorService used for scheduling heartbeat processing thread. - */ - private final ScheduledExecutorService executorService; - /** - * The frequency in which we have run the heartbeat processing thread. - */ - private final long heartbeatCheckerIntervalMs; - /** - * The timeout value which will be used for marking a datanode as stale. - */ - private final long staleNodeIntervalMs; - /** - * The timeout value which will be used for marking a datanode as dead. - */ - private final long deadNodeIntervalMs; - - /** - * Constructs a NodeStateManager instance with the given configuration. - * - * @param conf Configuration - */ - public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { - this.nodeStateMap = new NodeStateMap(); - this.node2PipelineMap = new Node2PipelineMap(); - this.node2ContainerMap = new Node2ContainerMap(); - this.eventPublisher = eventPublisher; - this.state2EventMap = new HashMap<>(); - initialiseState2EventMap(); - Set<NodeState> finalStates = new HashSet<>(); - finalStates.add(NodeState.DECOMMISSIONED); - this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates); - initializeStateMachine(); - heartbeatCheckerIntervalMs = HddsServerUtil - .getScmheartbeatCheckerInterval(conf); - staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); - deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf); - Preconditions.checkState(heartbeatCheckerIntervalMs > 0, - OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + " should be greater than 0."); - Preconditions.checkState(staleNodeIntervalMs < deadNodeIntervalMs, - OZONE_SCM_STALENODE_INTERVAL + " should be less than" + - OZONE_SCM_DEADNODE_INTERVAL); - executorService = HadoopExecutors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); - executorService.schedule(this, heartbeatCheckerIntervalMs, - TimeUnit.MILLISECONDS); - } - - /** - * Populates state2event map. - */ - private void initialiseState2EventMap() { - state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE); - state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE); - } - - /* - * - * Node and State Transition Mapping: - * - * State: HEALTHY -------------------> STALE - * Event: TIMEOUT - * - * State: STALE -------------------> DEAD - * Event: TIMEOUT - * - * State: STALE -------------------> HEALTHY - * Event: RESTORE - * - * State: DEAD -------------------> HEALTHY - * Event: RESURRECT - * - * State: HEALTHY -------------------> DECOMMISSIONING - * Event: DECOMMISSION - * - * State: STALE -------------------> DECOMMISSIONING - * Event: DECOMMISSION - * - * State: DEAD -------------------> DECOMMISSIONING - * Event: DECOMMISSION - * - * State: DECOMMISSIONING -------------------> DECOMMISSIONED - * Event: DECOMMISSIONED - * - * Node State Flow - * - * +--------------------------------------------------------+ - * | (RESURRECT) | - * | +--------------------------+ | - * | | (RESTORE) | | - * | | | | - * V V | | - * [HEALTHY]------------------->[STALE]------------------->[DEAD] - * | (TIMEOUT) | (TIMEOUT) | - * | | | - * | | | - * | | | - * | | | - * | (DECOMMISSION) | (DECOMMISSION) | (DECOMMISSION) - * | V | - * +------------------->[DECOMMISSIONING]<----------------+ - * | - * | (DECOMMISSIONED) - * | - * V - * [DECOMMISSIONED] - * - */ - - /** - * Initializes the lifecycle of node state machine. - */ - private void initializeStateMachine() { - stateMachine.addTransition( - NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT); - stateMachine.addTransition( - NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT); - stateMachine.addTransition( - NodeState.STALE, NodeState.HEALTHY, NodeLifeCycleEvent.RESTORE); - stateMachine.addTransition( - NodeState.DEAD, NodeState.HEALTHY, NodeLifeCycleEvent.RESURRECT); - stateMachine.addTransition( - NodeState.HEALTHY, NodeState.DECOMMISSIONING, - NodeLifeCycleEvent.DECOMMISSION); - stateMachine.addTransition( - NodeState.STALE, NodeState.DECOMMISSIONING, - NodeLifeCycleEvent.DECOMMISSION); - stateMachine.addTransition( - NodeState.DEAD, NodeState.DECOMMISSIONING, - NodeLifeCycleEvent.DECOMMISSION); - stateMachine.addTransition( - NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED, - NodeLifeCycleEvent.DECOMMISSIONED); - - } - - /** - * Adds a new node to the state manager. - * - * @param datanodeDetails DatanodeDetails - * - * @throws NodeAlreadyExistsException if the node is already present - */ - public void addNode(DatanodeDetails datanodeDetails) - throws NodeAlreadyExistsException { - nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState()); - eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails); - } - - /** - * Adds a pipeline in the node2PipelineMap. - * @param pipeline - Pipeline to be added - */ - public void addPipeline(Pipeline pipeline) { - node2PipelineMap.addPipeline(pipeline); - } - - /** - * Get information about the node. - * - * @param datanodeDetails DatanodeDetails - * - * @return DatanodeInfo - * - * @throws NodeNotFoundException if the node is not present - */ - public DatanodeInfo getNode(DatanodeDetails datanodeDetails) - throws NodeNotFoundException { - return nodeStateMap.getNodeInfo(datanodeDetails.getUuid()); - } - - /** - * Updates the last heartbeat time of the node. - * - * @throws NodeNotFoundException if the node is not present - */ - public void updateLastHeartbeatTime(DatanodeDetails datanodeDetails) - throws NodeNotFoundException { - nodeStateMap.getNodeInfo(datanodeDetails.getUuid()) - .updateLastHeartbeatTime(); - } - - /** - * Returns the current state of the node. - * - * @param datanodeDetails DatanodeDetails - * - * @return NodeState - * - * @throws NodeNotFoundException if the node is not present - */ - public NodeState getNodeState(DatanodeDetails datanodeDetails) - throws NodeNotFoundException { - return nodeStateMap.getNodeState(datanodeDetails.getUuid()); - } - - /** - * Returns all the node which are in healthy state. - * - * @return list of healthy nodes - */ - public List<DatanodeDetails> getHealthyNodes() { - return getNodes(NodeState.HEALTHY); - } - - /** - * Returns all the node which are in stale state. - * - * @return list of stale nodes - */ - public List<DatanodeDetails> getStaleNodes() { - return getNodes(NodeState.STALE); - } - - /** - * Returns all the node which are in dead state. - * - * @return list of dead nodes - */ - public List<DatanodeDetails> getDeadNodes() { - return getNodes(NodeState.DEAD); - } - - /** - * Returns all the node which are in the specified state. - * - * @param state NodeState - * - * @return list of nodes - */ - public List<DatanodeDetails> getNodes(NodeState state) { - List<DatanodeDetails> nodes = new LinkedList<>(); - nodeStateMap.getNodes(state).forEach( - uuid -> { - try { - nodes.add(nodeStateMap.getNodeDetails(uuid)); - } catch (NodeNotFoundException e) { - // This should not happen unless someone else other than - // NodeStateManager is directly modifying NodeStateMap and removed - // the node entry after we got the list of UUIDs. - LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); - } - }); - return nodes; - } - - /** - * Returns all the nodes which have registered to NodeStateManager. - * - * @return all the managed nodes - */ - public List<DatanodeDetails> getAllNodes() { - List<DatanodeDetails> nodes = new LinkedList<>(); - nodeStateMap.getAllNodes().forEach( - uuid -> { - try { - nodes.add(nodeStateMap.getNodeDetails(uuid)); - } catch (NodeNotFoundException e) { - // This should not happen unless someone else other than - // NodeStateManager is directly modifying NodeStateMap and removed - // the node entry after we got the list of UUIDs. - LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); - } - }); - return nodes; - } - - /** - * Gets set of pipelineID a datanode belongs to. - * @param dnId - Datanode ID - * @return Set of PipelineID - */ - public Set<PipelineID> getPipelineByDnID(UUID dnId) { - return node2PipelineMap.getPipelines(dnId); - } - - /** - * Returns the count of healthy nodes. - * - * @return healthy node count - */ - public int getHealthyNodeCount() { - return getNodeCount(NodeState.HEALTHY); - } - - /** - * Returns the count of stale nodes. - * - * @return stale node count - */ - public int getStaleNodeCount() { - return getNodeCount(NodeState.STALE); - } - - /** - * Returns the count of dead nodes. - * - * @return dead node count - */ - public int getDeadNodeCount() { - return getNodeCount(NodeState.DEAD); - } - - /** - * Returns the count of nodes in specified state. - * - * @param state NodeState - * - * @return node count - */ - public int getNodeCount(NodeState state) { - return nodeStateMap.getNodeCount(state); - } - - /** - * Returns the count of all nodes managed by NodeStateManager. - * - * @return node count - */ - public int getTotalNodeCount() { - return nodeStateMap.getTotalNodeCount(); - } - - /** - * Removes a node from NodeStateManager. - * - * @param datanodeDetails DatanodeDetails - * - * @throws NodeNotFoundException if the node is not present - */ - public void removeNode(DatanodeDetails datanodeDetails) - throws NodeNotFoundException { - nodeStateMap.removeNode(datanodeDetails.getUuid()); - } - - /** - * Returns the current stats of the node. - * - * @param uuid node id - * - * @return SCMNodeStat - * - * @throws NodeNotFoundException if the node is not present - */ - public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException { - return nodeStateMap.getNodeStat(uuid); - } - - /** - * Returns a unmodifiable copy of nodeStats. - * @return map with node stats. - */ - public Map<UUID, SCMNodeStat> getNodeStatsMap() { - return nodeStateMap.getNodeStats(); - } - - /** - * Set the stat for the node. - * - * @param uuid node id. - * - * @param newstat new stat that will set to the specify node. - */ - public void setNodeStat(UUID uuid, SCMNodeStat newstat) { - nodeStateMap.setNodeStat(uuid, newstat); - } - - /** - * Remove the current stats of the specify node. - * - * @param uuid node id - * - * @return SCMNodeStat the stat removed from the node. - * - * @throws NodeNotFoundException if the node is not present. - */ - public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException { - return nodeStateMap.removeNodeStat(uuid); - } - - /** - * Removes a pipeline from the node2PipelineMap. - * @param pipeline - Pipeline to be removed - */ - public void removePipeline(Pipeline pipeline) { - node2PipelineMap.removePipeline(pipeline); - } - /** - * Update set of containers available on a datanode. - * @param uuid - DatanodeID - * @param containerIds - Set of containerIDs - * @throws SCMException - if datanode is not known. For new datanode use - * addDatanodeInContainerMap call. - */ - public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds) - throws SCMException { - node2ContainerMap.setContainersForDatanode(uuid, containerIds); - } - - /** - * Process containerReport received from datanode. - * @param uuid - DataonodeID - * @param containerIds - Set of containerIDs - * @return The result after processing containerReport - */ - public ReportResult<ContainerID> processContainerReport(UUID uuid, - Set<ContainerID> containerIds) { - return node2ContainerMap.processReport(uuid, containerIds); - } - - /** - * Return set of containerIDs available on a datanode. - * @param uuid - DatanodeID - * @return - set of containerIDs - */ - public Set<ContainerID> getContainers(UUID uuid) { - return node2ContainerMap.getContainers(uuid); - } - - /** - * Insert a new datanode with set of containerIDs for containers available - * on it. - * @param uuid - DatanodeID - * @param containerIDs - Set of ContainerIDs - * @throws SCMException - if datanode already exists - */ - public void addDatanodeInContainerMap(UUID uuid, - Set<ContainerID> containerIDs) throws SCMException { - node2ContainerMap.insertNewDatanode(uuid, containerIDs); - } - - /** - * Move Stale or Dead node to healthy if we got a heartbeat from them. - * Move healthy nodes to stale nodes if it is needed. - * Move Stales node to dead if needed. - * - * @see Thread#run() - */ - @Override - public void run() { - - /* - * - * staleNodeDeadline healthyNodeDeadline - * | | - * Dead | Stale | Healthy - * Node | Node | Node - * Window | Window | Window - * ----------------+----------------------------------+-------------------> - * >>-->> time-line >>-->> - * - * Here is the logic of computing the health of a node. -âââââ* -âââââ*â1. We get the current time and look back that the time -âââââ*â when we got a heartbeat from a node. -âââââ*â -âââââ*â2. If the last heartbeat was within the window of healthy node we mark -âââââ*â it as healthy. -âââââ*â -âââââ*â3. If the last HB Time stamp is longer and falls within the window of -âââââ*â Stale Node time, we will mark it as Stale. -âââââ*â -âââââ*â4. If the last HB time is older than the Stale Window, then the node is -âââââ* marked as dead. - * - * The Processing starts from current time and looks backwards in time. - */ - long processingStartTime = Time.monotonicNow(); - // After this time node is considered to be stale. - long healthyNodeDeadline = processingStartTime - staleNodeIntervalMs; - // After this time node is considered to be dead. - long staleNodeDeadline = processingStartTime - deadNodeIntervalMs; - - Predicate<Long> healthyNodeCondition = - (lastHbTime) -> lastHbTime >= healthyNodeDeadline; - // staleNodeCondition is superset of stale and dead node - Predicate<Long> staleNodeCondition = - (lastHbTime) -> lastHbTime < healthyNodeDeadline; - Predicate<Long> deadNodeCondition = - (lastHbTime) -> lastHbTime < staleNodeDeadline; - try { - for (NodeState state : NodeState.values()) { - List<UUID> nodes = nodeStateMap.getNodes(state); - for (UUID id : nodes) { - DatanodeInfo node = nodeStateMap.getNodeInfo(id); - switch (state) { - case HEALTHY: - // Move the node to STALE if the last heartbeat time is less than - // configured stale-node interval. - updateNodeState(node, staleNodeCondition, state, - NodeLifeCycleEvent.TIMEOUT); - break; - case STALE: - // Move the node to DEAD if the last heartbeat time is less than - // configured dead-node interval. - updateNodeState(node, deadNodeCondition, state, - NodeLifeCycleEvent.TIMEOUT); - // Restore the node if we have received heartbeat before configured - // stale-node interval. - updateNodeState(node, healthyNodeCondition, state, - NodeLifeCycleEvent.RESTORE); - break; - case DEAD: - // Resurrect the node if we have received heartbeat before - // configured stale-node interval. - updateNodeState(node, healthyNodeCondition, state, - NodeLifeCycleEvent.RESURRECT); - break; - // We don't do anything for DECOMMISSIONING and DECOMMISSIONED in - // heartbeat processing. - case DECOMMISSIONING: - case DECOMMISSIONED: - default: - } - } - } - } catch (NodeNotFoundException e) { - // This should not happen unless someone else other than - // NodeStateManager is directly modifying NodeStateMap and removed - // the node entry after we got the list of UUIDs. - LOG.error("Inconsistent NodeStateMap! " + nodeStateMap); - } - long processingEndTime = Time.monotonicNow(); - //If we have taken too much time for HB processing, log that information. - if ((processingEndTime - processingStartTime) > - heartbeatCheckerIntervalMs) { - LOG.error("Total time spend processing datanode HB's is greater than " + - "configured values for datanode heartbeats. Please adjust the" + - " heartbeat configs. Time Spend on HB processing: {} seconds " + - "Datanode heartbeat Interval: {} seconds.", - TimeUnit.MILLISECONDS - .toSeconds(processingEndTime - processingStartTime), - heartbeatCheckerIntervalMs); - } - - // we purposefully make this non-deterministic. Instead of using a - // scheduleAtFixedFrequency we will just go to sleep - // and wake up at the next rendezvous point, which is currentTime + - // heartbeatCheckerIntervalMs. This leads to the issue that we are now - // heart beating not at a fixed cadence, but clock tick + time taken to - // work. - // - // This time taken to work can skew the heartbeat processor thread. - // The reason why we don't care is because of the following reasons. - // - // 1. checkerInterval is general many magnitudes faster than datanode HB - // frequency. - // - // 2. if we have too much nodes, the SCM would be doing only HB - // processing, this could lead to SCM's CPU starvation. With this - // approach we always guarantee that HB thread sleeps for a little while. - // - // 3. It is possible that we will never finish processing the HB's in the - // thread. But that means we have a mis-configured system. We will warn - // the users by logging that information. - // - // 4. And the most important reason, heartbeats are not blocked even if - // this thread does not run, they will go into the processing queue. - - if (!Thread.currentThread().isInterrupted() && - !executorService.isShutdown()) { - executorService.schedule(this, heartbeatCheckerIntervalMs, - TimeUnit.MILLISECONDS); - } else { - LOG.info("Current Thread is interrupted, shutting down HB processing " + - "thread for Node Manager."); - } - - } - - /** - * Updates the node state if the condition satisfies. - * - * @param node DatanodeInfo - * @param condition condition to check - * @param state current state of node - * @param lifeCycleEvent NodeLifeCycleEvent to be applied if condition - * matches - * - * @throws NodeNotFoundException if the node is not present - */ - private void updateNodeState(DatanodeInfo node, Predicate<Long> condition, - NodeState state, NodeLifeCycleEvent lifeCycleEvent) - throws NodeNotFoundException { - try { - if (condition.test(node.getLastHeartbeatTime())) { - NodeState newState = stateMachine.getNextState(state, lifeCycleEvent); - nodeStateMap.updateNodeState(node.getUuid(), state, newState); - if (state2EventMap.containsKey(newState)) { - eventPublisher.fireEvent(state2EventMap.get(newState), node); - } - } - } catch (InvalidStateTransitionException e) { - LOG.warn("Invalid state transition of node {}." + - " Current state: {}, life cycle event: {}", - node, state, lifeCycleEvent); - } - } - - @Override - public void close() { - executorService.shutdown(); - try { - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { - LOG.error("Unable to shutdown NodeStateManager properly."); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - Thread.currentThread().interrupt(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java deleted file mode 100644 index 36a6f15..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ /dev/null @@ -1,599 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.node; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; -import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.node.states.ReportResult; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.hdds.scm.VersionInfo; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto - .ErrorCode; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.StorageReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; -import org.apache.hadoop.ozone.protocol.VersionResponse; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; -import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.ObjectName; -import java.io.IOException; -import java.net.InetAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Maintains information about the Datanodes on SCM side. - * <p> - * Heartbeats under SCM is very simple compared to HDFS heartbeatManager. - * <p> - * The getNode(byState) functions make copy of node maps and then creates a list - * based on that. It should be assumed that these get functions always report - * *stale* information. For example, getting the deadNodeCount followed by - * getNodes(DEAD) could very well produce totally different count. Also - * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not - * guaranteed to add up to the total nodes that we know off. Please treat all - * get functions in this file as a snap-shot of information that is inconsistent - * as soon as you read it. - */ -public class SCMNodeManager - implements NodeManager, StorageContainerNodeProtocol { - - @VisibleForTesting - static final Logger LOG = - LoggerFactory.getLogger(SCMNodeManager.class); - - private final NodeStateManager nodeStateManager; - // Should we maintain aggregated stats? If this is not frequently used, we - // can always calculate it from nodeStats whenever required. - // Aggregated node stats - private SCMNodeStat scmStat; - // Should we create ChillModeManager and extract all the chill mode logic - // to a new class? - private int chillModeNodeCount; - private final String clusterID; - private final VersionInfo version; - /** - * During start up of SCM, it will enter into chill mode and will be there - * until number of Datanodes registered reaches {@code chillModeNodeCount}. - * This flag is for tracking startup chill mode. - */ - private AtomicBoolean inStartupChillMode; - /** - * Administrator can put SCM into chill mode manually. - * This flag is for tracking manual chill mode. - */ - private AtomicBoolean inManualChillMode; - private final CommandQueue commandQueue; - // Node manager MXBean - private ObjectName nmInfoBean; - - // Node pool manager. - private final StorageContainerManager scmManager; - - /** - * Constructs SCM machine Manager. - */ - public SCMNodeManager(OzoneConfiguration conf, String clusterID, - StorageContainerManager scmManager, EventPublisher eventPublisher) - throws IOException { - this.nodeStateManager = new NodeStateManager(conf, eventPublisher); - this.scmStat = new SCMNodeStat(); - this.clusterID = clusterID; - this.version = VersionInfo.getLatestVersion(); - this.commandQueue = new CommandQueue(); - // TODO: Support this value as a Percentage of known machines. - this.chillModeNodeCount = 1; - this.inStartupChillMode = new AtomicBoolean(true); - this.inManualChillMode = new AtomicBoolean(false); - this.scmManager = scmManager; - LOG.info("Entering startup chill mode."); - registerMXBean(); - } - - private void registerMXBean() { - this.nmInfoBean = MBeans.register("SCMNodeManager", - "SCMNodeManagerInfo", this); - } - - private void unregisterMXBean() { - if(this.nmInfoBean != null) { - MBeans.unregister(this.nmInfoBean); - this.nmInfoBean = null; - } - } - - /** - * Removes a data node from the management of this Node Manager. - * - * @param node - DataNode. - * @throws NodeNotFoundException - */ - @Override - public void removeNode(DatanodeDetails node) throws NodeNotFoundException { - nodeStateManager.removeNode(node); - } - - /** - * Gets all datanodes that are in a certain state. This function works by - * taking a snapshot of the current collection and then returning the list - * from that collection. This means that real map might have changed by the - * time we return this list. - * - * @return List of Datanodes that are known to SCM in the requested state. - */ - @Override - public List<DatanodeDetails> getNodes(NodeState nodestate) { - return nodeStateManager.getNodes(nodestate); - } - - /** - * Returns all datanodes that are known to SCM. - * - * @return List of DatanodeDetails - */ - @Override - public List<DatanodeDetails> getAllNodes() { - return nodeStateManager.getAllNodes(); - } - - /** - * Get the minimum number of nodes to get out of Chill mode. - * - * @return int - */ - @Override - public int getMinimumChillModeNodes() { - return chillModeNodeCount; - } - - /** - * Sets the Minimum chill mode nodes count, used only in testing. - * - * @param count - Number of nodes. - */ - @VisibleForTesting - public void setMinimumChillModeNodes(int count) { - chillModeNodeCount = count; - } - - /** - * Returns chill mode Status string. - * @return String - */ - @Override - public String getChillModeStatus() { - if (inStartupChillMode.get()) { - return "Still in chill mode, waiting on nodes to report in." + - String.format(" %d nodes reported, minimal %d nodes required.", - nodeStateManager.getTotalNodeCount(), getMinimumChillModeNodes()); - } - if (inManualChillMode.get()) { - return "Out of startup chill mode, but in manual chill mode." + - String.format(" %d nodes have reported in.", - nodeStateManager.getTotalNodeCount()); - } - return "Out of chill mode." + - String.format(" %d nodes have reported in.", - nodeStateManager.getTotalNodeCount()); - } - - /** - * Forcefully exits the chill mode even if we have not met the minimum - * criteria of exiting the chill mode. This will exit from both startup - * and manual chill mode. - */ - @Override - public void forceExitChillMode() { - if(inStartupChillMode.get()) { - LOG.info("Leaving startup chill mode."); - inStartupChillMode.set(false); - } - if(inManualChillMode.get()) { - LOG.info("Leaving manual chill mode."); - inManualChillMode.set(false); - } - } - - /** - * Puts the node manager into manual chill mode. - */ - @Override - public void enterChillMode() { - LOG.info("Entering manual chill mode."); - inManualChillMode.set(true); - } - - /** - * Brings node manager out of manual chill mode. - */ - @Override - public void exitChillMode() { - LOG.info("Leaving manual chill mode."); - inManualChillMode.set(false); - } - - /** - * Returns true if node manager is out of chill mode, else false. - * @return true if out of chill mode, else false - */ - @Override - public boolean isOutOfChillMode() { - return !(inStartupChillMode.get() || inManualChillMode.get()); - } - - /** - * Returns the Number of Datanodes by State they are in. - * - * @return int -- count - */ - @Override - public int getNodeCount(NodeState nodestate) { - return nodeStateManager.getNodeCount(nodestate); - } - - /** - * Returns the node state of a specific node. - * - * @param datanodeDetails - Datanode Details - * @return Healthy/Stale/Dead/Unknown. - */ - @Override - public NodeState getNodeState(DatanodeDetails datanodeDetails) { - try { - return nodeStateManager.getNodeState(datanodeDetails); - } catch (NodeNotFoundException e) { - // TODO: should we throw NodeNotFoundException? - return null; - } - } - - - private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) { - SCMNodeStat stat; - try { - stat = nodeStateManager.getNodeStat(dnId); - } catch (NodeNotFoundException e) { - LOG.debug("SCM updateNodeStat based on heartbeat from previous" + - "dead datanode {}", dnId); - stat = new SCMNodeStat(); - } - - if (nodeReport != null && nodeReport.getStorageReportCount() > 0) { - long totalCapacity = 0; - long totalRemaining = 0; - long totalScmUsed = 0; - List<StorageReportProto> storageReports = nodeReport - .getStorageReportList(); - for (StorageReportProto report : storageReports) { - totalCapacity += report.getCapacity(); - totalRemaining += report.getRemaining(); - totalScmUsed+= report.getScmUsed(); - } - scmStat.subtract(stat); - stat.set(totalCapacity, totalScmUsed, totalRemaining); - scmStat.add(stat); - } - nodeStateManager.setNodeStat(dnId, stat); - } - - /** - * Closes this stream and releases any system resources associated with it. If - * the stream is already closed then invoking this method has no effect. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - unregisterMXBean(); - } - - /** - * Gets the version info from SCM. - * - * @param versionRequest - version Request. - * @return - returns SCM version info and other required information needed by - * datanode. - */ - @Override - public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { - return VersionResponse.newBuilder() - .setVersion(this.version.getVersion()) - .addValue(OzoneConsts.SCM_ID, - this.scmManager.getScmStorage().getScmId()) - .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage() - .getClusterID()) - .build(); - } - - /** - * Register the node if the node finds that it is not registered with any - * SCM. - * - * @param datanodeDetails - Send datanodeDetails with Node info. - * This function generates and assigns new datanode ID - * for the datanode. This allows SCM to be run independent - * of Namenode if required. - * @param nodeReport NodeReport. - * - * @return SCMHeartbeatResponseProto - */ - @Override - public RegisteredCommand register( - DatanodeDetails datanodeDetails, NodeReportProto nodeReport, - PipelineReportsProto pipelineReportsProto) { - - InetAddress dnAddress = Server.getRemoteIp(); - if (dnAddress != null) { - // Mostly called inside an RPC, update ip and peer hostname - datanodeDetails.setHostName(dnAddress.getHostName()); - datanodeDetails.setIpAddress(dnAddress.getHostAddress()); - } - UUID dnId = datanodeDetails.getUuid(); - try { - nodeStateManager.addNode(datanodeDetails); - nodeStateManager.setNodeStat(dnId, new SCMNodeStat()); - if(inStartupChillMode.get() && - nodeStateManager.getTotalNodeCount() >= getMinimumChillModeNodes()) { - inStartupChillMode.getAndSet(false); - LOG.info("Leaving startup chill mode."); - } - // Updating Node Report, as registration is successful - updateNodeStat(datanodeDetails.getUuid(), nodeReport); - LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid()); - } catch (NodeAlreadyExistsException e) { - LOG.trace("Datanode is already registered. Datanode: {}", - datanodeDetails.toString()); - } - return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success) - .setDatanodeUUID(datanodeDetails.getUuidString()) - .setClusterID(this.clusterID) - .setHostname(datanodeDetails.getHostName()) - .setIpAddress(datanodeDetails.getIpAddress()) - .build(); - } - - /** - * Send heartbeat to indicate the datanode is alive and doing well. - * - * @param datanodeDetails - DatanodeDetailsProto. - * @return SCMheartbeat response. - * @throws IOException - */ - @Override - public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) { - Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " + - "DatanodeDetails."); - try { - nodeStateManager.updateLastHeartbeatTime(datanodeDetails); - } catch (NodeNotFoundException e) { - LOG.warn("SCM receive heartbeat from unregistered datanode {}", - datanodeDetails); - commandQueue.addCommand(datanodeDetails.getUuid(), - new ReregisterCommand()); - } - return commandQueue.getCommand(datanodeDetails.getUuid()); - } - - /** - * Process node report. - * - * @param dnUuid - * @param nodeReport - */ - @Override - public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) { - this.updateNodeStat(dnUuid, nodeReport); - } - - /** - * Returns the aggregated node stats. - * @return the aggregated node stats. - */ - @Override - public SCMNodeStat getStats() { - return new SCMNodeStat(this.scmStat); - } - - /** - * Return a map of node stats. - * @return a map of individual node stats (live/stale but not dead). - */ - @Override - public Map<UUID, SCMNodeStat> getNodeStats() { - return nodeStateManager.getNodeStatsMap(); - } - - /** - * Return the node stat of the specified datanode. - * @param datanodeDetails - datanode ID. - * @return node stat if it is live/stale, null if it is decommissioned or - * doesn't exist. - */ - @Override - public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { - try { - return new SCMNodeMetric( - nodeStateManager.getNodeStat(datanodeDetails.getUuid())); - } catch (NodeNotFoundException e) { - LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}", - datanodeDetails.getUuid()); - return null; - } - } - - @Override - public Map<String, Integer> getNodeCount() { - Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); - for(NodeState state : NodeState.values()) { - nodeCountMap.put(state.toString(), getNodeCount(state)); - } - return nodeCountMap; - } - - /** - * Get set of pipelines a datanode is part of. - * @param dnId - datanodeID - * @return Set of PipelineID - */ - @Override - public Set<PipelineID> getPipelineByDnID(UUID dnId) { - return nodeStateManager.getPipelineByDnID(dnId); - } - - - /** - * Add pipeline information in the NodeManager. - * @param pipeline - Pipeline to be added - */ - @Override - public void addPipeline(Pipeline pipeline) { - nodeStateManager.addPipeline(pipeline); - } - - /** - * Remove a pipeline information from the NodeManager. - * @param pipeline - Pipeline to be removed - */ - @Override - public void removePipeline(Pipeline pipeline) { - nodeStateManager.removePipeline(pipeline); - } - - /** - * Update set of containers available on a datanode. - * @param uuid - DatanodeID - * @param containerIds - Set of containerIDs - * @throws SCMException - if datanode is not known. For new datanode use - * addDatanodeInContainerMap call. - */ - @Override - public void setContainersForDatanode(UUID uuid, - Set<ContainerID> containerIds) throws SCMException { - nodeStateManager.setContainersForDatanode(uuid, containerIds); - } - - /** - * Process containerReport received from datanode. - * @param uuid - DataonodeID - * @param containerIds - Set of containerIDs - * @return The result after processing containerReport - */ - @Override - public ReportResult<ContainerID> processContainerReport(UUID uuid, - Set<ContainerID> containerIds) { - return nodeStateManager.processContainerReport(uuid, containerIds); - } - - /** - * Return set of containerIDs available on a datanode. - * @param uuid - DatanodeID - * @return - set of containerIDs - */ - @Override - public Set<ContainerID> getContainers(UUID uuid) { - return nodeStateManager.getContainers(uuid); - } - - /** - * Insert a new datanode with set of containerIDs for containers available - * on it. - * @param uuid - DatanodeID - * @param containerIDs - Set of ContainerIDs - * @throws SCMException - if datanode already exists - */ - @Override - public void addDatanodeInContainerMap(UUID uuid, - Set<ContainerID> containerIDs) throws SCMException { - nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs); - } - - // TODO: - // Since datanode commands are added through event queue, onMessage method - // should take care of adding commands to command queue. - // Refactor and remove all the usage of this method and delete this method. - @Override - public void addDatanodeCommand(UUID dnId, SCMCommand command) { - this.commandQueue.addCommand(dnId, command); - } - - /** - * This method is called by EventQueue whenever someone adds a new - * DATANODE_COMMAND to the Queue. - * - * @param commandForDatanode DatanodeCommand - * @param ignored publisher - */ - @Override - public void onMessage(CommandForDatanode commandForDatanode, - EventPublisher ignored) { - addDatanodeCommand(commandForDatanode.getDatanodeId(), - commandForDatanode.getCommand()); - } - - /** - * Update the node stats and cluster storage stats in this SCM Node Manager. - * - * @param dnUuid datanode uuid. - */ - @Override - public void processDeadNode(UUID dnUuid) { - try { - SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid); - if (stat != null) { - LOG.trace("Update stat values as Datanode {} is dead.", dnUuid); - scmStat.subtract(stat); - stat.set(0, 0, 0); - } - } catch (NodeNotFoundException e) { - LOG.warn("Can't update stats based on message of dead Datanode {}, it" - + " doesn't exist or decommissioned already.", dnUuid); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java deleted file mode 100644 index 32ecbad..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; - -import java.util.Set; -import java.util.UUID; - -/** - * - * This is the JMX management interface for node manager information. - */ [email protected] -public interface SCMNodeStorageStatMXBean { - /** - * Get the capacity of the dataNode. - * @param datanodeID Datanode Id - * @return long - */ - long getCapacity(UUID datanodeID); - - /** - * Returns the remaining space of a Datanode. - * @param datanodeId Datanode Id - * @return long - */ - long getRemainingSpace(UUID datanodeId); - - - /** - * Returns used space in bytes of a Datanode. - * @return long - */ - long getUsedSpace(UUID datanodeId); - - /** - * Returns the total capacity of all dataNodes. - * @return long - */ - long getTotalCapacity(); - - /** - * Returns the total Used Space in all Datanodes. - * @return long - */ - long getTotalSpaceUsed(); - - /** - * Returns the total Remaining Space in all Datanodes. - * @return long - */ - long getTotalFreeSpace(); - - /** - * Returns the set of disks for a given Datanode. - * @return set of storage volumes - */ - Set<StorageLocationReport> getStorageVolumes(UUID datanodeId); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java deleted file mode 100644 index 1b0e5b5..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.node; - - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.StorageReportProto; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.ObjectName; -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_SUCH_DATANODE; - -/** - * This data structure maintains the disk space capacity, disk usage and free - * space availability per Datanode. - * This information is built from the DN node reports. - */ -public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { - static final Logger LOG = - LoggerFactory.getLogger(SCMNodeStorageStatMap.class); - - private final double warningUtilizationThreshold; - private final double criticalUtilizationThreshold; - - private final Map<UUID, Set<StorageLocationReport>> scmNodeStorageReportMap; - // NodeStorageInfo MXBean - private ObjectName scmNodeStorageInfoBean; - /** - * constructs the scmNodeStorageReportMap object. - */ - public SCMNodeStorageStatMap(OzoneConfiguration conf) { - // scmNodeStorageReportMap = new ConcurrentHashMap<>(); - scmNodeStorageReportMap = new ConcurrentHashMap<>(); - warningUtilizationThreshold = conf.getDouble( - OzoneConfigKeys. - HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD, - OzoneConfigKeys. - HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD_DEFAULT); - criticalUtilizationThreshold = conf.getDouble( - OzoneConfigKeys. - HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD, - OzoneConfigKeys. - HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT); - } - - /** - * Enum that Describes what we should do at various thresholds. - */ - public enum UtilizationThreshold { - NORMAL, WARN, CRITICAL; - } - - /** - * Returns true if this a datanode that is already tracked by - * scmNodeStorageReportMap. - * - * @param datanodeID - UUID of the Datanode. - * @return True if this is tracked, false if this map does not know about it. - */ - public boolean isKnownDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - return scmNodeStorageReportMap.containsKey(datanodeID); - } - - public List<UUID> getDatanodeList( - UtilizationThreshold threshold) { - return scmNodeStorageReportMap.entrySet().stream().filter( - entry -> (isThresholdReached(threshold, - getScmUsedratio(getUsedSpace(entry.getKey()), - getCapacity(entry.getKey()))))) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - } - - - - /** - * Insert a new datanode into Node2Container Map. - * - * @param datanodeID -- Datanode UUID - * @param report - set if StorageReports. - */ - public void insertNewDatanode(UUID datanodeID, - Set<StorageLocationReport> report) throws SCMException { - Preconditions.checkNotNull(report); - Preconditions.checkState(report.size() != 0); - Preconditions.checkNotNull(datanodeID); - synchronized (scmNodeStorageReportMap) { - if (isKnownDatanode(datanodeID)) { - throw new SCMException("Node already exists in the map", - DUPLICATE_DATANODE); - } - scmNodeStorageReportMap.putIfAbsent(datanodeID, report); - } - } - - //TODO: This should be called once SCMNodeManager gets Started. - private void registerMXBean() { - this.scmNodeStorageInfoBean = MBeans.register("StorageContainerManager", - "scmNodeStorageInfo", this); - } - - //TODO: Unregister call should happen as a part of SCMNodeManager shutdown. - private void unregisterMXBean() { - if(this.scmNodeStorageInfoBean != null) { - MBeans.unregister(this.scmNodeStorageInfoBean); - this.scmNodeStorageInfoBean = null; - } - } - /** - * Updates the Container list of an existing DN. - * - * @param datanodeID - UUID of DN. - * @param report - set of Storage Reports for the Datanode. - * @throws SCMException - if we don't know about this datanode, for new DN - * use addDatanodeInContainerMap. - */ - public void updateDatanodeMap(UUID datanodeID, - Set<StorageLocationReport> report) throws SCMException { - Preconditions.checkNotNull(datanodeID); - Preconditions.checkNotNull(report); - Preconditions.checkState(report.size() != 0); - synchronized (scmNodeStorageReportMap) { - if (!scmNodeStorageReportMap.containsKey(datanodeID)) { - throw new SCMException("No such datanode", NO_SUCH_DATANODE); - } - scmNodeStorageReportMap.put(datanodeID, report); - } - } - - public StorageReportResult processNodeReport(UUID datanodeID, - StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport) - throws IOException { - Preconditions.checkNotNull(datanodeID); - Preconditions.checkNotNull(nodeReport); - - long totalCapacity = 0; - long totalRemaining = 0; - long totalScmUsed = 0; - Set<StorageLocationReport> storagReportSet = new HashSet<>(); - Set<StorageLocationReport> fullVolumeSet = new HashSet<>(); - Set<StorageLocationReport> failedVolumeSet = new HashSet<>(); - List<StorageReportProto> - storageReports = nodeReport.getStorageReportList(); - for (StorageReportProto report : storageReports) { - StorageLocationReport storageReport = - StorageLocationReport.getFromProtobuf(report); - storagReportSet.add(storageReport); - if (report.hasFailed() && report.getFailed()) { - failedVolumeSet.add(storageReport); - } else if (isThresholdReached(UtilizationThreshold.CRITICAL, - getScmUsedratio(report.getScmUsed(), report.getCapacity()))) { - fullVolumeSet.add(storageReport); - } - totalCapacity += report.getCapacity(); - totalRemaining += report.getRemaining(); - totalScmUsed += report.getScmUsed(); - } - - if (!isKnownDatanode(datanodeID)) { - insertNewDatanode(datanodeID, storagReportSet); - } else { - updateDatanodeMap(datanodeID, storagReportSet); - } - if (isThresholdReached(UtilizationThreshold.CRITICAL, - getScmUsedratio(totalScmUsed, totalCapacity))) { - LOG.warn("Datanode {} is out of storage space. Capacity: {}, Used: {}", - datanodeID, totalCapacity, totalScmUsed); - return StorageReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.DATANODE_OUT_OF_SPACE) - .setFullVolumeSet(fullVolumeSet).setFailedVolumeSet(failedVolumeSet) - .build(); - } - if (isThresholdReached(UtilizationThreshold.WARN, - getScmUsedratio(totalScmUsed, totalCapacity))) { - LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}", - datanodeID, totalCapacity, totalScmUsed); - } - - if (failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) { - return StorageReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.STORAGE_OUT_OF_SPACE) - .setFullVolumeSet(fullVolumeSet).build(); - } - - if (!failedVolumeSet.isEmpty() && fullVolumeSet.isEmpty()) { - return StorageReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.FAILED_STORAGE) - .setFailedVolumeSet(failedVolumeSet).build(); - } - if (!failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) { - return StorageReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE) - .setFailedVolumeSet(failedVolumeSet).setFullVolumeSet(fullVolumeSet) - .build(); - } - return StorageReportResult.ReportResultBuilder.newBuilder() - .setStatus(ReportStatus.ALL_IS_WELL).build(); - } - - private boolean isThresholdReached(UtilizationThreshold threshold, - double scmUsedratio) { - switch (threshold) { - case NORMAL: - return scmUsedratio < warningUtilizationThreshold; - case WARN: - return scmUsedratio >= warningUtilizationThreshold - && scmUsedratio < criticalUtilizationThreshold; - case CRITICAL: - return scmUsedratio >= criticalUtilizationThreshold; - default: - throw new RuntimeException("Unknown UtilizationThreshold value"); - } - } - - @Override - public long getCapacity(UUID dnId) { - long capacity = 0; - Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId); - for (StorageLocationReport report : reportSet) { - capacity += report.getCapacity(); - } - return capacity; - } - - @Override - public long getRemainingSpace(UUID dnId) { - long remaining = 0; - Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId); - for (StorageLocationReport report : reportSet) { - remaining += report.getRemaining(); - } - return remaining; - } - - @Override - public long getUsedSpace(UUID dnId) { - long scmUsed = 0; - Set<StorageLocationReport> reportSet = scmNodeStorageReportMap.get(dnId); - for (StorageLocationReport report : reportSet) { - scmUsed += report.getScmUsed(); - } - return scmUsed; - } - - @Override - public long getTotalCapacity() { - long capacity = 0; - Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet(); - for (UUID id : dnIdSet) { - capacity += getCapacity(id); - } - return capacity; - } - - @Override - public long getTotalSpaceUsed() { - long scmUsed = 0; - Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet(); - for (UUID id : dnIdSet) { - scmUsed += getUsedSpace(id); - } - return scmUsed; - } - - @Override - public long getTotalFreeSpace() { - long remaining = 0; - Set<UUID> dnIdSet = scmNodeStorageReportMap.keySet(); - for (UUID id : dnIdSet) { - remaining += getRemainingSpace(id); - } - return remaining; - } - - /** - * removes the dataNode from scmNodeStorageReportMap. - * @param datanodeID - * @throws SCMException in case the dataNode is not found in the map. - */ - public void removeDatanode(UUID datanodeID) throws SCMException { - Preconditions.checkNotNull(datanodeID); - synchronized (scmNodeStorageReportMap) { - if (!scmNodeStorageReportMap.containsKey(datanodeID)) { - throw new SCMException("No such datanode", NO_SUCH_DATANODE); - } - scmNodeStorageReportMap.remove(datanodeID); - } - } - - /** - * Returns the set of storage volumes for a Datanode. - * @param datanodeID - * @return set of storage volumes. - */ - - @Override - public Set<StorageLocationReport> getStorageVolumes(UUID datanodeID) { - return scmNodeStorageReportMap.get(datanodeID); - } - - - /** - * Truncate to 4 digits since uncontrolled precision is some times - * counter intuitive to what users expect. - * @param value - double. - * @return double. - */ - private double truncateDecimals(double value) { - final int multiplier = 10000; - return (double) ((long) (value * multiplier)) / multiplier; - } - - /** - * get the scmUsed ratio. - */ - public double getScmUsedratio(long scmUsed, long capacity) { - double scmUsedRatio = - truncateDecimals(scmUsed / (double) capacity); - return scmUsedRatio; - } - /** - * Results possible from processing a Node report by - * Node2ContainerMapper. - */ - public enum ReportStatus { - ALL_IS_WELL, - DATANODE_OUT_OF_SPACE, - STORAGE_OUT_OF_SPACE, - FAILED_STORAGE, - FAILED_AND_OUT_OF_SPACE_STORAGE - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java deleted file mode 100644 index 48939f1..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; - -/** - * Handles Stale node event. - */ -public class StaleNodeHandler implements EventHandler<DatanodeDetails> { - - private final PipelineSelector pipelineSelector; - - public StaleNodeHandler(PipelineSelector pipelineSelector) { - this.pipelineSelector = pipelineSelector; - } - - @Override - public void onMessage(DatanodeDetails datanodeDetails, - EventPublisher publisher) { - pipelineSelector.handleStaleNode(datanodeDetails); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java deleted file mode 100644 index 0b63ceb..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java +++ /dev/null @@ -1,87 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; - -import java.util.Set; - -/** - * A Container Report gets processsed by the Node2Container and returns the - * Report Result class. - */ -public class StorageReportResult { - private SCMNodeStorageStatMap.ReportStatus status; - private Set<StorageLocationReport> fullVolumes; - private Set<StorageLocationReport> failedVolumes; - - StorageReportResult(SCMNodeStorageStatMap.ReportStatus status, - Set<StorageLocationReport> fullVolumes, - Set<StorageLocationReport> failedVolumes) { - this.status = status; - this.fullVolumes = fullVolumes; - this.failedVolumes = failedVolumes; - } - - public SCMNodeStorageStatMap.ReportStatus getStatus() { - return status; - } - - public Set<StorageLocationReport> getFullVolumes() { - return fullVolumes; - } - - public Set<StorageLocationReport> getFailedVolumes() { - return failedVolumes; - } - - static class ReportResultBuilder { - private SCMNodeStorageStatMap.ReportStatus status; - private Set<StorageLocationReport> fullVolumes; - private Set<StorageLocationReport> failedVolumes; - - static ReportResultBuilder newBuilder() { - return new ReportResultBuilder(); - } - - public ReportResultBuilder setStatus( - SCMNodeStorageStatMap.ReportStatus newstatus) { - this.status = newstatus; - return this; - } - - public ReportResultBuilder setFullVolumeSet( - Set<StorageLocationReport> fullVolumesSet) { - this.fullVolumes = fullVolumesSet; - return this; - } - - public ReportResultBuilder setFailedVolumeSet( - Set<StorageLocationReport> failedVolumesSet) { - this.failedVolumes = failedVolumesSet; - return this; - } - - StorageReportResult build() { - return new StorageReportResult(status, fullVolumes, failedVolumes); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java deleted file mode 100644 index d6a8ad0..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.node; - -/** - * The node package deals with node management. - * <p/> - * The node manager takes care of node registrations, removal of node and - * handling of heartbeats. - * <p/> - * The node manager maintains statistics that gets send as part of - * heartbeats. - * <p/> - * The container manager polls the node manager to learn the state of - * datanodes that it is interested in. - * <p/> - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java deleted file mode 100644 index 9625f81..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.node.states; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; - -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes - .NO_SUCH_DATANODE; - -/** - * This data structure maintains the list of containers that is on a datanode. - * This information is built from the DN container reports. - */ -public class Node2ContainerMap extends Node2ObjectsMap<ContainerID> { - - /** - * Constructs a Node2ContainerMap Object. - */ - public Node2ContainerMap() { - super(); - } - - /** - * Returns null if there no containers associated with this datanode ID. - * - * @param datanode - UUID - * @return Set of containers or Null. - */ - public Set<ContainerID> getContainers(UUID datanode) { - return getObjects(datanode); - } - - /** - * Insert a new datanode into Node2Container Map. - * - * @param datanodeID -- Datanode UUID - * @param containerIDs - List of ContainerIDs. - */ - public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs) - throws SCMException { - super.insertNewDatanode(datanodeID, containerIDs); - } - - /** - * Updates the Container list of an existing DN. - * - * @param datanodeID - UUID of DN. - * @param containers - Set of Containers tht is present on DN. - * @throws SCMException - if we don't know about this datanode, for new DN - * use addDatanodeInContainerMap. - */ - public void setContainersForDatanode(UUID datanodeID, - Set<ContainerID> containers) throws SCMException { - Preconditions.checkNotNull(datanodeID); - Preconditions.checkNotNull(containers); - if (dn2ObjectMap - .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers)) - == null) { - throw new SCMException("No such datanode", NO_SUCH_DATANODE); - } - } - - @VisibleForTesting - public int size() { - return dn2ObjectMap.size(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java deleted file mode 100644 index e49a79c..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.node.states; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; - -import java.util.UUID; -import java.util.Set; -import java.util.Map; -import java.util.TreeSet; -import java.util.HashSet; -import java.util.Collections; - -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE; - -/** - * This data structure maintains the list of containers that is on a datanode. - * This information is built from the DN container reports. - */ -public class Node2ObjectsMap<T> { - protected final Map<UUID, Set<T>> dn2ObjectMap; - - /** - * Constructs a Node2ContainerMap Object. - */ - public Node2ObjectsMap() { - dn2ObjectMap = new ConcurrentHashMap<>(); - } - - /** - * Returns true if this a datanode that is already tracked by - * Node2ContainerMap. - * - * @param datanodeID - UUID of the Datanode. - * @return True if this is tracked, false if this map does not know about it. - */ - public boolean isKnownDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - return dn2ObjectMap.containsKey(datanodeID); - } - - /** - * Insert a new datanode into Node2Container Map. - * - * @param datanodeID -- Datanode UUID - * @param containerIDs - List of ContainerIDs. - */ - public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs) - throws SCMException { - Preconditions.checkNotNull(containerIDs); - Preconditions.checkNotNull(datanodeID); - if (dn2ObjectMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs)) - != null) { - throw new SCMException("Node already exists in the map", - DUPLICATE_DATANODE); - } - } - - /** - * Removes datanode Entry from the map. - * - * @param datanodeID - Datanode ID. - */ - void removeDatanode(UUID datanodeID) { - Preconditions.checkNotNull(datanodeID); - dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null); - } - - /** - * Returns null if there no containers associated with this datanode ID. - * - * @param datanode - UUID - * @return Set of containers or Null. - */ - Set<T> getObjects(UUID datanode) { - Preconditions.checkNotNull(datanode); - final Set<T> s = dn2ObjectMap.get(datanode); - return s != null? Collections.unmodifiableSet(s): Collections.emptySet(); - } - - public ReportResult.ReportResultBuilder<T> newBuilder() { - return new ReportResult.ReportResultBuilder<>(); - } - - public ReportResult<T> processReport(UUID datanodeID, Set<T> objects) { - Preconditions.checkNotNull(datanodeID); - Preconditions.checkNotNull(objects); - - if (!isKnownDatanode(datanodeID)) { - return newBuilder() - .setStatus(ReportResult.ReportStatus.NEW_DATANODE_FOUND) - .setNewEntries(objects) - .build(); - } - - // Conditions like Zero length containers should be handled by removeAll. - Set<T> currentSet = dn2ObjectMap.get(datanodeID); - TreeSet<T> newObjects = new TreeSet<>(objects); - newObjects.removeAll(currentSet); - - TreeSet<T> missingObjects = new TreeSet<>(currentSet); - missingObjects.removeAll(objects); - - if (newObjects.isEmpty() && missingObjects.isEmpty()) { - return newBuilder() - .setStatus(ReportResult.ReportStatus.ALL_IS_WELL) - .build(); - } - - if (newObjects.isEmpty() && !missingObjects.isEmpty()) { - return newBuilder() - .setStatus(ReportResult.ReportStatus.MISSING_ENTRIES) - .setMissingEntries(missingObjects) - .build(); - } - - if (!newObjects.isEmpty() && missingObjects.isEmpty()) { - return newBuilder() - .setStatus(ReportResult.ReportStatus.NEW_ENTRIES_FOUND) - .setNewEntries(newObjects) - .build(); - } - - if (!newObjects.isEmpty() && !missingObjects.isEmpty()) { - return newBuilder() - .setStatus(ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND) - .setNewEntries(newObjects) - .setMissingEntries(missingObjects) - .build(); - } - - // default status & Make compiler happy - return newBuilder() - .setStatus(ReportResult.ReportStatus.ALL_IS_WELL) - .build(); - } - - @VisibleForTesting - public int size() { - return dn2ObjectMap.size(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java deleted file mode 100644 index 87f2222..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package org.apache.hadoop.hdds.scm.node.states; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; - -/** - * This data structure maintains the list of pipelines which the given datanode is a part of. This - * information will be added whenever a new pipeline allocation happens. - * - * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart - */ -public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> { - - /** Constructs a Node2PipelineMap Object. */ - public Node2PipelineMap() { - super(); - } - - /** - * Returns null if there no pipelines associated with this datanode ID. - * - * @param datanode - UUID - * @return Set of pipelines or Null. - */ - public Set<PipelineID> getPipelines(UUID datanode) { - return getObjects(datanode); - } - - /** - * Adds a pipeline entry to a given dataNode in the map. - * - * @param pipeline Pipeline to be added - */ - public synchronized void addPipeline(Pipeline pipeline) { - for (DatanodeDetails details : pipeline.getDatanodes().values()) { - UUID dnId = details.getUuid(); - dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>()) - .add(pipeline.getId()); - } - } - - public synchronized void removePipeline(Pipeline pipeline) { - for (DatanodeDetails details : pipeline.getDatanodes().values()) { - UUID dnId = details.getUuid(); - dn2ObjectMap.computeIfPresent(dnId, - (k, v) -> { - v.remove(pipeline.getId()); - return v; - }); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java deleted file mode 100644 index aa5c382..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeAlreadyExistsException.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.node.states; - -/** - * This exception represents that there is already a node added to NodeStateMap - * with same UUID. - */ -public class NodeAlreadyExistsException extends NodeException { - - /** - * Constructs an {@code NodeAlreadyExistsException} with {@code null} - * as its error detail message. - */ - public NodeAlreadyExistsException() { - super(); - } - - /** - * Constructs an {@code NodeAlreadyExistsException} with the specified - * detail message. - * - * @param message - * The detail message (which is saved for later retrieval - * by the {@link #getMessage()} method) - */ - public NodeAlreadyExistsException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java deleted file mode 100644 index c67b55d..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeException.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.node.states; - -/** - * This exception represents all node related exceptions in NodeStateMap. - */ -public class NodeException extends Exception { - - /** - * Constructs an {@code NodeException} with {@code null} - * as its error detail message. - */ - public NodeException() { - super(); - } - - /** - * Constructs an {@code NodeException} with the specified - * detail message. - * - * @param message - * The detail message (which is saved for later retrieval - * by the {@link #getMessage()} method) - */ - public NodeException(String message) { - super(message); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
