This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1b78bbdc35 Fix: - Do not query local topology when deciding what keys
to fetch to avoid TopologyRetiredException - Ensure we propagate information
back to the requesting CommandStore by using the store id to ensure it is
included - BurnTest topology fetching was broken by earlier patch - Topology
callbacks were not being invoked as we were not calling .begin() - Topology
mismatch failure during notAccept phase was not being reported due to
CoordinatePreAccept already having [...]
1b78bbdc35 is described below
commit 1b78bbdc3557de3ccd3acef2d0eac3e6df9df533
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Jun 11 18:44:05 2025 +0100
Fix:
- Do not query local topology when deciding what keys to fetch to avoid
TopologyRetiredException
- Ensure we propagate information back to the requesting CommandStore by
using the store id to ensure it is included
- BurnTest topology fetching was broken by earlier patch
- Topology callbacks were not being invoked as we were not calling .begin()
- Topology mismatch failure during notAccept phase was not being reported
due to CoordinatePreAccept already having isDone==true
patch by Benedict; reviewed by David Capwell for CASSANDRA-20711
---
modules/accord | 2 +-
.../cassandra/service/accord/AccordConfigurationService.java | 9 +++++----
src/java/org/apache/cassandra/service/accord/AccordService.java | 2 +-
.../cassandra/service/accord/AccordConfigurationServiceTest.java | 5 +++--
.../cassandra/service/accord/AccordSyncPropagatorTest.java | 3 ++-
test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java | 3 ++-
6 files changed, 14 insertions(+), 10 deletions(-)
diff --git a/modules/accord b/modules/accord
index bf85660dcc..c2ef0643ab 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit bf85660dccaece9fcb3dd319b6f82265d780378f
+Subproject commit c2ef0643ab671067d39e7ce688eea493e9ee018d
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index fade204b1f..bd06cb9764 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -32,6 +32,7 @@ import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
+import accord.api.Agent;
import accord.impl.AbstractConfigurationService;
import accord.local.Node;
import accord.primitives.Ranges;
@@ -138,17 +139,17 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
}
- public AccordConfigurationService(Node.Id node, MessageDelivery
messagingService, IFailureDetector failureDetector, ScheduledExecutorPlus
scheduledTasks)
+ public AccordConfigurationService(Node.Id node, Agent agent,
MessageDelivery messagingService, IFailureDetector failureDetector,
ScheduledExecutorPlus scheduledTasks)
{
- super(node);
+ super(node, agent);
this.syncPropagator = new AccordSyncPropagator(localId, this,
messagingService, failureDetector, scheduledTasks, this);
this.watermarkCollector = new WatermarkCollector();
listeners.add(watermarkCollector);
}
- public AccordConfigurationService(Node.Id node)
+ public AccordConfigurationService(Node.Id node, Agent agent)
{
- this(node, MessagingService.instance(), FailureDetector.instance,
ScheduledExecutors.scheduledTasks);
+ this(node, agent, MessagingService.instance(),
FailureDetector.instance, ScheduledExecutors.scheduledTasks);
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 4bf89c9b1e..6529db1683 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -300,7 +300,7 @@ public class AccordService implements IAccordService,
Shutdownable
this.scheduler = new AccordScheduler();
this.dataStore = new AccordDataStore();
this.journal = new
AccordJournal(DatabaseDescriptor.getAccord().journal);
- this.configService = new AccordConfigurationService(localId);
+ this.configService = new AccordConfigurationService(localId, agent);
this.fastPathCoordinator = AccordFastPathCoordinator.create(localId,
configService);
this.messageSink = new AccordMessageSink(agent, configService,
callbacks);
this.node = new Node(localId,
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
index 77d6e687d2..94e07b9a9e 100644
---
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ValidatingClusterMetadataService;
@@ -171,7 +172,7 @@ public class AccordConfigurationServiceTest
try
{
journal = initJournal();
- AccordConfigurationService service = new
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(),
ScheduledExecutors.scheduledTasks);
+ AccordConfigurationService service = new
AccordConfigurationService(ID1, new AccordAgent(), new Messaging(), new
MockFailureDetector(), ScheduledExecutors.scheduledTasks);
AccordJournal journal_ = journal;
TestListener listener = new TestListener(service, true)
{
@@ -200,7 +201,7 @@ public class AccordConfigurationServiceTest
Topology topology3 = createTopology(cms);
service.reportTopology(topology3);
- AccordConfigurationService loaded = new
AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(),
ScheduledExecutors.scheduledTasks);
+ AccordConfigurationService loaded = new
AccordConfigurationService(ID1, new AccordAgent(), new Messaging(), new
MockFailureDetector(), ScheduledExecutors.scheduledTasks);
loaded.updateMapping(mappingForEpoch(cms.metadata().epoch.getEpoch() + 1));
listener = new
AbstractConfigurationServiceTest.TestListener(loaded, true);
loaded.registerListener(listener);
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
index 948dc80afb..adaff55578 100644
---
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
@@ -70,6 +70,7 @@ import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.tcm.ValidatingClusterMetadataService;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.utils.AccordGenerators;
@@ -419,7 +420,7 @@ public class AccordSyncPropagatorTest
private ConfigService(Node.Id node)
{
- super(node);
+ super(node, new AccordAgent());
}
@Override
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index cc05e86db4..fec84f5d63 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -87,6 +87,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableParams;
import org.apache.cassandra.schema.Tables;
import
org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot;
+import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
@@ -682,7 +683,7 @@ public class EpochSyncTest
// TODO (review): Should there be a real scheduler here? Is it
possible to adapt the Scheduler interface to scheduler used in this test?
TimeService time =
TimeService.ofNonMonotonic(globalExecutor::currentTimeMillis,
TimeUnit.MILLISECONDS);
this.topology = new
TopologyManager(SizeOfIntersectionSorter.SUPPLIER, new
TestAgent.RethrowAgent(), id, time, new DefaultTimeouts(time));
- config = new AccordConfigurationService(node,
messagingService, failureDetector, scheduler);
+ config = new AccordConfigurationService(node, new
AccordAgent(), messagingService, failureDetector, scheduler);
config.registerListener(new ConfigurationService.Listener()
{
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]