This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b21020aed Core: Retry connections in JDBC catalog with user 
configured error code list (#10140)
2b21020aed is described below

commit 2b21020aedb63c26295005d150c05f0a5a5f0eb2
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri May 10 11:18:46 2024 -0600

    Core: Retry connections in JDBC catalog with user configured error code 
list (#10140)
---
 .../java/org/apache/iceberg/ClientPoolImpl.java    |  47 +++++--
 .../java/org/apache/iceberg/jdbc/JdbcCatalog.java  |   6 +
 .../org/apache/iceberg/jdbc/JdbcClientPool.java    |  44 ++++++-
 .../java/org/apache/iceberg/jdbc/JdbcUtil.java     |   2 +
 .../org/apache/iceberg/TestClientPoolImpl.java     | 141 +++++++++++++++++++++
 .../org/apache/iceberg/jdbc/TestJdbcCatalog.java   |  55 ++++++++
 6 files changed, 282 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java 
b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index e8ab57fed3..4c44695448 100644
--- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -34,16 +34,29 @@ public abstract class ClientPoolImpl<C, E extends Exception>
   private final Class<? extends E> reconnectExc;
   private final Object signal = new Object();
   private final boolean retryByDefault;
+  private final int maxRetries;
+
   private volatile int currentSize;
   private boolean closed;
 
+  private int connectionRetryWaitPeriodMs = 1000;
+
   public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc, boolean 
retryByDefault) {
+    this(poolSize, reconnectExc, retryByDefault, 1);
+  }
+
+  public ClientPoolImpl(
+      int poolSize,
+      Class<? extends E> reconnectExc,
+      boolean retryByDefault,
+      int maxConnectionRetries) {
     this.poolSize = poolSize;
     this.reconnectExc = reconnectExc;
     this.clients = new ArrayDeque<>(poolSize);
     this.currentSize = 0;
     this.closed = false;
     this.retryByDefault = retryByDefault;
+    this.maxRetries = maxConnectionRetries;
   }
 
   @Override
@@ -56,26 +69,38 @@ public abstract class ClientPoolImpl<C, E extends Exception>
     C client = get();
     try {
       return action.run(client);
-
     } catch (Exception exc) {
-      if (retry && isConnectionException(exc)) {
-        try {
-          client = reconnect(client);
-        } catch (Exception ignored) {
-          // if reconnection throws any exception, rethrow the original failure
-          throw reconnectExc.cast(exc);
-        }
-
-        return action.run(client);
+      if (!retry || !isConnectionException(exc)) {
+        throw exc;
       }
 
-      throw exc;
+      return retryAction(action, exc, client);
 
     } finally {
       release(client);
     }
   }
 
+  private <R> R retryAction(Action<R, C, E> action, Exception originalFailure, 
C client)
+      throws E, InterruptedException {
+    int retryAttempts = 0;
+    while (retryAttempts < maxRetries) {
+      try {
+        C reconnectedClient = reconnect(client);
+        return action.run(reconnectedClient);
+      } catch (Exception exc) {
+        if (isConnectionException(exc)) {
+          retryAttempts++;
+          Thread.sleep(connectionRetryWaitPeriodMs);
+        } else {
+          throw reconnectExc.cast(originalFailure);
+        }
+      }
+    }
+
+    throw reconnectExc.cast(originalFailure);
+  }
+
   protected abstract C newClient();
 
   protected abstract C reconnect(C client);
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java 
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
index 71590e7618..4e10ee96d1 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
@@ -55,6 +55,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -689,6 +690,11 @@ public class JdbcCatalog extends BaseMetastoreViewCatalog
     }
   }
 
+  @VisibleForTesting
+  JdbcClientPool connectionPool() {
+    return connections;
+  }
+
   private int execute(String sql, String... args) {
     return execute(err -> {}, sql, args);
   }
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java 
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
index 60e5eb49a4..487b8409b1 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
@@ -21,17 +21,40 @@ package org.apache.iceberg.jdbc;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.sql.SQLNonTransientConnectionException;
+import java.sql.SQLTransientException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.ClientPoolImpl;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
 public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
 
+  /**
+   * The following are common retryable SQLSTATEs error codes which are 
generic across vendors.
+   *
+   * <ul>
+   *   <li>08000: Generic Connection Exception
+   *   <li>08003: Connection does not exist
+   *   <li>08006: Connection failure
+   *   <li>08007: Transaction resolution unknown
+   *   <li>40001: Serialization failure due to deadlock
+   * </ul>
+   *
+   * See https://en.wikipedia.org/wiki/SQLSTATE for more details.
+   */
+  static final Set<String> COMMON_RETRYABLE_CONNECTION_SQL_STATES =
+      ImmutableSet.of("08000", "08003", "08006", "08007", "40001");
+
   private final String dbUrl;
   private final Map<String, String> properties;
 
