http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java new file mode 100644 index 0000000..0a595d5 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +/** + * SCM CommonPolicy implements a set of invariants which are common + * for all container placement policies, acts as the repository of helper + * functions which are common to placement policies. + */ +public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMCommonPolicy.class); + private final NodeManager nodeManager; + private final Random rand; + private final Configuration conf; + + /** + * Constructs SCM Common Policy Class. + * + * @param nodeManager NodeManager + * @param conf Configuration class. + */ + public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) { + this.nodeManager = nodeManager; + this.rand = new Random(); + this.conf = conf; + } + + /** + * Return node manager. + * + * @return node manager + */ + public NodeManager getNodeManager() { + return nodeManager; + } + + /** + * Returns the Random Object. + * + * @return rand + */ + public Random getRand() { + return rand; + } + + /** + * Get Config. + * + * @return Configuration + */ + public Configuration getConf() { + return conf; + } + + /** + * Given the replication factor and size required, return set of datanodes + * that satisfy the nodes and size requirement. + * <p> + * Here are some invariants of container placement. + * <p> + * 1. We place containers only on healthy nodes. + * 2. We place containers on nodes with enough space for that container. + * 3. if a set of containers are requested, we either meet the required + * number of nodes or we fail that request. + * + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return list of datanodes chosen. + * @throws SCMException SCM exception. + */ + + public List<DatanodeDetails> chooseDatanodes(int nodesRequired, final long + sizeRequired) throws SCMException { + List<DatanodeDetails> healthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + String msg; + if (healthyNodes.size() == 0) { + msg = "No healthy node found to allocate container."; + LOG.error(msg); + throw new SCMException(msg, SCMException.ResultCodes + .FAILED_TO_FIND_HEALTHY_NODES); + } + + if (healthyNodes.size() < nodesRequired) { + msg = String.format("Not enough healthy nodes to allocate container. %d " + + " datanodes required. Found %d", + nodesRequired, healthyNodes.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d -> + hasEnoughSpace(d, sizeRequired)).collect(Collectors.toList()); + + if (healthyList.size() < nodesRequired) { + msg = String.format("Unable to find enough nodes that meet the space " + + "requirement of %d bytes in healthy node set." + + " Nodes required: %d Found: %d", + sizeRequired, nodesRequired, healthyList.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE); + } + + return healthyList; + } + + /** + * Returns true if this node has enough space to meet our requirement. + * + * @param datanodeDetails DatanodeDetails + * @return true if we have enough space. + */ + private boolean hasEnoughSpace(DatanodeDetails datanodeDetails, + long sizeRequired) { + SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails); + return (nodeMetric != null) && nodeMetric.get().getRemaining() + .hasResources(sizeRequired); + } + + /** + * This function invokes the derived classes chooseNode Function to build a + * list of nodes. Then it verifies that invoked policy was able to return + * expected number of nodes. + * + * @param nodesRequired - Nodes Required + * @param healthyNodes - List of Nodes in the result set. + * @return List of Datanodes that can be used for placement. + * @throws SCMException + */ + public List<DatanodeDetails> getResultSet( + int nodesRequired, List<DatanodeDetails> healthyNodes) + throws SCMException { + List<DatanodeDetails> results = new LinkedList<>(); + for (int x = 0; x < nodesRequired; x++) { + // invoke the choose function defined in the derived classes. + DatanodeDetails nodeId = chooseNode(healthyNodes); + if (nodeId != null) { + results.add(nodeId); + } + } + + if (results.size() < nodesRequired) { + LOG.error("Unable to find the required number of healthy nodes that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return results; + } + + /** + * Choose a datanode according to the policy, this function is implemented + * by the actual policy class. For example, PlacementCapacity or + * PlacementRandom. + * + * @param healthyNodes - Set of healthy nodes we can choose from. + * @return DatanodeDetails + */ + public abstract DatanodeDetails chooseNode( + List<DatanodeDetails> healthyNodes); + + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java new file mode 100644 index 0000000..85a6b54 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Container placement policy that randomly choose datanodes with remaining + * space to satisfy the size constraints. + * <p> + * The Algorithm is as follows, Pick 2 random nodes from a given pool of nodes + * and then pick the node which lower utilization. This leads to a higher + * probability of nodes with lower utilization to be picked. + * <p> + * For those wondering why we choose two nodes randomly and choose the node + * with lower utilization. There are links to this original papers in + * HDFS-11564. + * <p> + * A brief summary -- We treat the nodes from a scale of lowest utilized to + * highest utilized, there are (s * ( s + 1)) / 2 possibilities to build + * distinct pairs of nodes. There are s - k pairs of nodes in which the rank + * k node is less than the couple. So probability of a picking a node is + * (2 * (s -k)) / (s * (s - 1)). + * <p> + * In English, There is a much higher probability of picking less utilized nodes + * as compared to nodes with higher utilization since we pick 2 nodes and + * then pick the node with lower utilization. + * <p> + * This avoids the issue of users adding new nodes into the cluster and HDFS + * sending all traffic to those nodes if we only use a capacity based + * allocation scheme. Unless those nodes are part of the set of the first 2 + * nodes then newer nodes will not be in the running to get the container. + * <p> + * This leads to an I/O pattern where the lower utilized nodes are favoured + * more than higher utilized nodes, but part of the I/O will still go to the + * older higher utilized nodes. + * <p> + * With this algorithm in place, our hope is that balancer tool needs to do + * little or no work and the cluster will achieve a balanced distribution + * over time. + */ +public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMContainerPlacementCapacity.class); + + /** + * Constructs a Container Placement with considering only capacity. + * That is this policy tries to place containers based on node weight. + * + * @param nodeManager Node Manager + * @param conf Configuration + */ + public SCMContainerPlacementCapacity(final NodeManager nodeManager, + final Configuration conf) { + super(nodeManager, conf); + } + + /** + * Called by SCM to choose datanodes. + * + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return List of datanodes. + * @throws SCMException SCMException + */ + @Override + public List<DatanodeDetails> chooseDatanodes( + final int nodesRequired, final long sizeRequired) throws SCMException { + List<DatanodeDetails> healthyNodes = + super.chooseDatanodes(nodesRequired, sizeRequired); + if (healthyNodes.size() == nodesRequired) { + return healthyNodes; + } + return getResultSet(nodesRequired, healthyNodes); + } + + /** + * Find a node from the healthy list and return it after removing it from the + * list that we are operating on. + * + * @param healthyNodes - List of healthy nodes that meet the size + * requirement. + * @return DatanodeDetails that is chosen. + */ + @Override + public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) { + int firstNodeNdx = getRand().nextInt(healthyNodes.size()); + int secondNodeNdx = getRand().nextInt(healthyNodes.size()); + + DatanodeDetails datanodeDetails; + // There is a possibility that both numbers will be same. + // if that is so, we just return the node. + if (firstNodeNdx == secondNodeNdx) { + datanodeDetails = healthyNodes.get(firstNodeNdx); + } else { + DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx); + DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx); + SCMNodeMetric firstNodeMetric = + getNodeManager().getNodeStat(firstNodeDetails); + SCMNodeMetric secondNodeMetric = + getNodeManager().getNodeStat(secondNodeDetails); + datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get()) + ? firstNodeDetails : secondNodeDetails; + } + healthyNodes.remove(datanodeDetails); + return datanodeDetails; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java new file mode 100644 index 0000000..9903c84 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Container placement policy that randomly chooses healthy datanodes. + * This is very similar to current HDFS placement. That is we + * just randomly place containers without any considerations of utilization. + * <p> + * That means we rely on balancer to achieve even distribution of data. + * Balancer will need to support containers as a feature before this class + * can be practically used. + */ +public final class SCMContainerPlacementRandom extends SCMCommonPolicy + implements ContainerPlacementPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMContainerPlacementRandom.class); + + /** + * Construct a random Block Placement policy. + * + * @param nodeManager nodeManager + * @param conf Config + */ + public SCMContainerPlacementRandom(final NodeManager nodeManager, + final Configuration conf) { + super(nodeManager, conf); + } + + /** + * Choose datanodes called by the SCM to choose the datanode. + * + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return List of Datanodes. + * @throws SCMException SCMException + */ + @Override + public List<DatanodeDetails> chooseDatanodes( + final int nodesRequired, final long sizeRequired) throws SCMException { + List<DatanodeDetails> healthyNodes = + super.chooseDatanodes(nodesRequired, sizeRequired); + + if (healthyNodes.size() == nodesRequired) { + return healthyNodes; + } + return getResultSet(nodesRequired, healthyNodes); + } + + /** + * Just chose a node randomly and remove it from the set of nodes we can + * chose from. + * + * @param healthyNodes - all healthy datanodes. + * @return one randomly chosen datanode that from two randomly chosen datanode + */ + public DatanodeDetails chooseNode(final List<DatanodeDetails> healthyNodes) { + DatanodeDetails selectedNode = + healthyNodes.get(getRand().nextInt(healthyNodes.size())); + healthyNodes.remove(selectedNode); + return selectedNode; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java new file mode 100644 index 0000000..1cb810d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.algorithms; +// Various placement algorithms. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java new file mode 100644 index 0000000..b8e8998 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.web.utils.JsonUtils; + +import java.io.IOException; + +/** + * This class represents the SCM container stat. + */ +public class ContainerStat { + /** + * The maximum container size. + */ + @JsonProperty("Size") + private LongMetric size; + + /** + * The number of bytes used by the container. + */ + @JsonProperty("Used") + private LongMetric used; + + /** + * The number of keys in the container. + */ + @JsonProperty("KeyCount") + private LongMetric keyCount; + + /** + * The number of bytes read from the container. + */ + @JsonProperty("ReadBytes") + private LongMetric readBytes; + + /** + * The number of bytes write into the container. + */ + @JsonProperty("WriteBytes") + private LongMetric writeBytes; + + /** + * The number of times the container is read. + */ + @JsonProperty("ReadCount") + private LongMetric readCount; + + /** + * The number of times the container is written into. + */ + @JsonProperty("WriteCount") + private LongMetric writeCount; + + public ContainerStat() { + this(0L, 0L, 0L, 0L, 0L, 0L, 0L); + } + + public ContainerStat(long size, long used, long keyCount, long readBytes, + long writeBytes, long readCount, long writeCount) { + Preconditions.checkArgument(size >= 0, + "Container size cannot be " + "negative."); + Preconditions.checkArgument(used >= 0, + "Used space cannot be " + "negative."); + Preconditions.checkArgument(keyCount >= 0, + "Key count cannot be " + "negative"); + Preconditions.checkArgument(readBytes >= 0, + "Read bytes read cannot be " + "negative."); + Preconditions.checkArgument(readBytes >= 0, + "Write bytes cannot be " + "negative."); + Preconditions.checkArgument(readCount >= 0, + "Read count cannot be " + "negative."); + Preconditions.checkArgument(writeCount >= 0, + "Write count cannot be " + "negative"); + + this.size = new LongMetric(size); + this.used = new LongMetric(used); + this.keyCount = new LongMetric(keyCount); + this.readBytes = new LongMetric(readBytes); + this.writeBytes = new LongMetric(writeBytes); + this.readCount = new LongMetric(readCount); + this.writeCount = new LongMetric(writeCount); + } + + public LongMetric getSize() { + return size; + } + + public LongMetric getUsed() { + return used; + } + + public LongMetric getKeyCount() { + return keyCount; + } + + public LongMetric getReadBytes() { + return readBytes; + } + + public LongMetric getWriteBytes() { + return writeBytes; + } + + public LongMetric getReadCount() { + return readCount; + } + + public LongMetric getWriteCount() { + return writeCount; + } + + public void add(ContainerStat stat) { + if (stat == null) { + return; + } + + this.size.add(stat.getSize().get()); + this.used.add(stat.getUsed().get()); + this.keyCount.add(stat.getKeyCount().get()); + this.readBytes.add(stat.getReadBytes().get()); + this.writeBytes.add(stat.getWriteBytes().get()); + this.readCount.add(stat.getReadCount().get()); + this.writeCount.add(stat.getWriteCount().get()); + } + + public void subtract(ContainerStat stat) { + if (stat == null) { + return; + } + + this.size.subtract(stat.getSize().get()); + this.used.subtract(stat.getUsed().get()); + this.keyCount.subtract(stat.getKeyCount().get()); + this.readBytes.subtract(stat.getReadBytes().get()); + this.writeBytes.subtract(stat.getWriteBytes().get()); + this.readCount.subtract(stat.getReadCount().get()); + this.writeCount.subtract(stat.getWriteCount().get()); + } + + public String toJsonString() { + try { + return JsonUtils.toJsonString(this); + } catch (IOException ignored) { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java new file mode 100644 index 0000000..a6e732c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +import org.apache.hadoop.hdds.scm.exceptions.SCMException; + +/** + * DatanodeMetric acts as the basis for all the metric that is used in + * comparing 2 datanodes. + */ +public interface DatanodeMetric<T, S> extends Comparable<T> { + + /** + * Some syntactic sugar over Comparable interface. This makes code easier to + * read. + * + * @param o - Other Object + * @return - True if *this* object is greater than argument. + */ + boolean isGreater(T o); + + /** + * Inverse of isGreater. + * + * @param o - other object. + * @return True if *this* object is Lesser than argument. + */ + boolean isLess(T o); + + /** + * Returns true if the object has same values. Because of issues with + * equals, and loss of type information this interface supports isEqual. + * + * @param o object to compare. + * @return True, if the values match. + */ + boolean isEqual(T o); + + /** + * A resourceCheck, defined by resourceNeeded. + * For example, S could be bytes required + * and DatanodeMetric can reply by saying it can be met or not. + * + * @param resourceNeeded - ResourceNeeded in its own metric. + * @return boolean, True if this resource requirement can be met. + */ + boolean hasResources(S resourceNeeded) throws SCMException; + + /** + * Returns the metric. + * + * @return T, the object that represents this metric. + */ + T get(); + + /** + * Sets the value of this metric. + * + * @param value - value of the metric. + */ + void set(T value); + + /** + * Adds a value of to the base. + * @param value - value + */ + void add(T value); + + /** + * subtract a value. + * @param value value + */ + void subtract(T value); + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java new file mode 100644 index 0000000..050d26b --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; + +/** + * An helper class for all metrics based on Longs. + */ +@JsonAutoDetect(fieldVisibility = Visibility.ANY) +public class LongMetric implements DatanodeMetric<Long, Long> { + private Long value; + + /** + * Constructs a long Metric. + * + * @param value Value for this metric. + */ + public LongMetric(Long value) { + this.value = value; + } + + /** + * Some syntactic sugar over Comparable interface. This makes code easier to + * read. + * + * @param o - Other Object + * @return - True if *this* object is greater than argument. + */ + @Override + public boolean isGreater(Long o) { + return compareTo(o) > 0; + } + + /** + * Inverse of isGreater. + * + * @param o - other object. + * @return True if *this* object is Lesser than argument. + */ + @Override + public boolean isLess(Long o) { + return compareTo(o) < 0; + } + + /** + * Returns true if the object has same values. Because of issues with + * equals, and loss of type information this interface supports isEqual. + * + * @param o object to compare. + * @return True, if the values match. + */ + @Override + public boolean isEqual(Long o) { + return compareTo(o) == 0; + } + + /** + * A resourceCheck, defined by resourceNeeded. + * For example, S could be bytes required + * and DatanodeMetric can reply by saying it can be met or not. + * + * @param resourceNeeded - ResourceNeeded in its own metric. + * @return boolean, True if this resource requirement can be met. + */ + @Override + public boolean hasResources(Long resourceNeeded) { + return isGreater(resourceNeeded); + } + + /** + * Returns the metric. + * + * @return T, the object that represents this metric. + */ + @Override + public Long get() { + return this.value; + } + + /** + * Sets the value of this metric. + * + * @param setValue - value of the metric. + */ + @Override + public void set(Long setValue) { + this.value = setValue; + + } + + /** + * Adds a value of to the base. + * + * @param addValue - value + */ + @Override + public void add(Long addValue) { + this.value += addValue; + } + + /** + * subtract a value. + * + * @param subValue value + */ + @Override + public void subtract(Long subValue) { + this.value -= subValue; + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(Long o) { + return Long.compare(this.value, o); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LongMetric that = (LongMetric) o; + + return value != null ? value.equals(that.value) : that.value == null; + } + + @Override + public int hashCode() { + return value != null ? value.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java new file mode 100644 index 0000000..d6857d3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Interface that defines Node Stats. + */ +interface NodeStat { + /** + * Get capacity of the node. + * @return capacity of the node. + */ + LongMetric getCapacity(); + + /** + * Get the used space of the node. + * @return the used space of the node. + */ + LongMetric getScmUsed(); + + /** + * Get the remaining space of the node. + * @return the remaining space of the node. + */ + LongMetric getRemaining(); + + /** + * Set the total/used/remaining space. + * @param capacity - total space. + * @param used - used space. + * @param remain - remaining space. + */ + @VisibleForTesting + void set(long capacity, long used, long remain); + + /** + * Adding of the stat. + * @param stat - stat to be added. + * @return updated node stat. + */ + NodeStat add(NodeStat stat); + + /** + * Subtract of the stat. + * @param stat - stat to be subtracted. + * @return updated nodestat. + */ + NodeStat subtract(NodeStat stat); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java new file mode 100644 index 0000000..e4dd9aa --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +/** + * This class is for maintaining StorageContainerManager statistics. + */ +@Metrics(about="Storage Container Manager Metrics", context="dfs") +public class SCMMetrics { + public static final String SOURCE_NAME = + SCMMetrics.class.getSimpleName(); + + /** + * Container stat metrics, the meaning of following metrics + * can be found in {@link ContainerStat}. + */ + @Metric private MutableGaugeLong lastContainerReportSize; + @Metric private MutableGaugeLong lastContainerReportUsed; + @Metric private MutableGaugeLong lastContainerReportKeyCount; + @Metric private MutableGaugeLong lastContainerReportReadBytes; + @Metric private MutableGaugeLong lastContainerReportWriteBytes; + @Metric private MutableGaugeLong lastContainerReportReadCount; + @Metric private MutableGaugeLong lastContainerReportWriteCount; + + @Metric private MutableCounterLong containerReportSize; + @Metric private MutableCounterLong containerReportUsed; + @Metric private MutableCounterLong containerReportKeyCount; + @Metric private MutableCounterLong containerReportReadBytes; + @Metric private MutableCounterLong containerReportWriteBytes; + @Metric private MutableCounterLong containerReportReadCount; + @Metric private MutableCounterLong containerReportWriteCount; + + public SCMMetrics() { + } + + public static SCMMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, "Storage Container Manager Metrics", + new SCMMetrics()); + } + + public void setLastContainerReportSize(long size) { + this.lastContainerReportSize.set(size); + } + + public void setLastContainerReportUsed(long used) { + this.lastContainerReportUsed.set(used); + } + + public void setLastContainerReportKeyCount(long keyCount) { + this.lastContainerReportKeyCount.set(keyCount); + } + + public void setLastContainerReportReadBytes(long readBytes) { + this.lastContainerReportReadBytes.set(readBytes); + } + + public void setLastContainerReportWriteBytes(long writeBytes) { + this.lastContainerReportWriteBytes.set(writeBytes); + } + + public void setLastContainerReportReadCount(long readCount) { + this.lastContainerReportReadCount.set(readCount); + } + + public void setLastContainerReportWriteCount(long writeCount) { + this.lastContainerReportWriteCount.set(writeCount); + } + + public void incrContainerReportSize(long size) { + this.containerReportSize.incr(size); + } + + public void incrContainerReportUsed(long used) { + this.containerReportUsed.incr(used); + } + + public void incrContainerReportKeyCount(long keyCount) { + this.containerReportKeyCount.incr(keyCount); + } + + public void incrContainerReportReadBytes(long readBytes) { + this.containerReportReadBytes.incr(readBytes); + } + + public void incrContainerReportWriteBytes(long writeBytes) { + this.containerReportWriteBytes.incr(writeBytes); + } + + public void incrContainerReportReadCount(long readCount) { + this.containerReportReadCount.incr(readCount); + } + + public void incrContainerReportWriteCount(long writeCount) { + this.containerReportWriteCount.incr(writeCount); + } + + public void setLastContainerStat(ContainerStat newStat) { + this.lastContainerReportSize.set(newStat.getSize().get()); + this.lastContainerReportUsed.set(newStat.getUsed().get()); + this.lastContainerReportKeyCount.set(newStat.getKeyCount().get()); + this.lastContainerReportReadBytes.set(newStat.getReadBytes().get()); + this.lastContainerReportWriteBytes.set(newStat.getWriteBytes().get()); + this.lastContainerReportReadCount.set(newStat.getReadCount().get()); + this.lastContainerReportWriteCount.set(newStat.getWriteCount().get()); + } + + public void incrContainerStat(ContainerStat deltaStat) { + this.containerReportSize.incr(deltaStat.getSize().get()); + this.containerReportUsed.incr(deltaStat.getUsed().get()); + this.containerReportKeyCount.incr(deltaStat.getKeyCount().get()); + this.containerReportReadBytes.incr(deltaStat.getReadBytes().get()); + this.containerReportWriteBytes.incr(deltaStat.getWriteBytes().get()); + this.containerReportReadCount.incr(deltaStat.getReadCount().get()); + this.containerReportWriteCount.incr(deltaStat.getWriteCount().get()); + } + + public void decrContainerStat(ContainerStat deltaStat) { + this.containerReportSize.incr(-1 * deltaStat.getSize().get()); + this.containerReportUsed.incr(-1 * deltaStat.getUsed().get()); + this.containerReportKeyCount.incr(-1 * deltaStat.getKeyCount().get()); + this.containerReportReadBytes.incr(-1 * deltaStat.getReadBytes().get()); + this.containerReportWriteBytes.incr(-1 * deltaStat.getWriteBytes().get()); + this.containerReportReadCount.incr(-1 * deltaStat.getReadCount().get()); + this.containerReportWriteCount.incr(-1 * deltaStat.getWriteCount().get()); + } + + public void unRegister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java new file mode 100644 index 0000000..b50376d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * SCM Node Metric that is used in the placement classes. + */ +public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long> { + private SCMNodeStat stat; + + /** + * Constructs an SCMNode Metric. + * + * @param stat - SCMNodeStat. + */ + public SCMNodeMetric(SCMNodeStat stat) { + this.stat = stat; + } + + /** + * Set the capacity, used and remaining space on a datanode. + * + * @param capacity in bytes + * @param used in bytes + * @param remaining in bytes + */ + @VisibleForTesting + public SCMNodeMetric(long capacity, long used, long remaining) { + this.stat = new SCMNodeStat(); + this.stat.set(capacity, used, remaining); + } + + /** + * + * @param o - Other Object + * @return - True if *this* object is greater than argument. + */ + @Override + public boolean isGreater(SCMNodeStat o) { + Preconditions.checkNotNull(o, "Argument cannot be null"); + + // if zero, replace with 1 for the division to work. + long thisDenominator = (this.stat.getCapacity().get() == 0) + ? 1 : this.stat.getCapacity().get(); + long otherDenominator = (o.getCapacity().get() == 0) + ? 1 : o.getCapacity().get(); + + float thisNodeWeight = + stat.getScmUsed().get() / (float) thisDenominator; + + float oNodeWeight = + o.getScmUsed().get() / (float) otherDenominator; + + if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) { + return thisNodeWeight > oNodeWeight; + } + // if these nodes are have similar weight then return the node with more + // free space as the greater node. + return stat.getRemaining().isGreater(o.getRemaining().get()); + } + + /** + * Inverse of isGreater. + * + * @param o - other object. + * @return True if *this* object is Lesser than argument. + */ + @Override + public boolean isLess(SCMNodeStat o) { + Preconditions.checkNotNull(o, "Argument cannot be null"); + + // if zero, replace with 1 for the division to work. + long thisDenominator = (this.stat.getCapacity().get() == 0) + ? 1 : this.stat.getCapacity().get(); + long otherDenominator = (o.getCapacity().get() == 0) + ? 1 : o.getCapacity().get(); + + float thisNodeWeight = + stat.getScmUsed().get() / (float) thisDenominator; + + float oNodeWeight = + o.getScmUsed().get() / (float) otherDenominator; + + if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) { + return thisNodeWeight < oNodeWeight; + } + + // if these nodes are have similar weight then return the node with less + // free space as the lesser node. + return stat.getRemaining().isLess(o.getRemaining().get()); + } + + /** + * Returns true if the object has same values. Because of issues with + * equals, and loss of type information this interface supports isEqual. + * + * @param o object to compare. + * @return True, if the values match. + * TODO : Consider if it makes sense to add remaining to this equation. + */ + @Override + public boolean isEqual(SCMNodeStat o) { + float thisNodeWeight = stat.getScmUsed().get() / (float) + stat.getCapacity().get(); + float oNodeWeight = o.getScmUsed().get() / (float) o.getCapacity().get(); + return Math.abs(thisNodeWeight - oNodeWeight) < 0.000001; + } + + /** + * A resourceCheck, defined by resourceNeeded. + * For example, S could be bytes required + * and DatanodeMetric can reply by saying it can be met or not. + * + * @param resourceNeeded - ResourceNeeded in its own metric. + * @return boolean, True if this resource requirement can be met. + */ + @Override + public boolean hasResources(Long resourceNeeded) { + return false; + } + + /** + * Returns the metric. + * + * @return T, the object that represents this metric. + */ + @Override + public SCMNodeStat get() { + return stat; + } + + /** + * Sets the value of this metric. + * + * @param value - value of the metric. + */ + @Override + public void set(SCMNodeStat value) { + stat.set(value.getCapacity().get(), value.getScmUsed().get(), + value.getRemaining().get()); + } + + /** + * Adds a value of to the base. + * + * @param value - value + */ + @Override + public void add(SCMNodeStat value) { + stat.add(value); + } + + /** + * subtract a value. + * + * @param value value + */ + @Override + public void subtract(SCMNodeStat value) { + stat.subtract(value); + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(SCMNodeStat o) { + if (isEqual(o)) { + return 0; + } + if (isGreater(o)) { + return 1; + } else { + return -1; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SCMNodeMetric that = (SCMNodeMetric) o; + + return stat != null ? stat.equals(that.stat) : that.stat == null; + } + + @Override + public int hashCode() { + return stat != null ? stat.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java new file mode 100644 index 0000000..3c871d3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * This class represents the SCM node stat. + */ +public class SCMNodeStat implements NodeStat { + private LongMetric capacity; + private LongMetric scmUsed; + private LongMetric remaining; + + public SCMNodeStat() { + this(0L, 0L, 0L); + } + + public SCMNodeStat(SCMNodeStat other) { + this(other.capacity.get(), other.scmUsed.get(), other.remaining.get()); + } + + public SCMNodeStat(long capacity, long used, long remaining) { + Preconditions.checkArgument(capacity >= 0, "Capacity cannot be " + + "negative."); + Preconditions.checkArgument(used >= 0, "used space cannot be " + + "negative."); + Preconditions.checkArgument(remaining >= 0, "remaining cannot be " + + "negative"); + this.capacity = new LongMetric(capacity); + this.scmUsed = new LongMetric(used); + this.remaining = new LongMetric(remaining); + } + + /** + * @return the total configured capacity of the node. + */ + public LongMetric getCapacity() { + return capacity; + } + + /** + * @return the total SCM used space on the node. + */ + public LongMetric getScmUsed() { + return scmUsed; + } + + /** + * @return the total remaining space available on the node. + */ + public LongMetric getRemaining() { + return remaining; + } + + /** + * Set the capacity, used and remaining space on a datanode. + * + * @param newCapacity in bytes + * @param newUsed in bytes + * @param newRemaining in bytes + */ + @VisibleForTesting + public void set(long newCapacity, long newUsed, long newRemaining) { + Preconditions.checkNotNull(newCapacity, "Capacity cannot be null"); + Preconditions.checkNotNull(newUsed, "used cannot be null"); + Preconditions.checkNotNull(newRemaining, "remaining cannot be null"); + + Preconditions.checkArgument(newCapacity >= 0, "Capacity cannot be " + + "negative."); + Preconditions.checkArgument(newUsed >= 0, "used space cannot be " + + "negative."); + Preconditions.checkArgument(newRemaining >= 0, "remaining cannot be " + + "negative"); + + this.capacity = new LongMetric(newCapacity); + this.scmUsed = new LongMetric(newUsed); + this.remaining = new LongMetric(newRemaining); + } + + /** + * Adds a new nodestat to existing values of the node. + * + * @param stat Nodestat. + * @return SCMNodeStat + */ + public SCMNodeStat add(NodeStat stat) { + this.capacity.set(this.getCapacity().get() + stat.getCapacity().get()); + this.scmUsed.set(this.getScmUsed().get() + stat.getScmUsed().get()); + this.remaining.set(this.getRemaining().get() + stat.getRemaining().get()); + return this; + } + + /** + * Subtracts the stat values from the existing NodeStat. + * + * @param stat SCMNodeStat. + * @return Modified SCMNodeStat + */ + public SCMNodeStat subtract(NodeStat stat) { + this.capacity.set(this.getCapacity().get() - stat.getCapacity().get()); + this.scmUsed.set(this.getScmUsed().get() - stat.getScmUsed().get()); + this.remaining.set(this.getRemaining().get() - stat.getRemaining().get()); + return this; + } + + @Override + public boolean equals(Object to) { + if (to instanceof SCMNodeStat) { + SCMNodeStat tempStat = (SCMNodeStat) to; + return capacity.isEqual(tempStat.getCapacity().get()) && + scmUsed.isEqual(tempStat.getScmUsed().get()) && + remaining.isEqual(tempStat.getRemaining().get()); + } + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java new file mode 100644 index 0000000..4a81d69 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.metrics; + +// Various metrics supported by Datanode and used by SCM in the placement +// strategy. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java new file mode 100644 index 0000000..dc54d9b --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement; +// Classes related to container placement. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java new file mode 100644 index 0000000..52321ee --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.google.common.util.concurrent.Uninterruptibles + .sleepUninterruptibly; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT; + +/** + * This class takes a set of container reports that belong to a pool and then + * computes the replication levels for each container. + */ +public class ContainerSupervisor implements Closeable { + public static final Logger LOG = + LoggerFactory.getLogger(ContainerSupervisor.class); + + private final NodePoolManager poolManager; + private final HashSet<String> poolNames; + private final PriorityQueue<PeriodicPool> poolQueue; + private final NodeManager nodeManager; + private final long containerProcessingLag; + private final AtomicBoolean runnable; + private final ExecutorService executorService; + private final long maxPoolWait; + private long poolProcessCount; + private final List<InProgressPool> inProgressPoolList; + private final AtomicInteger threadFaultCount; + private final int inProgressPoolMaxCount; + + private final ReadWriteLock inProgressPoolListLock; + + /** + * Returns the number of times we have processed pools. + * @return long + */ + public long getPoolProcessCount() { + return poolProcessCount; + } + + + /** + * Constructs a class that computes Replication Levels. + * + * @param conf - OzoneConfiguration + * @param nodeManager - Node Manager + * @param poolManager - Pool Manager + */ + public ContainerSupervisor(Configuration conf, NodeManager nodeManager, + NodePoolManager poolManager) { + Preconditions.checkNotNull(poolManager); + Preconditions.checkNotNull(nodeManager); + this.containerProcessingLag = + conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, + OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT, + TimeUnit.SECONDS + ) * 1000; + int maxContainerReportThreads = + conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS, + OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT + ); + this.maxPoolWait = + conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, + OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + this.inProgressPoolMaxCount = conf.getInt( + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS, + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT); + this.poolManager = poolManager; + this.nodeManager = nodeManager; + this.poolNames = new HashSet<>(); + this.poolQueue = new PriorityQueue<>(); + this.runnable = new AtomicBoolean(true); + this.threadFaultCount = new AtomicInteger(0); + this.executorService = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Container Reports Processing Thread - %d") + .build(), maxContainerReportThreads); + this.inProgressPoolList = new LinkedList<>(); + this.inProgressPoolListLock = new ReentrantReadWriteLock(); + + initPoolProcessThread(); + } + + /** + * Returns the number of pools that are under process right now. + * @return int - Number of pools that are in process. + */ + public int getInProgressPoolCount() { + return inProgressPoolList.size(); + } + + /** + * Exits the background thread. + */ + public void setExit() { + this.runnable.set(false); + } + + /** + * Adds or removes pools from names that we need to process. + * + * There are two different cases that we need to process. + * The case where some pools are being added and some times we have to + * handle cases where pools are removed. + */ + private void refreshPools() { + List<String> pools = this.poolManager.getNodePools(); + if (pools != null) { + + HashSet<String> removedPools = + computePoolDifference(this.poolNames, new HashSet<>(pools)); + + HashSet<String> addedPools = + computePoolDifference(new HashSet<>(pools), this.poolNames); + // TODO: Support remove pool API in pool manager so that this code + // path can be tested. This never happens in the current code base. + for (String poolName : removedPools) { + for (PeriodicPool periodicPool : poolQueue) { + if (periodicPool.getPoolName().compareTo(poolName) == 0) { + poolQueue.remove(periodicPool); + } + } + } + // Remove the pool names that we have in the list. + this.poolNames.removeAll(removedPools); + + for (String poolName : addedPools) { + poolQueue.add(new PeriodicPool(poolName)); + } + + // Add to the pool names we are tracking. + poolNames.addAll(addedPools); + } + + } + + /** + * Handle the case where pools are added. + * + * @param newPools - New Pools list + * @param oldPool - oldPool List. + */ + private HashSet<String> computePoolDifference(HashSet<String> newPools, + Set<String> oldPool) { + Preconditions.checkNotNull(newPools); + Preconditions.checkNotNull(oldPool); + HashSet<String> newSet = new HashSet<>(newPools); + newSet.removeAll(oldPool); + return newSet; + } + + private void initPoolProcessThread() { + + /* + * Task that runs to check if we need to start a pool processing job. + * if so we create a pool reconciliation job and find out of all the + * expected containers are on the nodes. + */ + Runnable processPools = () -> { + while (runnable.get()) { + // Make sure that we don't have any new pools. + refreshPools(); + while (inProgressPoolList.size() < inProgressPoolMaxCount) { + PeriodicPool pool = poolQueue.poll(); + if (pool != null) { + if (pool.getLastProcessedTime() + this.containerProcessingLag > + Time.monotonicNow()) { + LOG.debug("Not within the time window for processing: {}", + pool.getPoolName()); + // we might over sleep here, not a big deal. + sleepUninterruptibly(this.containerProcessingLag, + TimeUnit.MILLISECONDS); + } + LOG.debug("Adding pool {} to container processing queue", + pool.getPoolName()); + InProgressPool inProgressPool = new InProgressPool(maxPoolWait, + pool, this.nodeManager, this.poolManager, this.executorService); + inProgressPool.startReconciliation(); + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.add(inProgressPool); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } + poolProcessCount++; + } else { + break; + } + } + sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS); + inProgressPoolListLock.readLock().lock(); + try { + for (InProgressPool inProgressPool : inProgressPoolList) { + inProgressPool.finalizeReconciliation(); + poolQueue.add(inProgressPool.getPool()); + } + } finally { + inProgressPoolListLock.readLock().unlock(); + } + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.clear(); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } + } + }; + + // We will have only one thread for pool processing. + Thread poolProcessThread = new Thread(processPools); + poolProcessThread.setDaemon(true); + poolProcessThread.setName("Pool replica thread"); + poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { + // Let us just restart this thread after logging a critical error. + // if this thread is not running we cannot handle commands from SCM. + LOG.error("Critical Error : Pool replica thread encountered an " + + "error. Thread: {} Error Count : {}", t.toString(), e, + threadFaultCount.incrementAndGet()); + poolProcessThread.start(); + // TODO : Add a config to restrict how many times we will restart this + // thread in a single session. + }); + poolProcessThread.start(); + } + + /** + * Adds a container report to appropriate inProgress Pool. + * @param containerReport -- Container report for a specific container from + * a datanode. + */ + public void handleContainerReport( + ContainerReportsRequestProto containerReport) { + DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf( + containerReport.getDatanodeDetails()); + inProgressPoolListLock.readLock().lock(); + try { + String poolName = poolManager.getNodePool(datanodeDetails); + for (InProgressPool ppool : inProgressPoolList) { + if (ppool.getPoolName().equalsIgnoreCase(poolName)) { + ppool.handleContainerReport(containerReport); + return; + } + } + // TODO: Decide if we can do anything else with this report. + LOG.debug("Discarding the container report for pool {}. " + + "That pool is not currently in the pool reconciliation process." + + " Container Name: {}", poolName, + containerReport.getDatanodeDetails()); + } catch (SCMException e) { + LOG.warn("Skipping processing container report from datanode {}, " + + "cause: failed to get the corresponding node pool", + datanodeDetails.toString(), e); + } finally { + inProgressPoolListLock.readLock().unlock(); + } + } + + /** + * Get in process pool list, used for testing. + * @return List of InProgressPool + */ + @VisibleForTesting + public List<InProgressPool> getInProcessPoolList() { + return inProgressPoolList; + } + + /** + * Shutdown the Container Replication Manager. + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + setExit(); + HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java new file mode 100644 index 0000000..ddbd213 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static com.google.common.util.concurrent.Uninterruptibles + .sleepUninterruptibly; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .INVALID; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; + +/** + * These are pools that are actively checking for replication status of the + * containers. + */ +public final class InProgressPool { + public static final Logger LOG = + LoggerFactory.getLogger(InProgressPool.class); + + private final PeriodicPool pool; + private final NodeManager nodeManager; + private final NodePoolManager poolManager; + private final ExecutorService executorService; + private final Map<String, Integer> containerCountMap; + private final Map<UUID, Boolean> processedNodeSet; + private final long startTime; + private ProgressStatus status; + private AtomicInteger nodeCount; + private AtomicInteger nodeProcessed; + private AtomicInteger containerProcessedCount; + private long maxWaitTime; + /** + * Constructs an pool that is being processed. + * @param maxWaitTime - Maximum wait time in milliseconds. + * @param pool - Pool that we are working against + * @param nodeManager - Nodemanager + * @param poolManager - pool manager + * @param executorService - Shared Executor service. + */ + InProgressPool(long maxWaitTime, PeriodicPool pool, + NodeManager nodeManager, NodePoolManager poolManager, + ExecutorService executorService) { + Preconditions.checkNotNull(pool); + Preconditions.checkNotNull(nodeManager); + Preconditions.checkNotNull(poolManager); + Preconditions.checkNotNull(executorService); + Preconditions.checkArgument(maxWaitTime > 0); + this.pool = pool; + this.nodeManager = nodeManager; + this.poolManager = poolManager; + this.executorService = executorService; + this.containerCountMap = new ConcurrentHashMap<>(); + this.processedNodeSet = new ConcurrentHashMap<>(); + this.maxWaitTime = maxWaitTime; + startTime = Time.monotonicNow(); + } + + /** + * Returns periodic pool. + * + * @return PeriodicPool + */ + public PeriodicPool getPool() { + return pool; + } + + /** + * We are done if we have got reports from all nodes or we have + * done waiting for the specified time. + * + * @return true if we are done, false otherwise. + */ + public boolean isDone() { + return (nodeCount.get() == nodeProcessed.get()) || + (this.startTime + this.maxWaitTime) > Time.monotonicNow(); + } + + /** + * Gets the number of containers processed. + * + * @return int + */ + public int getContainerProcessedCount() { + return containerProcessedCount.get(); + } + + /** + * Returns the start time in milliseconds. + * + * @return - Start Time. + */ + public long getStartTime() { + return startTime; + } + + /** + * Get the number of nodes in this pool. + * + * @return - node count + */ + public int getNodeCount() { + return nodeCount.get(); + } + + /** + * Get the number of nodes that we have already processed container reports + * from. + * + * @return - Processed count. + */ + public int getNodeProcessed() { + return nodeProcessed.get(); + } + + /** + * Returns the current status. + * + * @return Status + */ + public ProgressStatus getStatus() { + return status; + } + + /** + * Starts the reconciliation process for all the nodes in the pool. + */ + public void startReconciliation() { + List<DatanodeDetails> datanodeDetailsList = + this.poolManager.getNodes(pool.getPoolName()); + if (datanodeDetailsList.size() == 0) { + LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ", + pool.getPoolName()); + this.status = ProgressStatus.Error; + return; + } + + nodeProcessed = new AtomicInteger(0); + containerProcessedCount = new AtomicInteger(0); + nodeCount = new AtomicInteger(0); + /* + Ask each datanode to send us commands. + */ + SendContainerCommand cmd = SendContainerCommand.newBuilder().build(); + for (DatanodeDetails dd : datanodeDetailsList) { + NodeState currentState = getNodestate(dd); + if (currentState == HEALTHY || currentState == STALE) { + nodeCount.incrementAndGet(); + // Queue commands to all datanodes in this pool to send us container + // report. Since we ignore dead nodes, it is possible that we would have + // over replicated the container if the node comes back. + nodeManager.addDatanodeCommand(dd.getUuid(), cmd); + } + } + this.status = ProgressStatus.InProgress; + this.getPool().setLastProcessedTime(Time.monotonicNow()); + } + + /** + * Gets the node state. + * + * @param datanode - datanode information. + * @return NodeState. + */ + private NodeState getNodestate(DatanodeDetails datanode) { + NodeState currentState = INVALID; + int maxTry = 100; + // We need to loop to make sure that we will retry if we get + // node state unknown. This can lead to infinite loop if we send + // in unknown node ID. So max try count is used to prevent it. + + int currentTry = 0; + while (currentState == INVALID && currentTry < maxTry) { + // Retry to make sure that we deal with the case of node state not + // known. + currentState = nodeManager.getNodeState(datanode); + currentTry++; + if (currentState == INVALID) { + // Sleep to make sure that this is not a tight loop. + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + if (currentState == INVALID) { + LOG.error("Not able to determine the state of Node: {}, Exceeded max " + + "try and node manager returns INVALID state. This indicates we " + + "are dealing with a node that we don't know about.", datanode); + } + return currentState; + } + + /** + * Queues a container Report for handling. This is done in a worker thread + * since decoding a container report might be compute intensive . We don't + * want to block since we have asked for bunch of container reports + * from a set of datanodes. + * + * @param containerReport - ContainerReport + */ + public void handleContainerReport( + ContainerReportsRequestProto containerReport) { + if (status == ProgressStatus.InProgress) { + executorService.submit(processContainerReport(containerReport)); + } else { + LOG.debug("Cannot handle container report when the pool is in {} status.", + status); + } + } + + private Runnable processContainerReport( + ContainerReportsRequestProto reports) { + return () -> { + DatanodeDetails datanodeDetails = + DatanodeDetails.getFromProtoBuf(reports.getDatanodeDetails()); + if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(), + (k) -> true)) { + nodeProcessed.incrementAndGet(); + LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed, + datanodeDetails.getUuid()); + for (ContainerInfo info : reports.getReportsList()) { + containerProcessedCount.incrementAndGet(); + LOG.debug("Total Containers processed: {} Container Name: {}", + containerProcessedCount.get(), info.getContainerName()); + + // Update the container map with count + 1 if the key exists or + // update the map with 1. Since this is a concurrentMap the + // computation and update is atomic. + containerCountMap.merge(info.getContainerName(), 1, Integer::sum); + } + } + }; + } + + /** + * Filter the containers based on specific rules. + * + * @param predicate -- Predicate to filter by + * @return A list of map entries. + */ + public List<Map.Entry<String, Integer>> filterContainer( + Predicate<Map.Entry<String, Integer>> predicate) { + return containerCountMap.entrySet().stream() + .filter(predicate).collect(Collectors.toList()); + } + + /** + * Used only for testing, calling this will abort container report + * processing. This is very dangerous call and should not be made by any users + */ + @VisibleForTesting + public void setDoneProcessing() { + nodeProcessed.set(nodeCount.get()); + } + + /** + * Returns the pool name. + * + * @return Name of the pool. + */ + String getPoolName() { + return pool.getPoolName(); + } + + public void finalizeReconciliation() { + status = ProgressStatus.Done; + //TODO: Add finalizing logic. This is where actual reconciliation happens. + } + + /** + * Current status of the computing replication status. + */ + public enum ProgressStatus { + InProgress, Done, Error + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java new file mode 100644 index 0000000..ef28aa7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Periodic pool is a pool with a time stamp, this allows us to process pools + * based on a cyclic clock. + */ +public class PeriodicPool implements Comparable<PeriodicPool> { + private final String poolName; + private long lastProcessedTime; + private AtomicLong totalProcessedCount; + + /** + * Constructs a periodic pool. + * + * @param poolName - Name of the pool + */ + public PeriodicPool(String poolName) { + this.poolName = poolName; + lastProcessedTime = 0; + totalProcessedCount = new AtomicLong(0); + } + + /** + * Get pool Name. + * @return PoolName + */ + public String getPoolName() { + return poolName; + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(PeriodicPool o) { + return Long.compare(this.lastProcessedTime, o.lastProcessedTime); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PeriodicPool that = (PeriodicPool) o; + + return poolName.equals(that.poolName); + } + + @Override + public int hashCode() { + return poolName.hashCode(); + } + + /** + * Returns the Total Times we have processed this pool. + * + * @return processed count. + */ + public long getTotalProcessedCount() { + return totalProcessedCount.get(); + } + + /** + * Gets the last time we processed this pool. + * @return time in milliseconds + */ + public long getLastProcessedTime() { + return this.lastProcessedTime; + } + + + /** + * Sets the last processed time. + * + * @param lastProcessedTime - Long in milliseconds. + */ + + public void setLastProcessedTime(long lastProcessedTime) { + this.lastProcessedTime = lastProcessedTime; + } + + /* + * Increments the total processed count. + */ + public void incTotalProcessedCount() { + this.totalProcessedCount.incrementAndGet(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
