This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9ac60369a4cdee14851000c72bdd27dce1cb8a37
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Jan 15 12:01:59 2025 +0100

    Fix serialization order for topology updates
    
    Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-20219.
---
 .../cassandra/service/accord/AccordJournal.java    |  4 +-
 .../service/accord/AccordSegmentCompactor.java     | 17 ++++-
 .../cassandra/service/accord/JournalKey.java       |  1 +
 .../apache/cassandra/harry/gen/EntropySource.java  | 10 +++
 .../service/accord/AccordJournalTest.java          |  2 +-
 .../serializers/JournalKeySerializerTest.java      | 72 ++++++++++++++++++++++
 6 files changed, 102 insertions(+), 4 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 7f1446907f..b3a05ed011 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -287,14 +287,14 @@ public class AccordJournal implements accord.api.Journal, 
Shutdownable
     @Override
     public Iterator<TopologyUpdate> replayTopologies()
     {
-        AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate> 
accumulator = readAll(new JournalKey(TxnId.NONE, 
JournalKey.Type.TOPOLOGY_UPDATE, -1), false);
+        AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate> 
accumulator = readAll(new JournalKey(TxnId.NONE, 
JournalKey.Type.TOPOLOGY_UPDATE, 0), false);
         return accumulator.get().values().iterator();
     }
 
     @Override
     public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
     {
-        RecordPointer pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.TOPOLOGY_UPDATE, -1), topologyUpdate);
+        RecordPointer pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.TOPOLOGY_UPDATE, 0), topologyUpdate);
         if (onFlush != null)
             journal.onDurable(pointer, onFlush);
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java 
b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
index 930afee54f..c581f59d34 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import accord.utils.Invariants;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder;
@@ -144,11 +145,25 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
         }
     }
 
+    private JournalKey prevKey;
+    private DecoratedKey prevDecoratedKey;
+
     private void maybeWritePartition(SSTableTxnWriter writer, JournalKey key, 
Object builder, FlyweightSerializer<Object, Object> serializer, long 
descriptor, int offset) throws IOException
     {
         if (builder != null)
         {
-            SimpleBuilder partitionBuilder = 
PartitionUpdate.simpleBuilder(cfs.metadata(), 
AccordKeyspace.JournalColumns.decorate(key));
+            DecoratedKey decoratedKey = 
AccordKeyspace.JournalColumns.decorate(key);
+
+            if (prevKey != null)
+            {
+                
Invariants.checkArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : 
-1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1),
+                                         String.format("Partition key and 
JournalKey didn't have matching order, which may imply a serialization 
issue.\n%s (%s)\n%s (%s)",
+                                                       key, decoratedKey, 
prevKey, prevDecoratedKey));
+            }
+            prevKey = key;
+            prevDecoratedKey = decoratedKey;
+
+            SimpleBuilder partitionBuilder = 
PartitionUpdate.simpleBuilder(cfs.metadata(), decoratedKey);
             try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
             {
                 serializer.reserialize(key, builder, out, userVersion);
diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java 
b/src/java/org/apache/cassandra/service/accord/JournalKey.java
index 68ab0a1c04..f11154ff83 100644
--- a/src/java/org/apache/cassandra/service/accord/JournalKey.java
+++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java
@@ -52,6 +52,7 @@ public final class JournalKey
 
     public JournalKey(TxnId id, Type type, int commandStoreId)
     {
+        Invariants.checkArgument(commandStoreId >= 0);
         Invariants.checkState((id.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) == 
0);
         Invariants.nonNull(type);
         Invariants.nonNull(id);
diff --git a/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java 
b/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java
index 0f9308516f..f72b611937 100644
--- a/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java
+++ b/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java
@@ -39,13 +39,23 @@ public interface EntropySource
     EntropySource derive();
 
     int nextInt();
+
+    /**
+     * Generates a long in range [0, max).
+     */
     int nextInt(int max);
+
+    /**
+     * Generates a long in range [min, max).
+     */
     int nextInt(int min, int max);
     float nextFloat();
     double nextDouble();
 
     /**
      * Code is adopted from a similar method in JDK 17, and has to be removed 
as soon as we migrate to JDK 17.
+     *
+     * Generates a long in range [min, max).
      */
     default long nextLong(long min, long max) {
         long ret = next();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java
index c7b9a05c8c..58f4e1b1c8 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java
@@ -140,6 +140,6 @@ public class AccordJournalTest
     private Gen<JournalKey> keyGen()
     {
         Gen<TxnId> txnIdGen = AccordGens.txnIds();
-        return rs -> new JournalKey(txnIdGen.next(rs), 
JournalKey.Type.COMMAND_DIFF, -1);
+        return rs -> new JournalKey(txnIdGen.next(rs), 
JournalKey.Type.COMMAND_DIFF, 0);
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/JournalKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/JournalKeySerializerTest.java
new file mode 100644
index 0000000000..b36a5c74d5
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/JournalKeySerializerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.serializers;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import accord.local.Node;
+import accord.primitives.Routable;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.harry.dsl.TestRunner;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.Generators;
+import org.apache.cassandra.service.accord.AccordKeyspace;
+import org.apache.cassandra.service.accord.JournalKey;
+
+public class JournalKeySerializerTest
+{
+    @BeforeClass
+    public static void setUp()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        ServerTestUtils.prepareServer();
+    }
+
+    @Test
+    public void testOrder()
+    {
+        Node.Id node = new Node.Id(1);
+        Generator<Txn.Kind> kindGen = Generators.enumValues(Txn.Kind.class);
+        Generator<Routable.Domain> domainGen = 
Generators.enumValues(Routable.Domain.class);
+        Generator<JournalKey.Type> keyTypeGen = 
Generators.enumValues(JournalKey.Type.class);
+
+        Generator<JournalKey> keyGen = rng -> {
+            TxnId txnId = new TxnId(rng.nextLong(0, Timestamp.MAX_EPOCH + 1),
+                                    rng.nextLong(0, Long.MAX_VALUE),
+                                    kindGen.generate(rng),
+                                    domainGen.generate(rng),
+                                    node);
+            return new JournalKey(txnId, keyTypeGen.generate(rng), 
rng.nextInt(100));
+        };
+        TestRunner.test(keyGen, keyGen, (key1, key2) -> {
+            DecoratedKey dk1 = AccordKeyspace.JournalColumns.decorate(key1);
+            DecoratedKey dk2 = AccordKeyspace.JournalColumns.decorate(key2);
+            Assert.assertEquals(String.format("Sort mismatch for\n%s (%s) \n%s 
(%s) ", key1, dk1, key2, dk2),
+                                dk1.compareTo(dk2) >= 0 ? 1 : -1,
+                                JournalKey.SUPPORT.compare(key1, key2) >= 0 ? 
1 : -1);
+        });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to