This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 5b5556a9fe8da33a382830b5e5d73dee27cf8831 Author: Alex Petrov <[email protected]> AuthorDate: Fri Jun 6 11:43:34 2025 +0200 Fix StackOverflow exception on topology replay with a large number of active epochs/topologies Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20704 --- .../main/java/accord/impl/AbstractConfigurationService.java | 13 ++++++++++--- .../src/main/java/accord/utils/async/AsyncChain.java | 8 ++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index 0b5e45b2..f426a6d5 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -21,6 +21,7 @@ package accord.impl; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; @@ -348,6 +349,11 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo protected void topologyUpdatePostListenerNotify(Topology topology) {} + protected Executor executor() + { + return Runnable::run; + } + public void reportTopology(Topology topology, boolean isLoad, boolean startSync) { long lastReceived = epochs.lastReceived(); @@ -357,7 +363,8 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (lastReceived > 0 && topology.epoch() > lastReceived + 1) { logger.debug("Epoch {} received; waiting to receive {} before reporting", topology.epoch(), lastReceived + 1); - epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)); + epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync), executor()); + fetchTopologyForEpoch(lastReceived + 1); return; } @@ -365,7 +372,7 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo long lastAcked = epochs.lastAcknowledged(); if (lastAcked == 0 && lastReceived > 0) { - logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), epochs.minEpoch()); + logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), epochs.minEpoch(), executor()); epochs.acknowledgeFuture(epochs.minEpoch()).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)); return; } @@ -373,7 +380,7 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo if (lastAcked > 0 && topology.epoch() > lastAcked + 1) { logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), lastAcked + 1); - epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync)); + epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> reportTopology(topology, isLoad, startSync), executor()); return; } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java b/accord-core/src/main/java/accord/utils/async/AsyncChain.java index b2f6070f..09633765 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java @@ -78,6 +78,14 @@ public interface AsyncChain<V> }); } + default AsyncChain<V> invokeIfSuccess(Runnable runnable, Executor executor) + { + return map(r -> { + runnable.run(); + return r; + }, executor); + } + default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action, Executor executor) { return map(r -> { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
