This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 381cda94ad23982a5bce6b3524cb11a3078f8792
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Feb 5 09:06:26 2025 +0000

    Fix:
     - Even if we can decide autonomously that we took the fast path, we must 
still wait for earlier transactions to decide themselves
     - We must update a command that is in CFK.loadingPruned whether or not it 
is outOfRange
     - We must visit pruned commands that are transitive dependencies of RX to 
ensure dependencies are propagated
     - flagsWithoutDomainAndKind() -> flagsWithoutDomainOrKindOrCardinality() 
[to fix integration regression]
     - Don't invoke uniqueNow() twice when allocate nextTxnId
     - BTreeReducingRangeMap edge case that corrupts map when merge function 
does not return one of its inputs
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20292
---
 modules/accord                                     |  2 +-
 .../serializers/CommandsForKeySerializerTest.java  | 24 +++++++++++++---------
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/modules/accord b/modules/accord
index 96ec342423..344c20b6d2 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 96ec3424230bedf40b834785014a366716df8f42
+Subproject commit 344c20b6d22898ea2bf272c8f20a720bb9c35979
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index f8c85ad544..e6dfb59634 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -50,6 +50,7 @@ import accord.api.DataStore;
 import accord.api.Journal;
 import accord.api.Key;
 import accord.api.ProgressLog;
+import accord.api.ProtocolModifiers;
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.api.Timeouts;
@@ -85,6 +86,7 @@ import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
+import accord.primitives.Txn.Kind;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
 import accord.primitives.Writes;
@@ -111,6 +113,7 @@ import org.apache.cassandra.simulator.RandomSource.Choices;
 import org.apache.cassandra.utils.AccordGenerators;
 import org.apache.cassandra.utils.CassandraGenerators;
 
+import static 
accord.api.ProtocolModifiers.Toggles.setTransitiveDependenciesAreVisible;
 import static accord.local.cfk.CommandsForKey.NO_BOUNDS_INFO;
 import static accord.primitives.Known.KnownExecuteAt.ExecuteAtErased;
 import static accord.primitives.Known.KnownExecuteAt.ExecuteAtUnknown;
@@ -131,6 +134,7 @@ public class CommandsForKeySerializerTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1),
                                     parse("CREATE TABLE tbl (k int, c int, v 
int, primary key (k, c)) WITH transactional_mode='full'", "ks"));
+        setTransitiveDependenciesAreVisible(Kind.values());
         StorageService.instance.initServer();
     }
 
@@ -195,7 +199,7 @@ public class CommandsForKeySerializerTest
 
             if (saveStatus.known.outcome() == Known.Outcome.Apply)
             {
-                if (txnId.is(Txn.Kind.Write))
+                if (txnId.is(Kind.Write))
                     builder.writes(new Writes(txnId, executeAt, txn.keys(), 
new TxnWrite(Collections.emptyList(), true)));
                 builder.result(new TxnData());
             }
@@ -386,7 +390,7 @@ public class CommandsForKeySerializerTest
         return new ObjectGraph(cmds);
     }
 
-    private static Function<Timestamp, TxnId> txnIdSupplier(LongUnaryOperator 
epochSupplier, LongUnaryOperator hlcSupplier, Supplier<Txn.Kind> kindSupplier, 
Supplier<Node.Id> idSupplier)
+    private static Function<Timestamp, TxnId> txnIdSupplier(LongUnaryOperator 
epochSupplier, LongUnaryOperator hlcSupplier, Supplier<Kind> kindSupplier, 
Supplier<Node.Id> idSupplier)
     {
         return min -> new TxnId(epochSupplier.applyAsLong(min == null ? 1 : 
min.epoch()), hlcSupplier.applyAsLong(min == null ? 1 : min.hlc() + 1), 
kindSupplier.get(), Routable.Domain.Key, idSupplier.get());
     }
@@ -449,12 +453,12 @@ public class CommandsForKeySerializerTest
                 idSupplier = () -> lookup.computeIfAbsent(maxId == 1 ? 1 : 
source.nextInt(1, maxId), Node.Id::new);
             }
             final IntSupplier flagSupplier = () -> 0;
-            final Supplier<Txn.Kind> kindSupplier = () -> {
+            final Supplier<Kind> kindSupplier = () -> {
                 float v = source.nextFloat();
-                if (v < 0.5) return Txn.Kind.Read;
-                if (v < 0.95) return Txn.Kind.Write;
-                if (v < 0.97) return Txn.Kind.SyncPoint;
-                return Txn.Kind.ExclusiveSyncPoint;
+                if (v < 0.5) return Kind.Read;
+                if (v < 0.95) return Kind.Write;
+                if (v < 0.97) return Kind.SyncPoint;
+                return Kind.ExclusiveSyncPoint;
             };
 
             boolean permitMissing = source.decide(0.75f);
@@ -663,7 +667,7 @@ public class CommandsForKeySerializerTest
         @Override public int cfkPruneInterval() { return 0; }
         @Override public long maxConflictsHlcPruneDelta() { return 0; }
         @Override public long maxConflictsPruneInterval() { return 0; }
-        @Override public Txn emptySystemTxn(Txn.Kind kind, Routable.Domain 
domain) { throw new UnsupportedOperationException(); }
+        @Override public Txn emptySystemTxn(Kind kind, Routable.Domain domain) 
{ throw new UnsupportedOperationException(); }
         @Override public long attemptCoordinationDelay(Node node, 
SafeCommandStore safeStore, TxnId txnId, TimeUnit units, int retryCount) { 
return 0; }
         @Override public long seekProgressDelay(Node node, SafeCommandStore 
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil blockedUntil, 
TimeUnit units) { return 0; }
         @Override public long retryAwaitTimeout(Node node, SafeCommandStore 
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil retrying, 
TimeUnit units) { return 0; }
@@ -698,8 +702,8 @@ public class CommandsForKeySerializerTest
             @Override public long now() { return 0; }
             @Override public long elapsed(TimeUnit unit) { return 0; }
         }; }
-        @Override public boolean visit(Unseekables<?> keysOrRanges, TxnId 
testTxnId, Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, Timestamp 
testStartAtTimestamp, ComputeIsDep computeIsDep, AllCommandVisitor visit) { 
return false; }
-        @Override public <P1, P2> void visit(Unseekables<?> keysOrRanges, 
Timestamp startedBefore, Txn.Kind.Kinds testKind, ActiveCommandVisitor<P1, P2> 
visit, P1 p1, P2 p2) { }
+        @Override public boolean visit(Unseekables<?> keysOrRanges, TxnId 
testTxnId, Kind.Kinds testKind, TestStartedAt testStartedAt, Timestamp 
testStartAtTimestamp, ComputeIsDep computeIsDep, AllCommandVisitor visit) { 
return false; }
+        @Override public <P1, P2> void visit(Unseekables<?> keysOrRanges, 
Timestamp startedBefore, Kind.Kinds testKind, ActiveCommandVisitor<P1, P2> 
visit, P1 p1, P2 p2) { }
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to