This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 58c5aec321 Fix null field accounting
58c5aec321 is described below

commit 58c5aec3218972593bc069798f64ddbefb3419b8
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Feb 5 20:34:44 2025 +0100

    Fix null field accounting
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20297
---
 .gitmodules                                        |  4 +-
 modules/accord                                     |  2 +-
 src/java/org/apache/cassandra/journal/Journal.java |  8 ++-
 .../cassandra/service/accord/AccordJournal.java    | 65 +++++++++++-----------
 .../service/accord/AccordJournalTable.java         | 24 ++++++--
 .../service/accord/AccordJournalBurnTest.java      |  2 -
 .../accord/SimulatedAccordCommandStore.java        |  6 +-
 7 files changed, 63 insertions(+), 48 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 616dacf610..22d4a4fb41 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/apache/cassandra-accord.git
-       branch = trunk
+       url = https://github.com/ifesdjeen/cassandra-accord.git
+       branch = CASSANDRA-20297
diff --git a/modules/accord b/modules/accord
index 9328b83f62..a2ac02b4d2 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 9328b83f623a7877249b5fb18fe49e2e78e8c3a9
+Subproject commit a2ac02b4d20bb6ab9078e70a0a43c17c00f4b0fc
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index 6f0fdabfd3..08626b441f 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -336,13 +336,15 @@ public class Journal<K, V> implements Shutdownable
         return null;
     }
 
-    public void readAll(K id, RecordConsumer<K> consumer, boolean asc)
+    public void readAll(K id, RecordConsumer<K> consumer)
     {
         EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
         try (OpOrder.Group group = readOrder.start())
         {
-            for (Segment<K, V> segment : segments.get().allSorted(asc))
+            for (Segment<K, V> segment : segments.get().allSorted(false))
+            {
                 segment.readAll(id, holder, consumer);
+            }
         }
     }
 
@@ -360,7 +362,7 @@ public class Journal<K, V> implements Shutdownable
                 // can only throw if serializer is buggy
                 throw new RuntimeException(e);
             }
-        }, false);
+        });
         return res;
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index fd435a90d7..9a266788e2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -82,7 +82,7 @@ import static accord.impl.CommandChange.isChanged;
 import static accord.impl.CommandChange.isNull;
 import static accord.impl.CommandChange.nextSetField;
 import static accord.impl.CommandChange.setChanged;
-import static accord.impl.CommandChange.setFieldIsNull;
+import static accord.impl.CommandChange.setFieldIsNullAndChanged;
 import static accord.impl.CommandChange.toIterableSetFields;
 import static accord.impl.CommandChange.unsetIterable;
 import static accord.impl.CommandChange.validateFlags;
@@ -254,35 +254,35 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     }
 
     @Override
-    public RedundantBefore loadRedundantBefore(int store)
+    public RedundantBefore loadRedundantBefore(int commandStoreId)
     {
-        IdentityAccumulator<RedundantBefore> accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, store), false);
+        IdentityAccumulator<RedundantBefore> accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
         return accumulator.get();
     }
 
     @Override
-    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store)
+    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
     {
-        IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator = 
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), 
false);
+        IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator = 
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, 
commandStoreId));
         return accumulator.get();
     }
 
     @Override
-    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int store)
+    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
     {
-        IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator = 
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, store), false);
+        IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator = 
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, 
commandStoreId));
         return accumulator.get();
     }
 
     @Override
-    public CommandStores.RangesForEpoch loadRangesForEpoch(int store)
+    public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
     {
-        IdentityAccumulator<RangesForEpoch> accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, store), false);
+        IdentityAccumulator<RangesForEpoch> accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
         return accumulator.get();
     }
 
     @Override
-    public void saveCommand(int store, CommandUpdate update, @Nullable 
Runnable onFlush)
+    public void saveCommand(int commandStoreId, CommandUpdate update, 
@Nullable Runnable onFlush)
     {
         Writer diff = Writer.make(update.before, update.after);
         if (diff == null)
@@ -292,7 +292,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             return;
         }
 
