This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 138cbe3157 HDDS-11998. BlockDataStreamOutput should 
decrPendingContainerOpsMetrics (#7636)
138cbe3157 is described below

commit 138cbe31570da5876a682b54a5322e99cb9e4dc0
Author: Ivan Andika <[email protected]>
AuthorDate: Sat Jan 4 19:28:20 2025 +0800

    HDDS-11998. BlockDataStreamOutput should decrPendingContainerOpsMetrics 
(#7636)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    |  5 +-
 .../hadoop/hdds/scm/storage/StreamBuffer.java      |  4 +-
 .../client/rpc/TestBlockDataStreamOutput.java      | 77 ++++++++++++++--------
 3 files changed, 56 insertions(+), 30 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 48c77f2c86..8c2883a437 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -253,7 +253,7 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
     }
     while (len > 0) {
       allocateNewBufferIfNeeded();
-      int writeLen = Math.min(len, currentBuffer.length());
+      int writeLen = Math.min(len, currentBuffer.remaining());
       final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
       currentBuffer.put(buf);
       writeChunkIfNeeded();
@@ -265,7 +265,7 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
   }
 
   private void writeChunkIfNeeded() throws IOException {
-    if (currentBuffer.length() == 0) {
+    if (currentBuffer.remaining() == 0) {
       writeChunk(currentBuffer);
       currentBuffer = null;
     }
@@ -672,6 +672,7 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
             out.writeAsync(buf, StandardWriteOption.SYNC) :
             out.writeAsync(buf))
             .whenCompleteAsync((r, e) -> {
+              
metrics.decrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
               if (e != null || !r.isSuccess()) {
                 if (e == null) {
                   e = new IOException("result is not success");
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
index b889aa35e2..5bf6dcee82 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
@@ -39,8 +39,8 @@ public class StreamBuffer {
     return buffer.duplicate();
   }
 
-  public int length() {
-    return buffer.limit() - buffer.position();
+  public int remaining() {
+    return buffer.remaining();
   }
 
   public int position() {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 5bcf708405..46c3c71627 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
@@ -47,13 +46,14 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,13 +77,6 @@ public class TestBlockDataStreamOutput {
   private static String keyString;
   private static final DatanodeVersion DN_OLD_VERSION = 
DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE;
 
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true
-   *
-   * @throws IOException
-   */
   @BeforeAll
   public static void init() throws Exception {
     chunkSize = 100;
@@ -120,7 +113,7 @@ public class TestBlockDataStreamOutput {
     client = OzoneClientFactory.getRpcClient(conf);
     objectStore = client.getObjectStore();
     keyString = UUID.randomUUID().toString();
-    volumeName = "testblockoutputstream";
+    volumeName = "testblockdatastreamoutput";
     bucketName = volumeName;
     objectStore.createVolume(volumeName);
     objectStore.getVolume(volumeName).createBucket(bucketName);
@@ -130,9 +123,6 @@ public class TestBlockDataStreamOutput {
     return UUID.randomUUID().toString();
   }
 
-  /**
-   * Shutdown MiniDFSCluster.
-   */
   @AfterAll
   public static void shutdown() {
     IOUtils.closeQuietly(client);
@@ -166,6 +156,11 @@ public class TestBlockDataStreamOutput {
   }
 
   static void testWrite(int dataLength) throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
+    long pendingPutBlockCount = 
metrics.getPendingContainerOpCountMetrics(PutBlock);
+
     String keyName = getKeyName();
     OzoneDataStreamOutput key = createKey(
         keyName, ReplicationType.RATIS, dataLength);
@@ -174,9 +169,19 @@ public class TestBlockDataStreamOutput {
     // now close the stream, It will update the key length.
     key.close();
     validateData(keyName, data);
+
+    assertEquals(pendingPutBlockCount,
+        metrics.getPendingContainerOpCountMetrics(PutBlock));
+    assertEquals(pendingWriteChunkCount,
+        metrics.getPendingContainerOpCountMetrics(WriteChunk));
   }
 
   private void testWriteWithFailure(int dataLength) throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
+    long pendingPutBlockCount = 
metrics.getPendingContainerOpCountMetrics(PutBlock);
+
     String keyName = getKeyName();
     OzoneDataStreamOutput key = createKey(
         keyName, ReplicationType.RATIS, dataLength);
@@ -195,17 +200,24 @@ public class TestBlockDataStreamOutput {
     key.close();
     String dataString = new String(data, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+
+    assertEquals(pendingPutBlockCount,
+        metrics.getPendingContainerOpCountMetrics(PutBlock));
+    assertEquals(pendingWriteChunkCount,
+        metrics.getPendingContainerOpCountMetrics(WriteChunk));
   }
 
   @Test
   public void testPutBlockAtBoundary() throws Exception {
-    int dataLength = 500;
+    int dataLength = maxFlushSize + 100;
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock);
+    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
+    long pendingPutBlockCount = 
metrics.getPendingContainerOpCountMetrics(PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+
     String keyName = getKeyName();
     OzoneDataStreamOutput key = createKey(
         keyName, ReplicationType.RATIS, 0);
@@ -213,14 +225,25 @@ public class TestBlockDataStreamOutput {
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
     key.write(ByteBuffer.wrap(data));
-    
assertThat(metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock))
+    assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
         .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+    assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+        .isLessThanOrEqualTo(pendingWriteChunkCount + 5);
     key.close();
     // Since data length is 500 , first putBlock will be at 400(flush boundary)
     // and the other at 500
-    assertEquals(
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock),
-        putBlockCount + 2);
+    assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(PutBlock));
+    // Each chunk is 100 so there will be 500 / 100 = 5 chunks.
+    assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(WriteChunk));
+    assertEquals(totalOpCount + 7,
+        metrics.getTotalOpCount());
+    assertEquals(pendingPutBlockCount,
+        metrics.getPendingContainerOpCountMetrics(PutBlock));
+    assertEquals(pendingWriteChunkCount,
+        metrics.getPendingContainerOpCountMetrics(WriteChunk));
+
     validateData(keyName, data);
   }
 
@@ -242,20 +265,22 @@ public class TestBlockDataStreamOutput {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
     OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
+    long pendingWriteChunkCount = 
metrics.getPendingContainerOpCountMetrics(WriteChunk);
     byte[] data =
         ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2)
             .getBytes(UTF_8);
     key.write(ByteBuffer.wrap(data));
     // minPacketSize= 100, so first write of 50 wont trigger a writeChunk
     assertEquals(writeChunkCount,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+        metrics.getContainerOpCountMetrics(WriteChunk));
     key.write(ByteBuffer.wrap(data));
     assertEquals(writeChunkCount + 1,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+        metrics.getContainerOpCountMetrics(WriteChunk));
     // now close the stream, It will update the key length.
     key.close();
+    assertEquals(pendingWriteChunkCount,
+        metrics.getPendingContainerOpCountMetrics(WriteChunk));
     String dataString = new String(data, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to