Repository: hadoop Updated Branches: refs/heads/HDFS-7240 5dc4dfabb -> 6dca9fcb0
HDFS-12787. Ozone: SCM: Aggregate the metrics from all the container reports. Contributed by Yiqun Lin. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6dca9fcb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6dca9fcb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6dca9fcb Branch: refs/heads/HDFS-7240 Commit: 6dca9fcb0380da7de7587b368af557a1c3311120 Parents: 5dc4dfa Author: Xiaoyu Yao <x...@apache.org> Authored: Mon Nov 20 11:03:17 2017 -0800 Committer: Xiaoyu Yao <x...@apache.org> Committed: Mon Nov 20 11:03:17 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/ozone/scm/SCMMXBean.java | 8 + .../ozone/scm/StorageContainerManager.java | 140 +++++++++++++++--- .../placement/metrics/ContainerStat.java | 42 +++++- .../container/placement/metrics/LongMetric.java | 4 + .../container/placement/metrics/SCMMetrics.java | 67 +++++++++ .../hadoop/ozone/scm/node/SCMNodeManager.java | 20 ++- .../src/site/markdown/OzoneMetrics.md | 21 ++- .../apache/hadoop/ozone/scm/TestSCMMXBean.java | 20 +++ .../apache/hadoop/ozone/scm/TestSCMMetrics.java | 146 ++++++++++++++++++- .../ozone/scm/node/TestContainerPlacement.java | 2 +- .../hadoop/ozone/scm/node/TestNodeManager.java | 2 +- 11 files changed, 433 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java index 9f50f77..336ed44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.scm; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.jmx.ServiceRuntimeInfo; @@ -39,4 +41,10 @@ public interface SCMMXBean extends ServiceRuntimeInfo { * @return SCM client RPC server port */ String getClientRpcPort(); + + /** + * Get container report info that includes container IO stats of nodes. + * @return The datanodeUUid to report json string mapping + */ + Map<String, String> getContainerReport(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index a9f9360..0f1aae8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -19,6 +19,10 @@ package org.apache.hadoop.ozone.scm; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.protobuf.BlockingService; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.classification.InterfaceAudience; @@ -104,6 +108,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.Collections; import java.util.stream.Collectors; @@ -204,6 +210,9 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl /** SCM metrics. */ private static SCMMetrics metrics; + /** Key = DatanodeUuid, value = ContainerStat. */ + private Cache<String, ContainerStat> containerReportCache; + private static final String USAGE = "Usage: \n hdfs scm [ " + StartupOption.INIT.getName() + " [ " @@ -225,13 +234,15 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl OZONE_SCM_DB_CACHE_SIZE_DEFAULT); StorageContainerManager.initMetrics(); + initContainerReportCache(conf); + scmStorage = new SCMStorage(conf); String clusterId = scmStorage.getClusterID(); if (clusterId == null) { throw new SCMException("clusterId not found", ResultCodes.SCM_NOT_INITIALIZED); } - scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID()); + scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this); scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize); scmBlockManager = new BlockManagerImpl(conf, scmNodeManager, scmContainerManager, cacheSize); @@ -298,6 +309,31 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl } /** + * Initialize container reports cache that sent from datanodes. + * + * @param conf + */ + private void initContainerReportCache(OzoneConfiguration conf) { + containerReportCache = CacheBuilder.newBuilder() + .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS) + .maximumSize(Integer.MAX_VALUE) + .removalListener(new RemovalListener<String, ContainerStat>() { + @Override + public void onRemoval( + RemovalNotification<String, ContainerStat> removalNotification) { + synchronized (containerReportCache) { + ContainerStat stat = removalNotification.getValue(); + // remove invalid container report + metrics.decrContainerStat(stat); + LOG.debug( + "Remove expired container stat entry for datanode: {}.", + removalNotification.getKey()); + } + } + }).build(); + } + + /** * Builds a message for logging startup information about an RPC server. * * @param description RPC server description @@ -836,7 +872,15 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl LOG.error("SCM block manager service stop failed.", ex); } - metrics.unRegister(); + if (containerReportCache != null) { + containerReportCache.invalidateAll(); + containerReportCache.cleanUp(); + } + + if (metrics != null) { + metrics.unRegister(); + } + unregisterMXBean(); IOUtils.cleanupWithLogger(LOG, scmContainerManager); IOUtils.cleanupWithLogger(LOG, scmBlockManager); @@ -917,33 +961,44 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl @Override public ContainerReportsResponseProto sendContainerReport( ContainerReportsRequestProto reports) throws IOException { + updateContainerReportMetrics(reports); + + // should we process container reports async? + scmContainerManager.processContainerReports( + DatanodeID.getFromProtoBuf(reports.getDatanodeID()), + reports.getType(), reports.getReportsList()); + return ContainerReportsResponseProto.newBuilder().build(); + } + + private void updateContainerReportMetrics( + ContainerReportsRequestProto reports) { + ContainerStat newStat = null; // TODO: We should update the logic once incremental container report // type is supported. - if (reports.getType() == - ContainerReportsRequestProto.reportType.fullReport) { - ContainerStat stat = new ContainerStat(); + if (reports + .getType() == ContainerReportsRequestProto.reportType.fullReport) { + newStat = new ContainerStat(); for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports .getReportsList()) { - stat.add(new ContainerStat(info.getSize(), info.getUsed(), + newStat.add(new ContainerStat(info.getSize(), info.getUsed(), info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), info.getReadCount(), info.getWriteCount())); } // update container metrics - metrics.setLastContainerReportSize(stat.getSize().get()); - metrics.setLastContainerReportUsed(stat.getUsed().get()); - metrics.setLastContainerReportKeyCount(stat.getKeyCount().get()); - metrics.setLastContainerReportReadBytes(stat.getReadBytes().get()); - metrics.setLastContainerReportWriteBytes(stat.getWriteBytes().get()); - metrics.setLastContainerReportReadCount(stat.getReadCount().get()); - metrics.setLastContainerReportWriteCount(stat.getWriteCount().get()); + metrics.setLastContainerStat(newStat); } - // should we process container reports async? - scmContainerManager.processContainerReports( - DatanodeID.getFromProtoBuf(reports.getDatanodeID()), - reports.getType(), reports.getReportsList()); - return ContainerReportsResponseProto.newBuilder().build(); + // Update container stat entry, this will trigger a removal operation if it + // exists in cache. + synchronized (containerReportCache) { + String datanodeUuid = reports.getDatanodeID().getDatanodeUuid(); + if (datanodeUuid != null && newStat != null) { + containerReportCache.put(datanodeUuid, newStat); + // update global view container metrics + metrics.incrContainerStat(newStat); + } + } } /** @@ -1124,4 +1179,53 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl public static SCMMetrics getMetrics() { return metrics == null ? SCMMetrics.create() : metrics; } + + /** + * Invalidate container stat entry for given datanode. + * + * @param datanodeUuid + */ + public void removeContainerReport(String datanodeUuid) { + synchronized (containerReportCache) { + containerReportCache.invalidate(datanodeUuid); + } + } + + /** + * Get container stat of specified datanode. + * + * @param datanodeUuid + * @return + */ + public ContainerStat getContainerReport(String datanodeUuid) { + ContainerStat stat = null; + synchronized (containerReportCache) { + stat = containerReportCache.getIfPresent(datanodeUuid); + } + + return stat; + } + + /** + * Returns a view of the container stat entries. Modifications made to the + * map will directly affect the cache. + * + * @return + */ + public ConcurrentMap<String, ContainerStat> getContainerReportCache() { + return containerReportCache.asMap(); + } + + @Override + public Map<String, String> getContainerReport() { + Map<String, String> id2StatMap = new HashMap<>(); + synchronized (containerReportCache) { + ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap(); + for (Map.Entry<String, ContainerStat> entry : map.entrySet()) { + id2StatMap.put(entry.getKey(), entry.getValue().toJsonString()); + } + } + + return id2StatMap; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java index 65e96c3..810b8fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.ozone.scm.container.placement.metrics; +import java.io.IOException; + +import org.apache.hadoop.ozone.web.utils.JsonUtils; + +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; /** @@ -26,36 +31,43 @@ public class ContainerStat { /** * The maximum container size. */ + @JsonProperty("Size") private LongMetric size; /** * The number of bytes used by the container. */ + @JsonProperty("Used") private LongMetric used; /** * The number of keys in the container. */ + @JsonProperty("KeyCount") private LongMetric keyCount; /** * The number of bytes read from the container. */ + @JsonProperty("ReadBytes") private LongMetric readBytes; /** * The number of bytes write into the container. */ + @JsonProperty("WriteBytes") private LongMetric writeBytes; /** * The number of times the container is read. */ + @JsonProperty("ReadCount") private LongMetric readCount; /** - * The number of times the container is written into . + * The number of times the container is written into. */ + @JsonProperty("WriteCount") private LongMetric writeCount; public ContainerStat() { @@ -117,6 +129,10 @@ public class ContainerStat { } public void add(ContainerStat stat) { + if (stat == null) { + return; + } + this.size.add(stat.getSize().get()); this.used.add(stat.getUsed().get()); this.keyCount.add(stat.getKeyCount().get()); @@ -125,4 +141,26 @@ public class ContainerStat { this.readCount.add(stat.getReadCount().get()); this.writeCount.add(stat.getWriteCount().get()); } -} + + public void subtract(ContainerStat stat) { + if (stat == null) { + return; + } + + this.size.subtract(stat.getSize().get()); + this.used.subtract(stat.getUsed().get()); + this.keyCount.subtract(stat.getKeyCount().get()); + this.readBytes.subtract(stat.getReadBytes().get()); + this.writeBytes.subtract(stat.getWriteBytes().get()); + this.readCount.subtract(stat.getReadCount().get()); + this.writeCount.subtract(stat.getWriteCount().get()); + } + + public String toJsonString() { + try { + return JsonUtils.toJsonString(this); + } catch (IOException ignored) { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java index df3f462..dbcd9f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java @@ -16,9 +16,13 @@ */ package org.apache.hadoop.ozone.scm.container.placement.metrics; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; + /** * An helper class for all metrics based on Longs. */ +@JsonAutoDetect(fieldVisibility = Visibility.ANY) public class LongMetric implements DatanodeMetric<Long, Long> { private Long value; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java index 17892dd..bdfec2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java @@ -21,6 +21,7 @@ import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; /** @@ -43,6 +44,14 @@ public class SCMMetrics { @Metric private MutableGaugeLong lastContainerReportReadCount; @Metric private MutableGaugeLong lastContainerReportWriteCount; + @Metric private MutableCounterLong containerReportSize; + @Metric private MutableCounterLong containerReportUsed; + @Metric private MutableCounterLong containerReportKeyCount; + @Metric private MutableCounterLong containerReportReadBytes; + @Metric private MutableCounterLong containerReportWriteBytes; + @Metric private MutableCounterLong containerReportReadCount; + @Metric private MutableCounterLong containerReportWriteCount; + public SCMMetrics() { } @@ -80,6 +89,64 @@ public class SCMMetrics { this.lastContainerReportWriteCount.set(writeCount); } + public void incrContainerReportSize(long size) { + this.containerReportSize.incr(size); + } + + public void incrContainerReportUsed(long used) { + this.containerReportUsed.incr(used); + } + + public void incrContainerReportKeyCount(long keyCount) { + this.containerReportKeyCount.incr(keyCount); + } + + public void incrContainerReportReadBytes(long readBytes) { + this.containerReportReadBytes.incr(readBytes); + } + + public void incrContainerReportWriteBytes(long writeBytes) { + this.containerReportWriteBytes.incr(writeBytes); + } + + public void incrContainerReportReadCount(long readCount) { + this.containerReportReadCount.incr(readCount); + } + + public void incrContainerReportWriteCount(long writeCount) { + this.containerReportWriteCount.incr(writeCount); + } + + public void setLastContainerStat(ContainerStat newStat) { + this.lastContainerReportSize.set(newStat.getSize().get()); + this.lastContainerReportUsed.set(newStat.getUsed().get()); + this.lastContainerReportKeyCount.set(newStat.getKeyCount().get()); + this.lastContainerReportReadBytes.set(newStat.getReadBytes().get()); + this.lastContainerReportWriteBytes.set(newStat.getWriteBytes().get()); + this.lastContainerReportReadCount.set(newStat.getReadCount().get()); + this.lastContainerReportWriteCount.set(newStat.getWriteCount().get()); + } + + public void incrContainerStat(ContainerStat deltaStat) { + this.containerReportSize.incr(deltaStat.getSize().get()); + this.containerReportUsed.incr(deltaStat.getUsed().get()); + this.containerReportKeyCount.incr(deltaStat.getKeyCount().get()); + this.containerReportReadBytes.incr(deltaStat.getReadBytes().get()); + this.containerReportWriteBytes.incr(deltaStat.getWriteBytes().get()); + this.containerReportReadCount.incr(deltaStat.getReadCount().get()); + this.containerReportWriteCount.incr(deltaStat.getWriteCount().get()); + } + + public void decrContainerStat(ContainerStat deltaStat) { + this.containerReportSize.incr(-1 * deltaStat.getSize().get()); + this.containerReportUsed.incr(-1 * deltaStat.getUsed().get()); + this.containerReportKeyCount.incr(-1 * deltaStat.getKeyCount().get()); + this.containerReportReadBytes.incr(-1 * deltaStat.getReadBytes().get()); + this.containerReportWriteBytes.incr(-1 * deltaStat.getWriteBytes().get()); + this.containerReportReadCount.incr(-1 * deltaStat.getReadCount().get()); + this.containerReportWriteCount.incr(-1 * deltaStat.getWriteCount().get()); + } + public void unRegister() { MetricsSystem ms = DefaultMetricsSystem.instance(); ms.unregisterSource(SOURCE_NAME); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index 60148ba..ed894cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -43,7 +43,7 @@ import org.apache.hadoop.ozone.protocol .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol .proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport; - +import org.apache.hadoop.ozone.scm.StorageContainerManager; import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; @@ -121,7 +121,7 @@ public class SCMNodeManager private final AtomicInteger staleNodeCount; private final AtomicInteger deadNodeCount; private final AtomicInteger totalNodes; - private final long staleNodeIntervalMs; + private long staleNodeIntervalMs; private final long deadNodeIntervalMs; private final long heartbeatCheckerIntervalMs; private final long datanodeHBIntervalSeconds; @@ -150,12 +150,13 @@ public class SCMNodeManager // Node pool manager. private final SCMNodePoolManager nodePoolManager; + private final StorageContainerManager scmManager; /** * Constructs SCM machine Manager. */ - public SCMNodeManager(OzoneConfiguration conf, String clusterID) - throws IOException { + public SCMNodeManager(OzoneConfiguration conf, String clusterID, + StorageContainerManager scmManager) throws IOException { heartbeatQueue = new ConcurrentLinkedQueue<>(); healthyNodes = new ConcurrentHashMap<>(); deadNodes = new ConcurrentHashMap<>(); @@ -197,6 +198,7 @@ public class SCMNodeManager registerMXBean(); this.nodePoolManager = new SCMNodePoolManager(conf); + this.scmManager = scmManager; } private void registerMXBean() { @@ -551,6 +553,11 @@ public class SCMNodeManager healthyNodeCount.decrementAndGet(); staleNodes.put(entry.getKey(), entry.getValue()); staleNodeCount.incrementAndGet(); + + if (scmManager != null) { + // remove stale node's container report + scmManager.removeContainerReport(entry.getKey()); + } } /** @@ -863,4 +870,9 @@ public class SCMNodeManager public void addDatanodeCommand(DatanodeID id, SCMCommand command) { this.commandQueue.addCommand(id, command); } + + @VisibleForTesting + public void setStaleNodeIntervalMs(long interval) { + this.staleNodeIntervalMs = interval; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md index cd153ee..f5eccf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md @@ -110,13 +110,20 @@ Following are the counters for containers: | Name | Description | |:---- |:---- | -| `LastContainerReportSize` | Total size in bytes of all containers | -| `LastContainerReportUsed` | Total number of bytes used by all containers | -| `LastContainerReportKeyCount` | Total number of keys in all containers | -| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers | -| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers | -| `LastContainerReportReadCount` | Total number of times containers have been read from | -| `LastContainerReportWriteCount` | Total number of times containers have been written to | +| `LastContainerReportSize` | Total size in bytes of all containers in latest container report that SCM received from datanode | +| `LastContainerReportUsed` | Total number of bytes used by all containers in latest container report that SCM received from datanode | +| `LastContainerReportKeyCount` | Total number of keys in all containers in latest container report that SCM received from datanode | +| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers in latest container report that SCM received from datanode | +| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers in latest container report that SCM received from datanode | +| `LastContainerReportReadCount` | Total number of times containers have been read from in latest container report that SCM received from datanode | +| `LastContainerReportWriteCount` | Total number of times containers have been written to in latest container report that SCM received from datanode | +| `ContainerReportSize` | Total size in bytes of all containers over whole cluster | +| `ContainerReportUsed` | Total number of bytes used by all containers over whole cluster | +| `ContainerReportKeyCount` | Total number of keys in all containers over whole cluster | +| `ContainerReportReadBytes` | Total number of bytes have been read from all containers over whole cluster | +| `ContainerReportWriteBytes` | Total number of bytes have been written into all containers over whole cluster | +| `ContainerReportReadCount` | Total number of times containers have been read from over whole cluster | +| `ContainerReportWriteCount` | Total number of times containers have been written to over whole cluster | ### Key Space Metrics http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java index 3808e60..2ce751b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java @@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.MiniOzoneClassicCluster; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.junit.BeforeClass; import org.junit.AfterClass; @@ -40,6 +41,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.Map; import java.util.Iterator; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import javax.management.openmbean.CompositeData; @@ -91,6 +93,24 @@ public class TestSCMMXBean { String clientRpcPort = (String)mbs.getAttribute(bean, "ClientRpcPort"); assertEquals(scm.getClientRpcPort(), clientRpcPort); + + ConcurrentMap<String, ContainerStat> map = scm.getContainerReportCache(); + ContainerStat stat = new ContainerStat(1, 2, 3, 4, 5, 6, 7); + map.put("nodeID", stat); + TabularData data = (TabularData) mbs.getAttribute( + bean, "ContainerReport"); + + // verify report info + assertEquals(1, data.values().size()); + for (Object obj : data.values()) { + assertTrue(obj instanceof CompositeData); + CompositeData d = (CompositeData) obj; + Iterator<?> it = d.values().iterator(); + String key = it.next().toString(); + String value = it.next().toString(); + assertEquals("nodeID", key); + assertEquals(stat.toJsonString(), value); + } } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java index 762852b..5eeb0aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.scm; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getLongGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; @@ -25,9 +26,10 @@ import java.util.UUID; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; @@ -35,13 +37,23 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMMetrics; +import org.apache.hadoop.ozone.scm.node.SCMNodeManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; /** * This class tests the metrics of Storage Container Manager. */ public class TestSCMMetrics { - private static MiniOzoneCluster cluster = null; + /** + * Set the timeout for each test. + */ + @Rule + public Timeout testTimeout = new Timeout(90000); + + private static MiniOzoneClassicCluster cluster = null; @Test public void testContainerMetrics() throws Exception { @@ -64,7 +76,11 @@ public class TestSCMMetrics { ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes, writeBytes, readCount, writeCount); StorageContainerManager scmManager = cluster.getStorageContainerManager(); - scmManager.sendContainerReport(createContainerReport(numReport, stat)); + + ContainerReportsRequestProto request = createContainerReport(numReport, + stat, null); + String fstDatanodeID = request.getDatanodeID().getDatanodeUuid(); + scmManager.sendContainerReport(request); // verify container stat metrics MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); @@ -83,6 +99,117 @@ public class TestSCMMetrics { getLongGauge("LastContainerReportReadCount", scmMetrics)); assertEquals(writeCount * numReport, getLongGauge("LastContainerReportWriteCount", scmMetrics)); + + // add one new report + request = createContainerReport(1, stat, null); + String sndDatanodeID = request.getDatanodeID().getDatanodeUuid(); + scmManager.sendContainerReport(request); + + scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); + assertEquals(size * (numReport + 1), + getLongCounter("ContainerReportSize", scmMetrics)); + assertEquals(used * (numReport + 1), + getLongCounter("ContainerReportUsed", scmMetrics)); + assertEquals(readBytes * (numReport + 1), + getLongCounter("ContainerReportReadBytes", scmMetrics)); + assertEquals(writeBytes * (numReport + 1), + getLongCounter("ContainerReportWriteBytes", scmMetrics)); + + assertEquals(keyCount * (numReport + 1), + getLongCounter("ContainerReportKeyCount", scmMetrics)); + assertEquals(readCount * (numReport + 1), + getLongCounter("ContainerReportReadCount", scmMetrics)); + assertEquals(writeCount * (numReport + 1), + getLongCounter("ContainerReportWriteCount", scmMetrics)); + + // Re-send reports but with different value for validating + // the aggregation. + stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6); + scmManager.sendContainerReport(createContainerReport(1, stat, + fstDatanodeID)); + + stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1); + scmManager.sendContainerReport(createContainerReport(1, stat, + sndDatanodeID)); + + // the global container metrics value should be updated + scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); + assertEquals(101, getLongCounter("ContainerReportSize", scmMetrics)); + assertEquals(51, getLongCounter("ContainerReportUsed", scmMetrics)); + assertEquals(51, getLongCounter("ContainerReportReadBytes", scmMetrics)); + assertEquals(61, getLongCounter("ContainerReportWriteBytes", scmMetrics)); + + assertEquals(4, getLongCounter("ContainerReportKeyCount", scmMetrics)); + assertEquals(6, getLongCounter("ContainerReportReadCount", scmMetrics)); + assertEquals(7, getLongCounter("ContainerReportWriteCount", scmMetrics)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testStaleNodeContainerReport() throws Exception { + int nodeCount = 2; + int numReport = 2; + long size = OzoneConsts.GB * 5; + long used = OzoneConsts.GB * 2; + long readBytes = OzoneConsts.GB * 1; + long writeBytes = OzoneConsts.GB * 2; + int keyCount = 1000; + int readCount = 100; + int writeCount = 50; + OzoneConfiguration conf = new OzoneConfiguration(); + + try { + cluster = new MiniOzoneClassicCluster.Builder(conf) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) + .numDataNodes(nodeCount).build(); + + ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes, + writeBytes, readCount, writeCount); + StorageContainerManager scmManager = cluster.getStorageContainerManager(); + + DataNode dataNode = cluster.getDataNodes().get(0); + String datanodeUuid = dataNode.getDatanodeId().getDatanodeUuid(); + ContainerReportsRequestProto request = createContainerReport(numReport, + stat, datanodeUuid); + scmManager.sendContainerReport(request); + + MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); + assertEquals(size * numReport, + getLongCounter("ContainerReportSize", scmMetrics)); + assertEquals(used * numReport, + getLongCounter("ContainerReportUsed", scmMetrics)); + assertEquals(readBytes * numReport, + getLongCounter("ContainerReportReadBytes", scmMetrics)); + assertEquals(writeBytes * numReport, + getLongCounter("ContainerReportWriteBytes", scmMetrics)); + + assertEquals(keyCount * numReport, + getLongCounter("ContainerReportKeyCount", scmMetrics)); + assertEquals(readCount * numReport, + getLongCounter("ContainerReportReadCount", scmMetrics)); + assertEquals(writeCount * numReport, + getLongCounter("ContainerReportWriteCount", scmMetrics)); + + // reset stale interval time to move node from healthy to stale + SCMNodeManager nodeManager = (SCMNodeManager) cluster + .getStorageContainerManager().getScmNodeManager(); + nodeManager.setStaleNodeIntervalMs(100); + + // verify the metrics when node becomes stale + GenericTestUtils.waitFor(() -> { + MetricsRecordBuilder metrics = getMetrics(SCMMetrics.SOURCE_NAME); + return 0 == getLongCounter("ContainerReportSize", metrics) + && 0 == getLongCounter("ContainerReportUsed", metrics) + && 0 == getLongCounter("ContainerReportReadBytes", metrics) + && 0 == getLongCounter("ContainerReportWriteBytes", metrics) + && 0 == getLongCounter("ContainerReportKeyCount", metrics) + && 0 == getLongCounter("ContainerReportReadCount", metrics) + && 0 == getLongCounter("ContainerReportWriteCount", metrics); + }, 1000, 60000); } finally { if (cluster != null) { cluster.shutdown(); @@ -91,7 +218,7 @@ public class TestSCMMetrics { } private ContainerReportsRequestProto createContainerReport(int numReport, - ContainerStat stat) { + ContainerStat stat, String datanodeUuid) { StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder reportsBuilder = StorageContainerDatanodeProtocolProtos .ContainerReportsRequestProto.newBuilder(); @@ -108,8 +235,15 @@ public class TestSCMMetrics { report.setWriteBytes(stat.getWriteBytes().get()); reportsBuilder.addReports(report.getProtoBufMessage()); } - reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID() - .getProtoBufMessage()); + + DatanodeID datanodeID; + if (datanodeUuid == null) { + datanodeID = SCMTestUtils.getDatanodeID(); + } else { + datanodeID = new DatanodeID("null", "null", datanodeUuid, 0, 0, 0, 0); + } + + reportsBuilder.setDatanodeID(datanodeID.getProtoBufMessage()); reportsBuilder.setType(StorageContainerDatanodeProtocolProtos .ContainerReportsRequestProto.reportType.fullReport); return reportsBuilder.build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java index 42a5627..e08cf27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java @@ -93,7 +93,7 @@ public class TestContainerPlacement { SCMNodeManager createNodeManager(OzoneConfiguration config) throws IOException { SCMNodeManager nodeManager = new SCMNodeManager(config, - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), null); assertFalse("Node manager should be in chill mode", nodeManager.isOutOfChillMode()); return nodeManager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dca9fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index f81e6e6..58a6af3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -125,7 +125,7 @@ public class TestNodeManager { SCMNodeManager createNodeManager(OzoneConfiguration config) throws IOException { SCMNodeManager nodeManager = new SCMNodeManager(config, - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), null); assertFalse("Node manager should be in chill mode", nodeManager.isOutOfChillMode()); return nodeManager; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org