-        JournalKey key = new JournalKey(update.txnId, 
JournalKey.Type.COMMAND_DIFF, store);
+        JournalKey key = new JournalKey(update.txnId, 
JournalKey.Type.COMMAND_DIFF, commandStoreId);
         RecordPointer pointer = journal.asyncWrite(key, diff);
         if (journalTable.shouldIndex(key)
             && diff.hasParticipants()
@@ -307,7 +307,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     @Override
     public Iterator<TopologyUpdate> replayTopologies()
     {
-        AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate> 
accumulator = readAll(new JournalKey(TxnId.NONE, 
JournalKey.Type.TOPOLOGY_UPDATE, 0), false);
+        AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate> 
accumulator = readAll(new JournalKey(TxnId.NONE, 
JournalKey.Type.TOPOLOGY_UPDATE, 0));
         return accumulator.get().values().iterator();
     }
 
@@ -338,25 +338,25 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             @Override
             public DurableBefore load()
             {
-                DurableBeforeAccumulator accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0), false);
+                DurableBeforeAccumulator accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0));
                 return accumulator.get();
             }
         };
     }
 
     @Override
-    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    public void saveStoreState(int commandStoreId, FieldUpdates fieldUpdates, 
Runnable onFlush)
     {
         RecordPointer pointer = null;
         // TODO: avoid allocating keys
         if (fieldUpdates.newRedundantBefore != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.newRedundantBefore);
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.REDUNDANT_BEFORE, commandStoreId), 
fieldUpdates.newRedundantBefore);
         if (fieldUpdates.newBootstrapBeganAt != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.newBootstrapBeganAt);
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId), 
fieldUpdates.newBootstrapBeganAt);
         if (fieldUpdates.newSafeToRead != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.newSafeToRead);
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.SAFE_TO_READ, commandStoreId), fieldUpdates.newSafeToRead);
         if (fieldUpdates.newRangesForEpoch != null)
-            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.newRangesForEpoch);
+            pointer = appendInternal(new JournalKey(TxnId.NONE, 
JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId), 
fieldUpdates.newRangesForEpoch);
 
         if (onFlush == null)
             return;
@@ -371,7 +371,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     {
         JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, 
commandStoreId);
         Builder builder = new Builder(txnId, load);
-        journalTable.readAll(key, builder::deserializeNext, false);
+        journalTable.readAll(key, builder::deserializeNext);
         return builder;
     }
 
@@ -381,13 +381,13 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         return loadDiffs(commandStoreId, txnId, Load.ALL);
     }
 
-    private <BUILDER> BUILDER readAll(JournalKey key, boolean asc)
+    private <BUILDER> BUILDER readAll(JournalKey key)
     {
         BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key);
         // TODO: this can be further improved to avoid allocating lambdas
         AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> 
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) 
key.type.serializer;
         // TODO (expected): for those where we store an image, read only the 
first entry we find in DESC order
-        journalTable.readAll(key, (in, userVersion) -> 
serializer.deserialize(key, builder, in, userVersion), asc);
+        journalTable.readAll(key, (in, userVersion) -> 
serializer.deserialize(key, builder, in, userVersion));
         return builder;
     }
 
@@ -544,6 +544,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                     continue;
                 }
 
