This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 8d112e8bf5 Accord: test fixes and stability improvements   * Fix short 
accord simulation test (seed 0x6bea128ae851724b), 
ConcurrentModificationException   * Increase wait time during closing to avoid 
Unterminated threads   * Increase timeouts, improve test stability   * More 
descriptive output from CQL test   * Shorten max CMS delay   * Improve future 
handling in config service
8d112e8bf5 is described below

commit 8d112e8bf5375e57692009728ba15b63d564bd1b
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Mar 11 16:22:24 2025 +0100

    Accord: test fixes and stability improvements
      * Fix short accord simulation test (seed 0x6bea128ae851724b), 
ConcurrentModificationException
      * Increase wait time during closing to avoid Unterminated threads
      * Increase timeouts, improve test stability
      * More descriptive output from CQL test
      * Shorten max CMS delay
      * Improve future handling in config service
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20440
---
 src/java/org/apache/cassandra/config/Config.java   |  2 +-
 .../service/accord/AccordConfigurationService.java | 45 ++++++++--------
 .../service/accord/AccordVerbHandler.java          | 35 +++---------
 .../service/accord/CommandsForRanges.java          | 63 +++++++++++++++-------
 .../distributed/impl/AbstractCluster.java          |  2 +-
 .../test/accord/AccordHostReplacementTest.java     |  2 +
 .../accord/AccordMigrationReadRaceTestBase.java    |  5 +-
 .../sstable/CQLSSTableWriterConcurrencyTest.java   | 13 ++++-
 8 files changed, 91 insertions(+), 76 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 0e7189a385..5d0256795e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -185,7 +185,7 @@ public class Config
     public volatile DurationSpec.IntMillisecondsBound 
cms_default_retry_backoff = null;
     @Deprecated(since="5.1")
     public volatile DurationSpec.IntMillisecondsBound 
cms_default_max_retry_backoff = null;
-    public String cms_retry_delay = "0 <= 50ms*1*attempts <= 10s,retries=10";
+    public String cms_retry_delay = "0 <= 50ms*1*attempts <= 1s,retries=10";
 
     /**
      * How often we should snapshot the cluster metadata.
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 0fbeb7d190..7b1e36f8f1 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.utils.Simulate;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
-import static accord.topology.TopologyManager.TopologyRange;
 import static org.apache.cassandra.service.accord.AccordTopology.tcmIdToAccord;
 import static org.apache.cassandra.utils.Simulate.With.MONITORS;
 
@@ -399,32 +398,30 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
                 return;
             }
 
-            try
+            Set<InetAddressAndPort> peers = new 
HashSet<>(metadata.directory.allJoinedEndpoints());
+            peers.remove(FBUtilities.getBroadcastAddressAndPort());
+            if (peers.isEmpty())
             {
-                Set<InetAddressAndPort> peers = new 
HashSet<>(metadata.directory.allJoinedEndpoints());
-                peers.remove(FBUtilities.getBroadcastAddressAndPort());
-                if (peers.isEmpty())
-                {
-                    onResult.accept(Success, null);
-                    return;
-                }
-
-                // Fetching only one epoch here since later epochs might have 
already been requested concurrently
-                TopologyRange result = 
FetchTopologies.fetch(SharedContext.Global.instance, peers, epoch, epoch).get();
-                result.forEach(this::reportTopology, epoch, 1);
                 onResult.accept(Success, null);
+                return;
             }
-            catch (Throwable e)
-            {
-                if (currentEpoch() >= epoch)
-                {
-                    onResult.accept(Success, null);
-                    return;
-                }
-                if (e instanceof InterruptedException)
-                    Thread.currentThread().interrupt();
-                onResult.accept(null, e);
-            }
+
+            // Fetching only one epoch here since later epochs might have 
already been requested concurrently
+            FetchTopologies.fetch(SharedContext.Global.instance, peers, epoch, 
epoch)
+                           .addCallback((topologyRange, t) -> {
+                               if (t != null)
+                               {
+                                   if (currentEpoch() >= epoch)
+                                       onResult.accept(Success, null);
+                                   else
+                                       onResult.accept(null, t);
+                               }
+                               else
+                               {
+                                   topologyRange.forEach(this::reportTopology, 
epoch, 1);
+                                   onResult.accept(Success, null);
+                               }
+                           });
         });
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java 
b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
index 61f18867da..12fd39a135 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
@@ -27,9 +27,6 @@ import accord.local.Node;
 import accord.messages.Request;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.ClusterMetadataService;
-import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 public class AccordVerbHandler<T extends Request> implements IVerbHandler<T>
@@ -66,35 +63,15 @@ public class AccordVerbHandler<T extends Request> 
implements IVerbHandler<T>
          */
         Node.Id fromNodeId = endpointMapper.mappedId(message.from());
         long waitForEpoch = request.waitForEpoch();
