This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d4d358654 HDDS-7778. Add metrics for push replication (#4217)
9d4d358654 is described below

commit 9d4d358654b3b0e9f771f35539cd229c79454fb1
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Jan 30 21:55:02 2023 +0100

    HDDS-7778. Add metrics for push replication (#4217)
---
 .../java/org/apache/hadoop/hdds/utils/IOUtils.java  | 17 +++++++++++++++++
 .../common/statemachine/DatanodeStateMachine.java   | 16 +++++++---------
 .../container/replication/MeasuredReplicator.java   | 21 ++++++++++++++++-----
 .../ozone/container/replication/PushReplicator.java | 12 +++++++++---
 .../replication/TestMeasuredReplicator.java         |  2 +-
 5 files changed, 50 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java
index 9317675747..a80fb9c952 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java
@@ -51,4 +51,21 @@ public final class IOUtils {
     }
   }
 
+  /**
+   * Close each argument, catching exceptions and logging them as error.
+   */
+  public static void close(Logger logger, AutoCloseable... closeables) {
+    for (AutoCloseable c : closeables) {
+      if (c != null) {
+        try {
+          c.close();
+        } catch (Exception e) {
+          if (logger != null) {
+            logger.error("Exception in closing {}", c, e);
+          }
+        }
+      }
+    }
+  }
+
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index f9e4951eee..69b22885e3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -40,6 +40,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.ozone.HddsDatanodeStopService;
 import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
@@ -119,7 +120,8 @@ public class DatanodeStateMachine implements Closeable {
    * constructor in a non-thread-safe way - see HDDS-3116.
    */
   private final ReadWriteLock constructionLock = new ReentrantReadWriteLock();
-  private final MeasuredReplicator replicatorMetrics;
+  private final MeasuredReplicator pullReplicatorWithMetrics;
+  private final MeasuredReplicator pushReplicatorWithMetrics;
   private final ReplicationSupervisorMetrics replicationSupervisorMetrics;
   private final ECReconstructionMetrics ecReconstructionMetrics;
   // This is an instance variable as mockito needs to access it in a test
@@ -185,12 +187,12 @@ public class DatanodeStateMachine implements Closeable {
         importer,
         new SimpleContainerDownloader(conf, dnCertClient));
     ContainerReplicator pushReplicator = new PushReplicator(
-        // TODO compression, metrics
         new OnDemandContainerReplicationSource(container.getController()),
         new GrpcContainerUploader(conf, dnCertClient)
     );
 
-    replicatorMetrics = new MeasuredReplicator(pullReplicator);
+    pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull");
+    pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push");
 
     ReplicationConfig replicationConfig =
         conf.getObject(ReplicationConfig.class);
@@ -218,7 +220,7 @@ public class DatanodeStateMachine implements Closeable {
             conf, dnConf.getBlockDeleteThreads(),
             dnConf.getBlockDeleteQueueLimit()))
         .addHandler(new ReplicateContainerCommandHandler(conf, supervisor,
-            replicatorMetrics, pushReplicator))
+            pullReplicatorWithMetrics, pushReplicatorWithMetrics))
         .addHandler(reconstructECContainersCommandHandler)
         .addHandler(new DeleteContainerCommandHandler(
             dnConf.getContainerDeleteThreads(), clock))
@@ -590,11 +592,7 @@ public class DatanodeStateMachine implements Closeable {
    */
   public synchronized void stopDaemon() {
     try {
-      try {
-        replicatorMetrics.close();
-      } catch (Exception e) {
-        LOG.error("Couldn't stop replicator metrics", e);
-      }
+      IOUtils.close(LOG, pushReplicatorWithMetrics, pullReplicatorWithMetrics);
       supervisor.stop();
       context.setShutdownGracefully();
       context.setState(DatanodeStates.SHUTDOWN);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
index 7a791f61c2..fa3763d880 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
@@ -36,9 +36,10 @@ import org.apache.hadoop.util.Time;
 @Metrics(about = "Closed container replication metrics", context = "dfs")
 public class MeasuredReplicator implements ContainerReplicator, AutoCloseable {
 
-  private static final String NAME = ContainerReplicator.class.toString();
+  private static final String NAME = ContainerReplicator.class.getSimpleName();
 
   private final ContainerReplicator delegate;
+  private final String name;
 
   @Metric(about = "Number of successful replication tasks")
   private MutableCounterLong success;
@@ -61,10 +62,15 @@ public class MeasuredReplicator implements 
ContainerReplicator, AutoCloseable {
   @Metric(about = "Bytes transferred for successful replication tasks")
   private MutableGaugeLong transferredBytes;
 
-  public MeasuredReplicator(ContainerReplicator delegate) {
+  public MeasuredReplicator(ContainerReplicator delegate, String name) {
     this.delegate = delegate;
-    DefaultMetricsSystem.instance()
-        .register(NAME, "Closed container replication", this);
+    this.name = name;
+    DefaultMetricsSystem.instance().register(metricsName(),
+        "Closed container " + name + " replication metrics", this);
+  }
+
+  private String metricsName() {
+    return NAME + "/" + name;
   }
 
   @Override
@@ -89,7 +95,7 @@ public class MeasuredReplicator implements 
ContainerReplicator, AutoCloseable {
 
   @Override
   public void close() throws Exception {
-    DefaultMetricsSystem.instance().unregisterSource(NAME);
+    DefaultMetricsSystem.instance().unregisterSource(metricsName());
   }
 
   @VisibleForTesting
@@ -127,4 +133,9 @@ public class MeasuredReplicator implements 
ContainerReplicator, AutoCloseable {
     return failureBytes;
   }
 
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "{" + name + "}@"
+        + Integer.toHexString(hashCode());
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
index 27bf856adf..9a0f6f5a44 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.ozone.container.replication;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import 
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
+
+import org.apache.commons.io.output.CountingOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.OutputStream;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
@@ -53,14 +54,19 @@ public class PushReplicator implements ContainerReplicator {
 
     source.prepare(containerID);
 
-    OutputStream output = null;
+    CountingOutputStream output = null;
     try {
-      output = uploader.startUpload(containerID, target, fut);
+      output = new CountingOutputStream(
+          uploader.startUpload(containerID, target, fut));
       source.copyData(containerID, output, NO_COMPRESSION.name());
       fut.get();
+      task.setTransferredBytes(output.getByteCount());
       task.setStatus(Status.DONE);
     } catch (Exception e) {
       LOG.warn("Container {} replication was unsuccessful.", containerID, e);
+      if (output != null) {
+        task.setTransferredBytes(output.getByteCount());
+      }
       task.setStatus(Status.FAILED);
     } finally {
       // output may have already been closed, ignore such errors
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
index 5117130fc9..3ebfa448fd 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
@@ -54,7 +54,7 @@ public class TestMeasuredReplicator {
         e.printStackTrace();
       }
     };
-    measuredReplicator = new MeasuredReplicator(replicator);
+    measuredReplicator = new MeasuredReplicator(replicator, "test");
   }
 
   @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to