This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 63a232b798 HDDS-10907. DataNode StorageContainerMetrics numWriteChunk 
is counted multiple times (#6835)
63a232b798 is described below

commit 63a232b798c2eba71f19bafc607c20c8144a41af
Author: Chung En Lee <[email protected]>
AuthorDate: Tue Jul 16 14:17:06 2024 +0800

    HDDS-10907. DataNode StorageContainerMetrics numWriteChunk is counted 
multiple times (#6835)
    
    
    Co-authored-by: Doroszlai, Attila <[email protected]>
---
 .../container/common/impl/HddsDispatcher.java      |  11 +-
 .../container/metrics/TestContainerMetrics.java    | 193 +++++++++++++++------
 2 files changed, 147 insertions(+), 57 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index e494243ccc..b47116950c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -216,7 +216,6 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
     long startTime = Time.monotonicNow();
     Type cmdType = msg.getCmdType();
     long containerID = msg.getContainerID();
-    metrics.incContainerOpsMetrics(cmdType);
     Container container = getContainer(containerID);
     boolean isWriteStage =
         (cmdType == Type.WriteChunk && dispatcherContext != null
@@ -228,6 +227,16 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
             && dispatcherContext.getStage()
             == DispatcherContext.WriteChunkStage.COMMIT_DATA);
 
+    if (dispatcherContext == null) {
+      // increase all op not through ratis
+      metrics.incContainerOpsMetrics(cmdType);
+    } else if (isWriteStage) {
+      // increase WriteChunk in only WRITE_STAGE
+      metrics.incContainerOpsMetrics(cmdType);
+    } else if (cmdType != Type.WriteChunk) {
+      metrics.incContainerOpsMetrics(cmdType);
+    }
+
     try {
       if (DispatcherContext.op(dispatcherContext).validateToken()) {
         validateToken(msg);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index a4a5701f54..068cb01a96 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -18,11 +18,13 @@
 package org.apache.hadoop.ozone.container.metrics;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -32,32 +34,47 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.RatisTestHelper;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
+import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.ozone.test.GenericTestUtils;
 
 import com.google.common.collect.Maps;
-import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static org.apache.ozone.test.MetricsAsserts.assertCounter;
 import static org.apache.ozone.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.ozone.test.MetricsAsserts.getMetrics;
+import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.ratis.util.function.CheckedBiFunction;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.apache.ratis.util.function.CheckedFunction;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
@@ -67,72 +84,126 @@ import org.junit.jupiter.api.io.TempDir;
  */
 @Timeout(300)
 public class TestContainerMetrics {
+  static final String TEST_DIR = GenericTestUtils.getRandomizedTempPath() + 
File.separator;
   @TempDir
   private Path tempDir;
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
+  private static final int DFS_METRICS_PERCENTILES_INTERVALS = 1;
+
+  @BeforeAll
+  public static void setup() {
+    DefaultMetricsSystem.setMiniClusterMode(true);
+    CONF.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
+        DFS_METRICS_PERCENTILES_INTERVALS);
+    CONF.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, 
false);
+    CONF.set(OzoneConfigKeys.OZONE_METADATA_DIRS, TEST_DIR);
+
+  }
+
+  @AfterAll
+  public static void cleanup() {
+    // clean up volume dir
+    File file = new File(TEST_DIR);
+    if (file.exists()) {
+      FileUtil.fullyDelete(file);
+    }
+  }
+
+  @AfterEach
+  public void cleanUp() throws IOException {
+    FileUtils.deleteQuietly(new 
File(CONF.get(ScmConfigKeys.HDDS_DATANODE_DIR_KEY)));
+    
FileUtils.deleteQuietly(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)
 == null ?
+        null : new 
File(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)));
+  }
 
   @Test
   public void testContainerMetrics() throws Exception {
-    XceiverServerGrpc server = null;
-    XceiverClientGrpc client = null;
+    runTestClientServer(pipeline -> CONF
+            .setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
+                pipeline.getFirstNode()
+                    .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
+        pipeline -> new XceiverClientGrpc(pipeline, CONF),
+        (dn, volumeSet) -> new XceiverServerGrpc(dn, CONF,
+            createDispatcher(dn, volumeSet), null), (dn, p) -> {
+        });
+  }
+
+  @Test
+  public void testContainerMetricsRatis() throws Exception {
+    runTestClientServer(
+        pipeline -> RatisTestHelper.initRatisConf(GRPC, CONF),
+        pipeline -> XceiverClientRatis.newXceiverClientRatis(pipeline, CONF),
+        this::newXceiverServerRatis, (dn, p) ->
+            RatisTestHelper.initXceiverServerRatis(GRPC, dn, p));
+  }
+
+  private static MutableVolumeSet createVolumeSet(DatanodeDetails dn, String 
path) throws IOException {
+    CONF.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
+    return new MutableVolumeSet(
+        dn.getUuidString(), CONF,
+        null, StorageVolume.VolumeType.DATA_VOLUME, null);
+  }
+
+  private HddsDispatcher createDispatcher(DatanodeDetails dd, VolumeSet 
volumeSet) {
+    ContainerSet containerSet = new ContainerSet(1000);
+    StateContext context = ContainerTestUtils.getMockContext(
+        dd, CONF);
+    ContainerMetrics metrics = ContainerMetrics.create(CONF);
+    Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
+    for (ContainerProtos.ContainerType containerType :
+        ContainerProtos.ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(containerType, CONF,
+              context.getParent().getDatanodeDetails().getUuidString(),
+              containerSet, volumeSet, metrics,
+              c -> { }));
+    }
+    HddsDispatcher dispatcher = new HddsDispatcher(CONF, containerSet,
+        volumeSet, handlers, context, metrics, null);
+    StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
+        .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
+    dispatcher.setClusterId(UUID.randomUUID().toString());
+    return dispatcher;
+  }
+
+  static void runTestClientServer(
+      CheckedConsumer<Pipeline, IOException> initConf,
+      CheckedFunction<Pipeline, XceiverClientSpi,
+                IOException> createClient,
+      CheckedBiFunction<DatanodeDetails, MutableVolumeSet, XceiverServerSpi,
+          IOException> createServer,
+      CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
+      throws Exception {
+    XceiverServerSpi server = null;
+    XceiverClientSpi client = null;
     long containerID = ContainerTestHelper.getTestContainerID();
-    String path = GenericTestUtils.getRandomizedTempPath();
+    MutableVolumeSet volumeSet = null;
 
     try {
-      final int interval = 1;
-      Pipeline pipeline = MockPipeline
-          .createSingleNodePipeline();
-      OzoneConfiguration conf = new OzoneConfiguration();
-      conf.setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
-          pipeline.getFirstNode()
-              .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
-      conf.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
-          interval);
-
-      DatanodeDetails datanodeDetails = randomDatanodeDetails();
-      conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
-      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path);
-      VolumeSet volumeSet = new MutableVolumeSet(
-          datanodeDetails.getUuidString(), conf,
-          null, StorageVolume.VolumeType.DATA_VOLUME, null);
-      ContainerSet containerSet = new ContainerSet(1000);
-      StateContext context = ContainerTestUtils.getMockContext(
-          datanodeDetails, conf);
-      ContainerMetrics metrics = ContainerMetrics.create(conf);
-      Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
-      for (ContainerProtos.ContainerType containerType :
-          ContainerProtos.ContainerType.values()) {
-        handlers.put(containerType,
-            Handler.getHandlerForContainerType(containerType, conf,
-                context.getParent().getDatanodeDetails().getUuidString(),
-                containerSet, volumeSet, metrics,
-                c -> { }));
-      }
-      HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet,
-          volumeSet, handlers, context, metrics, null);
-      StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
-          .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
-      dispatcher.setClusterId(UUID.randomUUID().toString());
-
-      server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null);
-      client = new XceiverClientGrpc(pipeline, conf);
+      final Pipeline pipeline =
+          MockPipeline.createSingleNodePipeline();
+      initConf.accept(pipeline);
 
