This is an automated email from the ASF dual-hosted git repository.

duong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f9db61895 HDDS-11200. Hsync client-side metrics (#7371)
6f9db61895 is described below

commit 6f9db61895bb6b1379c3e9efedec041e251cff71
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Nov 4 11:07:29 2024 -0800

    HDDS-11200. Hsync client-side metrics (#7371)
---
 .../hadoop/hdds/scm/ContainerClientMetrics.java    | 40 ++++++++++++++++++++++
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 17 +++++++--
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
index 19a5a9cad5..0333f4758c 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.ozone.OzoneConsts;
 
 import java.util.Map;
@@ -52,6 +53,21 @@ public final class ContainerClientMetrics {
   private MutableCounterLong totalWriteChunkCalls;
   @Metric
   private MutableCounterLong totalWriteChunkBytes;
+
+  @Metric
+  private MutableRate hsyncSynchronizedWorkNs;
+  @Metric
+  private MutableRate hsyncSendWriteChunkNs;
+  @Metric
+  private MutableRate hsyncWaitForFlushNs;
+  @Metric
+  private MutableRate hsyncWatchForCommitNs;
+  @Metric
+  private MutableCounterLong writeChunksDuringWrite;
+  @Metric
+  private MutableCounterLong flushesDuringWrite;
+
+
   private MutableQuantiles[] listBlockLatency;
   private MutableQuantiles[] getBlockLatency;
   private MutableQuantiles[] getCommittedBlockLengthLatency;
@@ -249,4 +265,28 @@ public final class ContainerClientMetrics {
   Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
     return writeChunksCallsByLeaders;
   }
+
+  public MutableRate getHsyncSynchronizedWorkNs() {
+    return hsyncSynchronizedWorkNs;
+  }
+
+  public MutableRate getHsyncSendWriteChunkNs() {
+    return hsyncSendWriteChunkNs;
+  }
+
+  public MutableRate getHsyncWaitForFlushNs() {
+    return hsyncWaitForFlushNs;
+  }
+
+  public MutableRate getHsyncWatchForCommitNs() {
+    return hsyncWatchForCommitNs;
+  }
+
+  public MutableCounterLong getWriteChunksDuringWrite() {
+    return writeChunksDuringWrite;
+  }
+
+  public MutableCounterLong getFlushesDuringWrite() {
+    return flushesDuringWrite;
+  }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 59795dd0f0..b3a1dc6089 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -62,7 +62,9 @@ import static 
org.apache.hadoop.hdds.DatanodeVersion.COMBINED_PUTBLOCK_WRITECHUN
 import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
 import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
 import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
 
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -360,6 +362,7 @@ public class BlockOutputStream extends OutputStream {
   private void writeChunkIfNeeded() throws IOException {
     if (currentBufferRemaining == 0) {
       LOG.debug("WriteChunk from write(), buffer = {}", currentBuffer);
+      clientMetrics.getWriteChunksDuringWrite().incr();
       writeChunk(currentBuffer);
       updateWriteChunkLength();
     }
@@ -404,6 +407,7 @@ public class BlockOutputStream extends OutputStream {
         updatePutBlockLength();
         CompletableFuture<PutBlockResult> putBlockFuture = 
executePutBlock(false, false);
         recordWatchForCommitAsync(putBlockFuture);
+        clientMetrics.getFlushesDuringWrite().incr();
       }
 
       if (bufferPool.isAtCapacity()) {
@@ -532,12 +536,16 @@ public class BlockOutputStream extends OutputStream {
     }
 
     LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
+    final long start = Time.monotonicNowNanos();
     return sendWatchForCommit(commitIndex)
         .thenAccept(this::checkReply)
         .exceptionally(e -> {
           throw new FlushRuntimeException(setIoException(e));
         })
-        .whenComplete((r, e) -> LOG.debug("Leaving watchForCommit commitIndex 
= {}", commitIndex));
+        .whenComplete((r, e) -> {
+          LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);
+          
clientMetrics.getHsyncWatchForCommitNs().add(Time.monotonicNowNanos() - start);
+        });
   }
 
   private void checkReply(XceiverClientReply reply) {
@@ -693,12 +701,15 @@ public class BlockOutputStream extends OutputStream {
       throws IOException, InterruptedException, ExecutionException {
     checkOpen();
     LOG.debug("Start handleFlushInternal close={}", close);
-    CompletableFuture<Void> toWaitFor = handleFlushInternalSynchronized(close);
+    CompletableFuture<Void> toWaitFor = 
captureLatencyNs(clientMetrics.getHsyncSynchronizedWorkNs(),
+        () -> handleFlushInternalSynchronized(close));
 
     if (toWaitFor != null) {
       LOG.debug("Waiting for flush");
       try {
+        long startWaiting = Time.monotonicNowNanos();
         toWaitFor.get();
+        clientMetrics.getHsyncWaitForFlushNs().add(Time.monotonicNowNanos() - 
startWaiting);
       } catch (ExecutionException ex) {
         if (ex.getCause() instanceof FlushRuntimeException) {
           throw ((FlushRuntimeException) ex.getCause()).cause;
@@ -727,6 +738,7 @@ public class BlockOutputStream extends OutputStream {
   }
 
   private synchronized CompletableFuture<Void> 
handleFlushInternalSynchronized(boolean close) throws IOException {
+    long start = Time.monotonicNowNanos();
     CompletableFuture<PutBlockResult> putBlockResultFuture = null;
     // flush the last chunk data residing on the currentBuffer
     if (totalWriteChunkLength < writtenDataLength) {
@@ -768,6 +780,7 @@ public class BlockOutputStream extends OutputStream {
     if (putBlockResultFuture != null) {
       recordWatchForCommitAsync(putBlockResultFuture);
     }
+    clientMetrics.getHsyncSendWriteChunkNs().add(Time.monotonicNowNanos() - 
start);
     return lastFlushFuture;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to