This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8a5c4e86753 HDDS-12749. Use EnumCounters instead Map<Type, Integer>
for command counts (#9211)
8a5c4e86753 is described below
commit 8a5c4e86753d258c073b26bc12a99c393ed58557
Author: Sarveksha Yeshavantha Raju
<[email protected]>
AuthorDate: Wed Oct 29 22:59:05 2025 +0530
HDDS-12749. Use EnumCounters instead Map<Type, Integer> for command counts
(#9211)
---
.../common/statemachine/DatanodeQueueMetrics.java | 9 ++--
.../common/statemachine/DatanodeStateMachine.java | 22 ++++----
.../common/statemachine/StateContext.java | 7 +--
.../commandhandler/CommandDispatcher.java | 11 ++--
.../states/endpoint/HeartbeatEndpointTask.java | 14 ++---
.../common/statemachine/TestStateContext.java | 11 ++--
.../states/endpoint/TestHeartbeatEndpointTask.java | 60 ++++++++++++++++------
7 files changed, 84 insertions(+), 50 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
index d47e0c0936a..c0ed734da69 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeQueueMetrics.java
@@ -25,6 +25,7 @@
import java.util.Map;
import org.apache.commons.text.WordUtils;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -114,20 +115,20 @@ private void initializeQueues() {
public void getMetrics(MetricsCollector collector, boolean b) {
MetricsRecordBuilder builder = collector.addRecord(METRICS_SOURCE_NAME);
- Map<SCMCommandProto.Type, Integer> tmpMap =
+ EnumCounters<SCMCommandProto.Type> tmpEnum =
datanodeStateMachine.getContext().getCommandQueueSummary();
for (Map.Entry<SCMCommandProto.Type, MetricsInfo> entry:
stateContextCommandQueueMap.entrySet()) {
builder.addGauge(entry.getValue(),
- (long) tmpMap.getOrDefault(entry.getKey(), 0));
+ tmpEnum.get(entry.getKey()));
}
- tmpMap = datanodeStateMachine.getCommandDispatcher()
+ tmpEnum = datanodeStateMachine.getCommandDispatcher()
.getQueuedCommandCount();
for (Map.Entry<SCMCommandProto.Type, MetricsInfo> entry:
commandDispatcherQueueMap.entrySet()) {
builder.addGauge(entry.getValue(),
- (long) tmpMap.getOrDefault(entry.getKey(), 0));
+ tmpEnum.get(entry.getKey()));
}
for (Map.Entry<InetSocketAddress, MetricsInfo> entry:
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 1bd888c84a6..3b61050c4af 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -23,7 +23,6 @@
import java.io.IOException;
import java.time.Clock;
import java.time.ZoneId;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -48,6 +47,7 @@
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.NettyMetrics;
+import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
@@ -620,23 +620,21 @@ public void join() throws InterruptedException {
* (single) thread, or queues it in the handler where a thread pool executor
* will process it. The total commands queued in the datanode is therefore
* the sum those in the CommandQueue and the dispatcher queues.
- * @return A map containing a count for each known command.
+ * @return EnumCounters containing a count for each known command.
*/
- public Map<SCMCommandProto.Type, Integer> getQueuedCommandCount() {
- // This is a "sparse map" - there is not guaranteed to be an entry for
- // every command type
- Map<SCMCommandProto.Type, Integer> commandQSummary =
+ public EnumCounters<SCMCommandProto.Type> getQueuedCommandCount() {
+ // Get command counts from StateContext command queue
+ EnumCounters<SCMCommandProto.Type> commandQSummary =
context.getCommandQueueSummary();
- // This map will contain an entry for every command type which is
registered
+ // This EnumCounters will contain an entry for every command type which is
registered
// with the dispatcher, and that should be all command types the DN knows
- // about. Any commands with nothing in the queue will return a count of
+ // about. Any commands with nothing in the queue will have a count of
// zero.
- Map<SCMCommandProto.Type, Integer> dispatcherQSummary =
+ EnumCounters<SCMCommandProto.Type> dispatcherQSummary =
commandDispatcher.getQueuedCommandCount();
- // Merge the "sparse" map into the fully populated one returning a count
+ // Merge the two EnumCounters into the fully populated one having a count
// for all known command types.
- commandQSummary.forEach((k, v)
- -> dispatcherQSummary.merge(k, v, Integer::sum));
+ dispatcherQSummary.add(commandQSummary);
return dispatcherQSummary;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 305b7b55a22..a7ea469f0c8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -71,6 +71,7 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdfs.util.EnumCounters;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import
org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
@@ -796,12 +797,12 @@ public void addCommand(SCMCommand<?> command) {
this.addCmdStatus(command);
}
- public Map<SCMCommandProto.Type, Integer> getCommandQueueSummary() {
- Map<SCMCommandProto.Type, Integer> summary = new HashMap<>();
+ public EnumCounters<SCMCommandProto.Type> getCommandQueueSummary() {
+ EnumCounters<SCMCommandProto.Type> summary = new
EnumCounters<>(SCMCommandProto.Type.class);
lock.lock();
try {
for (SCMCommand<?> cmd : commandQueue) {
- summary.put(cmd.getType(), summary.getOrDefault(cmd.getType(), 0) + 1);
+ summary.add(cmd.getType(), 1);
}
} finally {
lock.unlock();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index ece91ffdd1c..482878e6f58 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics;
import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
@@ -115,15 +116,15 @@ public void stop() {
/**
* For each registered handler, call its getQueuedCount method to retrieve
the
- * number of queued commands. The returned map will contain an entry for
every
+ * number of queued commands. The returned EnumCounters will contain an
entry for every
* registered command in the dispatcher, with a value of zero if there are no
* queued commands.
- * @return A Map of CommandType where the value is the queued command count.
+ * @return EnumCounters of CommandType with the queued command count.
*/
- public Map<Type, Integer> getQueuedCommandCount() {
- Map<Type, Integer> counts = new HashMap<>();
+ public EnumCounters<Type> getQueuedCommandCount() {
+ EnumCounters<Type> counts = new EnumCounters<>(Type.class);
for (Map.Entry<Type, CommandHandler> entry : handlerMap.entrySet()) {
- counts.put(entry.getKey(), entry.getValue().getQueuedCount());
+ counts.set(entry.getKey(), entry.getValue().getQueuedCount());
}
return counts;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 2681cdf90d5..0959d78bdb2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -30,7 +30,6 @@
import java.time.ZonedDateTime;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -45,6 +44,7 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdfs.util.EnumCounters;
import
org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates;
@@ -242,14 +242,16 @@ private void addPipelineActions(
*/
private void addQueuedCommandCounts(
SCMHeartbeatRequestProto.Builder requestBuilder) {
- Map<SCMCommandProto.Type, Integer> commandCount =
+ EnumCounters<SCMCommandProto.Type> commandCount =
context.getParent().getQueuedCommandCount();
CommandQueueReportProto.Builder reportProto =
CommandQueueReportProto.newBuilder();
- for (Map.Entry<SCMCommandProto.Type, Integer> entry
- : commandCount.entrySet()) {
- reportProto.addCommand(entry.getKey())
- .addCount(entry.getValue());
+ for (SCMCommandProto.Type type : SCMCommandProto.Type.values()) {
+ long count = commandCount.get(type);
+ if (count > 0) {
+ reportProto.addCommand(type)
+ .addCount((int) count);
+ }
}
requestBuilder.setCommandQueueReport(reportProto.build());
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 73e4b8f4368..8d79335591b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -59,6 +59,7 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
@@ -709,15 +710,15 @@ public void testCommandQueueSummary() throws IOException {
ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId()));
ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptySet()));
- Map<SCMCommandProto.Type, Integer> summary = ctx.getCommandQueueSummary();
+ EnumCounters<SCMCommandProto.Type> summary = ctx.getCommandQueueSummary();
assertEquals(3,
-
summary.get(SCMCommandProto.Type.replicateContainerCommand).intValue());
+ summary.get(SCMCommandProto.Type.replicateContainerCommand));
assertEquals(2,
- summary.get(SCMCommandProto.Type.closePipelineCommand).intValue());
+ summary.get(SCMCommandProto.Type.closePipelineCommand));
assertEquals(1,
- summary.get(SCMCommandProto.Type.closeContainerCommand).intValue());
+ summary.get(SCMCommandProto.Type.closeContainerCommand));
assertEquals(1,
-
summary.get(SCMCommandProto.Type.reconcileContainerCommand).intValue());
+ summary.get(SCMCommandProto.Type.reconcileContainerCommand));
}
@Test
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 11c145ee38a..c04d2c75884 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -32,10 +32,8 @@
import com.google.protobuf.Proto2Utils;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
@@ -53,6 +51,7 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdfs.util.EnumCounters;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -102,13 +101,16 @@ public void handlesReconstructContainerCommand() throws
Exception {
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
datanodeStateMachine, "");
+ when(datanodeStateMachine.getQueuedCommandCount())
+ .thenReturn(new EnumCounters<>(SCMCommandProto.Type.class));
+
// WHEN
HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
task.call();
// THEN
assertEquals(1, context.getCommandQueueSummary()
- .get(reconstructECContainersCommand).intValue());
+ .get(reconstructECContainersCommand));
}
@Test
@@ -138,13 +140,16 @@ public void testHandlesReconcileContainerCommand() throws
Exception {
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
datanodeStateMachine, "");
+ when(datanodeStateMachine.getQueuedCommandCount())
+ .thenReturn(new EnumCounters<>(SCMCommandProto.Type.class));
+
// WHEN
HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
task.call();
// THEN
assertEquals(1, context.getCommandQueueSummary()
- .get(reconcileContainerCommand).intValue());
+ .get(reconcileContainerCommand));
}
@Test
@@ -165,8 +170,12 @@ public void testheartbeatWithoutReports() throws Exception
{
.build());
OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachine =
mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
+
+ when(datanodeStateMachine.getQueuedCommandCount())
+ .thenReturn(new EnumCounters<>(SCMCommandProto.Type.class));
context.setTermOfLeaderSCM(1);
HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
conf, context, scm);
@@ -185,9 +194,12 @@ public void testheartbeatWithoutReports() throws Exception
{
@Test
public void testheartbeatWithNodeReports() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachine =
mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
+ when(datanodeStateMachine.getQueuedCommandCount())
+ .thenReturn(new EnumCounters<>(SCMCommandProto.Type.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
@@ -217,8 +229,12 @@ public void testheartbeatWithNodeReports() throws
Exception {
@Test
public void testheartbeatWithContainerReports() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachine =
mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
+
+ when(datanodeStateMachine.getQueuedCommandCount())
+ .thenReturn(new EnumCounters<>(SCMCommandProto.Type.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
@@ -249,8 +265,12 @@ public void testheartbeatWithContainerReports() throws
Exception {
@Test
public void testheartbeatWithCommandStatusReports() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachine =
mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
+
+ when(datanodeStateMachine.getQueuedCommandCount())
+ .thenReturn(new EnumCounters<>(SCMCommandProto.Type.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
@@ -282,8 +302,12 @@ public void testheartbeatWithCommandStatusReports() throws
Exception {
@Test
public void testheartbeatWithContainerActions() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachine =
mock(DatanodeStateMachine.class);
StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
- mock(DatanodeStateMachine.class), "");
+ datanodeStateMachine, "");
+
+ when(datanodeStateMachine.getQueuedCommandCount())
+ .thenReturn(new EnumCounters<>(SCMCommandProto.Type.class));
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(
@@ -320,10 +344,10 @@ public void testheartbeatWithAllReports() throws
Exception {
datanodeStateMachine, "");
// Return a Map of command counts when the heartbeat logic requests it
- final Map<SCMCommandProto.Type, Integer> commands = new HashMap<>();
+ final EnumCounters<SCMCommandProto.Type> commands = new
EnumCounters<>(SCMCommandProto.Type.class);
int count = 1;
for (SCMCommandProto.Type cmd : SCMCommandProto.Type.values()) {
- commands.put(cmd, count++);
+ commands.set(cmd, count++);
}
when(datanodeStateMachine.getQueuedCommandCount())
.thenReturn(commands);
@@ -358,10 +382,16 @@ public void testheartbeatWithAllReports() throws
Exception {
assertTrue(heartbeat.hasContainerActions());
assertTrue(heartbeat.hasCommandQueueReport());
CommandQueueReportProto queueCount = heartbeat.getCommandQueueReport();
- assertEquals(queueCount.getCommandCount(), commands.size());
- assertEquals(queueCount.getCountCount(), commands.size());
- for (int i = 0; i < commands.size(); i++) {
- assertEquals(commands.get(queueCount.getCommand(i)).intValue(),
+ int commandCount = 0;
+ for (SCMCommandProto.Type type : SCMCommandProto.Type.values()) {
+ if (commands.get(type) > 0) {
+ commandCount++;
+ }
+ }
+ assertEquals(queueCount.getCommandCount(), commandCount);
+ assertEquals(queueCount.getCountCount(), commandCount);
+ for (int i = 0; i < commandCount; i++) {
+ assertEquals(commands.get(queueCount.getCommand(i)),
queueCount.getCount(i));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]