+
                 switch (field)
                 {
                     case EXECUTE_AT:
@@ -648,31 +649,33 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         public void deserializeNext(DataInputPlus in, int userVersion) throws 
IOException
         {
             Invariants.require(txnId != null);
-            int flags = in.readInt();
-            Invariants.require(flags != 0);
+            int readFlags = in.readInt();
+            Invariants.require(readFlags != 0);
             nextCalled = true;
             count++;
 
-            int iterable = toIterableSetFields(flags);
+            int iterable = toIterableSetFields(readFlags);
             while (iterable != 0)
             {
                 Field field = nextSetField(iterable);
+                // Since we are iterating in reverse order, we skip the fields 
that were
+                // set by entries writter later (i.e. already read ones).
                 if (isChanged(field, this.flags) || isNull(field, mask))
                 {
-                    if (!isNull(field, flags))
-                        skip(field, in, userVersion);
+                    if (!isNull(field, readFlags))
+                        skip(txnId, field, in, userVersion);
 
                     iterable = unsetIterable(field, iterable);
                     continue;
                 }
-                this.flags = setChanged(field, this.flags);
 
-                if (isNull(field, flags))
+                if (isNull(field, readFlags))
                 {
-                    this.flags = setFieldIsNull(field, this.flags);
+                    this.flags = setFieldIsNullAndChanged(field, this.flags);
                 }
                 else
                 {
+                    this.flags = setChanged(field, this.flags);
                     deserialize(field, in, userVersion);
                 }
 
@@ -731,7 +734,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             }
         }
 
-        private void skip(Field field, DataInputPlus in, int userVersion) 
throws IOException
+        private static void skip(TxnId txnId, Field field, DataInputPlus in, 
int userVersion) throws IOException
         {
             switch (field)
             {
@@ -777,7 +780,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                     break;
                 case RESULT:
                     // TODO (expected): skip
-                    result = ResultSerializers.result.deserialize(in, 
userVersion);
+                    ResultSerializers.result.deserialize(in, userVersion);
                     break;
             }
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index f4c96fa122..5b16752e32 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -162,10 +162,21 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
             this.reader = reader;
         }
 
+        private long prevSegment = -1;
+        private long prevPosition = -1;
+
         @Override
         public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
         {
+            Invariants.require(prevSegment == -1 || segment <= prevSegment,
+                               "Records should always be iterated over in a 
reverse order, but %s was seen after %s", segment, prevSegment);
+            if (prevSegment != segment)
+                prevPosition = -1;
+            Invariants.require(prevPosition == -1 || position < prevPosition,
+                               "Records should always be iterated over in a 
reverse order, but %s was seen after %s", position, prevPosition);
             readBuffer(buffer, reader, userVersion);
+            prevSegment = segment;
+            prevPosition = position;
         }
     }
 
@@ -173,6 +184,7 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
     {
         protected LongHashSet visited = null;
         protected RecordConsumer<K> delegate;
+
         TableRecordConsumer(RecordConsumer<K> delegate)
         {
             this.delegate = delegate;
@@ -203,6 +215,7 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         private final K key;
         private final TableRecordConsumer tableRecordConsumer;
         private final RecordConsumer<K> delegate;
+
         JournalAndTableRecordConsumer(K key, RecordConsumer<K> reader)
         {
             this.key = key;
@@ -218,7 +231,7 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         @Override
         public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
         {
-            if (!tableRecordConsumer.visited(segment))
+            if (!tableRecordConsumer.visited(segment)) //TODO: don't need this 
anymore
                 delegate.accept(segment, position, key, buffer, userVersion);
         }
     }
@@ -339,15 +352,15 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
      * <p>
      * When reading from journal segments, skip descriptors that were read 
from the table.
      */
-    public void readAll(K key, Reader reader, boolean asc)
+    public void readAll(K key, Reader reader)
     {
-        readAll(key, new RecordConsumerAdapter(reader), asc);
+        readAll(key, new RecordConsumerAdapter(reader));
     }
 
-    public void readAll(K key, RecordConsumer<K> reader, boolean asc)
+    public void readAll(K key, RecordConsumer<K> reader)
     {
         JournalAndTableRecordConsumer consumer = new 
JournalAndTableRecordConsumer(key, reader);
-        journal.readAll(key, consumer, asc);
+        journal.readAll(key, consumer);
         consumer.readTable();
     }
 
@@ -382,7 +395,6 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
 
         long descriptor = LongType.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(0)));
         int position = Int32Type.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(1)));
-
         into.key = key;
         into.value = row.getCell(recordColumn).buffer();
         into.userVersion = 
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index 7ed6a8cec0..75fdd8fea8 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,7 +98,6 @@ public class AccordJournalBurnTest extends BurnTestBase
 
     }
 
-    @Ignore
     @Test
     public void testOne()
     {
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index abaed8349e..e0c1d126bf 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -452,15 +452,15 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
         }
 
         @Override
-        public void saveCommand(int store, CommandUpdate update, Runnable 
onFlush)
+        public void saveCommand(int commandStoreId, CommandUpdate update, 
Runnable onFlush)
         {
-            super.saveCommand(store, update, onFlush);
+            super.saveCommand(commandStoreId, update, onFlush);
             if (!update.after.txnId().domain().isRange())
                 return;
             Command after = update.after;
             Route<?> route = after.participants().route();
             if (route != null)
-                index.update(0, store, after.txnId(), route);
+                index.update(0, commandStoreId, after.txnId(), route);
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to