+      DatanodeDetails dn = pipeline.getFirstNode();
+      volumeSet = createVolumeSet(dn, TEST_DIR + dn.getUuidString());
+      server = createServer.apply(dn, volumeSet);
       server.start();
+      initServer.accept(dn, pipeline);
+
+      client = createClient.apply(pipeline);
       client.connect();
 
       // Write Chunk
       BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
-      ContainerTestHelper.getWriteChunkRequest(
-          pipeline, blockID, 1024);
-      ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+      final ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
           ContainerTestHelper.getWriteChunkRequest(
               pipeline, blockID, 1024);
-      ContainerCommandResponseProto response =
-              client.sendCommand(writeChunkRequest);
+      ContainerCommandResponseProto response = 
client.sendCommand(writeChunkRequest);
       assertEquals(ContainerProtos.Result.SUCCESS,
           response.getResult());
 
       //Read Chunk
-      ContainerProtos.ContainerCommandRequestProto readChunkRequest =
+      final ContainerProtos.ContainerCommandRequestProto readChunkRequest =
           ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
               .getWriteChunk());
       response = client.sendCommand(readChunkRequest);
@@ -147,11 +218,10 @@ public class TestContainerMetrics {
       assertCounter("bytesWriteChunk", 1024L, containerMetrics);
       assertCounter("bytesReadChunk", 1024L, containerMetrics);
 
-      String sec = interval + "s";
-      Thread.sleep((interval + 1) * 1000);
+      String sec = DFS_METRICS_PERCENTILES_INTERVALS + "s";
+      Thread.sleep((DFS_METRICS_PERCENTILES_INTERVALS + 1) * 1000);
       assertQuantileGauges("WriteChunkNanos" + sec, containerMetrics);
 
-      // Check VolumeIOStats metrics
       List<HddsVolume> volumes =
           StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
       HddsVolume hddsVolume = volumes.get(0);
@@ -161,19 +231,30 @@ public class TestContainerMetrics {
       assertCounter("ReadOpCount", 1L, volumeIOMetrics);
       assertCounter("WriteBytes", 1024L, volumeIOMetrics);
       assertCounter("WriteOpCount", 1L, volumeIOMetrics);
-
     } finally {
+      ContainerMetrics.remove();
+      if (volumeSet != null) {
+        volumeSet.shutdown();
+      }
       if (client != null) {
         client.close();
       }
       if (server != null) {
         server.stop();
       }
-      // clean up volume dir
-      File file = new File(path);
-      if (file.exists()) {
-        FileUtil.fullyDelete(file);
-      }
     }
   }
+
+  private XceiverServerSpi newXceiverServerRatis(DatanodeDetails dn, 
MutableVolumeSet volumeSet)
+      throws IOException {
+    CONF.setInt(OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_PORT,
+        dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
+    final String dir = TEST_DIR + dn.getUuid();
+    CONF.set(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+    final ContainerDispatcher dispatcher = createDispatcher(dn,
+        volumeSet);
+    return XceiverServerRatis.newXceiverServerRatis(dn, CONF, dispatcher,
+        new ContainerController(new ContainerSet(1000), Maps.newHashMap()),
+        null, null);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to