This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2cd6d0d475 Avoid adding a closed client to the pool (#10337)
2cd6d0d475 is described below
commit 2cd6d0d475a782b23bf53077703873d1c593f2c4
Author: Yufei Gu <[email protected]>
AuthorDate: Wed May 15 14:11:53 2024 -0700
Avoid adding a closed client to the pool (#10337)
---
.../java/org/apache/iceberg/ClientPoolImpl.java | 46 +++++++++++-----------
.../org/apache/iceberg/TestClientPoolImpl.java | 15 +++++--
2 files changed, 34 insertions(+), 27 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index 4c44695448..c3534fa22a 100644
--- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Deque;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,37 +71,29 @@ public abstract class ClientPoolImpl<C, E extends Exception>
try {
return action.run(client);
} catch (Exception exc) {
- if (!retry || !isConnectionException(exc)) {
- throw exc;
+ if (retry && isConnectionException(exc)) {
+ int retryAttempts = 0;
+ while (retryAttempts < maxRetries) {
+ try {
+ client = reconnect(client);
+ return action.run(client);
+ } catch (Exception e) {
+ if (isConnectionException(e)) {
+ retryAttempts++;
+ Thread.sleep(connectionRetryWaitPeriodMs);
+ } else {
+ throw reconnectExc.cast(exc);
+ }
+ }
+ }
}
- return retryAction(action, exc, client);
-
+ throw exc;
} finally {
release(client);
}
}
- private <R> R retryAction(Action<R, C, E> action, Exception originalFailure,
C client)
- throws E, InterruptedException {
- int retryAttempts = 0;
- while (retryAttempts < maxRetries) {
- try {
- C reconnectedClient = reconnect(client);
- return action.run(reconnectedClient);
- } catch (Exception exc) {
- if (isConnectionException(exc)) {
- retryAttempts++;
- Thread.sleep(connectionRetryWaitPeriodMs);
- } else {
- throw reconnectExc.cast(originalFailure);
- }
- }
- }
-
- throw reconnectExc.cast(originalFailure);
- }
-
protected abstract C newClient();
protected abstract C reconnect(C client);
@@ -169,6 +162,11 @@ public abstract class ClientPoolImpl<C, E extends
Exception>
}
}
+ @VisibleForTesting
+ Deque<C> clients() {
+ return clients;
+ }
+
public int poolSize() {
return poolSize;
}
diff --git a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
index 8d62afa176..3a6666bac3 100644
--- a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
+++ b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
@@ -31,11 +31,16 @@ public class TestClientPoolImpl {
int succeedAfterAttempts = 3;
try (MockClientPoolImpl mockClientPool =
new MockClientPoolImpl(2, RetryableException.class, true, maxRetries))
{
+ // initial the client pool with a client, so that we can verify the
client is replaced
+ MockClient firstClient = mockClientPool.newClient();
+ mockClientPool.clients().add(firstClient);
+
int actions = mockClientPool.run(client ->
client.succeedAfter(succeedAfterAttempts));
assertThat(actions)
.as("There should be exactly one successful action invocation")
.isEqualTo(1);
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts
- 1);
+
assertThat(mockClientPool.clients().peekFirst().equals(firstClient)).isFalse();
}
}
@@ -78,11 +83,15 @@ public class TestClientPoolImpl {
static class MockClient {
boolean closed = false;
-
int actions = 0;
-
int retryableFailures = 0;
+ MockClient() {}
+
+ MockClient(int retryableFailures) {
+ this.retryableFailures = retryableFailures;
+ }
+
public void close() {
closed = true;
}
@@ -126,7 +135,7 @@ public class TestClientPoolImpl {
@Override
protected MockClient reconnect(MockClient client) {
reconnectionAttempts++;
- return client;
+ return new MockClient(reconnectionAttempts);
}
@Override