This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 101a6ceafa Accord: Test fixes 101a6ceafa is described below commit 101a6ceafaab0345352994cbdc583627fd321a52 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Apr 14 16:24:09 2025 -0700 Accord: Test fixes patch by Alex Petrov; reviewed by Benedict Elliott Smith, David Capwell for CASSANDRA-20552 --- src/java/org/apache/cassandra/net/Verb.java | 4 +-- .../cassandra/service/accord/AccordService.java | 3 +- .../service/reads/AbstractReadExecutor.java | 6 ++-- .../test/log/FetchLogFromPeers2Test.java | 33 ++++++++++------------ 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index e9258b5f9a..5253124ffe 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -293,7 +293,7 @@ public enum Verb // transactional cluster metadata TCM_COMMIT_RSP (801, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitResultSerializer, RESPONSE_HANDLER ), TCM_COMMIT_REQ (802, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::commitSerializer, () -> commitRequestHandler(), TCM_COMMIT_RSP ), - TCM_FETCH_CMS_LOG_RSP (803, P0, rpcTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), + TCM_FETCH_CMS_LOG_RSP (803, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), TCM_FETCH_CMS_LOG_REQ (804, P0, rpcTimeout, FETCH_METADATA, () -> FetchCMSLog.serializer, () -> fetchLogRequestHandler(), TCM_FETCH_CMS_LOG_RSP ), TCM_REPLICATION (805, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> replicationHandler() ), TCM_NOTIFY_RSP (806, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, RESPONSE_HANDLER ), @@ -304,7 +304,7 @@ public enum Verb TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationRequest.Initiator.serializer,() -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ), TCM_DISCOVER_RSP (812, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, RESPONSE_HANDLER ), TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), - TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), + TCM_FETCH_PEER_LOG_RSP (818, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_METADATA, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), TCM_RECONSTRUCT_EPOCH_RSP (820, P0, rpcTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), TCM_RECONSTRUCT_EPOCH_REQ (821, P0, rpcTimeout, FETCH_METADATA, () -> ReconstructLogState.serializer, () -> ReconstructLogState.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index bed4b1396d..d2682950af 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -399,9 +399,10 @@ public class AccordService implements IAccordService, Shutdownable int waitSeconds = 5; while (true) { + Epoch await = Epoch.max(Epoch.create(configService.currentEpoch()), metadata.epoch); try { - epochReady(metadata.epoch).get(waitSeconds, SECONDS); + epochReady(await).get(waitSeconds, SECONDS); break; } catch (TimeoutException e) diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index ae76c02d38..1774164fb9 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -216,6 +216,9 @@ public abstract class AbstractReadExecutor if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM) return new NeverSpeculatingReadExecutor(coordinator, cfs, command, replicaPlan, requestTime, false); + if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)) + return new AlwaysSpeculatingReadExecutor(coordinator, cfs, command, replicaPlan, requestTime); + // There are simply no extra replicas to speculate. // Handle this separately so it can record failed attempts to speculate due to lack of replicas if (replicaPlan.contacts().size() == replicaPlan.readCandidates().size()) @@ -223,9 +226,6 @@ public abstract class AbstractReadExecutor boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL; return new NeverSpeculatingReadExecutor(coordinator, cfs, command, replicaPlan, requestTime, recordFailedSpeculation); } - - if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)) - return new AlwaysSpeculatingReadExecutor(coordinator, cfs, command, replicaPlan, requestTime); else // PERCENTILE or CUSTOM. return new SpeculatingReadExecutor(coordinator, cfs, command, replicaPlan, requestTime); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java index 783583a4fb..d42c379683 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java @@ -19,7 +19,6 @@ package org.apache.cassandra.distributed.test.log; import java.util.UUID; -import java.util.concurrent.ExecutionException; import org.junit.Test; @@ -31,35 +30,39 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; -import static org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.*; +import static org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.ClusterState; +import static org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.Operation; +import static org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.coordinator; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class FetchLogFromPeers2Test extends TestBaseImpl { @Test - public void testSchema() throws Exception + public void testSchema() throws Throwable { - try (Cluster cluster = init(builder().withNodes(3) - .start())) + try (Cluster cluster = init(builder().withNodes(3).start())) { - cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3}")); - cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); - cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)")); + cluster.schemaChange(withKeyspace("alter keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':3} ")); + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key) WITH speculative_retry = 'ALWAYS';")); for (ClusterState clusterState : ClusterState.values()) + { for (Operation operation : Operation.values()) { + cluster.filters().inbound().from(1, 2).to(1, 2).drop(); setupSchemaBehind(cluster); + cluster.filters().inbound().to(1).to(1).drop(); runQuery(cluster, clusterState, operation); + cluster.filters().reset(); } + } + } } - public void runQuery(Cluster cluster, ClusterState clusterState, Operation operation) throws ExecutionException, InterruptedException + public void runQuery(Cluster cluster, ClusterState clusterState, Operation operation) throws Throwable { - cluster.get(1).shutdown().get(); - // node2 is behind String query; switch (operation) @@ -78,7 +81,7 @@ public class FetchLogFromPeers2Test extends TestBaseImpl long metricsBefore = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount()); if (clusterState == ClusterState.COORDINATOR_BEHIND) { - long [] coordinatorBehindMetricsBefore = new long[cluster.size()]; + long[] coordinatorBehindMetricsBefore = new long[cluster.size()]; try { for (int i = 1; i <= cluster.size(); i++) @@ -102,20 +105,15 @@ public class FetchLogFromPeers2Test extends TestBaseImpl } } assertTrue("Metric CoordinatorBehindSchema should have been bumped for at least one replica", metricBumped); - } cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM); assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from /127.0.0.3:7012").getResult().size() > 0); long metricsAfter = cluster.get(2).callOnInstance(() -> TCMMetrics.instance.fetchedPeerLogEntries.getCount()); assertTrue(metricsAfter > metricsBefore); - - cluster.get(1).startup(); } public void setupSchemaBehind(Cluster cluster) { - cluster.filters().reset(); - cluster.filters().inbound().from(1).to(2).drop(); long epochBefore = cluster.get(3).callOnInstance(() -> ClusterMetadata.current().epoch.getEpoch()); cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE); cluster.get(3).runOnInstance(() -> { @@ -128,6 +126,5 @@ public class FetchLogFromPeers2Test extends TestBaseImpl throw new RuntimeException(e); } }); - cluster.filters().reset(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org