This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba7ccd15ac HDDS-15522. Make RpcClient close idempotent (#10478)
2ba7ccd15ac is described below

commit 2ba7ccd15ac7a3ebe6fa98674ecac32bdabbf1f3
Author: Luis Pigueiras <[email protected]>
AuthorDate: Sat Jun 13 21:30:51 2026 +0200

    HDDS-15522. Make RpcClient close idempotent (#10478)
    
    Co-authored-by: Peter Lee <[email protected]>
---
 .../hadoop/hdds/scm/ContainerClientMetrics.java    | 28 +++++++++++++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 29 ++++++++-----
 .../hadoop/ozone/client/rpc/TestRpcClient.java     | 49 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
index 64cfb32ddcc..710a2da5ec8 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ContainerClientMetrics.java
@@ -20,6 +20,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hdds.protocol.DatanodeID;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -79,6 +80,29 @@ public final class ContainerClientMetrics {
   private final Map<DatanodeID, MutableCounterLong> writeChunksCallsByLeaders;
   private final MetricsRegistry registry;
 
+  /**
+   * Handle for one ContainerClientMetrics acquisition.
+   */
+  public static final class Handle implements AutoCloseable {
+    private final ContainerClientMetrics metrics;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private Handle(ContainerClientMetrics metrics) {
+      this.metrics = metrics;
+    }
+
+    public ContainerClientMetrics metrics() {
+      return metrics;
+    }
+
+    @Override
+    public void close() {
+      if (closed.compareAndSet(false, true)) {
+        release();
+      }
+    }
+  }
+
   public static synchronized ContainerClientMetrics acquire() {
     if (instance == null) {
       instanceCount++;
@@ -90,6 +114,10 @@ public static synchronized ContainerClientMetrics acquire() 
{
     return instance;
   }
 
+  public static Handle acquireHandle() {
+    return new Handle(acquire());
+  }
+
   public static synchronized void release() {
     if (instance == null) {
       throw new IllegalStateException("This metrics class is not used.");
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index d1f302a5cba..f1611a117dd 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -220,6 +220,7 @@ public class RpcClient implements ClientProtocol {
   private final BlockInputStreamFactory blockInputStreamFactory;
   private final OzoneManagerVersion omVersion;
   private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
+  private final ContainerClientMetrics.Handle clientMetricsHandle;
   private final ContainerClientMetrics clientMetrics;
   private final MemoizedSupplier<ExecutorService> writeExecutor;
   private volatile OzoneFsServerDefaults serverDefaults;
@@ -328,7 +329,8 @@ public void onRemoval(
     this.byteBufferPool = new BoundedElasticByteBufferPool(maxPoolSize);
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
         .getInstance(byteBufferPool, ecReconstructExecutor);
-    this.clientMetrics = ContainerClientMetrics.acquire();
+    this.clientMetricsHandle = ContainerClientMetrics.acquireHandle();
+    this.clientMetrics = clientMetricsHandle.metrics();
 
     this.serverDefaultsValidityPeriod = conf.getTimeDuration(
         OZONE_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS,
@@ -1948,16 +1950,23 @@ private OmKeyInfo getKeyInfo(OmKeyArgs keyArgs) throws 
IOException {
 
   @Override
   public void close() throws IOException {
-    if (ecReconstructExecutor.isInitialized()) {
-      ecReconstructExecutor.get().shutdownNow();
+    IOUtils.cleanupWithLogger(LOG,
+        () -> shutdownExecutor(ecReconstructExecutor),
+        () -> shutdownExecutor(writeExecutor),
+        ozoneManagerClient,
+        xceiverClientManager,
+        () -> {
+          keyProviderCache.invalidateAll();
+          keyProviderCache.cleanUp();
+        },
+        clientMetricsHandle);
+  }
+
+  private static void shutdownExecutor(
+      MemoizedSupplier<ExecutorService> executor) {
+    if (executor.isInitialized()) {
+      executor.get().shutdownNow();
     }
-    if (writeExecutor.isInitialized()) {
-      writeExecutor.get().shutdownNow();
-    }
-    IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
-    keyProviderCache.invalidateAll();
-    keyProviderCache.cleanUp();
-    ContainerClientMetrics.release();
   }
 
   @Deprecated
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
index 4e4efef51e1..999b892ff7b 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/rpc/TestRpcClient.java
@@ -18,17 +18,29 @@
 package org.apache.hadoop.ozone.client.rpc;
 
 import static org.apache.hadoop.ozone.client.rpc.RpcClient.validateOmVersion;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.client.MockOmTransport;
+import org.apache.hadoop.ozone.client.MockXceiverClientFactory;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.event.Level;
 
 /**
  * Run RPC Client tests.
@@ -215,4 +227,41 @@ public void 
testFutureVersionShouldNotBeAnExpectedVersion() {
         IllegalArgumentException.class,
         () -> validateOmVersion(OzoneManagerVersion.FUTURE_VERSION, null));
   }
+
+  @Test
+  public void testCloseTwiceDoesNotWarn() throws IOException {
+    RpcClient rpcClient = createRpcClient();
+    GenericTestUtils.setLogLevel(RpcClient.class, Level.DEBUG);
+    LogCapturer logs = LogCapturer.captureLogs(RpcClient.class);
+    logs.clearOutput();
+
+    try {
+      assertDoesNotThrow(() -> {
+        rpcClient.close();
+        rpcClient.close();
+      });
+
+      assertThat(logs.getOutput())
+          .doesNotContain("WARN")
+          .doesNotContain("This metrics class is not used.");
+    } finally {
+      logs.stopCapturing();
+    }
+  }
+
+  private static RpcClient createRpcClient() throws IOException {
+    OzoneConfiguration config = new OzoneConfiguration();
+    return new RpcClient(config, null) {
+      @Override
+      protected OmTransport createOmTransport(String omServiceId) {
+        return new MockOmTransport();
+      }
+
+      @Override
+      protected XceiverClientFactory createXceiverClientFactory(
+          ServiceInfoEx serviceInfo) {
+        return new MockXceiverClientFactory();
+      }
+    };
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to