This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9d4d358654 HDDS-7778. Add metrics for push replication (#4217)
9d4d358654 is described below
commit 9d4d358654b3b0e9f771f35539cd229c79454fb1
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Jan 30 21:55:02 2023 +0100
HDDS-7778. Add metrics for push replication (#4217)
---
.../java/org/apache/hadoop/hdds/utils/IOUtils.java | 17 +++++++++++++++++
.../common/statemachine/DatanodeStateMachine.java | 16 +++++++---------
.../container/replication/MeasuredReplicator.java | 21 ++++++++++++++++-----
.../ozone/container/replication/PushReplicator.java | 12 +++++++++---
.../replication/TestMeasuredReplicator.java | 2 +-
5 files changed, 50 insertions(+), 18 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java
index 9317675747..a80fb9c952 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/IOUtils.java
@@ -51,4 +51,21 @@ public final class IOUtils {
}
}
+ /**
+ * Close each argument, catching exceptions and logging them as error.
+ */
+ public static void close(Logger logger, AutoCloseable... closeables) {
+ for (AutoCloseable c : closeables) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Exception e) {
+ if (logger != null) {
+ logger.error("Exception in closing {}", c, e);
+ }
+ }
+ }
+ }
+ }
+
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index f9e4951eee..69b22885e3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -40,6 +40,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
@@ -119,7 +120,8 @@ public class DatanodeStateMachine implements Closeable {
* constructor in a non-thread-safe way - see HDDS-3116.
*/
private final ReadWriteLock constructionLock = new ReentrantReadWriteLock();
- private final MeasuredReplicator replicatorMetrics;
+ private final MeasuredReplicator pullReplicatorWithMetrics;
+ private final MeasuredReplicator pushReplicatorWithMetrics;
private final ReplicationSupervisorMetrics replicationSupervisorMetrics;
private final ECReconstructionMetrics ecReconstructionMetrics;
// This is an instance variable as mockito needs to access it in a test
@@ -185,12 +187,12 @@ public class DatanodeStateMachine implements Closeable {
importer,
new SimpleContainerDownloader(conf, dnCertClient));
ContainerReplicator pushReplicator = new PushReplicator(
- // TODO compression, metrics
new OnDemandContainerReplicationSource(container.getController()),
new GrpcContainerUploader(conf, dnCertClient)
);
- replicatorMetrics = new MeasuredReplicator(pullReplicator);
+ pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull");
+ pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push");
ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
@@ -218,7 +220,7 @@ public class DatanodeStateMachine implements Closeable {
conf, dnConf.getBlockDeleteThreads(),
dnConf.getBlockDeleteQueueLimit()))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor,
- replicatorMetrics, pushReplicator))
+ pullReplicatorWithMetrics, pushReplicatorWithMetrics))
.addHandler(reconstructECContainersCommandHandler)
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads(), clock))
@@ -590,11 +592,7 @@ public class DatanodeStateMachine implements Closeable {
*/
public synchronized void stopDaemon() {
try {
- try {
- replicatorMetrics.close();
- } catch (Exception e) {
- LOG.error("Couldn't stop replicator metrics", e);
- }
+ IOUtils.close(LOG, pushReplicatorWithMetrics, pullReplicatorWithMetrics);
supervisor.stop();
context.setShutdownGracefully();
context.setState(DatanodeStates.SHUTDOWN);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
index 7a791f61c2..fa3763d880 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/MeasuredReplicator.java
@@ -36,9 +36,10 @@ import org.apache.hadoop.util.Time;
@Metrics(about = "Closed container replication metrics", context = "dfs")
public class MeasuredReplicator implements ContainerReplicator, AutoCloseable {
- private static final String NAME = ContainerReplicator.class.toString();
+ private static final String NAME = ContainerReplicator.class.getSimpleName();
private final ContainerReplicator delegate;
+ private final String name;
@Metric(about = "Number of successful replication tasks")
private MutableCounterLong success;
@@ -61,10 +62,15 @@ public class MeasuredReplicator implements
ContainerReplicator, AutoCloseable {
@Metric(about = "Bytes transferred for successful replication tasks")
private MutableGaugeLong transferredBytes;
- public MeasuredReplicator(ContainerReplicator delegate) {
+ public MeasuredReplicator(ContainerReplicator delegate, String name) {
this.delegate = delegate;
- DefaultMetricsSystem.instance()
- .register(NAME, "Closed container replication", this);
+ this.name = name;
+ DefaultMetricsSystem.instance().register(metricsName(),
+ "Closed container " + name + " replication metrics", this);
+ }
+
+ private String metricsName() {
+ return NAME + "/" + name;
}
@Override
@@ -89,7 +95,7 @@ public class MeasuredReplicator implements
ContainerReplicator, AutoCloseable {
@Override
public void close() throws Exception {
- DefaultMetricsSystem.instance().unregisterSource(NAME);
+ DefaultMetricsSystem.instance().unregisterSource(metricsName());
}
@VisibleForTesting
@@ -127,4 +133,9 @@ public class MeasuredReplicator implements
ContainerReplicator, AutoCloseable {
return failureBytes;
}
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" + name + "}@"
+ + Integer.toHexString(hashCode());
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
index 27bf856adf..9a0f6f5a44 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.ozone.container.replication;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.utils.IOUtils;
import
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
+
+import org.apache.commons.io.output.CountingOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
import static
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
@@ -53,14 +54,19 @@ public class PushReplicator implements ContainerReplicator {
source.prepare(containerID);
- OutputStream output = null;
+ CountingOutputStream output = null;
try {
- output = uploader.startUpload(containerID, target, fut);
+ output = new CountingOutputStream(
+ uploader.startUpload(containerID, target, fut));
source.copyData(containerID, output, NO_COMPRESSION.name());
fut.get();
+ task.setTransferredBytes(output.getByteCount());
task.setStatus(Status.DONE);
} catch (Exception e) {
LOG.warn("Container {} replication was unsuccessful.", containerID, e);
+ if (output != null) {
+ task.setTransferredBytes(output.getByteCount());
+ }
task.setStatus(Status.FAILED);
} finally {
// output may have already been closed, ignore such errors
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
index 5117130fc9..3ebfa448fd 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
@@ -54,7 +54,7 @@ public class TestMeasuredReplicator {
e.printStackTrace();
}
};
- measuredReplicator = new MeasuredReplicator(replicator);
+ measuredReplicator = new MeasuredReplicator(replicator, "test");
}
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]