This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 101a6ceafa Accord: Test fixes
101a6ceafa is described below

commit 101a6ceafaab0345352994cbdc583627fd321a52
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Mon Apr 14 16:24:09 2025 -0700

    Accord: Test fixes
    
    patch by Alex Petrov; reviewed by Benedict Elliott Smith, David Capwell for 
CASSANDRA-20552
---
 src/java/org/apache/cassandra/net/Verb.java        |  4 +--
 .../cassandra/service/accord/AccordService.java    |  3 +-
 .../service/reads/AbstractReadExecutor.java        |  6 ++--
 .../test/log/FetchLogFromPeers2Test.java           | 33 ++++++++++------------
 4 files changed, 22 insertions(+), 24 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index e9258b5f9a..5253124ffe 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -293,7 +293,7 @@ public enum Verb
     // transactional cluster metadata
     TCM_COMMIT_RSP         (801, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::commitResultSerializer,         RESPONSE_HANDLER            
                     ),
     TCM_COMMIT_REQ         (802, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::commitSerializer,               () -> 
commitRequestHandler(),               TCM_COMMIT_RSP         ),
-    TCM_FETCH_CMS_LOG_RSP  (803, P0, rpcTimeout,      FETCH_METADATA,       
MessageSerializers::logStateSerializer,             RESPONSE_HANDLER            
                     ),
+    TCM_FETCH_CMS_LOG_RSP  (803, P0, shortTimeout,    FETCH_METADATA,       
MessageSerializers::logStateSerializer,             RESPONSE_HANDLER            
                     ),
     TCM_FETCH_CMS_LOG_REQ  (804, P0, rpcTimeout,      FETCH_METADATA,       () 
-> FetchCMSLog.serializer,                       () -> 
fetchLogRequestHandler(),             TCM_FETCH_CMS_LOG_RSP  ),
     TCM_REPLICATION        (805, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::logStateSerializer,             () -> replicationHandler()  
                                       ),
     TCM_NOTIFY_RSP         (806, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Epoch.messageSerializer,                      RESPONSE_HANDLER               
                  ),
@@ -304,7 +304,7 @@ public enum Verb
     TCM_ABORT_MIG          (811, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> CMSInitializationRequest.Initiator.serializer,() -> 
Election.instance.abortHandler,       TCM_INIT_MIG_RSP       ),
     TCM_DISCOVER_RSP       (812, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Discovery.serializer,                         RESPONSE_HANDLER               
                  ),
     TCM_DISCOVER_REQ       (813, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
Discovery.instance.requestHandler,    TCM_DISCOVER_RSP       ),
-    TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout,      FETCH_METADATA,       
MessageSerializers::logStateSerializer,             RESPONSE_HANDLER            
                     ),
+    TCM_FETCH_PEER_LOG_RSP (818, P0, shortTimeout,    FETCH_METADATA,       
MessageSerializers::logStateSerializer,             RESPONSE_HANDLER            
                     ),
     TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout,      FETCH_METADATA,       () 
-> FetchPeerLog.serializer,                      () -> 
FetchPeerLog.Handler.instance,        TCM_FETCH_PEER_LOG_RSP ),
     TCM_RECONSTRUCT_EPOCH_RSP (820, P0, rpcTimeout,   FETCH_METADATA,       
MessageSerializers::logStateSerializer,             () -> 
ResponseVerbHandler.instance                                 ),
     TCM_RECONSTRUCT_EPOCH_REQ (821, P0, rpcTimeout,   FETCH_METADATA,       () 
-> ReconstructLogState.serializer,               () -> 
ReconstructLogState.Handler.instance, TCM_FETCH_PEER_LOG_RSP ),
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index bed4b1396d..d2682950af 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -399,9 +399,10 @@ public class AccordService implements IAccordService, 
Shutdownable
             int waitSeconds = 5;
             while (true)
             {
+                Epoch await = 
Epoch.max(Epoch.create(configService.currentEpoch()), metadata.epoch);
                 try
                 {
-                    epochReady(metadata.epoch).get(waitSeconds, SECONDS);
+                    epochReady(await).get(waitSeconds, SECONDS);
                     break;
                 }
                 catch (TimeoutException e)
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index ae76c02d38..1774164fb9 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -216,6 +216,9 @@ public abstract class AbstractReadExecutor
         if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || 
consistencyLevel == ConsistencyLevel.EACH_QUORUM)
             return new NeverSpeculatingReadExecutor(coordinator, cfs, command, 
replicaPlan, requestTime, false);
 
