This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2b21020aed Core: Retry connections in JDBC catalog with user
configured error code list (#10140)
2b21020aed is described below
commit 2b21020aedb63c26295005d150c05f0a5a5f0eb2
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri May 10 11:18:46 2024 -0600
Core: Retry connections in JDBC catalog with user configured error code
list (#10140)
---
.../java/org/apache/iceberg/ClientPoolImpl.java | 47 +++++--
.../java/org/apache/iceberg/jdbc/JdbcCatalog.java | 6 +
.../org/apache/iceberg/jdbc/JdbcClientPool.java | 44 ++++++-
.../java/org/apache/iceberg/jdbc/JdbcUtil.java | 2 +
.../org/apache/iceberg/TestClientPoolImpl.java | 141 +++++++++++++++++++++
.../org/apache/iceberg/jdbc/TestJdbcCatalog.java | 55 ++++++++
6 files changed, 282 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index e8ab57fed3..4c44695448 100644
--- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -34,16 +34,29 @@ public abstract class ClientPoolImpl<C, E extends Exception>
private final Class<? extends E> reconnectExc;
private final Object signal = new Object();
private final boolean retryByDefault;
+ private final int maxRetries;
+
private volatile int currentSize;
private boolean closed;
+ private int connectionRetryWaitPeriodMs = 1000;
+
public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc, boolean
retryByDefault) {
+ this(poolSize, reconnectExc, retryByDefault, 1);
+ }
+
+ public ClientPoolImpl(
+ int poolSize,
+ Class<? extends E> reconnectExc,
+ boolean retryByDefault,
+ int maxConnectionRetries) {
this.poolSize = poolSize;
this.reconnectExc = reconnectExc;
this.clients = new ArrayDeque<>(poolSize);
this.currentSize = 0;
this.closed = false;
this.retryByDefault = retryByDefault;
+ this.maxRetries = maxConnectionRetries;
}
@Override
@@ -56,26 +69,38 @@ public abstract class ClientPoolImpl<C, E extends Exception>
C client = get();
try {
return action.run(client);
-
} catch (Exception exc) {
- if (retry && isConnectionException(exc)) {
- try {
- client = reconnect(client);
- } catch (Exception ignored) {
- // if reconnection throws any exception, rethrow the original failure
- throw reconnectExc.cast(exc);
- }
-
- return action.run(client);
+ if (!retry || !isConnectionException(exc)) {
+ throw exc;
}
- throw exc;
+ return retryAction(action, exc, client);
} finally {
release(client);
}
}
+ private <R> R retryAction(Action<R, C, E> action, Exception originalFailure,
C client)
+ throws E, InterruptedException {
+ int retryAttempts = 0;
+ while (retryAttempts < maxRetries) {
+ try {
+ C reconnectedClient = reconnect(client);
+ return action.run(reconnectedClient);
+ } catch (Exception exc) {
+ if (isConnectionException(exc)) {
+ retryAttempts++;
+ Thread.sleep(connectionRetryWaitPeriodMs);
+ } else {
+ throw reconnectExc.cast(originalFailure);
+ }
+ }
+ }
+
+ throw reconnectExc.cast(originalFailure);
+ }
+
protected abstract C newClient();
protected abstract C reconnect(C client);
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
index 71590e7618..4e10ee96d1 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
@@ -55,6 +55,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -689,6 +690,11 @@ public class JdbcCatalog extends BaseMetastoreViewCatalog
}
}
+ @VisibleForTesting
+ JdbcClientPool connectionPool() {
+ return connections;
+ }
+
private int execute(String sql, String... args) {
return execute(err -> {}, sql, args);
}
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
index 60e5eb49a4..487b8409b1 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
@@ -21,17 +21,40 @@ package org.apache.iceberg.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.sql.SQLNonTransientConnectionException;
+import java.sql.SQLTransientException;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ClientPoolImpl;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+ /**
+ * The following are common retryable SQLSTATEs error codes which are
generic across vendors.
+ *
+ * <ul>
+ * <li>08000: Generic Connection Exception
+ * <li>08003: Connection does not exist
+ * <li>08006: Connection failure
+ * <li>08007: Transaction resolution unknown
+ * <li>40001: Serialization failure due to deadlock
+ * </ul>
+ *
+ * See https://en.wikipedia.org/wiki/SQLSTATE for more details.
+ */
+ static final Set<String> COMMON_RETRYABLE_CONNECTION_SQL_STATES =
+ ImmutableSet.of("08000", "08003", "08006", "08007", "40001");
+
private final String dbUrl;
private final Map<String, String> properties;
+ private Set<String> retryableStatusCodes;
+
public JdbcClientPool(String dbUrl, Map<String, String> props) {
this(
Integer.parseInt(
@@ -43,8 +66,18 @@ public class JdbcClientPool extends
ClientPoolImpl<Connection, SQLException> {
}
public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props)
{
- super(poolSize, SQLNonTransientConnectionException.class, true);
+ super(poolSize, SQLTransientException.class, true);
properties = props;
+ retryableStatusCodes = Sets.newHashSet();
+ retryableStatusCodes.addAll(COMMON_RETRYABLE_CONNECTION_SQL_STATES);
+ String configuredRetryableStatuses =
props.get(JdbcUtil.RETRYABLE_STATUS_CODES);
+ if (configuredRetryableStatuses != null) {
+ retryableStatusCodes.addAll(
+ Arrays.stream(configuredRetryableStatuses.split(","))
+ .map(status -> status.replaceAll("\\s+", ""))
+ .collect(Collectors.toSet()));
+ }
+
this.dbUrl = dbUrl;
}
@@ -72,4 +105,11 @@ public class JdbcClientPool extends
ClientPoolImpl<Connection, SQLException> {
throw new UncheckedSQLException(e, "Failed to close connection");
}
}
+
+ @Override
+ protected boolean isConnectionException(Exception e) {
+ return super.isConnectionException(e)
+ || (e instanceof SQLException
+ && retryableStatusCodes.contains(((SQLException)
e).getSQLState()));
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
index 749c2d485f..c9bd2b78a6 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
@@ -43,6 +43,8 @@ final class JdbcUtil {
static final String INIT_CATALOG_TABLES_PROPERTY =
JdbcCatalog.PROPERTY_PREFIX + "init-catalog-tables";
+ static final String RETRYABLE_STATUS_CODES = "retryable_status_codes";
+
enum SchemaVersion {
V0,
V1
diff --git a/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
new file mode 100644
index 0000000000..8d62afa176
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestClientPoolImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+public class TestClientPoolImpl {
+
+ @Test
+ public void testRetrySucceedsWithinMaxAttempts() throws Exception {
+ int maxRetries = 5;
+ int succeedAfterAttempts = 3;
+ try (MockClientPoolImpl mockClientPool =
+ new MockClientPoolImpl(2, RetryableException.class, true, maxRetries))
{
+ int actions = mockClientPool.run(client ->
client.succeedAfter(succeedAfterAttempts));
+ assertThat(actions)
+ .as("There should be exactly one successful action invocation")
+ .isEqualTo(1);
+
assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(succeedAfterAttempts
- 1);
+ }
+ }
+
+ @Test
+ public void testRetriesExhaustedAndSurfacesFailure() {
+ int maxRetries = 3;
+ int succeedAfterAttempts = 5;
+ try (MockClientPoolImpl mockClientPool =
+ new MockClientPoolImpl(2, RetryableException.class, true, maxRetries))
{
+ assertThatThrownBy(
+ () -> mockClientPool.run(client ->
client.succeedAfter(succeedAfterAttempts)))
+ .isInstanceOf(RetryableException.class);
+ assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(maxRetries);
+ }
+ }
+
+ @Test
+ public void testNoRetryingNonRetryableException() {
+ try (MockClientPoolImpl mockClientPool =
+ new MockClientPoolImpl(2, RetryableException.class, true, 3)) {
+ assertThatThrownBy(() ->
mockClientPool.run(MockClient::failWithNonRetryable, true))
+ .isInstanceOf(NonRetryableException.class);
+ assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void testNoRetryingWhenDisabled() {
+ try (MockClientPoolImpl mockClientPool =
+ new MockClientPoolImpl(2, RetryableException.class, false, 3)) {
+ assertThatThrownBy(() -> mockClientPool.run(client ->
client.succeedAfter(3)))
+ .isInstanceOf(RetryableException.class);
+ assertThat(mockClientPool.reconnectionAttempts()).isEqualTo(0);
+ }
+ }
+
+ static class RetryableException extends RuntimeException {}
+
+ static class NonRetryableException extends RuntimeException {}
+
+ static class MockClient {
+ boolean closed = false;
+
+ int actions = 0;
+
+ int retryableFailures = 0;
+
+ public void close() {
+ closed = true;
+ }
+
+ public int successfulAction() {
+ actions++;
+ return actions;
+ }
+
+ int succeedAfter(int succeedAfterAttempts) {
+ if (retryableFailures == succeedAfterAttempts - 1) {
+ return successfulAction();
+ }
+
+ retryableFailures++;
+ throw new RetryableException();
+ }
+
+ int failWithNonRetryable() {
+ throw new NonRetryableException();
+ }
+ }
+
+ static class MockClientPoolImpl extends ClientPoolImpl<MockClient,
Exception> {
+
+ private int reconnectionAttempts;
+
+ MockClientPoolImpl(
+ int poolSize,
+ Class<? extends Exception> reconnectExc,
+ boolean retryByDefault,
+ int numRetries) {
+ super(poolSize, reconnectExc, retryByDefault, numRetries);
+ }
+
+ @Override
+ protected MockClient newClient() {
+ return new MockClient();
+ }
+
+ @Override
+ protected MockClient reconnect(MockClient client) {
+ reconnectionAttempts++;
+ return client;
+ }
+
+ @Override
+ protected void close(MockClient client) {
+ client.close();
+ }
+
+ int reconnectionAttempts() {
+ return reconnectionAttempts;
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
index 985c84f0dc..90492b5109 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
@@ -34,6 +34,7 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -208,6 +209,60 @@ public class TestJdbcCatalog extends
CatalogTests<JdbcCatalog> {
assertThat(catalogTablesExist(jdbcUrl)).isTrue();
}
+ @Test
+ public void testRetryingErrorCodesProperty() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION,
this.tableDir.toAbsolutePath().toString());
+ properties.put(CatalogProperties.URI,
"jdbc:sqlite:file::memory:?icebergDB");
+ properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000,57P03,57P04");
+ JdbcCatalog jdbcCatalog = new JdbcCatalog();
+ jdbcCatalog.setConf(conf);
+ jdbcCatalog.initialize("test_catalog_with_retryable_status_codes",
properties);
+ JdbcClientPool jdbcClientPool = jdbcCatalog.connectionPool();
+ List<SQLException> expectedRetryableExceptions =
+ Lists.newArrayList(
+ new SQLException("operator_intervention", "57000"),
+ new SQLException("cannot_connect_now", "57P03"),
+ new SQLException("database_dropped", "57P04"));
+ JdbcClientPool.COMMON_RETRYABLE_CONNECTION_SQL_STATES.forEach(
+ code -> expectedRetryableExceptions.add(new SQLException("some
failure", code)));
+
+ expectedRetryableExceptions.forEach(
+ exception -> {
+ assertThat(jdbcClientPool.isConnectionException(exception))
+ .as(String.format("%s status should be retryable",
exception.getSQLState()))
+ .isTrue();
+ });
+
+ // Test the same retryable status codes but with spaces in the
configuration
+ properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000, 57P03, 57P04");
+
jdbcCatalog.initialize("test_catalog_with_retryable_status_codes_with_spaces",
properties);
+ JdbcClientPool updatedClientPool = jdbcCatalog.connectionPool();
+ expectedRetryableExceptions.forEach(
+ exception -> {
+ assertThat(updatedClientPool.isConnectionException(exception))
+ .as(String.format("%s status should be retryable",
exception.getSQLState()))
+ .isTrue();
+ });
+ }
+
+ @Test
+ public void testSqlNonTransientExceptionNotRetryable() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION,
this.tableDir.toAbsolutePath().toString());
+ properties.put(CatalogProperties.URI,
"jdbc:sqlite:file::memory:?icebergDB");
+ properties.put(JdbcUtil.RETRYABLE_STATUS_CODES, "57000,57P03,57P04");
+ JdbcCatalog jdbcCatalog = new JdbcCatalog();
+ jdbcCatalog.setConf(conf);
+ jdbcCatalog.initialize("test_catalog_with_retryable_status_codes",
properties);
+ JdbcClientPool jdbcClientPool = jdbcCatalog.connectionPool();
+ Assertions.assertThat(
+ jdbcClientPool.isConnectionException(
+ new SQLNonTransientConnectionException("Failed to
authenticate")))
+ .as("SQL Non Transient exception is not retryable")
+ .isFalse();
+ }
+
@Test
public void testInitSchemaV0() {
Map<String, String> properties = Maps.newHashMap();