This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2ba7ccd15ac HDDS-15522. Make RpcClient close idempotent (#10478)
2ba7ccd15ac is described below
commit 2ba7ccd15ac7a3ebe6fa98674ecac32bdabbf1f3
Author: Luis Pigueiras <[email protected]>
AuthorDate: Sat Jun 13 21:30:51 2026 +0200
HDDS-15522. Make RpcClient close idempotent (#10478)
Co-authored-by: Peter Lee <[email protected]>
---
.../hadoop/hdds/scm/ContainerClientMetrics.java | 28 +++++++++++++
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 29 ++++++++-----
.../hadoop/ozone/client/rpc/TestRpcClient.java | 49 ++++++++++++++++++++++
3 files changed, 96 insertions(+), 10 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
index 64cfb32ddcc..710a2da5ec8 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
@@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -79,6 +80,29 @@ public final class ContainerClientMetrics {
private final Map<DatanodeID, MutableCounterLong> writeChunksCallsByLeaders;
private final MetricsRegistry registry;
+ /**
+ * Handle for one ContainerClientMetrics acquisition.
+ */
+ public static final class Handle implements AutoCloseable {
+ private final ContainerClientMetrics metrics;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private Handle(ContainerClientMetrics metrics) {
+ this.metrics = metrics;
+ }
+
+ public ContainerClientMetrics metrics() {
+ return metrics;
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ release();
+ }
+ }
+ }
+
public static synchronized ContainerClientMetrics acquire() {
if (instance == null) {
instanceCount++;
@@ -90,6 +114,10 @@ public static synchronized ContainerClientMetrics acquire()
{
return instance;
}
+ public static Handle acquireHandle() {
+ return new Handle(acquire());
+ }
+
public static synchronized void release() {
if (instance == null) {
throw new IllegalStateException("This metrics class is not used.");
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index d1f302a5cba..f1611a117dd 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -220,6 +220,7 @@ public class RpcClient implements ClientProtocol {
private final BlockInputStreamFactory blockInputStreamFactory;
private final OzoneManagerVersion omVersion;
private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
+ private final ContainerClientMetrics.Handle clientMetricsHandle;
private final ContainerClientMetrics clientMetrics;
private final MemoizedSupplier<ExecutorService> writeExecutor;
private volatile OzoneFsServerDefaults serverDefaults;
@@ -328,7 +329,8 @@ public void onRemoval(
this.byteBufferPool = new BoundedElasticByteBufferPool(maxPoolSize);
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, ecReconstructExecutor);
- this.clientMetrics = ContainerClientMetrics.acquire();
+ this.clientMetricsHandle = ContainerClientMetrics.acquireHandle();
+ this.clientMetrics = clientMetricsHandle.metrics();
this.serverDefaultsValidityPeriod = conf.getTimeDuration(
OZONE_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS,
@@ -1948,16 +1950,23 @@ private OmKeyInfo getKeyInfo(OmKeyArgs keyArgs) throws
IOException {
@Override
public void close() throws IOException {
- if (ecReconstructExecutor.isInitialized()) {
- ecReconstructExecutor.get().shutdownNow();
+ IOUtils.cleanupWithLogger(LOG,
+ () -> shutdownExecutor(ecReconstructExecutor),
+ () -> shutdownExecutor(writeExecutor),
+ ozoneManagerClient,
+ xceiverClientManager,
+ () -> {
+ keyProviderCache.invalidateAll();
+ keyProviderCache.cleanUp();
+ },
+ clientMetricsHandle);
+ }
+
+ private static void shutdownExecutor(
+ MemoizedSupplier<ExecutorService> executor) {
+ if (executor.isInitialized()) {
+ executor.get().shutdownNow();
}
- if (writeExecutor.isInitialized()) {
- writeExecutor.get().shutdownNow();
- }
- IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
- keyProviderCache.invalidateAll();
- keyProviderCache.cleanUp();
- ContainerClientMetrics.release();
}
@Deprecated
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
index 4e4efef51e1..999b892ff7b 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
@@ -18,17 +18,29 @@
package org.apache.hadoop.ozone.client.rpc;
import static org.apache.hadoop.ozone.client.rpc.RpcClient.validateOmVersion;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.client.MockOmTransport;
+import org.apache.hadoop.ozone.client.MockXceiverClientFactory;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.event.Level;
/**
* Run RPC Client tests.
@@ -215,4 +227,41 @@ public void
testFutureVersionShouldNotBeAnExpectedVersion() {
IllegalArgumentException.class,
() -> validateOmVersion(OzoneManagerVersion.FUTURE_VERSION, null));
}
+
+ @Test
+ public void testCloseTwiceDoesNotWarn() throws IOException {
+ RpcClient rpcClient = createRpcClient();
+ GenericTestUtils.setLogLevel(RpcClient.class, Level.DEBUG);
+ LogCapturer logs = LogCapturer.captureLogs(RpcClient.class);
+ logs.clearOutput();
+
+ try {
+ assertDoesNotThrow(() -> {
+ rpcClient.close();
+ rpcClient.close();
+ });
+
+ assertThat(logs.getOutput())
+ .doesNotContain("WARN")
+ .doesNotContain("This metrics class is not used.");
+ } finally {
+ logs.stopCapturing();
+ }
+ }
+
+ private static RpcClient createRpcClient() throws IOException {
+ OzoneConfiguration config = new OzoneConfiguration();
+ return new RpcClient(config, null) {
+ @Override
+ protected OmTransport createOmTransport(String omServiceId) {
+ return new MockOmTransport();
+ }
+
+ @Override
+ protected XceiverClientFactory createXceiverClientFactory(
+ ServiceInfoEx serviceInfo) {
+ return new MockXceiverClientFactory();
+ }
+ };
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]