dimas-b commented on code in PR #1517: URL: https://github.com/apache/polaris/pull/1517#discussion_r2075697018
########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -173,23 +190,86 @@ public int executeUpdate(String query) throws SQLException { * @throws SQLException : Exception caught during transaction execution. */ public void runWithinTransaction(TransactionCallback callback) throws SQLException { - try (Connection connection = borrowConnection()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - boolean success = false; + withRetries( + () -> { + try (Connection connection = borrowConnection()) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + } + connection.setAutoCommit(autoCommit); + } + } + return null; + }); + } + + private boolean isRetryable(SQLException e) { + String sqlState = e.getSQLState(); + + if (sqlState != null) { + return sqlState.equals(DEADLOCK_SQL_CODE) + || // Deadlock detected + sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure + } + + // Additionally, one might check for specific error messages or other conditions + return e.getMessage().contains("connection refused") + || e.getMessage().contains("connection reset"); + } + + public <T> T withRetries(Operation<T> operation) throws SQLException { + int attempts = 0; + // maximum number of retries. + int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1); + // How long we should try, since the first attempt. + long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(5000L); + // How long to wait before first failure. + long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); + + // maximum time we will retry till. + long maxRetryTime = Clock.systemUTC().millis() + maxDuration; + + while (attempts < maxAttempts) { try { - try (Statement statement = connection.createStatement()) { - success = callback.execute(statement); + return operation.execute(); + } catch (SQLException e) { + attempts++; + long timeLeft = Math.max((maxRetryTime - Clock.systemUTC().millis()), 0L); Review Comment: What I meant in my previous comment on clocks is that `Clock` should probably be a field in this class. Whenever it is created / injected a `Clock` impl. will have to be provided. By default `Clock.systemUTC()` can be used. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -173,23 +190,86 @@ public int executeUpdate(String query) throws SQLException { * @throws SQLException : Exception caught during transaction execution. */ public void runWithinTransaction(TransactionCallback callback) throws SQLException { - try (Connection connection = borrowConnection()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - boolean success = false; + withRetries( + () -> { + try (Connection connection = borrowConnection()) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + } + connection.setAutoCommit(autoCommit); + } + } + return null; + }); + } + + private boolean isRetryable(SQLException e) { + String sqlState = e.getSQLState(); + + if (sqlState != null) { + return sqlState.equals(DEADLOCK_SQL_CODE) + || // Deadlock detected + sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure + } + + // Additionally, one might check for specific error messages or other conditions + return e.getMessage().contains("connection refused") + || e.getMessage().contains("connection reset"); + } + + public <T> T withRetries(Operation<T> operation) throws SQLException { Review Comment: Why `public`? ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -173,23 +190,86 @@ public int executeUpdate(String query) throws SQLException { * @throws SQLException : Exception caught during transaction execution. */ public void runWithinTransaction(TransactionCallback callback) throws SQLException { - try (Connection connection = borrowConnection()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - boolean success = false; + withRetries( + () -> { + try (Connection connection = borrowConnection()) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + } + connection.setAutoCommit(autoCommit); + } + } + return null; + }); + } + + private boolean isRetryable(SQLException e) { + String sqlState = e.getSQLState(); + + if (sqlState != null) { + return sqlState.equals(DEADLOCK_SQL_CODE) + || // Deadlock detected + sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure + } + + // Additionally, one might check for specific error messages or other conditions + return e.getMessage().contains("connection refused") + || e.getMessage().contains("connection reset"); + } + + public <T> T withRetries(Operation<T> operation) throws SQLException { + int attempts = 0; + // maximum number of retries. + int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1); + // How long we should try, since the first attempt. + long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(5000L); + // How long to wait before first failure. + long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); + + // maximum time we will retry till. + long maxRetryTime = Clock.systemUTC().millis() + maxDuration; + + while (attempts < maxAttempts) { try { - try (Statement statement = connection.createStatement()) { - success = callback.execute(statement); + return operation.execute(); + } catch (SQLException e) { + attempts++; + long timeLeft = Math.max((maxRetryTime - Clock.systemUTC().millis()), 0L); + if (attempts >= maxAttempts || !isRetryable(e) || timeLeft == 0) { + String exceptionMessage = + String.format( + "Failed due to %s, after , %s attempts and %s milliseconds", + e.getMessage(), attempts, maxDuration); + throw new SQLException(exceptionMessage, e.getSQLState(), e.getErrorCode()); } - } finally { - if (success) { - connection.commit(); - } else { - connection.rollback(); + // Add jitter + long timeToSleep = Math.min(timeLeft, delay + (long) (random.nextFloat() * 0.2 * delay)); Review Comment: suggestion: use two bounds: minDelay (start at 10ms) and maxDelay (start 100ms). Jitter between them. Grow both exponentially. I believe this approach would be easier for users to understand and predict behaviour and will also offer more tuning flexibility. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -173,23 +190,86 @@ public int executeUpdate(String query) throws SQLException { * @throws SQLException : Exception caught during transaction execution. */ public void runWithinTransaction(TransactionCallback callback) throws SQLException { - try (Connection connection = borrowConnection()) { - boolean autoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); - boolean success = false; + withRetries( + () -> { + try (Connection connection = borrowConnection()) { + boolean autoCommit = connection.getAutoCommit(); + boolean success = false; + connection.setAutoCommit(false); + try { + try (Statement statement = connection.createStatement()) { + success = callback.execute(statement); + } + } finally { + if (success) { + connection.commit(); + } else { + connection.rollback(); + } + connection.setAutoCommit(autoCommit); + } + } + return null; + }); + } + + private boolean isRetryable(SQLException e) { + String sqlState = e.getSQLState(); + + if (sqlState != null) { + return sqlState.equals(DEADLOCK_SQL_CODE) + || // Deadlock detected + sqlState.equals(SERIALIZATION_FAILURE_SQL_CODE); // Serialization failure + } + + // Additionally, one might check for specific error messages or other conditions + return e.getMessage().contains("connection refused") + || e.getMessage().contains("connection reset"); + } + + public <T> T withRetries(Operation<T> operation) throws SQLException { + int attempts = 0; + // maximum number of retries. + int maxAttempts = relationalJdbcConfiguration.maxRetries().orElse(1); + // How long we should try, since the first attempt. + long maxDuration = relationalJdbcConfiguration.maxDurationInMs().orElse(5000L); + // How long to wait before first failure. + long delay = relationalJdbcConfiguration.initialDelayInMs().orElse(100L); + + // maximum time we will retry till. + long maxRetryTime = Clock.systemUTC().millis() + maxDuration; + + while (attempts < maxAttempts) { try { - try (Statement statement = connection.createStatement()) { - success = callback.execute(statement); + return operation.execute(); + } catch (SQLException e) { + attempts++; + long timeLeft = Math.max((maxRetryTime - Clock.systemUTC().millis()), 0L); + if (attempts >= maxAttempts || !isRetryable(e) || timeLeft == 0) { + String exceptionMessage = + String.format( + "Failed due to %s, after , %s attempts and %s milliseconds", + e.getMessage(), attempts, maxDuration); + throw new SQLException(exceptionMessage, e.getSQLState(), e.getErrorCode()); Review Comment: pls include `e` as the root cause exception. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -128,22 +143,21 @@ public <T> void executeSelectOverStream( @Nonnull Converter<T> converterInstance, @Nonnull Consumer<Stream<T>> consumer) throws SQLException { - try (Connection connection = borrowConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(query)) { - ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance); - consumer.accept(iterator.toStream()); - } catch (SQLException e) { - throw e; - } catch (RuntimeException e) { - if (e.getCause() instanceof SQLException) { - throw (SQLException) e.getCause(); - } else { - throw e; - } - } catch (Exception e) { - throw new RuntimeException(e); - } + withRetries( + () -> { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance); + consumer.accept(iterator.toStream()); + return null; + } catch (RuntimeException e) { + if (e.getCause() instanceof SQLException ex) { + throw ex; Review Comment: Why is it necessary to unwrap `SQLException` here? Who wraps it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org