+  private Set<String> retryableStatusCodes;
+
   public JdbcClientPool(String dbUrl, Map<String, String> props) {
     this(
         Integer.parseInt(
@@ -43,8 +66,18 @@ public class JdbcClientPool extends 
ClientPoolImpl<Connection, SQLException> {
   }
 
   public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) 
{
-    super(poolSize, SQLNonTransientConnectionException.class, true);
+    super(poolSize, SQLTransientException.class, true);
     properties = props;
+    retryableStatusCodes = Sets.newHashSet();
+    retryableStatusCodes.addAll(COMMON_RETRYABLE_CONNECTION_SQL_STATES);
+    String configuredRetryableStatuses = 
props.get(JdbcUtil.RETRYABLE_STATUS_CODES);
+    if (configuredRetryableStatuses != null) {
+      retryableStatusCodes.addAll(
+          Arrays.stream(configuredRetryableStatuses.split(","))
+              .map(status -> status.replaceAll("\\s+", ""))
+              .collect(Collectors.toSet()));
+    }
+
     this.dbUrl = dbUrl;
   }
 
@@ -72,4 +105,11 @@ public class JdbcClientPool extends 
ClientPoolImpl<Connection, SQLException> {
       throw new UncheckedSQLException(e, "Failed to close connection");
     }
   }
+
+  @Override
+  protected boolean isConnectionException(Exception e) {
+    return super.isConnectionException(e)
+        || (e instanceof SQLException
+            && retryableStatusCodes.contains(((SQLException) 
e).getSQLState()));
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java 
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
index 749c2d485f..c9bd2b78a6 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
@@ -43,6 +43,8 @@ final class JdbcUtil {
   static final String INIT_CATALOG_TABLES_PROPERTY =
       JdbcCatalog.PROPERTY_PREFIX + "init-catalog-tables";
 
+  static final String RETRYABLE_STATUS_CODES = "retryable_status_codes";
+
   enum SchemaVersion {
     V0,
     V1
diff --git a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java 
b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
new file mode 100644
index 0000000000..8d62afa176
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+public class TestClientPoolImpl {
+
+  @Test
+  public void testRetrySucceedsWithinMaxAttempts() throws Exception {
+    int maxRetries = 5;
+    int succeedAfterAttempts = 3;
+    try (MockClientPoolImpl mockClientPool =
+        new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) 
{
+      int actions = mockClientPool.run(client -> 
client.succeedAfter(succeedAfterAttempts));
+      assertThat(actions)
+          .as("There should be exactly one successful action invocation")
+          .isEqualTo(1);
+      
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts
 - 1);
+    }
+  }
+
+  @Test
+  public void testRetriesExhaustedAndSurfacesFailure() {
+    int maxRetries = 3;
+    int succeedAfterAttempts = 5;
+    try (MockClientPoolImpl mockClientPool =
+        new MockClientPoolImpl(2, RetryableException.class, true, maxRetries)) 
{
+      assertThatThrownBy(
+              () -> mockClientPool.run(client -> 
client.succeedAfter(succeedAfterAttempts)))
+          .isInstanceOf(RetryableException.class);
+      assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(maxRetries);
+    }
+  }
+
+  @Test
+  public void testNoRetryingNonRetryableException() {
+    try (MockClientPoolImpl mockClientPool =
+        new MockClientPoolImpl(2, RetryableException.class, true, 3)) {
+      assertThatThrownBy(() -> 
mockClientPool.run(MockClient::failWithNonRetryable, true))
+          .isInstanceOf(NonRetryableException.class);
+      assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(0);
+    }
+  }
+
+  @Test
+  public void testNoRetryingWhenDisabled() {
+    try (MockClientPoolImpl mockClientPool =
+        new MockClientPoolImpl(2, RetryableException.class, false, 3)) {
+      assertThatThrownBy(() -> mockClientPool.run(client -> 
client.succeedAfter(3)))
+          .isInstanceOf(RetryableException.class);
+      assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(0);
+    }
+  }
+
+  static class RetryableException extends RuntimeException {}
+
+  static class NonRetryableException extends RuntimeException {}
+
+  static class MockClient {
+    boolean closed = false;
+
+    int actions = 0;
+
+    int retryableFailures = 0;
+
+    public void close() {
+      closed = true;
+    }
+
+    public int successfulAction() {
+      actions++;
+      return actions;
+    }
+
+    int succeedAfter(int succeedAfterAttempts) {
+      if (retryableFailures == succeedAfterAttempts - 1) {
+        return successfulAction();
+      }
+
+      retryableFailures++;
+      throw new RetryableException();
+    }
+
+    int failWithNonRetryable() {
+      throw new NonRetryableException();
+    }
+  }
+
+  static class MockClientPoolImpl extends ClientPoolImpl<MockClient, 
Exception> {
+
+    private int reconnectionAttempts;
+
+    MockClientPoolImpl(
+        int poolSize,
+        Class<? extends Exception> reconnectExc,
+        boolean retryByDefault,
+        int numRetries) {
+      super(poolSize, reconnectExc, retryByDefault, numRetries);
+    }
+
+    @Override
+    protected MockClient newClient() {
+      return new MockClient();
+    }
+
+    @Override
+    protected MockClient reconnect(MockClient client) {
+      reconnectionAttempts++;
+      return client;
+    }
+
+    @Override
+    protected void close(MockClient client) {
+      client.close();
+    }
+
+    int reconnectionAttempts() {
+      return reconnectionAttempts;
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java 
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
index 985c84f0dc..90492b5109 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
@@ -34,6 +34,7 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -208,6 +209,60 @@ public class TestJdbcCatalog extends 
CatalogTests<JdbcCatalog> {
     assertThat(catalogTablesExist(jdbcUrl)).isTrue();
   }
 
+  @Test
+  public void testRetryingErrorCodesProperty() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
this.tableDir.toAbsolutePath().toString());
+    properties.put(CatalogProperties.URI, 
"jdbc:sqlite:file::memory:?icebergDB");
+    properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000,57P03,57P04");
+    JdbcCatalog jdbcCatalog = new JdbcCatalog();
+    jdbcCatalog.setConf(conf);
+    jdbcCatalog.initialize("test_catalog_with_retryable_status_codes", 
properties);
+    JdbcClientPool jdbcClientPool = jdbcCatalog.connectionPool();
+    List<SQLException> expectedRetryableExceptions =
+        Lists.newArrayList(
+            new SQLException("operator_intervention", "57000"),
+            new SQLException("cannot_connect_now", "57P03"),
+            new SQLException("database_dropped", "57P04"));
+    JdbcClientPool.COMMON_RETRYABLE_CONNECTION_SQL_STATES.forEach(
+        code -> expectedRetryableExceptions.add(new SQLException("some 
failure", code)));
+
+    expectedRetryableExceptions.forEach(
+        exception -> {
+          assertThat(jdbcClientPool.isConnectionException(exception))
+              .as(String.format("%s status should be retryable", 
exception.getSQLState()))
+              .isTrue();
+        });
+
+    // Test the same retryable status codes but with spaces in the 
configuration
+    properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000, 57P03, 57P04");
+    
jdbcCatalog.initialize("test_catalog_with_retryable_status_codes_with_spaces", 
properties);
+    JdbcClientPool updatedClientPool = jdbcCatalog.connectionPool();
+    expectedRetryableExceptions.forEach(
+        exception -> {
+          assertThat(updatedClientPool.isConnectionException(exception))
+              .as(String.format("%s status should be retryable", 
exception.getSQLState()))
+              .isTrue();
+        });
+  }
+
+  @Test
+  public void testSqlNonTransientExceptionNotRetryable() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
this.tableDir.toAbsolutePath().toString());
+    properties.put(CatalogProperties.URI, 
"jdbc:sqlite:file::memory:?icebergDB");
+    properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000,57P03,57P04");
+    JdbcCatalog jdbcCatalog = new JdbcCatalog();
+    jdbcCatalog.setConf(conf);
+    jdbcCatalog.initialize("test_catalog_with_retryable_status_codes", 
properties);
+    JdbcClientPool jdbcClientPool = jdbcCatalog.connectionPool();
+    Assertions.assertThat(
+            jdbcClientPool.isConnectionException(
+                new SQLNonTransientConnectionException("Failed to 
authenticate")))
+        .as("SQL Non Transient exception is not retryable")
+        .isFalse();
+  }
+
   @Test
   public void testInitSchemaV0() {
     Map<String, String> properties = Maps.newHashMap();

Reply via email to