This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cd6d0d475 Avoid adding a closed client to the pool (#10337)
2cd6d0d475 is described below

commit 2cd6d0d475a782b23bf53077703873d1c593f2c4
Author: Yufei Gu <[email protected]>
AuthorDate: Wed May 15 14:11:53 2024 -0700

    Avoid adding a closed client to the pool (#10337)
---
 .../java/org/apache/iceberg/ClientPoolImpl.java    | 46 +++++++++++-----------
 .../org/apache/iceberg/TestClientPoolImpl.java     | 15 +++++--
 2 files changed, 34 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java 
b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index 4c44695448..c3534fa22a 100644
--- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 import java.io.Closeable;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,37 +71,29 @@ public abstract class ClientPoolImpl<C, E extends Exception>
     try {
       return action.run(client);
     } catch (Exception exc) {
-      if (!retry || !isConnectionException(exc)) {
-        throw exc;
+      if (retry && isConnectionException(exc)) {
+        int retryAttempts = 0;
+        while (retryAttempts < maxRetries) {
+          try {
+            client = reconnect(client);
+            return action.run(client);
+          } catch (Exception e) {
+            if (isConnectionException(e)) {
+              retryAttempts++;
+              Thread.sleep(connectionRetryWaitPeriodMs);
+            } else {
+              throw reconnectExc.cast(exc);
+            }
+          }
+        }
       }
 
-      return retryAction(action, exc, client);
-
+      throw exc;
     } finally {
       release(client);
     }
   }
 
-  private <R> R retryAction(Action<R, C, E> action, Exception originalFailure, 
C client)
-      throws E, InterruptedException {
-    int retryAttempts = 0;
-    while (retryAttempts < maxRetries) {
-      try {
-        C reconnectedClient = reconnect(client);
-        return action.run(reconnectedClient);
-      } catch (Exception exc) {
-        if (isConnectionException(exc)) {
-          retryAttempts++;
-          Thread.sleep(connectionRetryWaitPeriodMs);
-        } else {
-          throw reconnectExc.cast(originalFailure);
-        }
-      }
-    }
-
-    throw reconnectExc.cast(originalFailure);
-  }
-
   protected abstract C newClient();
 
   protected abstract C reconnect(C client);
@@ -169,6 +162,11 @@ public abstract class ClientPoolImpl<C, E extends 
Exception>
     }
   }
 
+  @VisibleForTesting
+  Deque<C> clients() {
+    return clients;
+  }
+
   public int poolSize() {
     return poolSize;
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java 
b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
index 8d62afa176..3a6666bac3 100644
--- a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
+++ b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
@@ -31,11 +31,16 @@ public class TestClientPoolImpl {
     int succeedAfterAttempts = 3;
     try (MockClientPoolImpl mockClientPool =
         new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) 
{
+      // initial the client pool with a client, so that we can verify the 
client is replaced
+      MockClient firstClient = mockClientPool.newClient();
+      mockClientPool.clients().add(firstClient);
+
       int actions = mockClientPool.run(client -> 
client.succeedAfter(succeedAfterAttempts));
       assertThat(actions)
           .as("There should be exactly one successful action invocation")
           .isEqualTo(1);
       
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts
 - 1);
+      
assertThat(mockClientPool.clients().peekFirst().equals(firstClient)).isFalse();
     }
   }
 
@@ -78,11 +83,15 @@ public class TestClientPoolImpl {
 
   static class MockClient {
     boolean closed = false;
-
     int actions = 0;
-
     int retryableFailures = 0;
 
+    MockClient() {}
+
+    MockClient(int retryableFailures) {
+      this.retryableFailures = retryableFailures;
+    }
+
     public void close() {
       closed = true;
     }
@@ -126,7 +135,7 @@ public class TestClientPoolImpl {
     @Override
     protected MockClient reconnect(MockClient client) {
       reconnectionAttempts++;
-      return client;
+      return new MockClient(reconnectionAttempts);
     }
 
     @Override

Reply via email to