+        if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
+            return new AlwaysSpeculatingReadExecutor(coordinator, cfs, 
command, replicaPlan, requestTime);
+
         // There are simply no extra replicas to speculate.
         // Handle this separately so it can record failed attempts to 
speculate due to lack of replicas
         if (replicaPlan.contacts().size() == 
replicaPlan.readCandidates().size())
@@ -223,9 +226,6 @@ public abstract class AbstractReadExecutor
             boolean recordFailedSpeculation = consistencyLevel != 
ConsistencyLevel.ALL;
             return new NeverSpeculatingReadExecutor(coordinator, cfs, command, 
replicaPlan, requestTime, recordFailedSpeculation);
         }
-
-        if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
-            return new AlwaysSpeculatingReadExecutor(coordinator, cfs, 
command, replicaPlan, requestTime);
         else // PERCENTILE or CUSTOM.
             return new SpeculatingReadExecutor(coordinator, cfs, command, 
replicaPlan, requestTime);
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
index 783583a4fb..d42c379683 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.distributed.test.log;
 
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 
 import org.junit.Test;
 
@@ -31,35 +30,39 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 
-import static 
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.*;
+import static 
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.ClusterState;
+import static 
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.Operation;
+import static 
org.apache.cassandra.distributed.test.log.FetchLogFromPeersTest.coordinator;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class FetchLogFromPeers2Test extends TestBaseImpl
 {
     @Test
-    public void testSchema() throws Exception
+    public void testSchema() throws Throwable
     {
-        try (Cluster cluster = init(builder().withNodes(3)
-                                             .start()))
+        try (Cluster cluster = init(builder().withNodes(3).start()))
         {
-            cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class':'SimpleStrategy', 'replication_factor':3}"));
-            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
-            cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int 
primary key)"));
+            cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class':'SimpleStrategy', 'replication_factor':3} "));
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key) WITH speculative_retry = 'ALWAYS';"));
 
             for (ClusterState clusterState : ClusterState.values())
+            {
                 for (Operation operation : Operation.values())
                 {
+                    cluster.filters().inbound().from(1, 2).to(1, 2).drop();
                     setupSchemaBehind(cluster);
+                    cluster.filters().inbound().to(1).to(1).drop();
                     runQuery(cluster, clusterState, operation);
+                    cluster.filters().reset();
                 }
+            }
+
         }
     }
 
-    public void runQuery(Cluster cluster, ClusterState clusterState, Operation 
operation) throws ExecutionException, InterruptedException
+    public void runQuery(Cluster cluster, ClusterState clusterState, Operation 
operation) throws Throwable
     {
-        cluster.get(1).shutdown().get();
-
         // node2 is behind
         String query;
         switch (operation)
@@ -78,7 +81,7 @@ public class FetchLogFromPeers2Test extends TestBaseImpl
         long metricsBefore = cluster.get(2).callOnInstance(() -> 
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
         if (clusterState == ClusterState.COORDINATOR_BEHIND)
         {
-            long [] coordinatorBehindMetricsBefore = new long[cluster.size()];
+            long[] coordinatorBehindMetricsBefore = new long[cluster.size()];
             try
             {
                 for (int i = 1; i <= cluster.size(); i++)
@@ -102,20 +105,15 @@ public class FetchLogFromPeers2Test extends TestBaseImpl
                 }
             }
             assertTrue("Metric CoordinatorBehindSchema should have been bumped 
for at least one replica", metricBumped);
-
         }
         cluster.coordinator(coordinator).execute(withKeyspace(query), 
ConsistencyLevel.QUORUM);
         assertTrue(cluster.get(2).logs().grep(mark, "Fetching log from 
/127.0.0.3:7012").getResult().size() > 0);
         long metricsAfter = cluster.get(2).callOnInstance(() -> 
TCMMetrics.instance.fetchedPeerLogEntries.getCount());
         assertTrue(metricsAfter > metricsBefore);
-
-        cluster.get(1).startup();
     }
 
     public void setupSchemaBehind(Cluster cluster)
     {
-        cluster.filters().reset();
-        cluster.filters().inbound().from(1).to(2).drop();
         long epochBefore = cluster.get(3).callOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch());
         cluster.coordinator(1).execute(withKeyspace("alter table %s.tbl with 
comment='test " + UUID.randomUUID() + "'"), ConsistencyLevel.ONE);
         cluster.get(3).runOnInstance(() -> {
@@ -128,6 +126,5 @@ public class FetchLogFromPeers2Test extends TestBaseImpl
                 throw new RuntimeException(e);
             }
         });
-        cluster.filters().reset();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to