This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 58c5aec321 Fix null field accounting
58c5aec321 is described below
commit 58c5aec3218972593bc069798f64ddbefb3419b8
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Feb 5 20:34:44 2025 +0100
Fix null field accounting
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20297
---
.gitmodules | 4 +-
modules/accord | 2 +-
src/java/org/apache/cassandra/journal/Journal.java | 8 ++-
.../cassandra/service/accord/AccordJournal.java | 65 +++++++++++-----------
.../service/accord/AccordJournalTable.java | 24 ++++++--
.../service/accord/AccordJournalBurnTest.java | 2 -
.../accord/SimulatedAccordCommandStore.java | 6 +-
7 files changed, 63 insertions(+), 48 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index 616dacf610..22d4a4fb41 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
- url = https://github.com/apache/cassandra-accord.git
- branch = trunk
+ url = https://github.com/ifesdjeen/cassandra-accord.git
+ branch = CASSANDRA-20297
diff --git a/modules/accord b/modules/accord
index 9328b83f62..a2ac02b4d2 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 9328b83f623a7877249b5fb18fe49e2e78e8c3a9
+Subproject commit a2ac02b4d20bb6ab9078e70a0a43c17c00f4b0fc
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index 6f0fdabfd3..08626b441f 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -336,13 +336,15 @@ public class Journal<K, V> implements Shutdownable
return null;
}
- public void readAll(K id, RecordConsumer<K> consumer, boolean asc)
+ public void readAll(K id, RecordConsumer<K> consumer)
{
EntrySerializer.EntryHolder<K> holder = new
EntrySerializer.EntryHolder<>();
try (OpOrder.Group group = readOrder.start())
{
- for (Segment<K, V> segment : segments.get().allSorted(asc))
+ for (Segment<K, V> segment : segments.get().allSorted(false))
+ {
segment.readAll(id, holder, consumer);
+ }
}
}
@@ -360,7 +362,7 @@ public class Journal<K, V> implements Shutdownable
// can only throw if serializer is buggy
throw new RuntimeException(e);
}
- }, false);
+ });
return res;
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index fd435a90d7..9a266788e2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -82,7 +82,7 @@ import static accord.impl.CommandChange.isChanged;
import static accord.impl.CommandChange.isNull;
import static accord.impl.CommandChange.nextSetField;
import static accord.impl.CommandChange.setChanged;
-import static accord.impl.CommandChange.setFieldIsNull;
+import static accord.impl.CommandChange.setFieldIsNullAndChanged;
import static accord.impl.CommandChange.toIterableSetFields;
import static accord.impl.CommandChange.unsetIterable;
import static accord.impl.CommandChange.validateFlags;
@@ -254,35 +254,35 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
}
@Override
- public RedundantBefore loadRedundantBefore(int store)
+ public RedundantBefore loadRedundantBefore(int commandStoreId)
{
- IdentityAccumulator<RedundantBefore> accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, store), false);
+ IdentityAccumulator<RedundantBefore> accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
return accumulator.get();
}
@Override
- public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store)
+ public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
{
- IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator =
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, store),
false);
+ IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator =
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT,
commandStoreId));
return accumulator.get();
}
@Override
- public NavigableMap<Timestamp, Ranges> loadSafeToRead(int store)
+ public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
{
- IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator =
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, store), false);
+ IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator =
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ,
commandStoreId));
return accumulator.get();
}
@Override
- public CommandStores.RangesForEpoch loadRangesForEpoch(int store)
+ public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
{
- IdentityAccumulator<RangesForEpoch> accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, store), false);
+ IdentityAccumulator<RangesForEpoch> accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
return accumulator.get();
}
@Override
- public void saveCommand(int store, CommandUpdate update, @Nullable
Runnable onFlush)
+ public void saveCommand(int commandStoreId, CommandUpdate update,
@Nullable Runnable onFlush)
{
Writer diff = Writer.make(update.before, update.after);
if (diff == null)
@@ -292,7 +292,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
return;
}
- JournalKey key = new JournalKey(update.txnId,
JournalKey.Type.COMMAND_DIFF, store);
+ JournalKey key = new JournalKey(update.txnId,
JournalKey.Type.COMMAND_DIFF, commandStoreId);
RecordPointer pointer = journal.asyncWrite(key, diff);
if (journalTable.shouldIndex(key)
&& diff.hasParticipants()
@@ -307,7 +307,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
@Override
public Iterator<TopologyUpdate> replayTopologies()
{
- AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate>
accumulator = readAll(new JournalKey(TxnId.NONE,
JournalKey.Type.TOPOLOGY_UPDATE, 0), false);
+ AccordJournalValueSerializers.MapAccumulator<Long, TopologyUpdate>
accumulator = readAll(new JournalKey(TxnId.NONE,
JournalKey.Type.TOPOLOGY_UPDATE, 0));
return accumulator.get().values().iterator();
}
@@ -338,25 +338,25 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
@Override
public DurableBefore load()
{
- DurableBeforeAccumulator accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0), false);
+ DurableBeforeAccumulator accumulator = readAll(new
JournalKey(TxnId.NONE, JournalKey.Type.DURABLE_BEFORE, 0));
return accumulator.get();
}
};
}
@Override
- public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable
onFlush)
+ public void saveStoreState(int commandStoreId, FieldUpdates fieldUpdates,
Runnable onFlush)
{
RecordPointer pointer = null;
// TODO: avoid allocating keys
if (fieldUpdates.newRedundantBefore != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.newRedundantBefore);
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.REDUNDANT_BEFORE, commandStoreId),
fieldUpdates.newRedundantBefore);
if (fieldUpdates.newBootstrapBeganAt != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.newBootstrapBeganAt);
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId),
fieldUpdates.newBootstrapBeganAt);
if (fieldUpdates.newSafeToRead != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.newSafeToRead);
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.SAFE_TO_READ, commandStoreId), fieldUpdates.newSafeToRead);
if (fieldUpdates.newRangesForEpoch != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.newRangesForEpoch);
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId),
fieldUpdates.newRangesForEpoch);
if (onFlush == null)
return;
@@ -371,7 +371,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
{
JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF,
commandStoreId);
Builder builder = new Builder(txnId, load);
- journalTable.readAll(key, builder::deserializeNext, false);
+ journalTable.readAll(key, builder::deserializeNext);
return builder;
}
@@ -381,13 +381,13 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
return loadDiffs(commandStoreId, txnId, Load.ALL);
}
- private <BUILDER> BUILDER readAll(JournalKey key, boolean asc)
+ private <BUILDER> BUILDER readAll(JournalKey key)
{
BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key);
// TODO: this can be further improved to avoid allocating lambdas
AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>)
key.type.serializer;
// TODO (expected): for those where we store an image, read only the
first entry we find in DESC order
- journalTable.readAll(key, (in, userVersion) ->
serializer.deserialize(key, builder, in, userVersion), asc);
+ journalTable.readAll(key, (in, userVersion) ->
serializer.deserialize(key, builder, in, userVersion));
return builder;
}
@@ -544,6 +544,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
continue;
}
+
switch (field)
{
case EXECUTE_AT:
@@ -648,31 +649,33 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
public void deserializeNext(DataInputPlus in, int userVersion) throws
IOException
{
Invariants.require(txnId != null);
- int flags = in.readInt();
- Invariants.require(flags != 0);
+ int readFlags = in.readInt();
+ Invariants.require(readFlags != 0);
nextCalled = true;
count++;
- int iterable = toIterableSetFields(flags);
+ int iterable = toIterableSetFields(readFlags);
while (iterable != 0)
{
Field field = nextSetField(iterable);
+ // Since we are iterating in reverse order, we skip the fields
that were
+ // set by entries writter later (i.e. already read ones).
if (isChanged(field, this.flags) || isNull(field, mask))
{
- if (!isNull(field, flags))
- skip(field, in, userVersion);
+ if (!isNull(field, readFlags))
+ skip(txnId, field, in, userVersion);
iterable = unsetIterable(field, iterable);
continue;
}
- this.flags = setChanged(field, this.flags);
- if (isNull(field, flags))
+ if (isNull(field, readFlags))
{
- this.flags = setFieldIsNull(field, this.flags);
+ this.flags = setFieldIsNullAndChanged(field, this.flags);
}
else
{
+ this.flags = setChanged(field, this.flags);
deserialize(field, in, userVersion);
}
@@ -731,7 +734,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
}
}
- private void skip(Field field, DataInputPlus in, int userVersion)
throws IOException
+ private static void skip(TxnId txnId, Field field, DataInputPlus in,
int userVersion) throws IOException
{
switch (field)
{
@@ -777,7 +780,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
break;
case RESULT:
// TODO (expected): skip
- result = ResultSerializers.result.deserialize(in,
userVersion);
+ ResultSerializers.result.deserialize(in, userVersion);
break;
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index f4c96fa122..5b16752e32 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -162,10 +162,21 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
this.reader = reader;
}
+ private long prevSegment = -1;
+ private long prevPosition = -1;
+
@Override
public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
{
+ Invariants.require(prevSegment == -1 || segment <= prevSegment,
+ "Records should always be iterated over in a
reverse order, but %s was seen after %s", segment, prevSegment);
+ if (prevSegment != segment)
+ prevPosition = -1;
+ Invariants.require(prevPosition == -1 || position < prevPosition,
+ "Records should always be iterated over in a
reverse order, but %s was seen after %s", position, prevPosition);
readBuffer(buffer, reader, userVersion);
+ prevSegment = segment;
+ prevPosition = position;
}
}
@@ -173,6 +184,7 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
{
protected LongHashSet visited = null;
protected RecordConsumer<K> delegate;
+
TableRecordConsumer(RecordConsumer<K> delegate)
{
this.delegate = delegate;
@@ -203,6 +215,7 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
private final K key;
private final TableRecordConsumer tableRecordConsumer;
private final RecordConsumer<K> delegate;
+
JournalAndTableRecordConsumer(K key, RecordConsumer<K> reader)
{
this.key = key;
@@ -218,7 +231,7 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
@Override
public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
{
- if (!tableRecordConsumer.visited(segment))
+ if (!tableRecordConsumer.visited(segment)) //TODO: don't need this
anymore
delegate.accept(segment, position, key, buffer, userVersion);
}
}
@@ -339,15 +352,15 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
* <p>
* When reading from journal segments, skip descriptors that were read
from the table.
*/
- public void readAll(K key, Reader reader, boolean asc)
+ public void readAll(K key, Reader reader)
{
- readAll(key, new RecordConsumerAdapter(reader), asc);
+ readAll(key, new RecordConsumerAdapter(reader));
}
- public void readAll(K key, RecordConsumer<K> reader, boolean asc)
+ public void readAll(K key, RecordConsumer<K> reader)
{
JournalAndTableRecordConsumer consumer = new
JournalAndTableRecordConsumer(key, reader);
- journal.readAll(key, consumer, asc);
+ journal.readAll(key, consumer);
consumer.readTable();
}
@@ -382,7 +395,6 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
long descriptor = LongType.instance.compose(ByteBuffer.wrap((byte[])
row.clustering().get(0)));
int position = Int32Type.instance.compose(ByteBuffer.wrap((byte[])
row.clustering().get(1)));
-
into.key = key;
into.value = row.getCell(recordColumn).buffer();
into.userVersion =
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
diff --git
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index 7ed6a8cec0..75fdd8fea8 100644
---
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,7 +98,6 @@ public class AccordJournalBurnTest extends BurnTestBase
}
- @Ignore
@Test
public void testOne()
{
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index abaed8349e..e0c1d126bf 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -452,15 +452,15 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
}
@Override
- public void saveCommand(int store, CommandUpdate update, Runnable
onFlush)
+ public void saveCommand(int commandStoreId, CommandUpdate update,
Runnable onFlush)
{
- super.saveCommand(store, update, onFlush);
+ super.saveCommand(commandStoreId, update, onFlush);
if (!update.after.txnId().domain().isRange())
return;
Command after = update.after;
Route<?> route = after.participants().route();
if (route != null)
- index.update(0, store, after.txnId(), route);
+ index.update(0, commandStoreId, after.txnId(), route);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]