imbajin commented on code in PR #2997:
URL: https://github.com/apache/hugegraph/pull/2997#discussion_r3142097403


##########
hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java:
##########
@@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() {
                     positiveInt(),
                     12 * 60 * 60
             );
+
+    public static final ConfigOption<Long> CASSANDRA_RECONNECT_BASE_DELAY =
+            new ConfigOption<>(
+                    "cassandra.reconnect_base_delay",
+                    "The base delay in milliseconds used by the driver's " +
+                    "exponential reconnection policy when a Cassandra host " +
+                    "becomes unreachable.",
+                    rangeInt(100L, Long.MAX_VALUE),
+                    1000L
+            );
+
+    public static final ConfigOption<Long> CASSANDRA_RECONNECT_MAX_DELAY =
+            new ConfigOption<>(
+                    "cassandra.reconnect_max_delay",
+                    "The maximum delay in milliseconds used by the driver's " +
+                    "exponential reconnection policy when a Cassandra host " +
+                    "becomes unreachable.",
+                    rangeInt(1000L, Long.MAX_VALUE),
+                    10_000L

Review Comment:
   🧹 Minor: The default values in the PR description table don't match the 
actual code defaults:
   
   | Option | PR Description | Code |
   |--------|---------------|------|
   | `reconnect_max_delay` | 60000 ms | **10000 ms** |
   | `reconnect_max_retries` | 10 | **3** |
   | `reconnect_interval` | 5000 ms | **1000 ms** |
   
   Please update the PR description to match the code.
   



##########
hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java:
##########
@@ -197,15 +256,92 @@ public ResultSet query(Statement statement) {
         }
 
         public ResultSet execute(Statement statement) {
-            return this.session.execute(statement);
+            return this.executeWithRetry(statement);
         }
 
         public ResultSet execute(String statement) {
-            return this.session.execute(statement);
+            return this.executeWithRetry(new SimpleStatement(statement));
         }
 
         public ResultSet execute(String statement, Object... args) {
-            return this.session.execute(statement, args);
+            return this.executeWithRetry(new SimpleStatement(statement, args));
+        }
+
+        /**
+         * Execute a statement, retrying on transient connectivity failures
+         * (NoHostAvailableException / OperationTimedOutException). The driver
+         * itself keeps retrying connections in the background via the
+         * reconnection policy, so once Cassandra comes back online, a
+         * subsequent attempt here will succeed without restarting the server.
+         *
+         * <p>If the driver session has been discarded (e.g. by
+         * {@link #reconnectIfNeeded()} after a failed health-check) it is
+         * lazily reopened at the start of each attempt. After a transient
+         * failure the session is {@linkplain #reset() reset} so the next
+         * iteration gets a fresh driver session.
+         *
+         * <p><b>Blocking note:</b> retries block the calling thread via
+         * {@link Thread#sleep(long)}. Worst-case a single call blocks for
+         * {@code maxRetries * retryMaxDelay} ms. Under high-throughput
+         * workloads concurrent threads may pile up in {@code sleep()} during
+         * a Cassandra outage. For such deployments lower
+         * {@code cassandra.reconnect_max_retries} (default 3) and
+         * {@code cassandra.reconnect_max_delay} (default 10000ms) so the
+         * request fails fast and pressure is released back to the caller.
+         */
+        private ResultSet executeWithRetry(Statement statement) {
+            int retries = CassandraSessionPool.this.maxRetries;
+            long interval = CassandraSessionPool.this.retryInterval;
+            long maxDelay = CassandraSessionPool.this.retryMaxDelay;
+            DriverException lastError = null;
+            for (int attempt = 0; attempt <= retries; attempt++) {
+                try {
+                    if (this.session == null) {
+                        // Lazy reopen: may itself throw NHAE while
+                        // Cassandra is still unreachable; the catch below
+                        // treats that as a transient failure.
+                        this.open();
+                    }
+                    return this.session.execute(statement);
+                } catch (NoHostAvailableException | OperationTimedOutException 
e) {
+                    lastError = e;
+                    // Discard the (possibly broken) driver session so the
+                    // next iteration reopens cleanly.
+                    this.reset();

Review Comment:
   ⚠️ Consider not calling `reset()` on each transient failure.
   
   After `NoHostAvailableException`, the Datastax driver `Session` itself is 
still valid — it's the *nodes* that are unreachable, not the session object. 
The `ExponentialReconnectionPolicy` registered on the `Cluster` keeps retrying 
downed nodes in the background, so once Cassandra comes back, the **existing** 
session will route queries to the recovered nodes automatically.
   
   Closing and reopening the session on every retry attempt adds the overhead 
of `cluster.connect(keyspace)` per attempt, which establishes a new connection 
pool. With the default 3 retries, that's up to 3 unnecessary reconnects during 
an outage.
   
   Suggestion: simply retry `session.execute()` without `reset()`, and reserve 
`reset()` as a last-resort fallback only on the final attempt before giving up 
(or remove it from the retry loop entirely and rely on `reconnectIfNeeded()` 
for session-level recovery).
   



##########
hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java:
##########
@@ -255,6 +391,56 @@ public boolean hasChanges() {
             return this.batch.size() > 0;
         }
 
+        /**
+         * Periodic liveness probe invoked by {@link BackendSessionPool} to
+         * recover thread-local sessions after Cassandra has been restarted.
+         * Reopens the driver session if it was closed and pings the cluster
+         * with a lightweight query. On failure the session is discarded via
+         * {@link #reset()} so the next call to
+         * {@link #executeWithRetry(Statement)} reopens it; any exception
+         * here is swallowed so the caller can still issue the real query.
+         */
+        @Override
+        public void reconnectIfNeeded() {
+            if (!this.opened) {
+                return;
+            }
+            try {
+                if (this.session == null || this.session.isClosed()) {
+                    this.session = null;
+                    this.tryOpen();
+                }
+                if (this.session != null) {
+                    this.session.execute(new 
SimpleStatement(HEALTH_CHECK_CQL));
+                }
+            } catch (DriverException e) {
+                LOG.debug("Cassandra health-check failed, resetting session: 
{}",

Review Comment:
   ⚠️ `DriverException` is the base class for **all** Cassandra driver 
exceptions, including `AuthenticationException`, `SyntaxError`, 
`InvalidQueryException`, etc. Catching it this broadly means non-transient 
errors (e.g., auth failure, bad CQL in the health check) will be silently 
swallowed and the session nullified.
   
   Consider narrowing the catch to match `executeWithRetry()`:
   ```java
   } catch (NoHostAvailableException | OperationTimedOutException e) {
   ```
   



##########
hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java:
##########
@@ -174,6 +228,11 @@ public void commitAsync() {
             int processors = Math.min(statements.size(), 1023);
             List<ResultSetFuture> results = new ArrayList<>(processors + 1);
             for (Statement s : statements) {
+                // TODO(issue #2740): commitAsync() bypasses 
executeWithRetry().
+                // During a Cassandra restart, async writes may fail with
+                // NoHostAvailableException even when maxRetries > 0. Callers
+                // must handle CompletableFuture failures. A follow-up will
+                // wrap each future with retry semantics.
                 ResultSetFuture future = this.session.executeAsync(s);

Review Comment:
   ⚠️ Worth tracking this gap explicitly. During a Cassandra restart, all async 
writes issued through `commitAsync()` will fail with 
`NoHostAvailableException`, and the failure is only observable through the 
`ResultSetFuture`. Callers that don't inspect futures will silently lose writes.
   
   Consider opening a follow-up issue to track async retry support, so this 
TODO doesn't get forgotten.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to