http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java new file mode 100644 index 0000000..0174c17 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -0,0 +1,904 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.StorageContainerManager; +import org.apache.hadoop.hdds.scm.VersionInfo; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .ErrorCode; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .INVALID; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; +import static org.apache.hadoop.util.Time.monotonicNow; + +/** + * Maintains information about the Datanodes on SCM side. + * <p> + * Heartbeats under SCM is very simple compared to HDFS heartbeatManager. + * <p> + * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to + * staleNodesMap to deadNodesMap. This moving of a node from one map to another + * is controlled by 4 configuration variables. These variables define how many + * heartbeats must go missing for the node to move from one map to another. + * <p> + * Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The + * worker thread wakes up and grabs that heartbeat from the queue. The worker + * thread will lookup the healthynodes map and set the timestamp if the entry + * is there. if not it will look up stale and deadnodes map. + * <p> + * The getNode(byState) functions make copy of node maps and then creates a list + * based on that. It should be assumed that these get functions always report + * *stale* information. For example, getting the deadNodeCount followed by + * getNodes(DEAD) could very well produce totally different count. Also + * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not + * guaranteed to add up to the total nodes that we know off. Please treat all + * get functions in this file as a snap-shot of information that is inconsistent + * as soon as you read it. + */ +public class SCMNodeManager + implements NodeManager, StorageContainerNodeProtocol { + + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMNodeManager.class); + + /** + * Key = NodeID, value = timestamp. + */ + private final ConcurrentHashMap<UUID, Long> healthyNodes; + private final ConcurrentHashMap<UUID, Long> staleNodes; + private final ConcurrentHashMap<UUID, Long> deadNodes; + private final Queue<HeartbeatQueueItem> heartbeatQueue; + private final ConcurrentHashMap<UUID, DatanodeDetails> nodes; + // Individual live node stats + private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats; + // Aggregated node stats + private SCMNodeStat scmStat; + // TODO: expose nodeStats and scmStat as metrics + private final AtomicInteger healthyNodeCount; + private final AtomicInteger staleNodeCount; + private final AtomicInteger deadNodeCount; + private final AtomicInteger totalNodes; + private long staleNodeIntervalMs; + private final long deadNodeIntervalMs; + private final long heartbeatCheckerIntervalMs; + private final long datanodeHBIntervalSeconds; + private final ScheduledExecutorService executorService; + private long lastHBcheckStart; + private long lastHBcheckFinished = 0; + private long lastHBProcessedCount; + private int chillModeNodeCount; + private final int maxHBToProcessPerLoop; + private final String clusterID; + private final VersionInfo version; + /** + * During start up of SCM, it will enter into chill mode and will be there + * until number of Datanodes registered reaches {@code chillModeNodeCount}. + * This flag is for tracking startup chill mode. + */ + private AtomicBoolean inStartupChillMode; + /** + * Administrator can put SCM into chill mode manually. + * This flag is for tracking manual chill mode. + */ + private AtomicBoolean inManualChillMode; + private final CommandQueue commandQueue; + // Node manager MXBean + private ObjectName nmInfoBean; + + // Node pool manager. + private final SCMNodePoolManager nodePoolManager; + private final StorageContainerManager scmManager; + + /** + * Constructs SCM machine Manager. + */ + public SCMNodeManager(OzoneConfiguration conf, String clusterID, + StorageContainerManager scmManager) throws IOException { + heartbeatQueue = new ConcurrentLinkedQueue<>(); + healthyNodes = new ConcurrentHashMap<>(); + deadNodes = new ConcurrentHashMap<>(); + staleNodes = new ConcurrentHashMap<>(); + nodes = new ConcurrentHashMap<>(); + nodeStats = new ConcurrentHashMap<>(); + scmStat = new SCMNodeStat(); + + healthyNodeCount = new AtomicInteger(0); + staleNodeCount = new AtomicInteger(0); + deadNodeCount = new AtomicInteger(0); + totalNodes = new AtomicInteger(0); + this.clusterID = clusterID; + this.version = VersionInfo.getLatestVersion(); + commandQueue = new CommandQueue(); + + // TODO: Support this value as a Percentage of known machines. + chillModeNodeCount = 1; + + staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); + deadNodeIntervalMs = HddsServerUtil.getDeadNodeInterval(conf); + heartbeatCheckerIntervalMs = + HddsServerUtil.getScmheartbeatCheckerInterval(conf); + datanodeHBIntervalSeconds = HddsServerUtil.getScmHeartbeatInterval(conf); + maxHBToProcessPerLoop = HddsServerUtil.getMaxHBToProcessPerLoop(conf); + + executorService = HadoopExecutors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); + + LOG.info("Entering startup chill mode."); + this.inStartupChillMode = new AtomicBoolean(true); + this.inManualChillMode = new AtomicBoolean(false); + + Preconditions.checkState(heartbeatCheckerIntervalMs > 0); + executorService.schedule(this, heartbeatCheckerIntervalMs, + TimeUnit.MILLISECONDS); + + registerMXBean(); + + this.nodePoolManager = new SCMNodePoolManager(conf); + this.scmManager = scmManager; + } + + private void registerMXBean() { + this.nmInfoBean = MBeans.register("SCMNodeManager", + "SCMNodeManagerInfo", this); + } + + private void unregisterMXBean() { + if(this.nmInfoBean != null) { + MBeans.unregister(this.nmInfoBean); + this.nmInfoBean = null; + } + } + + /** + * Removes a data node from the management of this Node Manager. + * + * @param node - DataNode. + * @throws UnregisteredNodeException + */ + @Override + public void removeNode(DatanodeDetails node) { + // TODO : Fix me when adding the SCM CLI. + + } + + /** + * Gets all datanodes that are in a certain state. This function works by + * taking a snapshot of the current collection and then returning the list + * from that collection. This means that real map might have changed by the + * time we return this list. + * + * @return List of Datanodes that are known to SCM in the requested state. + */ + @Override + public List<DatanodeDetails> getNodes(NodeState nodestate) + throws IllegalArgumentException { + Map<UUID, Long> set; + switch (nodestate) { + case HEALTHY: + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(healthyNodes)); + } + break; + case STALE: + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(staleNodes)); + } + break; + case DEAD: + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(deadNodes)); + } + break; + default: + throw new IllegalArgumentException("Unknown node state requested."); + } + + return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) + .collect(Collectors.toList()); + } + + /** + * Returns all datanodes that are known to SCM. + * + * @return List of DatanodeDetails + */ + @Override + public List<DatanodeDetails> getAllNodes() { + Map<UUID, DatanodeDetails> set; + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(nodes)); + } + return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) + .collect(Collectors.toList()); + } + + /** + * Get the minimum number of nodes to get out of Chill mode. + * + * @return int + */ + @Override + public int getMinimumChillModeNodes() { + return chillModeNodeCount; + } + + /** + * Sets the Minimum chill mode nodes count, used only in testing. + * + * @param count - Number of nodes. + */ + @VisibleForTesting + public void setMinimumChillModeNodes(int count) { + chillModeNodeCount = count; + } + + /** + * Returns chill mode Status string. + * @return String + */ + @Override + public String getChillModeStatus() { + if (inStartupChillMode.get()) { + return "Still in chill mode, waiting on nodes to report in." + + String.format(" %d nodes reported, minimal %d nodes required.", + totalNodes.get(), getMinimumChillModeNodes()); + } + if (inManualChillMode.get()) { + return "Out of startup chill mode, but in manual chill mode." + + String.format(" %d nodes have reported in.", totalNodes.get()); + } + return "Out of chill mode." + + String.format(" %d nodes have reported in.", totalNodes.get()); + } + + /** + * Forcefully exits the chill mode even if we have not met the minimum + * criteria of exiting the chill mode. This will exit from both startup + * and manual chill mode. + */ + @Override + public void forceExitChillMode() { + if(inStartupChillMode.get()) { + LOG.info("Leaving startup chill mode."); + inStartupChillMode.set(false); + } + if(inManualChillMode.get()) { + LOG.info("Leaving manual chill mode."); + inManualChillMode.set(false); + } + } + + /** + * Puts the node manager into manual chill mode. + */ + @Override + public void enterChillMode() { + LOG.info("Entering manual chill mode."); + inManualChillMode.set(true); + } + + /** + * Brings node manager out of manual chill mode. + */ + @Override + public void exitChillMode() { + LOG.info("Leaving manual chill mode."); + inManualChillMode.set(false); + } + + /** + * Returns true if node manager is out of chill mode, else false. + * @return true if out of chill mode, else false + */ + @Override + public boolean isOutOfChillMode() { + return !(inStartupChillMode.get() || inManualChillMode.get()); + } + + /** + * Returns the Number of Datanodes by State they are in. + * + * @return int -- count + */ + @Override + public int getNodeCount(NodeState nodestate) { + switch (nodestate) { + case HEALTHY: + return healthyNodeCount.get(); + case STALE: + return staleNodeCount.get(); + case DEAD: + return deadNodeCount.get(); + case INVALID: + // This is unknown due to the fact that some nodes can be in + // transit between the other states. Returning a count for that is not + // possible. The fact that we have such state is to deal with the fact + // that this information might not be consistent always. + return 0; + default: + return 0; + } + } + + /** + * Used for testing. + * + * @return true if the HB check is done. + */ + @VisibleForTesting + @Override + public boolean waitForHeartbeatProcessed() { + return lastHBcheckFinished != 0; + } + + /** + * Returns the node state of a specific node. + * + * @param datanodeDetails - Datanode Details + * @return Healthy/Stale/Dead/Unknown. + */ + @Override + public NodeState getNodeState(DatanodeDetails datanodeDetails) { + // There is a subtle race condition here, hence we also support + // the NODEState.UNKNOWN. It is possible that just before we check the + // healthyNodes, we have removed the node from the healthy list but stil + // not added it to Stale Nodes list. + // We can fix that by adding the node to stale list before we remove, but + // then the node is in 2 states to avoid this race condition. Instead we + // just deal with the possibilty of getting a state called unknown. + + UUID id = datanodeDetails.getUuid(); + if(healthyNodes.containsKey(id)) { + return HEALTHY; + } + + if(staleNodes.containsKey(id)) { + return STALE; + } + + if(deadNodes.containsKey(id)) { + return DEAD; + } + + return INVALID; + } + + /** + * This is the real worker thread that processes the HB queue. We do the + * following things in this thread. + * <p> + * Process the Heartbeats that are in the HB Queue. Move Stale or Dead node to + * healthy if we got a heartbeat from them. Move Stales Node to dead node + * table if it is needed. Move healthy nodes to stale nodes if it is needed. + * <p> + * if it is a new node, we call register node and add it to the list of nodes. + * This will be replaced when we support registration of a node in SCM. + * + * @see Thread#run() + */ + @Override + public void run() { + lastHBcheckStart = monotonicNow(); + lastHBProcessedCount = 0; + + // Process the whole queue. + while (!heartbeatQueue.isEmpty() && + (lastHBProcessedCount < maxHBToProcessPerLoop)) { + HeartbeatQueueItem hbItem = heartbeatQueue.poll(); + synchronized (this) { + handleHeartbeat(hbItem); + } + // we are shutting down or something give up processing the rest of + // HBs. This will terminate the HB processing thread. + if (Thread.currentThread().isInterrupted()) { + LOG.info("Current Thread is isInterrupted, shutting down HB " + + "processing thread for Node Manager."); + return; + } + } + + if (lastHBProcessedCount >= maxHBToProcessPerLoop) { + LOG.error("SCM is being flooded by heartbeats. Not able to keep up with" + + " the heartbeat counts. Processed {} heartbeats. Breaking out of" + + " loop. Leaving rest to be processed later. ", lastHBProcessedCount); + } + + // Iterate over the Stale nodes and decide if we need to move any node to + // dead State. + long currentTime = monotonicNow(); + for (Map.Entry<UUID, Long> entry : staleNodes.entrySet()) { + if (currentTime - entry.getValue() > deadNodeIntervalMs) { + synchronized (this) { + moveStaleNodeToDead(entry); + } + } + } + + // Iterate over the healthy nodes and decide if we need to move any node to + // Stale State. + currentTime = monotonicNow(); + for (Map.Entry<UUID, Long> entry : healthyNodes.entrySet()) { + if (currentTime - entry.getValue() > staleNodeIntervalMs) { + synchronized (this) { + moveHealthyNodeToStale(entry); + } + } + } + lastHBcheckFinished = monotonicNow(); + + monitorHBProcessingTime(); + + // we purposefully make this non-deterministic. Instead of using a + // scheduleAtFixedFrequency we will just go to sleep + // and wake up at the next rendezvous point, which is currentTime + + // heartbeatCheckerIntervalMs. This leads to the issue that we are now + // heart beating not at a fixed cadence, but clock tick + time taken to + // work. + // + // This time taken to work can skew the heartbeat processor thread. + // The reason why we don't care is because of the following reasons. + // + // 1. checkerInterval is general many magnitudes faster than datanode HB + // frequency. + // + // 2. if we have too much nodes, the SCM would be doing only HB + // processing, this could lead to SCM's CPU starvation. With this + // approach we always guarantee that HB thread sleeps for a little while. + // + // 3. It is possible that we will never finish processing the HB's in the + // thread. But that means we have a mis-configured system. We will warn + // the users by logging that information. + // + // 4. And the most important reason, heartbeats are not blocked even if + // this thread does not run, they will go into the processing queue. + + if (!Thread.currentThread().isInterrupted() && + !executorService.isShutdown()) { + executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit + .MILLISECONDS); + } else { + LOG.info("Current Thread is interrupted, shutting down HB processing " + + "thread for Node Manager."); + } + } + + /** + * If we have taken too much time for HB processing, log that information. + */ + private void monitorHBProcessingTime() { + if (TimeUnit.MILLISECONDS.toSeconds(lastHBcheckFinished - + lastHBcheckStart) > datanodeHBIntervalSeconds) { + LOG.error("Total time spend processing datanode HB's is greater than " + + "configured values for datanode heartbeats. Please adjust the" + + " heartbeat configs. Time Spend on HB processing: {} seconds " + + "Datanode heartbeat Interval: {} seconds , heartbeats " + + "processed: {}", + TimeUnit.MILLISECONDS + .toSeconds(lastHBcheckFinished - lastHBcheckStart), + datanodeHBIntervalSeconds, lastHBProcessedCount); + } + } + + /** + * Moves a Healthy node to a Stale node state. + * + * @param entry - Map Entry + */ + private void moveHealthyNodeToStale(Map.Entry<UUID, Long> entry) { + LOG.trace("Moving healthy node to stale: {}", entry.getKey()); + healthyNodes.remove(entry.getKey()); + healthyNodeCount.decrementAndGet(); + staleNodes.put(entry.getKey(), entry.getValue()); + staleNodeCount.incrementAndGet(); + + if (scmManager != null) { + // remove stale node's container report + scmManager.removeContainerReport(entry.getKey().toString()); + } + } + + /** + * Moves a Stale node to a dead node state. + * + * @param entry - Map Entry + */ + private void moveStaleNodeToDead(Map.Entry<UUID, Long> entry) { + LOG.trace("Moving stale node to dead: {}", entry.getKey()); + staleNodes.remove(entry.getKey()); + staleNodeCount.decrementAndGet(); + deadNodes.put(entry.getKey(), entry.getValue()); + deadNodeCount.incrementAndGet(); + + // Update SCM node stats + SCMNodeStat deadNodeStat = nodeStats.get(entry.getKey()); + scmStat.subtract(deadNodeStat); + nodeStats.remove(entry.getKey()); + } + + /** + * Handles a single heartbeat from a datanode. + * + * @param hbItem - heartbeat item from a datanode. + */ + private void handleHeartbeat(HeartbeatQueueItem hbItem) { + lastHBProcessedCount++; + + DatanodeDetails datanodeDetails = hbItem.getDatanodeDetails(); + UUID datanodeUuid = datanodeDetails.getUuid(); + SCMNodeReport nodeReport = hbItem.getNodeReport(); + long recvTimestamp = hbItem.getRecvTimestamp(); + long processTimestamp = Time.monotonicNow(); + if (LOG.isTraceEnabled()) { + //TODO: add average queue time of heartbeat request as metrics + LOG.trace("Processing Heartbeat from datanode {}: queueing time {}", + datanodeUuid, processTimestamp - recvTimestamp); + } + + // If this node is already in the list of known and healthy nodes + // just set the last timestamp and return. + if (healthyNodes.containsKey(datanodeUuid)) { + healthyNodes.put(datanodeUuid, processTimestamp); + updateNodeStat(datanodeUuid, nodeReport); + updateCommandQueue(datanodeUuid, + hbItem.getContainerReportState().getState()); + return; + } + + // A stale node has heartbeat us we need to remove the node from stale + // list and move to healthy list. + if (staleNodes.containsKey(datanodeUuid)) { + staleNodes.remove(datanodeUuid); + healthyNodes.put(datanodeUuid, processTimestamp); + healthyNodeCount.incrementAndGet(); + staleNodeCount.decrementAndGet(); + updateNodeStat(datanodeUuid, nodeReport); + updateCommandQueue(datanodeUuid, + hbItem.getContainerReportState().getState()); + return; + } + + // A dead node has heartbeat us, we need to remove that node from dead + // node list and move it to the healthy list. + if (deadNodes.containsKey(datanodeUuid)) { + deadNodes.remove(datanodeUuid); + healthyNodes.put(datanodeUuid, processTimestamp); + deadNodeCount.decrementAndGet(); + healthyNodeCount.incrementAndGet(); + updateNodeStat(datanodeUuid, nodeReport); + updateCommandQueue(datanodeUuid, + hbItem.getContainerReportState().getState()); + return; + } + + LOG.warn("SCM receive heartbeat from unregistered datanode {}", + datanodeUuid); + this.commandQueue.addCommand(datanodeUuid, + new ReregisterCommand()); + } + + private void updateNodeStat(UUID dnId, SCMNodeReport nodeReport) { + SCMNodeStat stat = nodeStats.get(dnId); + if (stat == null) { + LOG.debug("SCM updateNodeStat based on heartbeat from previous" + + "dead datanode {}", dnId); + stat = new SCMNodeStat(); + } + + if (nodeReport != null && nodeReport.getStorageReportCount() > 0) { + long totalCapacity = 0; + long totalRemaining = 0; + long totalScmUsed = 0; + List<SCMStorageReport> storageReports = nodeReport.getStorageReportList(); + for (SCMStorageReport report : storageReports) { + totalCapacity += report.getCapacity(); + totalRemaining += report.getRemaining(); + totalScmUsed+= report.getScmUsed(); + } + scmStat.subtract(stat); + stat.set(totalCapacity, totalScmUsed, totalRemaining); + nodeStats.put(dnId, stat); + scmStat.add(stat); + } + } + + private void updateCommandQueue(UUID dnId, + ReportState.states containerReportState) { + if (containerReportState != null) { + switch (containerReportState) { + case completeContinerReport: + commandQueue.addCommand(dnId, + SendContainerCommand.newBuilder().build()); + return; + case deltaContainerReport: + case noContainerReports: + default: + // do nothing + } + } + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + unregisterMXBean(); + nodePoolManager.close(); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Unable to shutdown NodeManager properly."); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @VisibleForTesting + long getLastHBProcessedCount() { + return lastHBProcessedCount; + } + + /** + * Gets the version info from SCM. + * + * @param versionRequest - version Request. + * @return - returns SCM version info and other required information needed by + * datanode. + */ + @Override + public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { + return VersionResponse.newBuilder() + .setVersion(this.version.getVersion()) + .build(); + } + + /** + * Register the node if the node finds that it is not registered with any + * SCM. + * + * @param datanodeDetailsProto - Send datanodeDetails with Node info. + * This function generates and assigns new datanode ID + * for the datanode. This allows SCM to be run independent + * of Namenode if required. + * + * @return SCMHeartbeatResponseProto + */ + @Override + public SCMCommand register(DatanodeDetailsProto datanodeDetailsProto) { + + DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( + datanodeDetailsProto); + InetAddress dnAddress = Server.getRemoteIp(); + if (dnAddress != null) { + // Mostly called inside an RPC, update ip and peer hostname + String hostname = dnAddress.getHostName(); + String ip = dnAddress.getHostAddress(); + datanodeDetails.setHostName(hostname); + datanodeDetails.setIpAddress(ip); + } + SCMCommand responseCommand = verifyDatanodeUUID(datanodeDetails); + if (responseCommand != null) { + return responseCommand; + } + UUID dnId = datanodeDetails.getUuid(); + nodes.put(dnId, datanodeDetails); + totalNodes.incrementAndGet(); + healthyNodes.put(dnId, monotonicNow()); + healthyNodeCount.incrementAndGet(); + nodeStats.put(dnId, new SCMNodeStat()); + + if(inStartupChillMode.get() && + totalNodes.get() >= getMinimumChillModeNodes()) { + inStartupChillMode.getAndSet(false); + LOG.info("Leaving startup chill mode."); + } + + // TODO: define node pool policy for non-default node pool. + // For now, all nodes are added to the "DefaultNodePool" upon registration + // if it has not been added to any node pool yet. + try { + if (nodePoolManager.getNodePool(datanodeDetails) == null) { + nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, + datanodeDetails); + } + } catch (IOException e) { + // TODO: make sure registration failure is handled correctly. + return RegisteredCommand.newBuilder() + .setErrorCode(ErrorCode.errorNodeNotPermitted) + .build(); + } + LOG.info("Data node with ID: {} Registered.", + datanodeDetails.getUuid()); + return RegisteredCommand.newBuilder() + .setErrorCode(ErrorCode.success) + .setDatanodeUUID(datanodeDetails.getUuidString()) + .setClusterID(this.clusterID) + .build(); + } + + /** + * Verifies the datanode does not have a valid UUID already. + * + * @param datanodeDetails - Datanode Details. + * @return SCMCommand + */ + private SCMCommand verifyDatanodeUUID(DatanodeDetails datanodeDetails) { + if (datanodeDetails.getUuid() != null && + nodes.containsKey(datanodeDetails.getUuid())) { + LOG.trace("Datanode is already registered. Datanode: {}", + datanodeDetails.toString()); + return RegisteredCommand.newBuilder() + .setErrorCode(ErrorCode.success) + .setClusterID(this.clusterID) + .setDatanodeUUID(datanodeDetails.getUuidString()) + .build(); + } + return null; + } + + /** + * Send heartbeat to indicate the datanode is alive and doing well. + * + * @param datanodeDetailsProto - DatanodeDetailsProto. + * @param nodeReport - node report. + * @param containerReportState - container report state. + * @return SCMheartbeat response. + * @throws IOException + */ + @Override + public List<SCMCommand> sendHeartbeat( + DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, + ReportState containerReportState) { + + DatanodeDetails datanodeDetails = DatanodeDetails + .getFromProtoBuf(datanodeDetailsProto); + + // Checking for NULL to make sure that we don't get + // an exception from ConcurrentList. + // This could be a problem in tests, if this function is invoked via + // protobuf, transport layer will guarantee that this is not null. + if (datanodeDetails != null) { + heartbeatQueue.add( + new HeartbeatQueueItem.Builder() + .setDatanodeDetails(datanodeDetails) + .setNodeReport(nodeReport) + .setContainerReportState(containerReportState) + .build()); + } else { + LOG.error("Datanode ID in heartbeat is null"); + } + + return commandQueue.getCommand(datanodeDetails.getUuid()); + } + + /** + * Returns the aggregated node stats. + * @return the aggregated node stats. + */ + @Override + public SCMNodeStat getStats() { + return new SCMNodeStat(this.scmStat); + } + + /** + * Return a map of node stats. + * @return a map of individual node stats (live/stale but not dead). + */ + @Override + public Map<UUID, SCMNodeStat> getNodeStats() { + return Collections.unmodifiableMap(nodeStats); + } + + /** + * Return the node stat of the specified datanode. + * @param datanodeDetails - datanode ID. + * @return node stat if it is live/stale, null if it is dead or does't exist. + */ + @Override + public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { + return new SCMNodeMetric(nodeStats.get(datanodeDetails)); + } + + @Override + public NodePoolManager getNodePoolManager() { + return nodePoolManager; + } + + @Override + public Map<String, Integer> getNodeCount() { + Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); + for(NodeState state : NodeState.values()) { + nodeCountMap.put(state.toString(), getNodeCount(state)); + } + return nodeCountMap; + } + + @Override + public void addDatanodeCommand(UUID dnId, SCMCommand command) { + this.commandQueue.addCommand(dnId, command); + } + + @VisibleForTesting + public void setStaleNodeIntervalMs(long interval) { + this.staleNodeIntervalMs = interval; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java new file mode 100644 index 0000000..a4a6c51 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_FIND_NODE_IN_POOL; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_LOAD_NODEPOOL; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB; + +/** + * SCM node pool manager that manges node pools. + */ +public final class SCMNodePoolManager implements NodePoolManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMNodePoolManager.class); + private static final List<DatanodeDetails> EMPTY_NODE_LIST = + new ArrayList<>(); + private static final List<String> EMPTY_NODEPOOL_LIST = new ArrayList<>(); + public static final String DEFAULT_NODEPOOL = "DefaultNodePool"; + + // DB that saves the node to node pool mapping. + private MetadataStore nodePoolStore; + + // In-memory node pool to nodes mapping + private HashMap<String, Set<DatanodeDetails>> nodePools; + + // Read-write lock for nodepool operations + private ReadWriteLock lock; + + /** + * Construct SCMNodePoolManager class that manages node to node pool mapping. + * @param conf - configuration. + * @throws IOException + */ + public SCMNodePoolManager(final OzoneConfiguration conf) + throws IOException { + final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + File metaDir = getOzoneMetaDirPath(conf); + String scmMetaDataDir = metaDir.getPath(); + File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB); + nodePoolStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(nodePoolDBPath) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); + nodePools = new HashMap<>(); + lock = new ReentrantReadWriteLock(); + init(); + } + + /** + * Initialize the in-memory store based on persist store from level db. + * No lock is needed as init() is only invoked by constructor. + * @throws SCMException + */ + private void init() throws SCMException { + try { + nodePoolStore.iterate(null, (key, value) -> { + try { + DatanodeDetails nodeId = DatanodeDetails.getFromProtoBuf( + HddsProtos.DatanodeDetailsProto.PARSER.parseFrom(key)); + String poolName = DFSUtil.bytes2String(value); + + Set<DatanodeDetails> nodePool = null; + if (nodePools.containsKey(poolName)) { + nodePool = nodePools.get(poolName); + } else { + nodePool = new HashSet<>(); + nodePools.put(poolName, nodePool); + } + nodePool.add(nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding node: {} to node pool: {}", + nodeId, poolName); + } + } catch (IOException e) { + LOG.warn("Can't add a datanode to node pool, continue next..."); + } + return true; + }); + } catch (IOException e) { + LOG.error("Loading node pool error " + e); + throw new SCMException("Failed to load node pool", + FAILED_TO_LOAD_NODEPOOL); + } + } + + /** + * Add a datanode to a node pool. + * @param pool - name of the node pool. + * @param node - name of the datanode. + */ + @Override + public void addNode(final String pool, final DatanodeDetails node) + throws IOException { + Preconditions.checkNotNull(pool, "pool name is null"); + Preconditions.checkNotNull(node, "node is null"); + lock.writeLock().lock(); + try { + // add to the persistent store + nodePoolStore.put(node.getProtoBufMessage().toByteArray(), + DFSUtil.string2Bytes(pool)); + + // add to the in-memory store + Set<DatanodeDetails> nodePool = null; + if (nodePools.containsKey(pool)) { + nodePool = nodePools.get(pool); + } else { + nodePool = new HashSet<DatanodeDetails>(); + nodePools.put(pool, nodePool); + } + nodePool.add(node); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Remove a datanode from a node pool. + * @param pool - name of the node pool. + * @param node - datanode id. + * @throws SCMException + */ + @Override + public void removeNode(final String pool, final DatanodeDetails node) + throws SCMException { + Preconditions.checkNotNull(pool, "pool name is null"); + Preconditions.checkNotNull(node, "node is null"); + lock.writeLock().lock(); + try { + // Remove from the persistent store + byte[] kName = node.getProtoBufMessage().toByteArray(); + byte[] kData = nodePoolStore.get(kName); + if (kData == null) { + throw new SCMException(String.format("Unable to find node %s from" + + " pool %s in DB.", DFSUtil.bytes2String(kName), pool), + FAILED_TO_FIND_NODE_IN_POOL); + } + nodePoolStore.delete(kName); + + // Remove from the in-memory store + if (nodePools.containsKey(pool)) { + Set<DatanodeDetails> nodePool = nodePools.get(pool); + nodePool.remove(node); + } else { + throw new SCMException(String.format("Unable to find node %s from" + + " pool %s in MAP.", DFSUtil.bytes2String(kName), pool), + FAILED_TO_FIND_NODE_IN_POOL); + } + } catch (IOException e) { + throw new SCMException("Failed to remove node " + node.toString() + + " from node pool " + pool, e, + SCMException.ResultCodes.IO_EXCEPTION); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Get all the node pools. + * @return all the node pools. + */ + @Override + public List<String> getNodePools() { + lock.readLock().lock(); + try { + if (!nodePools.isEmpty()) { + return nodePools.keySet().stream().collect(Collectors.toList()); + } else { + return EMPTY_NODEPOOL_LIST; + } + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get all datanodes of a specific node pool. + * @param pool - name of the node pool. + * @return all datanodes of the specified node pool. + */ + @Override + public List<DatanodeDetails> getNodes(final String pool) { + Preconditions.checkNotNull(pool, "pool name is null"); + if (nodePools.containsKey(pool)) { + return nodePools.get(pool).stream().collect(Collectors.toList()); + } else { + return EMPTY_NODE_LIST; + } + } + + /** + * Get the node pool name if the node has been added to a node pool. + * @param datanodeDetails - datanode ID. + * @return node pool name if it has been assigned. + * null if the node has not been assigned to any node pool yet. + * TODO: Put this in a in-memory map if performance is an issue. + */ + @Override + public String getNodePool(final DatanodeDetails datanodeDetails) + throws SCMException { + Preconditions.checkNotNull(datanodeDetails, "node is null"); + try { + byte[] result = nodePoolStore.get( + datanodeDetails.getProtoBufMessage().toByteArray()); + return result == null ? null : DFSUtil.bytes2String(result); + } catch (IOException e) { + throw new SCMException("Failed to get node pool for node " + + datanodeDetails.toString(), e, + SCMException.ResultCodes.IO_EXCEPTION); + } + } + + /** + * Close node pool level db store. + * @throws IOException + */ + @Override + public void close() throws IOException { + nodePoolStore.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java new file mode 100644 index 0000000..d6a8ad0 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.node; + +/** + * The node package deals with node management. + * <p/> + * The node manager takes care of node registrations, removal of node and + * handling of heartbeats. + * <p/> + * The node manager maintains statistics that gets send as part of + * heartbeats. + * <p/> + * The container manager polls the node manager to learn the state of + * datanodes that it is interested in. + * <p/> + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java new file mode 100644 index 0000000..4669e74 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm; + +/* + * This package contains StorageContainerManager classes. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java new file mode 100644 index 0000000..8e43528 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.pipelines; + +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manage Ozone pipelines. + */ +public abstract class PipelineManager { + private static final Logger LOG = + LoggerFactory.getLogger(PipelineManager.class); + private final List<PipelineChannel> activePipelineChannels; + private final AtomicInteger conduitsIndex; + + public PipelineManager() { + activePipelineChannels = new LinkedList<>(); + conduitsIndex = new AtomicInteger(0); + } + + /** + * This function is called by the Container Manager while allocating a new + * container. The client specifies what kind of replication pipeline is + * needed and based on the replication type in the request appropriate + * Interface is invoked. + * + * @param containerName Name of the container + * @param replicationFactor - Replication Factor + * @return a Pipeline. + */ + public synchronized final Pipeline getPipeline(String containerName, + ReplicationFactor replicationFactor, ReplicationType replicationType) + throws IOException { + /** + * In the Ozone world, we have a very simple policy. + * + * 1. Try to create a pipelineChannel if there are enough free nodes. + * + * 2. This allows all nodes to part of a pipelineChannel quickly. + * + * 3. if there are not enough free nodes, return conduits in a + * round-robin fashion. + * + * TODO: Might have to come up with a better algorithm than this. + * Create a new placement policy that returns conduits in round robin + * fashion. + */ + PipelineChannel pipelineChannel = + allocatePipelineChannel(replicationFactor); + if (pipelineChannel != null) { + LOG.debug("created new pipelineChannel:{} for container:{}", + pipelineChannel.getName(), containerName); + activePipelineChannels.add(pipelineChannel); + } else { + pipelineChannel = + findOpenPipelineChannel(replicationType, replicationFactor); + if (pipelineChannel != null) { + LOG.debug("re-used pipelineChannel:{} for container:{}", + pipelineChannel.getName(), containerName); + } + } + if (pipelineChannel == null) { + LOG.error("Get pipelineChannel call failed. We are not able to find" + + "free nodes or operational pipelineChannel."); + return null; + } else { + return new Pipeline(containerName, pipelineChannel); + } + } + + protected int getReplicationCount(ReplicationFactor factor) { + switch (factor) { + case ONE: + return 1; + case THREE: + return 3; + default: + throw new IllegalArgumentException("Unexpected replication count"); + } + } + + public abstract PipelineChannel allocatePipelineChannel( + ReplicationFactor replicationFactor) throws IOException; + + /** + * Find a PipelineChannel that is operational. + * + * @return - Pipeline or null + */ + private PipelineChannel findOpenPipelineChannel( + ReplicationType type, ReplicationFactor factor) { + PipelineChannel pipelineChannel = null; + final int sentinal = -1; + if (activePipelineChannels.size() == 0) { + LOG.error("No Operational conduits found. Returning null."); + return null; + } + int startIndex = getNextIndex(); + int nextIndex = sentinal; + for (; startIndex != nextIndex; nextIndex = getNextIndex()) { + // Just walk the list in a circular way. + PipelineChannel temp = + activePipelineChannels + .get(nextIndex != sentinal ? nextIndex : startIndex); + // if we find an operational pipelineChannel just return that. + if ((temp.getLifeCycleState() == LifeCycleState.OPEN) && + (temp.getFactor() == factor) && (temp.getType() == type)) { + pipelineChannel = temp; + break; + } + } + return pipelineChannel; + } + + /** + * gets the next index of the PipelineChannel to get. + * + * @return index in the link list to get. + */ + private int getNextIndex() { + return conduitsIndex.incrementAndGet() % activePipelineChannels.size(); + } + + /** + * Creates a pipeline from a specified set of Nodes. + * @param pipelineID - Name of the pipeline + * @param datanodes - The list of datanodes that make this pipeline. + */ + public abstract void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) throws IOException; + + /** + * Close the pipeline with the given clusterId. + */ + public abstract void closePipeline(String pipelineID) throws IOException; + + /** + * list members in the pipeline . + * @return the datanode + */ + public abstract List<DatanodeDetails> getMembers(String pipelineID) + throws IOException; + + /** + * Update the datanode list of the pipeline. + */ + public abstract void updatePipeline(String pipelineID, + List<DatanodeDetails> newDatanodes) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java new file mode 100644 index 0000000..f0c9eea --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.pipelines; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl; +import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.ozone.OzoneConsts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sends the request to the right pipeline manager. + */ +public class PipelineSelector { + private static final Logger LOG = + LoggerFactory.getLogger(PipelineSelector.class); + private final ContainerPlacementPolicy placementPolicy; + private final NodeManager nodeManager; + private final Configuration conf; + private final RatisManagerImpl ratisManager; + private final StandaloneManagerImpl standaloneManager; + private final long containerSize; + + /** + * Constructs a pipeline Selector. + * + * @param nodeManager - node manager + * @param conf - Ozone Config + */ + public PipelineSelector(NodeManager nodeManager, Configuration conf) { + this.nodeManager = nodeManager; + this.conf = conf; + this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); + this.containerSize = OzoneConsts.GB * this.conf.getInt( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); + this.standaloneManager = + new StandaloneManagerImpl(this.nodeManager, placementPolicy, + containerSize); + this.ratisManager = + new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, + conf); + } + + /** + * Translates a list of nodes, ordered such that the first is the leader, into + * a corresponding {@link Pipeline} object. + * + * @param nodes - list of datanodes on which we will allocate the container. + * The first of the list will be the leader node. + * @return pipeline corresponding to nodes + */ + public static PipelineChannel newPipelineFromNodes( + List<DatanodeDetails> nodes, LifeCycleState state, + ReplicationType replicationType, ReplicationFactor replicationFactor, + String name) { + Preconditions.checkNotNull(nodes); + Preconditions.checkArgument(nodes.size() > 0); + String leaderId = nodes.get(0).getUuidString(); + PipelineChannel + pipelineChannel = new PipelineChannel(leaderId, state, replicationType, + replicationFactor, name); + for (DatanodeDetails node : nodes) { + pipelineChannel.addMember(node); + } + return pipelineChannel; + } + + /** + * Create pluggable container placement policy implementation instance. + * + * @param nodeManager - SCM node manager. + * @param conf - configuration. + * @return SCM container placement policy implementation instance. + */ + @SuppressWarnings("unchecked") + private static ContainerPlacementPolicy createContainerPlacementPolicy( + final NodeManager nodeManager, final Configuration conf) { + Class<? extends ContainerPlacementPolicy> implClass = + (Class<? extends ContainerPlacementPolicy>) conf.getClass( + ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementRandom.class); + + try { + Constructor<? extends ContainerPlacementPolicy> ctor = + implClass.getDeclaredConstructor(NodeManager.class, + Configuration.class); + return ctor.newInstance(nodeManager, conf); + } catch (RuntimeException e) { + throw e; + } catch (InvocationTargetException e) { + throw new RuntimeException(implClass.getName() + + " could not be constructed.", e.getCause()); + } catch (Exception e) { + LOG.error("Unhandled exception occurred, Placement policy will not be " + + "functional."); + throw new IllegalArgumentException("Unable to load " + + "ContainerPlacementPolicy", e); + } + } + + /** + * Return the pipeline manager from the replication type. + * + * @param replicationType - Replication Type Enum. + * @return pipeline Manager. + * @throws IllegalArgumentException If an pipeline type gets added + * and this function is not modified we will throw. + */ + private PipelineManager getPipelineManager(ReplicationType replicationType) + throws IllegalArgumentException { + switch (replicationType) { + case RATIS: + return this.ratisManager; + case STAND_ALONE: + return this.standaloneManager; + case CHAINED: + throw new IllegalArgumentException("Not implemented yet"); + default: + throw new IllegalArgumentException("Unexpected enum found. Does not" + + " know how to handle " + replicationType.toString()); + } + + } + + /** + * This function is called by the Container Manager while allocating a new + * container. The client specifies what kind of replication pipeline is needed + * and based on the replication type in the request appropriate Interface is + * invoked. + */ + + public Pipeline getReplicationPipeline(ReplicationType replicationType, + HddsProtos.ReplicationFactor replicationFactor, String containerName) + throws IOException { + PipelineManager manager = getPipelineManager(replicationType); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Getting replication pipeline for {} : Replication {}", + containerName, replicationFactor.toString()); + return manager. + getPipeline(containerName, replicationFactor, replicationType); + } + + /** + * Creates a pipeline from a specified set of Nodes. + */ + + public void createPipeline(ReplicationType replicationType, String + pipelineID, List<DatanodeDetails> datanodes) throws IOException { + PipelineManager manager = getPipelineManager(replicationType); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID, + datanodes.stream().map(DatanodeDetails::toString) + .collect(Collectors.joining(","))); + manager.createPipeline(pipelineID, datanodes); + } + + /** + * Close the pipeline with the given clusterId. + */ + + public void closePipeline(ReplicationType replicationType, String + pipelineID) throws IOException { + PipelineManager manager = getPipelineManager(replicationType); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Closing pipeline. pipelineID: {}", pipelineID); + manager.closePipeline(pipelineID); + } + + /** + * list members in the pipeline . + */ + + public List<DatanodeDetails> getDatanodes(ReplicationType replicationType, + String pipelineID) throws IOException { + PipelineManager manager = getPipelineManager(replicationType); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Getting data nodes from pipeline : {}", pipelineID); + return manager.getMembers(pipelineID); + } + + /** + * Update the datanodes in the list of the pipeline. + */ + + public void updateDatanodes(ReplicationType replicationType, String + pipelineID, List<DatanodeDetails> newDatanodes) throws IOException { + PipelineManager manager = getPipelineManager(replicationType); + Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); + LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID, + newDatanodes.stream().map(DatanodeDetails::toString) + .collect(Collectors.joining(","))); + manager.updatePipeline(pipelineID, newDatanodes); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java new file mode 100644 index 0000000..ea24c58 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/package-info.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipelines; +/** + Ozone supports the notion of different kind of pipelines. + That means that we can have a replication pipeline build on + Ratis, Standalone or some other protocol. All Pipeline managers + the entities in charge of pipelines reside in the package. + + Here is the high level Arch. + + 1. A pipeline selector class is instantiated in the Container manager class. + + 2. A client when creating a container -- will specify what kind of + replication type it wants to use. We support 2 types now, Ratis and StandAlone. + + 3. Based on the replication type, the pipeline selector class asks the + corresponding pipeline manager for a pipeline. + + 4. We have supported the ability for clients to specify a set of nodes in + the pipeline or rely in the pipeline manager to select the datanodes if they + are not specified. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java new file mode 100644 index 0000000..089a137 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.pipelines.ratis; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Implementation of {@link PipelineManager}. + * + * TODO : Introduce a state machine. + */ +public class RatisManagerImpl extends PipelineManager { + private static final Logger LOG = + LoggerFactory.getLogger(RatisManagerImpl.class); + private static final String PREFIX = "Ratis-"; + private final Configuration conf; + private final NodeManager nodeManager; + private final Set<DatanodeDetails> ratisMembers; + + /** + * Constructs a Ratis Pipeline Manager. + * + * @param nodeManager + */ + public RatisManagerImpl(NodeManager nodeManager, + ContainerPlacementPolicy placementPolicy, long size, Configuration conf) { + super(); + this.conf = conf; + this.nodeManager = nodeManager; + ratisMembers = new HashSet<>(); + } + + /** + * Allocates a new ratis PipelineChannel from the free nodes. + * + * @param factor - One or Three + * @return PipelineChannel. + */ + public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { + List<DatanodeDetails> newNodesList = new LinkedList<>(); + List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + int count = getReplicationCount(factor); + //TODO: Add Raft State to the Nodes, so we can query and skip nodes from + // data from datanode instead of maintaining a set. + for (DatanodeDetails datanode : datanodes) { + Preconditions.checkNotNull(datanode); + if (!ratisMembers.contains(datanode)) { + newNodesList.add(datanode); + if (newNodesList.size() == count) { + // once a datanode has been added to a pipeline, exclude it from + // further allocations + ratisMembers.addAll(newNodesList); + LOG.info("Allocating a new ratis pipelineChannel of size: {}", count); + // Start all channel names with "Ratis", easy to grep the logs. + String conduitName = PREFIX + + UUID.randomUUID().toString().substring(PREFIX.length()); + PipelineChannel pipelineChannel = + PipelineSelector.newPipelineFromNodes(newNodesList, + LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName); + Pipeline pipeline = + new Pipeline("setup", pipelineChannel); + try (XceiverClientRatis client = + XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { + client.createPipeline(pipeline.getPipelineName(), newNodesList); + } catch (IOException e) { + return null; + } + return pipelineChannel; + } + } + } + return null; + } + + /** + * Creates a pipeline from a specified set of Nodes. + * + * @param pipelineID - Name of the pipeline + * @param datanodes - The list of datanodes that make this pipeline. + */ + @Override + public void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) { + + } + + /** + * Close the pipeline with the given clusterId. + * + * @param pipelineID + */ + @Override + public void closePipeline(String pipelineID) throws IOException { + + } + + /** + * list members in the pipeline . + * + * @param pipelineID + * @return the datanode + */ + @Override + public List<DatanodeDetails> getMembers(String pipelineID) + throws IOException { + return null; + } + + /** + * Update the datanode list of the pipeline. + * + * @param pipelineID + * @param newDatanodes + */ + @Override + public void updatePipeline(String pipelineID, + List<DatanodeDetails> newDatanodes) + throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java new file mode 100644 index 0000000..2970fb3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipelines.ratis; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java new file mode 100644 index 0000000..8268329 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.pipelines.standalone; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipelines.PipelineManager; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Standalone Manager Impl to prove that pluggable interface + * works with current tests. + */ +public class StandaloneManagerImpl extends PipelineManager { + private static final Logger LOG = + LoggerFactory.getLogger(StandaloneManagerImpl.class); + private final NodeManager nodeManager; + private final ContainerPlacementPolicy placementPolicy; + private final long containerSize; + private final Set<DatanodeDetails> standAloneMembers; + + /** + * Constructor for Standalone Node Manager Impl. + * @param nodeManager - Node Manager. + * @param placementPolicy - Placement Policy + * @param containerSize - Container Size. + */ + public StandaloneManagerImpl(NodeManager nodeManager, + ContainerPlacementPolicy placementPolicy, long containerSize) { + super(); + this.nodeManager = nodeManager; + this.placementPolicy = placementPolicy; + this.containerSize = containerSize; + this.standAloneMembers = new HashSet<>(); + } + + + /** + * Allocates a new standalone PipelineChannel from the free nodes. + * + * @param factor - One + * @return PipelineChannel. + */ + public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { + List<DatanodeDetails> newNodesList = new LinkedList<>(); + List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + int count = getReplicationCount(factor); + for (DatanodeDetails datanode : datanodes) { + Preconditions.checkNotNull(datanode); + if (!standAloneMembers.contains(datanode)) { + newNodesList.add(datanode); + if (newNodesList.size() == count) { + // once a datanode has been added to a pipeline, exclude it from + // further allocations + standAloneMembers.addAll(newNodesList); + LOG.info("Allocating a new standalone pipeline channel of size: {}", + count); + String channelName = + "SA-" + UUID.randomUUID().toString().substring(3); + return PipelineSelector.newPipelineFromNodes(newNodesList, + LifeCycleState.OPEN, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE, channelName); + } + } + } + return null; + } + + /** + * Creates a pipeline from a specified set of Nodes. + * + * @param pipelineID - Name of the pipeline + * @param datanodes - The list of datanodes that make this pipeline. + */ + @Override + public void createPipeline(String pipelineID, + List<DatanodeDetails> datanodes) { + //return newPipelineFromNodes(datanodes, pipelineID); + } + + /** + * Close the pipeline with the given clusterId. + * + * @param pipelineID + */ + @Override + public void closePipeline(String pipelineID) throws IOException { + + } + + /** + * list members in the pipeline . + * + * @param pipelineID + * @return the datanode + */ + @Override + public List<DatanodeDetails> getMembers(String pipelineID) + throws IOException { + return null; + } + + /** + * Update the datanode list of the pipeline. + * + * @param pipelineID + * @param newDatanodes + */ + @Override + public void updatePipeline(String pipelineID, List<DatanodeDetails> + newDatanodes) throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java new file mode 100644 index 0000000..b2c3ca40 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipelines.standalone; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java new file mode 100644 index 0000000..4944017 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.ratis; + +/** + * This package contains classes related to Apache Ratis for SCM. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/webapps/scm/index.html ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/webapps/scm/index.html b/hadoop-hdds/server-scm/src/main/webapps/scm/index.html new file mode 100644 index 0000000..3407f51 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/webapps/scm/index.html @@ -0,0 +1,76 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" + "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<html lang="en"> +<head> + <meta charset="utf-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> + <meta name="description" content="HDFS Storage Container Manager"> + + <title>HDFS Storage Container Manager</title> + + <link href="static/bootstrap-3.0.2/css/bootstrap.min.css" rel="stylesheet"> + <link href="static/hadoop.css" rel="stylesheet"> + <link href="static/nvd3-1.8.5.min.css" rel="stylesheet"> + + <link href="static/ozone.css" rel="stylesheet"> + +</head> + +<body ng-app="scm"> + +<header class="navbar navbar-inverse navbar-fixed-top bs-docs-nav"> + <div class="container-fluid"> + <div class="navbar-header"> + <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" + aria-expanded="false" aria-controls="navbar"> + <span class="sr-only">Toggle navigation</span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + </button> + <a class="navbar-brand" href="#">HDFS SCM</a> + </div> + + + <navmenu + metrics="{ 'Rpc metrics' : '#!/metrics/rpc'}"></navmenu> + + + </div> +</header> + +<div class="container-fluid" style="margin: 12pt"> + + <ng-view></ng-view> + +</div><!-- /.container --> + +<script src="static/jquery-1.10.2.min.js"></script> +<script src="static/angular-1.6.4.min.js"></script> +<script src="static/angular-route-1.6.4.min.js"></script> +<script src="static/d3-3.5.17.min.js"></script> +<script src="static/nvd3-1.8.5.min.js"></script> +<script src="static/angular-nvd3-1.0.9.min.js"></script> +<script src="static/ozone.js"></script> +<script src="scm.js"></script> +<script src="static/bootstrap-3.0.2/js/bootstrap.min.js"></script> +</body> +</html> http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/webapps/scm/main.html ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/webapps/scm/main.html b/hadoop-hdds/server-scm/src/main/webapps/scm/main.html new file mode 100644 index 0000000..2666f81 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/webapps/scm/main.html @@ -0,0 +1,20 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<overview> + <scm-overview> + </scm-overview> +</overview> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org