-        ClusterMetadata cm = ClusterMetadata.current();
-        boolean cmUpToDate = ClusterMetadata.current().epoch.getEpoch() >= 
waitForEpoch;
-        if (node.topology().hasAtLeastEpoch(waitForEpoch) && cmUpToDate)
+        if (node.topology().hasAtLeastEpoch(waitForEpoch))
             request.process(node, fromNodeId, message.header);
         else
         {
-            // TODO (required): review this claim. Downstream from 
`withEpoch`, we do call fetch log, albeit from _CMS_, since we
-            //                  do not know the peer there.
-            // withEpoch does not reliably ensure that TCM is up to date, if 
Accord has the topology it won't
-            // wait for TCM to come up to date, so do it here in the verb 
handler
-            if (!cmUpToDate)
-            {
-                
ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(cm, 
message.from(), Epoch.create(waitForEpoch))
-                                      .addCallback((success, failure) -> {
-                                          node.withEpoch(waitForEpoch, 
(ignored, withEpochFailure) -> {
-                                              if (withEpochFailure != null)
-                                                  throw new 
RuntimeException("Timed out waiting for epoch when processing message from " + 
fromNodeId + " to " + node + " message " + message, withEpochFailure);
-                                              request.process(node, 
fromNodeId, message.header);
-                                          });
-                                      });
-            }
-            else
-            {
-                node.withEpoch(waitForEpoch, (ignored, withEpochFailure) -> {
-                    if (withEpochFailure != null)
-                        throw new RuntimeException("Timed out waiting for 
epoch when processing message from " + fromNodeId + " to " + node + " message " 
+ message, withEpochFailure);
-                    request.process(node, fromNodeId, message.header);
-                });
-            }
+            node.withEpoch(waitForEpoch, (ignored, withEpochFailure) -> {
+                if (withEpochFailure != null)
+                    throw new RuntimeException("Timed out waiting for epoch 
when processing message from " + fromNodeId + " to " + node + " message " + 
message, withEpochFailure);
+                request.process(node, fromNodeId, message.header);
+            });
         }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index 9e80429875..ffd1754d2c 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -22,8 +22,10 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.UnaryOperator;
 import javax.annotation.Nullable;
 
 import accord.local.Command;
@@ -62,7 +64,7 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
Summary> implements Co
     {
         private final AccordCommandStore commandStore;
         private final RangeSearcher searcher;
-        private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+        private AtomicReference<NavigableMap<TxnId, Ranges>> transitive = new 
AtomicReference<>(new TreeMap<>());
         private final ObjectHashSet<TxnId> cachedRangeTxns = new 
ObjectHashSet<>();
 
         public Manager(AccordCommandStore commandStore)
@@ -102,22 +104,46 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
Summary> implements Co
             return new Loader(this, searchKeysOrRanges, redundantBefore, 
testKind, minTxnId, maxTxnId, findAsDep);
         }
 
+        private void updateTransitive(UnaryOperator<NavigableMap<TxnId, 
Ranges>> update)
+        {
+            while (true)
+            {
+                NavigableMap<TxnId, Ranges> prev = transitive.get();
+                NavigableMap<TxnId, Ranges> next = update.apply(prev);
+                if (next == null || prev == next)
+                    return;
+                if (transitive.compareAndSet(prev, next))
+                    return;
+            }
+        }
+
         public void mergeTransitive(TxnId txnId, Ranges ranges, BiFunction<? 
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
         {
-            transitive.merge(txnId, ranges, remappingFunction);
+            updateTransitive(transitive -> {
+                NavigableMap<TxnId, Ranges> next = new TreeMap<>(transitive);
+                next.merge(txnId, ranges, remappingFunction);
+                return next;
+            });
         }
 
         public void gcBefore(TxnId gcBefore, Ranges ranges)
         {
-            Iterator<Map.Entry<TxnId, Ranges>> iterator = 
transitive.headMap(gcBefore).entrySet().iterator();
-            while (iterator.hasNext())
-            {
-                Map.Entry<TxnId, Ranges> e = iterator.next();
-                Ranges newRanges = e.getValue().without(ranges);
-                if (newRanges.isEmpty())
-                    iterator.remove();
-                e.setValue(newRanges);
-            }
+            updateTransitive(transitive -> {
+                NavigableMap<TxnId, Ranges> next = null;
+                Iterator<Map.Entry<TxnId, Ranges>> iterator = 
transitive.headMap(gcBefore).entrySet().iterator();
+                while (iterator.hasNext())
+                {
+                    Map.Entry<TxnId, Ranges> e = iterator.next();
+                    Ranges newRanges = e.getValue().without(ranges);
+                    if (!newRanges.isEmpty())
+                    {
+                        if (next == null)
+                            next = new TreeMap<>();
+                        next.put(e.getKey(), newRanges);
+                    }
+                }
+                return next;
+            });
         }
     }
 
