This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9ac60369a4cdee14851000c72bdd27dce1cb8a37 Author: Alex Petrov <[email protected]> AuthorDate: Wed Jan 15 12:01:59 2025 +0100 Fix serialization order for topology updates Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-20219. --- .../cassandra/service/accord/AccordJournal.java | 4 +- .../service/accord/AccordSegmentCompactor.java | 17 ++++- .../cassandra/service/accord/JournalKey.java | 1 + .../apache/cassandra/harry/gen/EntropySource.java | 10 +++ .../service/accord/AccordJournalTest.java | 2 +- .../serializers/JournalKeySerializerTest.java | 72 ++++++++++++++++++++++ 6 files changed, 102 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 7f1446907f..b3a05ed011 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -287,14 +287,14 @@ public class AccordJournal implements accord.api.Journal, Shutdownable @Override public Iterator<TopologyUpdate> replayTopologies() { - AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.TOPOLOGY_UPDATE, -1), false); + AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.TOPOLOGY_UPDATE, 0), false); return accumulator.get().values().iterator(); } @Override public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { - RecordPointer pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.TOPOLOGY_UPDATE, -1), topologyUpdate); + RecordPointer pointer = appendInternal(new JournalKey(TxnId.NONE, JournalKey.Type.TOPOLOGY_UPDATE, 0), topologyUpdate); if (onFlush != null) journal.onDurable(pointer, onFlush); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java index 930afee54f..c581f59d34 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import accord.utils.Invariants; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder; @@ -144,11 +145,25 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V } } + private JournalKey prevKey; + private DecoratedKey prevDecoratedKey; + private void maybeWritePartition(SSTableTxnWriter writer, JournalKey key, Object builder, FlyweightSerializer<Object, Object> serializer, long descriptor, int offset) throws IOException { if (builder != null) { - SimpleBuilder partitionBuilder = PartitionUpdate.simpleBuilder(cfs.metadata(), AccordKeyspace.JournalColumns.decorate(key)); + DecoratedKey decoratedKey = AccordKeyspace.JournalColumns.decorate(key); + + if (prevKey != null) + { + Invariants.checkArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : -1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1), + String.format("Partition key and JournalKey didn't have matching order, which may imply a serialization issue.\n%s (%s)\n%s (%s)", + key, decoratedKey, prevKey, prevDecoratedKey)); + } + prevKey = key; + prevDecoratedKey = decoratedKey; + + SimpleBuilder partitionBuilder = PartitionUpdate.simpleBuilder(cfs.metadata(), decoratedKey); try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get()) { serializer.reserialize(key, builder, out, userVersion); diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java b/src/java/org/apache/cassandra/service/accord/JournalKey.java index 68ab0a1c04..f11154ff83 100644 --- a/src/java/org/apache/cassandra/service/accord/JournalKey.java +++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java @@ -52,6 +52,7 @@ public final class JournalKey public JournalKey(TxnId id, Type type, int commandStoreId) { + Invariants.checkArgument(commandStoreId >= 0); Invariants.checkState((id.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) == 0); Invariants.nonNull(type); Invariants.nonNull(id); diff --git a/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java b/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java index 0f9308516f..f72b611937 100644 --- a/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java +++ b/test/harry/main/org/apache/cassandra/harry/gen/EntropySource.java @@ -39,13 +39,23 @@ public interface EntropySource EntropySource derive(); int nextInt(); + + /** + * Generates a long in range [0, max). + */ int nextInt(int max); + + /** + * Generates a long in range [min, max). + */ int nextInt(int min, int max); float nextFloat(); double nextDouble(); /** * Code is adopted from a similar method in JDK 17, and has to be removed as soon as we migrate to JDK 17. + * + * Generates a long in range [min, max). */ default long nextLong(long min, long max) { long ret = next(); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java b/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java index c7b9a05c8c..58f4e1b1c8 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalTest.java @@ -140,6 +140,6 @@ public class AccordJournalTest private Gen<JournalKey> keyGen() { Gen<TxnId> txnIdGen = AccordGens.txnIds(); - return rs -> new JournalKey(txnIdGen.next(rs), JournalKey.Type.COMMAND_DIFF, -1); + return rs -> new JournalKey(txnIdGen.next(rs), JournalKey.Type.COMMAND_DIFF, 0); } } diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/JournalKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/JournalKeySerializerTest.java new file mode 100644 index 0000000000..b36a5c74d5 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/serializers/JournalKeySerializerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord.serializers; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import accord.local.Node; +import accord.primitives.Routable; +import accord.primitives.Timestamp; +import accord.primitives.Txn; +import accord.primitives.TxnId; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.harry.dsl.TestRunner; +import org.apache.cassandra.harry.gen.Generator; +import org.apache.cassandra.harry.gen.Generators; +import org.apache.cassandra.service.accord.AccordKeyspace; +import org.apache.cassandra.service.accord.JournalKey; + +public class JournalKeySerializerTest +{ + @BeforeClass + public static void setUp() + { + DatabaseDescriptor.daemonInitialization(); + ServerTestUtils.prepareServer(); + } + + @Test + public void testOrder() + { + Node.Id node = new Node.Id(1); + Generator<Txn.Kind> kindGen = Generators.enumValues(Txn.Kind.class); + Generator<Routable.Domain> domainGen = Generators.enumValues(Routable.Domain.class); + Generator<JournalKey.Type> keyTypeGen = Generators.enumValues(JournalKey.Type.class); + + Generator<JournalKey> keyGen = rng -> { + TxnId txnId = new TxnId(rng.nextLong(0, Timestamp.MAX_EPOCH + 1), + rng.nextLong(0, Long.MAX_VALUE), + kindGen.generate(rng), + domainGen.generate(rng), + node); + return new JournalKey(txnId, keyTypeGen.generate(rng), rng.nextInt(100)); + }; + TestRunner.test(keyGen, keyGen, (key1, key2) -> { + DecoratedKey dk1 = AccordKeyspace.JournalColumns.decorate(key1); + DecoratedKey dk2 = AccordKeyspace.JournalColumns.decorate(key2); + Assert.assertEquals(String.format("Sort mismatch for\n%s (%s) \n%s (%s) ", key1, dk1, key2, dk2), + dk1.compareTo(dk2) >= 0 ? 1 : -1, + JournalKey.SUPPORT.compare(key1, key2) >= 0 ? 1 : -1); + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
