This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 5b5556a9fe8da33a382830b5e5d73dee27cf8831
Author: Alex Petrov <[email protected]>
AuthorDate: Fri Jun 6 11:43:34 2025 +0200

    Fix StackOverflow exception on topology replay with a large number of 
active epochs/topologies
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20704
---
 .../main/java/accord/impl/AbstractConfigurationService.java | 13 ++++++++++---
 .../src/main/java/accord/utils/async/AsyncChain.java        |  8 ++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 0b5e45b2..f426a6d5 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -21,6 +21,7 @@ package accord.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -348,6 +349,11 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
 
     protected void topologyUpdatePostListenerNotify(Topology topology) {}
 
+    protected Executor executor()
+    {
+        return Runnable::run;
+    }
+
     public void reportTopology(Topology topology, boolean isLoad, boolean 
startSync)
     {
         long lastReceived = epochs.lastReceived();
@@ -357,7 +363,8 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
         {
             logger.debug("Epoch {} received; waiting to receive {} before 
reporting", topology.epoch(), lastReceived + 1);
-            epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync));
+            epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync), executor());
+
             fetchTopologyForEpoch(lastReceived + 1);
             return;
         }
@@ -365,7 +372,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         long lastAcked = epochs.lastAcknowledged();
         if (lastAcked == 0 && lastReceived > 0)
         {
-            logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), epochs.minEpoch());
+            logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), epochs.minEpoch(), executor());
             epochs.acknowledgeFuture(epochs.minEpoch()).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync));
             return;
         }
@@ -373,7 +380,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
         {
             logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), lastAcked + 1);
-            epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync));
+            epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync), executor());
             return;
         }
 
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index b2f6070f..09633765 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -78,6 +78,14 @@ public interface AsyncChain<V>
         });
     }
 
+    default AsyncChain<V> invokeIfSuccess(Runnable runnable, Executor executor)
+    {
+        return map(r -> {
+            runnable.run();
+            return r;
+        }, executor);
+    }
+
     default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action, Executor 
executor)
     {
         return map(r -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to