This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a0f0872fc8 HDDS-11444. Make Datanode Command metrics consistent across
all commands (#7191)
a0f0872fc8 is described below
commit a0f0872fc88a6eb360c5460830e5ded21a17d8bb
Author: jianghuazhu <[email protected]>
AuthorDate: Thu Oct 3 00:12:28 2024 +0800
HDDS-11444. Make Datanode Command metrics consistent across all commands
(#7191)
---
.../common/helpers/CommandHandlerMetrics.java | 3 +
.../CloseContainerCommandHandler.java | 16 +--
.../ClosePipelineCommandHandler.java | 16 +--
.../CreatePipelineCommandHandler.java | 16 +--
.../commandhandler/DeleteBlocksCommandHandler.java | 16 +--
.../DeleteContainerCommandHandler.java | 16 +--
.../FinalizeNewLayoutVersionCommandHandler.java | 16 +--
.../ReconstructECContainersCommandHandler.java | 26 ++--
.../RefreshVolumeUsageCommandHandler.java | 16 +--
.../ReplicateContainerCommandHandler.java | 31 ++---
.../SetNodeOperationalStateCommandHandler.java | 26 ++--
.../replication/ReplicationSupervisor.java | 33 +++++
.../replication/ReplicationSupervisorMetrics.java | 7 +-
.../TestReconstructECContainersCommandHandler.java | 139 +++++++++++++++++++++
.../TestReplicateContainerCommandHandler.java | 118 +++++++++++++++++
.../replication/TestReplicationSupervisor.java | 10 ++
16 files changed, 419 insertions(+), 86 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java
index a6e4d6258d..e52565952a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.TotalRunTimeMs;
import static
org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.QueueWaitingTaskCount;
import static
org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.InvocationCount;
+import static
org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.AvgRunTimeMs;
import static
org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.ThreadPoolActivePoolSize;
import static
org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.ThreadPoolMaxPoolSize;
import static
org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.CommandReceivedCount;
@@ -46,6 +47,7 @@ public final class CommandHandlerMetrics implements
MetricsSource {
enum CommandMetricsMetricsInfo implements MetricsInfo {
Command("The type of the SCM command"),
TotalRunTimeMs("The total runtime of the command handler in milliseconds"),
+ AvgRunTimeMs("Average run time of the command handler in milliseconds"),
QueueWaitingTaskCount("The number of queued tasks waiting for execution"),
InvocationCount("The number of times the command handler has been
invoked"),
ThreadPoolActivePoolSize("The number of active threads in the thread
pool"),
@@ -108,6 +110,7 @@ public final class CommandHandlerMetrics implements
MetricsSource {
commandHandler.getCommandType().name());
builder.addGauge(TotalRunTimeMs, commandHandler.getTotalRunTime());
+ builder.addGauge(AvgRunTimeMs, commandHandler.getAverageRunTime());
builder.addGauge(QueueWaitingTaskCount, commandHandler.getQueuedCount());
builder.addGauge(InvocationCount, commandHandler.getInvocationCount());
int activePoolSize = commandHandler.getThreadPoolActivePoolSize();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index bc703ac6a5..cd032d4b27 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
@@ -58,7 +60,7 @@ public class CloseContainerCommandHandler implements
CommandHandler {
private final AtomicLong invocationCount = new AtomicLong(0);
private final AtomicInteger queuedCount = new AtomicInteger(0);
private final ThreadPoolExecutor executor;
- private long totalTime;
+ private final MutableRate opsLatencyMs;
/**
* Constructs a close container command handler.
@@ -72,6 +74,9 @@ public class CloseContainerCommandHandler implements
CommandHandler {
new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "CloseContainerThread-%d")
.build());
+ MetricsRegistry registry = new MetricsRegistry(
+ CloseContainerCommandHandler.class.getSimpleName());
+ this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.closeContainerCommand + "Ms");
}
/**
@@ -155,7 +160,7 @@ public class CloseContainerCommandHandler implements
CommandHandler {
LOG.error("Can't close container #{}", containerId, e);
} finally {
long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
+ this.opsLatencyMs.add(endTime - startTime);
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
@@ -204,15 +209,12 @@ public class CloseContainerCommandHandler implements
CommandHandler {
*/
@Override
public long getAverageRunTime() {
- if (invocationCount.get() > 0) {
- return totalTime / invocationCount.get();
- }
- return 0;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime;
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 241abb6f4a..be39277fdf 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.protocol.proto.
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -60,9 +62,9 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
private final AtomicLong invocationCount = new AtomicLong(0);
private final AtomicInteger queuedCount = new AtomicInteger(0);
- private long totalTime;
private final Executor executor;
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
+ private final MutableRate opsLatencyMs;
/**
* Constructs a closePipelineCommand handler.
@@ -80,6 +82,9 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
Executor executor) {
this.newRaftClient = newRaftClient;
this.executor = executor;
+ MetricsRegistry registry = new MetricsRegistry(
+ ClosePipelineCommandHandler.class.getSimpleName());
+ this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.closePipelineCommand + "Ms");
}
/**
@@ -155,7 +160,7 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
}
} finally {
long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
+ this.opsLatencyMs.add(endTime - startTime);
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
@@ -187,15 +192,12 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
*/
@Override
public long getAverageRunTime() {
- if (invocationCount.get() > 0) {
- return totalTime / invocationCount.get();
- }
- return 0;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime;
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index 4a36a1987d..62fc8a919d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -59,8 +61,8 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
private final AtomicInteger queuedCount = new AtomicInteger(0);
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
- private long totalTime;
private final Executor executor;
+ private final MutableRate opsLatencyMs;
/**
* Constructs a createPipelineCommand handler.
@@ -75,6 +77,9 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
Executor executor) {
this.newRaftClient = newRaftClient;
this.executor = executor;
+ MetricsRegistry registry = new MetricsRegistry(
+ CreatePipelineCommandHandler.class.getSimpleName());
+ this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.createPipelineCommand + "Ms");
}
/**
@@ -135,7 +140,7 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
}
} finally {
long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
+ this.opsLatencyMs.add(endTime - startTime);
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
@@ -167,15 +172,12 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
*/
@Override
public long getAverageRunTime() {
- if (invocationCount.get() > 0) {
- return totalTime / invocationCount.get();
- }
- return 0;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime;
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index bd7431c614..1a630f8f0b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import
org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
@@ -91,7 +93,6 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
private final ContainerSet containerSet;
private final ConfigurationSource conf;
private int invocationCount;
- private long totalTime;
private final ThreadPoolExecutor executor;
private final LinkedBlockingQueue<DeleteCmdInfo> deleteCommandQueues;
private final Daemon handlerThread;
@@ -99,6 +100,7 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
private final BlockDeletingServiceMetrics blockDeleteMetrics;
private final long tryLockTimeoutMs;
private final Map<String, SchemaHandler> schemaHandlers;
+ private final MutableRate opsLatencyMs;
public DeleteBlocksCommandHandler(OzoneContainer container,
ConfigurationSource conf, DatanodeConfiguration dnConf,
@@ -121,6 +123,9 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
dnConf.getBlockDeleteThreads(), threadFactory);
this.deleteCommandQueues =
new LinkedBlockingQueue<>(dnConf.getBlockDeleteQueueLimit());
+ MetricsRegistry registry = new MetricsRegistry(
+ DeleteBlocksCommandHandler.class.getSimpleName());
+ this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.deleteBlocksCommand + "Ms");
long interval = dnConf.getBlockDeleteCommandWorkerInterval().toMillis();
handlerThread = new Daemon(new DeleteCmdWorker(interval));
handlerThread.start();
@@ -403,7 +408,7 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG);
long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
+ this.opsLatencyMs.add(endTime - startTime);
invocationCount++;
}
}
@@ -666,15 +671,12 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
@Override
public long getAverageRunTime() {
- if (invocationCount > 0) {
- return totalTime / invocationCount;
- }
- return 0;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime;
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index b76e306e1c..59aaacc1c8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -22,6 +22,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -39,7 +41,6 @@ import java.util.OptionalLong;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Handler to process the DeleteContainerCommand from SCM.
@@ -51,10 +52,10 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicInteger timeoutCount = new AtomicInteger(0);
- private final AtomicLong totalTime = new AtomicLong(0);
private final ThreadPoolExecutor executor;
private final Clock clock;
private int maxQueueSize;
+ private final MutableRate opsLatencyMs;
public DeleteContainerCommandHandler(
int threadPoolSize, Clock clock, int queueSize, String threadNamePrefix)
{
@@ -73,6 +74,9 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
this.executor = executor;
this.clock = clock;
maxQueueSize = queueSize;
+ MetricsRegistry registry = new MetricsRegistry(
+ DeleteContainerCommandHandler.class.getSimpleName());
+ this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.deleteContainerCommand + "Ms");
}
@Override
public void handle(final SCMCommand command,
@@ -124,7 +128,7 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
} catch (IOException e) {
LOG.error("Exception occurred while deleting the container.", e);
} finally {
- totalTime.getAndAdd(Time.monotonicNow() - startTime);
+ this.opsLatencyMs.add(Time.monotonicNow() - startTime);
}
}
@@ -149,14 +153,12 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
@Override
public long getAverageRunTime() {
- final int invocations = invocationCount.get();
- return invocations == 0 ?
- 0 : totalTime.get() / invocations;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime.get();
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
index bd7ec5710d..77e152447b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
@@ -20,6 +20,8 @@ import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATI
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.FinalizeNewLayoutVersionCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
@@ -42,12 +44,15 @@ public class FinalizeNewLayoutVersionCommandHandler
implements CommandHandler {
LoggerFactory.getLogger(FinalizeNewLayoutVersionCommandHandler.class);
private AtomicLong invocationCount = new AtomicLong(0);
- private long totalTime;
+ private final MutableRate opsLatencyMs;
/**
* Constructs a FinalizeNewLayoutVersionCommandHandler.
*/
public FinalizeNewLayoutVersionCommandHandler() {
+ MetricsRegistry registry = new MetricsRegistry(
+ FinalizeNewLayoutVersionCommandHandler.class.getSimpleName());
+ this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.finalizeNewLayoutVersionCommand + "Ms");
}
/**
@@ -82,7 +87,7 @@ public class FinalizeNewLayoutVersionCommandHandler
implements CommandHandler {
LOG.error("Exception during finalization.", e);
} finally {
long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
+ this.opsLatencyMs.add(endTime - startTime);
}
}
@@ -113,15 +118,12 @@ public class FinalizeNewLayoutVersionCommandHandler
implements CommandHandler {
*/
@Override
public long getAverageRunTime() {
- if (invocationCount.get() > 0) {
- return totalTime / invocationCount.get();
- }
- return 0;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime;
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index 602687d7a0..030d169e9b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -36,6 +36,7 @@ public class ReconstructECContainersCommandHandler implements
CommandHandler {
private final ReplicationSupervisor supervisor;
private final ECReconstructionCoordinator coordinator;
private final ConfigurationSource conf;
+ private String metricsName;
public ReconstructECContainersCommandHandler(ConfigurationSource conf,
ReplicationSupervisor supervisor,
@@ -52,8 +53,16 @@ public class ReconstructECContainersCommandHandler
implements CommandHandler {
(ReconstructECContainersCommand) command;
ECReconstructionCommandInfo reconstructionCommandInfo =
new ECReconstructionCommandInfo(ecContainersCommand);
- this.supervisor.addTask(new ECReconstructionCoordinatorTask(
- coordinator, reconstructionCommandInfo));
+ ECReconstructionCoordinatorTask task = new ECReconstructionCoordinatorTask(
+ coordinator, reconstructionCommandInfo);
+ if (this.metricsName == null) {
+ this.metricsName = task.getMetricName();
+ }
+ this.supervisor.addTask(task);
+ }
+
+ public String getMetricsName() {
+ return this.metricsName;
}
@Override
@@ -63,23 +72,26 @@ public class ReconstructECContainersCommandHandler
implements CommandHandler {
@Override
public int getInvocationCount() {
- return 0;
+ return this.metricsName == null ? 0 : (int) this.supervisor
+ .getReplicationRequestCount(metricsName);
}
@Override
public long getAverageRunTime() {
- return 0;
+ return this.metricsName == null ? 0 : (int) this.supervisor
+ .getReplicationRequestAvgTime(metricsName);
}
@Override
public long getTotalRunTime() {
- return 0;
+ return this.metricsName == null ? 0 : this.supervisor
+ .getReplicationRequestTotalTime(metricsName);
}
@Override
public int getQueuedCount() {
- return supervisor
- .getInFlightReplications(ECReconstructionCoordinatorTask.class);
+ return this.metricsName == null ? 0 : (int) this.supervisor
+ .getReplicationQueuedCount(metricsName);
}
public ConfigurationSource getConf() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
index 3c14b2fb16..1ab31ba1c4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
@@ -18,6 +18,8 @@ package
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -27,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Command handler to refresh usage info of all volumes.
@@ -38,9 +39,12 @@ public class RefreshVolumeUsageCommandHandler implements
CommandHandler {
LoggerFactory.getLogger(RefreshVolumeUsageCommandHandler.class);
private final AtomicInteger invocationCount = new AtomicInteger(0);
- private final AtomicLong totalTime = new AtomicLong(0);
+ private final MutableRate opsLatencyMs;
public RefreshVolumeUsageCommandHandler() {
+ MetricsRegistry registry = new MetricsRegistry(
+ RefreshVolumeUsageCommandHandler.class.getSimpleName());
+ this.opsLatencyMs = registry.newRate(Type.refreshVolumeUsageInfo + "Ms");
}
@Override
@@ -50,7 +54,7 @@ public class RefreshVolumeUsageCommandHandler implements
CommandHandler {
invocationCount.incrementAndGet();
final long startTime = Time.monotonicNow();
container.getVolumeSet().refreshAllVolumeUsage();
- totalTime.getAndAdd(Time.monotonicNow() - startTime);
+ this.opsLatencyMs.add(Time.monotonicNow() - startTime);
}
@Override
@@ -66,14 +70,12 @@ public class RefreshVolumeUsageCommandHandler implements
CommandHandler {
@Override
public long getAverageRunTime() {
- final int invocations = invocationCount.get();
- return invocations == 0 ?
- 0 : totalTime.get() / invocations;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime.get();
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index 21b26339e2..242a4eb74b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -43,29 +43,28 @@ public class ReplicateContainerCommandHandler implements
CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
- private int invocationCount;
-
- private long totalTime;
-
- private ConfigurationSource conf;
-
private ReplicationSupervisor supervisor;
private ContainerReplicator downloadReplicator;
private ContainerReplicator pushReplicator;
+ private String metricsName;
+
public ReplicateContainerCommandHandler(
ConfigurationSource conf,
ReplicationSupervisor supervisor,
ContainerReplicator downloadReplicator,
ContainerReplicator pushReplicator) {
- this.conf = conf;
this.supervisor = supervisor;
this.downloadReplicator = downloadReplicator;
this.pushReplicator = pushReplicator;
}
+ public String getMetricsName() {
+ return this.metricsName;
+ }
+
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
@@ -86,12 +85,16 @@ public class ReplicateContainerCommandHandler implements
CommandHandler {
downloadReplicator : pushReplicator;
ReplicationTask task = new ReplicationTask(replicateCommand, replicator);
+ if (metricsName == null) {
+ metricsName = task.getMetricName();
+ }
supervisor.addTask(task);
}
@Override
public int getQueuedCount() {
- return supervisor.getInFlightReplications(ReplicationTask.class);
+ return this.metricsName == null ? 0 : (int) this.supervisor
+ .getReplicationQueuedCount(metricsName);
}
@Override
@@ -101,19 +104,19 @@ public class ReplicateContainerCommandHandler implements
CommandHandler {
@Override
public int getInvocationCount() {
- return this.invocationCount;
+ return this.metricsName == null ? 0 : (int) this.supervisor
+ .getReplicationRequestCount(metricsName);
}
@Override
public long getAverageRunTime() {
- if (invocationCount > 0) {
- return totalTime / invocationCount;
- }
- return 0;
+ return this.metricsName == null ? 0 : (int) this.supervisor
+ .getReplicationRequestAvgTime(metricsName);
}
@Override
public long getTotalRunTime() {
- return totalTime;
+ return this.metricsName == null ? 0 : this.supervisor
+ .getReplicationRequestTotalTime(metricsName);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
index 6f7f4414ee..3356362479 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
@@ -21,8 +21,10 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -39,7 +41,6 @@ import org.apache.hadoop.hdds.protocol.proto.
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -54,7 +55,7 @@ public class SetNodeOperationalStateCommandHandler implements
CommandHandler {
private final ConfigurationSource conf;
private final Consumer<HddsProtos.NodeOperationalState>
replicationSupervisor;
private final AtomicInteger invocationCount = new AtomicInteger(0);
- private final AtomicLong totalTime = new AtomicLong(0);
+ private final MutableRate opsLatencyMs;
/**
* Set Node State command handler.
@@ -65,6 +66,9 @@ public class SetNodeOperationalStateCommandHandler implements
CommandHandler {
Consumer<HddsProtos.NodeOperationalState> replicationSupervisor) {
this.conf = conf;
this.replicationSupervisor = replicationSupervisor;
+ MetricsRegistry registry = new MetricsRegistry(
+ SetNodeOperationalStateCommandHandler.class.getSimpleName());
+ this.opsLatencyMs = registry.newRate(Type.setNodeOperationalStateCommand +
"Ms");
}
/**
@@ -80,9 +84,6 @@ public class SetNodeOperationalStateCommandHandler implements
CommandHandler {
StateContext context, SCMConnectionManager connectionManager) {
long startTime = Time.monotonicNow();
invocationCount.incrementAndGet();
- StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto
- setNodeCmdProto = null;
-
if (command.getType() != Type.setNodeOperationalStateCommand) {
LOG.warn("Skipping handling command, expected command "
+ "type {} but found {}",
@@ -91,7 +92,7 @@ public class SetNodeOperationalStateCommandHandler implements
CommandHandler {
}
SetNodeOperationalStateCommand setNodeCmd =
(SetNodeOperationalStateCommand) command;
- setNodeCmdProto = setNodeCmd.getProto();
+ SetNodeOperationalStateCommandProto setNodeCmdProto =
setNodeCmd.getProto();
DatanodeDetails dni = context.getParent().getDatanodeDetails();
HddsProtos.NodeOperationalState state =
setNodeCmdProto.getNodeOperationalState();
@@ -106,7 +107,7 @@ public class SetNodeOperationalStateCommandHandler
implements CommandHandler {
// handler interface.
}
replicationSupervisor.accept(state);
- totalTime.addAndGet(Time.monotonicNow() - startTime);
+ this.opsLatencyMs.add(Time.monotonicNow() - startTime);
}
// TODO - this duplicates code in HddsDatanodeService and InitDatanodeState
@@ -125,8 +126,7 @@ public class SetNodeOperationalStateCommandHandler
implements CommandHandler {
* @return Type
*/
@Override
- public StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
- getCommandType() {
+ public Type getCommandType() {
return Type.setNodeOperationalStateCommand;
}
@@ -147,14 +147,12 @@ public class SetNodeOperationalStateCommandHandler
implements CommandHandler {
*/
@Override
public long getAverageRunTime() {
- final int invocations = invocationCount.get();
- return invocations == 0 ?
- 0 : totalTime.get() / invocations;
+ return (long) this.opsLatencyMs.lastStat().mean();
}
@Override
public long getTotalRunTime() {
- return totalTime.get();
+ return (long) this.opsLatencyMs.lastStat().total();
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 92ff4b6d8d..9513cac84e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
@@ -50,6 +52,7 @@ import
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Sta
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +80,10 @@ public final class ReplicationSupervisor {
private final Map<String, AtomicLong> failureCounter = new
ConcurrentHashMap<>();
private final Map<String, AtomicLong> timeoutCounter = new
ConcurrentHashMap<>();
private final Map<String, AtomicLong> skippedCounter = new
ConcurrentHashMap<>();
+ private final Map<String, AtomicLong> queuedCounter = new
ConcurrentHashMap<>();
+
+ private final MetricsRegistry registry;
+ private final Map<String, MutableRate> opsLatencyMs = new
ConcurrentHashMap<>();
private static final Map<String, String> METRICS_MAP;
@@ -218,6 +225,7 @@ public final class ReplicationSupervisor {
nodeStateUpdated(dn.getPersistedOpState());
}
}
+ registry = new
MetricsRegistry(ReplicationSupervisor.class.getSimpleName());
}
/**
@@ -240,6 +248,9 @@ public final class ReplicationSupervisor {
failureCounter.put(task.getMetricName(), new AtomicLong(0));
timeoutCounter.put(task.getMetricName(), new AtomicLong(0));
skippedCounter.put(task.getMetricName(), new AtomicLong(0));
+ queuedCounter.put(task.getMetricName(), new AtomicLong(0));
+ opsLatencyMs.put(task.getMetricName(), registry.newRate(
+ task.getClass().getSimpleName() + "Ms"));
METRICS_MAP.put(task.getMetricName(),
task.getMetricDescriptionSegment());
}
}
@@ -253,6 +264,7 @@ public final class ReplicationSupervisor {
taskCounter.computeIfAbsent(task.getClass(),
k -> new AtomicInteger()).incrementAndGet();
}
+ queuedCounter.get(task.getMetricName()).incrementAndGet();
executor.execute(new TaskRunner(task));
}
}
@@ -353,6 +365,7 @@ public final class ReplicationSupervisor {
@Override
public void run() {
+ final long startTime = Time.monotonicNow();
try {
requestCounter.get(task.getMetricName()).incrementAndGet();
@@ -401,6 +414,8 @@ public final class ReplicationSupervisor {
LOG.warn("Failed {}", this, e);
failureCounter.get(task.getMetricName()).incrementAndGet();
} finally {
+ queuedCounter.get(task.getMetricName()).decrementAndGet();
+ opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() -
startTime);
inFlight.remove(task);
decrementTaskCounter(task);
}
@@ -511,4 +526,22 @@ public final class ReplicationSupervisor {
return counter != null ? counter.get() : 0;
}
+ public long getReplicationQueuedCount() {
+ return getCount(queuedCounter);
+ }
+
+ public long getReplicationQueuedCount(String metricsName) {
+ AtomicLong counter = queuedCounter.get(metricsName);
+ return counter != null ? counter.get() : 0;
+ }
+
+ public long getReplicationRequestAvgTime(String metricsName) {
+ MutableRate rate = opsLatencyMs.get(metricsName);
+ return rate != null ? (long) rate.lastStat().mean() : 0;
+ }
+
+ public long getReplicationRequestTotalTime(String metricsName) {
+ MutableRate rate = opsLatencyMs.get(metricsName);
+ return rate != null ? (long) rate.lastStat().total() : 0;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
index a1763976af..cd1103a0c4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java
@@ -67,7 +67,7 @@ public class ReplicationSupervisorMetrics implements
MetricsSource {
supervisor.getTotalInFlightReplications())
.addGauge(Interns.info("numQueuedReplications",
"Number of replications in queue"),
- supervisor.getQueueSize())
+ supervisor.getReplicationQueuedCount())
.addGauge(Interns.info("numRequestedReplications",
"Number of requested replications"),
supervisor.getReplicationRequestCount())
@@ -107,7 +107,10 @@ public class ReplicationSupervisorMetrics implements
MetricsSource {
.addGauge(Interns.info("numSkipped" + metricsName,
"Number of " + descriptionSegment + " skipped as the
container is "
+ "already present"),
- supervisor.getReplicationSkippedCount(metricsName));
+ supervisor.getReplicationSkippedCount(metricsName))
+ .addGauge(Interns.info("numQueued" + metricsName,
+ "Number of " + descriptionSegment + " in queue"),
+ supervisor.getReplicationQueuedCount(metricsName));
}
});
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconstructECContainersCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconstructECContainersCommandHandler.java
new file mode 100644
index 0000000000..7e6c760818
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconstructECContainersCommandHandler.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Proto2Utils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics;
+import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases to verify {@link ReconstructECContainersCommandHandler}.
+ */
+public class TestReconstructECContainersCommandHandler {
+ private OzoneConfiguration conf;
+ private ReplicationSupervisor supervisor;
+ private ECReconstructionCoordinator coordinator;
+ private OzoneContainer ozoneContainer;
+ private StateContext stateContext;
+ private SCMConnectionManager connectionManager;
+
+ @BeforeEach
+ public void setUp() {
+ supervisor = mock(ReplicationSupervisor.class);
+ coordinator = mock(ECReconstructionCoordinator.class);
+ conf = new OzoneConfiguration();
+ ozoneContainer = mock(OzoneContainer.class);
+ connectionManager = mock(SCMConnectionManager.class);
+ stateContext = mock(StateContext.class);
+ }
+
+ @Test
+ public void testMetrics() {
+ ReconstructECContainersCommandHandler commandHandler =
+ new ReconstructECContainersCommandHandler(conf, supervisor,
coordinator);
+ doNothing().when(supervisor).addTask(any());
+ Map<SCMCommandProto.Type, CommandHandler> handlerMap = new HashMap<>();
+ handlerMap.put(commandHandler.getCommandType(), commandHandler);
+ CommandHandlerMetrics metrics = CommandHandlerMetrics.create(handlerMap);
+ try {
+ byte[] missingIndexes = {1, 2};
+ ByteString missingContainerIndexes =
Proto2Utils.unsafeByteString(missingIndexes);
+ ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
+ List<DatanodeDetails> dnDetails = getDNDetails(5);
+ List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sources =
+ dnDetails.stream().map(a -> new ReconstructECContainersCommand
+ .DatanodeDetailsAndReplicaIndex(a, dnDetails.indexOf(a)))
+ .collect(Collectors.toList());
+ List<DatanodeDetails> targets = getDNDetails(2);
+ ReconstructECContainersCommand reconstructECContainersCommand =
+ new ReconstructECContainersCommand(1L, sources, targets,
+ missingContainerIndexes, ecReplicationConfig);
+
+ commandHandler.handle(reconstructECContainersCommand, ozoneContainer,
+ stateContext, connectionManager);
+ String metricsName = "ECReconstructions";
+ assertEquals(commandHandler.getMetricsName(), metricsName);
+ when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(1L);
+ assertEquals(commandHandler.getInvocationCount(), 1);
+
+ commandHandler.handle(new ReconstructECContainersCommand(2L, sources,
+ targets, missingContainerIndexes, ecReplicationConfig),
ozoneContainer,
+ stateContext, connectionManager);
+ commandHandler.handle(new ReconstructECContainersCommand(3L, sources,
+ targets, missingContainerIndexes, ecReplicationConfig),
ozoneContainer,
+ stateContext, connectionManager);
+ commandHandler.handle(new ReconstructECContainersCommand(4L, sources,
+ targets, missingContainerIndexes, ecReplicationConfig),
ozoneContainer,
+ stateContext, connectionManager);
+ commandHandler.handle(new ReconstructECContainersCommand(5L, sources,
+ targets, missingContainerIndexes, ecReplicationConfig),
ozoneContainer,
+ stateContext, connectionManager);
+ commandHandler.handle(new ReconstructECContainersCommand(6L, sources,
+ targets, missingContainerIndexes, ecReplicationConfig),
ozoneContainer,
+ stateContext, connectionManager);
+
+ when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(5L);
+
when(supervisor.getReplicationRequestTotalTime(metricsName)).thenReturn(10L);
+
when(supervisor.getReplicationRequestAvgTime(metricsName)).thenReturn(2L);
+ when(supervisor.getReplicationQueuedCount(metricsName)).thenReturn(1L);
+ assertEquals(commandHandler.getInvocationCount(), 5);
+ assertEquals(commandHandler.getQueuedCount(), 1);
+ assertEquals(commandHandler.getTotalRunTime(), 10);
+ assertEquals(commandHandler.getAverageRunTime(), 2);
+
+ MetricsCollectorImpl metricsCollector = new MetricsCollectorImpl();
+ metrics.getMetrics(metricsCollector, true);
+ assertEquals(1, metricsCollector.getRecords().size());
+ } finally {
+ metrics.unRegister();
+ }
+ }
+
+ private List<DatanodeDetails> getDNDetails(int numDns) {
+ List<DatanodeDetails> dns = new ArrayList<>();
+ for (int i = 0; i < numDns; i++) {
+ dns.add(MockDatanodeDetails.randomDatanodeDetails());
+ }
+ return dns;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
new file mode 100644
index 0000000000..9de00877e5
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics;
+import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doNothing;
+
+/**
+ * Test cases to verify {@link ReplicateContainerCommandHandler}.
+ */
+public class TestReplicateContainerCommandHandler {
+ private OzoneConfiguration conf;
+ private ReplicationSupervisor supervisor;
+ private ContainerReplicator downloadReplicator;
+ private ContainerReplicator pushReplicator;
+ private OzoneContainer ozoneContainer;
+ private StateContext stateContext;
+ private SCMConnectionManager connectionManager;
+
+ @BeforeEach
+ public void setUp() {
+ conf = new OzoneConfiguration();
+ supervisor = mock(ReplicationSupervisor.class);
+ downloadReplicator = mock(ContainerReplicator.class);
+ pushReplicator = mock(ContainerReplicator.class);
+ ozoneContainer = mock(OzoneContainer.class);
+ connectionManager = mock(SCMConnectionManager.class);
+ stateContext = mock(StateContext.class);
+ }
+
+ @Test
+ public void testMetrics() {
+ ReplicateContainerCommandHandler commandHandler =
+ new ReplicateContainerCommandHandler(conf, supervisor,
+ downloadReplicator, pushReplicator);
+ Map<SCMCommandProto.Type, CommandHandler> handlerMap = new HashMap<>();
+ handlerMap.put(commandHandler.getCommandType(), commandHandler);
+ CommandHandlerMetrics metrics = CommandHandlerMetrics.create(handlerMap);
+ try {
+ doNothing().when(supervisor).addTask(any());
+ DatanodeDetails source = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
+ List<DatanodeDetails> sourceList = new ArrayList<>();
+ sourceList.add(source);
+
+ ReplicateContainerCommand command =
ReplicateContainerCommand.fromSources(
+ 1, sourceList);
+ commandHandler.handle(command, ozoneContainer, stateContext,
connectionManager);
+ String metricsName = "ContainerReplications";
+ assertEquals(commandHandler.getMetricsName(), metricsName);
+ when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(1L);
+ assertEquals(commandHandler.getInvocationCount(), 1);
+
+ commandHandler.handle(ReplicateContainerCommand.fromSources(2,
sourceList),
+ ozoneContainer, stateContext, connectionManager);
+ commandHandler.handle(ReplicateContainerCommand.fromSources(3,
sourceList),
+ ozoneContainer, stateContext, connectionManager);
+ commandHandler.handle(ReplicateContainerCommand.toTarget(4, target),
+ ozoneContainer, stateContext, connectionManager);
+ commandHandler.handle(ReplicateContainerCommand.toTarget(5, target),
+ ozoneContainer, stateContext, connectionManager);
+ commandHandler.handle(ReplicateContainerCommand.fromSources(6,
sourceList),
+ ozoneContainer, stateContext, connectionManager);
+
+ when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(5L);
+
when(supervisor.getReplicationRequestTotalTime(metricsName)).thenReturn(10L);
+
when(supervisor.getReplicationRequestAvgTime(metricsName)).thenReturn(3L);
+ when(supervisor.getReplicationQueuedCount(metricsName)).thenReturn(1L);
+ assertEquals(commandHandler.getInvocationCount(), 5);
+ assertEquals(commandHandler.getQueuedCount(), 1);
+ assertEquals(commandHandler.getTotalRunTime(), 10);
+ assertEquals(commandHandler.getAverageRunTime(), 3);
+
+ MetricsCollectorImpl metricsCollector = new MetricsCollectorImpl();
+ metrics.getMetrics(metricsCollector, true);
+ assertEquals(1, metricsCollector.getRecords().size());
+ } finally {
+ metrics.unRegister();
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index ef37c22665..315e0c0253 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -87,6 +87,7 @@ import static
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerComman
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL;
import static org.mockito.Mockito.any;
@@ -488,6 +489,15 @@ public class TestReplicationSupervisor {
assertEquals(0, ecReconstructionSupervisor.getReplicationRequestCount(
task1.getMetricName()));
+ assertTrue(replicationSupervisor.getReplicationRequestTotalTime(
+ task1.getMetricName()) > 0);
+ assertTrue(ecReconstructionSupervisor.getReplicationRequestTotalTime(
+ task2.getMetricName()) > 0);
+ assertTrue(replicationSupervisor.getReplicationRequestAvgTime(
+ task1.getMetricName()) > 0);
+ assertTrue(ecReconstructionSupervisor.getReplicationRequestAvgTime(
+ task2.getMetricName()) > 0);
+
MetricsCollectorImpl replicationMetricsCollector = new
MetricsCollectorImpl();
replicationMetrics.getMetrics(replicationMetricsCollector, true);
assertEquals(1, replicationMetricsCollector.getRecords().size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]