@@ -144,13 +170,14 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
Summary> implements Co
                         manager.searcher.search(manager.commandStore.id(), 
(TokenKey) key, minTxnId, maxTxnId).consume(forEach);
             }
 
-            if (!manager.transitive.isEmpty())
+            NavigableMap<TxnId, Ranges> transitive = manager.transitive.get();
+            if (!transitive.isEmpty())
             {
-                for (Map.Entry<TxnId, Ranges> e : 
manager.transitive.tailMap(minTxnId, true).entrySet())
-                {
-                    if (e.getValue().intersects(searchKeysOrRanges))
-                        forEach.accept(e.getKey());
-                }
+                    for (Map.Entry<TxnId, Ranges> e : 
transitive.tailMap(minTxnId, true).entrySet())
+                    {
+                        if (e.getValue().intersects(searchKeysOrRanges))
+                            forEach.accept(e.getKey());
+                    }
             }
         }
 
@@ -180,7 +207,7 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
Summary> implements Co
                     return ifRelevant(cmd);
             }
 
-            Ranges ranges = manager.transitive.get(txnId);
+            Ranges ranges = manager.transitive.get().get(txnId);
             if (ranges == null)
                 return null;
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index d10a75e4b6..20efdf88e7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1149,7 +1149,7 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
                            .collect(Collectors.toList());
         try
         {
-            FBUtilities.waitOnFutures(futures, 1L, TimeUnit.MINUTES);
+            FBUtilities.waitOnFutures(futures, instances.size(), 
TimeUnit.MINUTES);
         }
         catch (Throwable t)
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
index 702d5f0707..86c9a72ecb 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
@@ -55,6 +55,8 @@ public class AccordHostReplacementTest extends TestBaseImpl
         Cluster.Builder clusterBuilder = Cluster.build(3)
                                                 .withConfig(c -> 
c.with(Feature.values())
                                                                   
.set("accord.command_store_shard_count", "1")
+                                                                  
.set("write_request_timeout", "10s")
+                                                                  
.set("read_request_timeout", "10s")
                                                                   
.set("accord.queue_shard_count", "1")
                                                 );
         TokenSupplier tokenRing = TokenSupplier.evenlyDistributedTokens(3, 
clusterBuilder.getTokenCount());
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
index 42e5bb9b8d..a4931c6ab5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
@@ -169,8 +169,9 @@ public abstract class AccordMigrationReadRaceTestBase 
extends AccordTestBase
         // Otherwise repair complains if you don't specify a keyspace
         CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.setInt(3);
         AccordTestBase.setupCluster(builder -> builder.appendConfig(config -> 
config.set("paxos_variant", PaxosVariant.v2.name())
-                                                                               
     .set("read_request_timeout", "2s")
-                                                                               
     .set("range_request_timeout", "2s")
+                                                                               
     .set("read_request_timeout", "10s")
+                                                                               
     .set("range_request_timeout", "10s")
+                                                                               
     .set("write_request_timeout", "10s")
                                                                                
     .set("accord.range_migration", "explicit")), 3);
         partitioner = 
FBUtilities.newPartitioner(SHARED_CLUSTER.get(1).callsOnInstance(() -> 
DatabaseDescriptor.getPartitioner().getClass().getSimpleName()).call());
         StorageService.instance.setPartitionerUnsafe(partitioner);
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
index 3d69cdad5a..54d8cec32c 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -36,6 +37,7 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.schema.Schema;
+import org.assertj.core.description.Description;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -70,6 +72,7 @@ public class CQLSSTableWriterConcurrencyTest extends CQLTester
         File[] dataDirs = new File[nThreads];
         String baseDataDir = tempFolder.newFolder().getAbsolutePath();
 
+        AtomicReference<String> errors = new AtomicReference<>("");
         for (int i = 0; i < nThreads; i++)
         {
             tableNames[i] = String.format("table_%02d", i);
@@ -113,6 +116,7 @@ public class CQLSSTableWriterConcurrencyTest extends 
CQLTester
                 catch (Throwable throwable)
                 {
                     LOGGER.error("Error while processing element number {}", 
finalI, throwable);
+                    errors.updateAndGet(s -> s + "\n" + 
throwable.getMessage());
                     errorCount.incrementAndGet();
                 }
             });
@@ -123,6 +127,13 @@ public class CQLSSTableWriterConcurrencyTest extends 
CQLTester
         {
             LOGGER.warn("Unable to close executor pool after 1 minute");
         }
-        assertThat(errorCount.get()).isEqualTo(0);
+        int count = errorCount.get();
+        assertThat(count).isEqualTo(0).describedAs(new Description()
+        {
+            public String value()
+            {
+                return String.format("Caught %d errors: %s", count, 
errors.get());
+            }
+        });
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to