This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new b63d952856 More follow-up to CASSANDRA-19967 and CASSANDRA-19869
b63d952856 is described below
commit b63d95285616a385402224ce1ef45b4a5d8a98d1
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Oct 2 13:35:57 2024 +0200
More follow-up to CASSANDRA-19967 and CASSANDRA-19869
---
modules/accord | 2 +-
.../db/compaction/CompactionIterator.java | 42 +++++++++--
src/java/org/apache/cassandra/journal/Journal.java | 16 +++-
.../org/apache/cassandra/journal/Segments.java | 8 ++
.../apache/cassandra/journal/StaticSegment.java | 9 ++-
.../service/accord/AccordCommandStore.java | 11 ++-
.../cassandra/service/accord/AccordJournal.java | 82 ++++++++++++++++++---
.../service/accord/AccordSafeCommandStore.java | 86 ++++++++++++++++++----
.../service/accord/AccordSegmentCompactor.java | 10 +++
.../cassandra/service/accord/AccordService.java | 11 ++-
.../service/accord/CommandsForRangesLoader.java | 2 +-
.../cassandra/service/accord/IAccordService.java | 8 +-
.../cassandra/service/accord/SavedCommand.java | 64 ++++++++++++----
.../service/accord/async/AsyncOperation.java | 23 +++---
.../test/accord/AccordBootstrapTest.java | 28 +++----
.../distributed/test/accord/AccordLoadTest.java | 16 +++-
.../accord/AccordJournalCompactionTest.java | 22 +++---
.../compaction/CompactionAccordIteratorsTest.java | 5 +-
.../service/accord/AccordCommandStoreTest.java | 5 +-
.../cassandra/service/accord/MockJournal.java | 24 +++---
.../cassandra/service/accord/SavedCommandTest.java | 1 +
21 files changed, 350 insertions(+), 125 deletions(-)
diff --git a/modules/accord b/modules/accord
index 4cf0070d60..f3782e2a98 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 4cf0070d604abd2db460a5f1c3f8cd8dc7d26696
+Subproject commit f3782e2a98004843cc3384a6983478c1128a1d6a
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 3e9a1de462..fc993e98da 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -109,6 +110,7 @@ import
org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
import org.apache.cassandra.utils.TimeUUID;
import static accord.local.Cleanup.ERASE;
+import static accord.local.Cleanup.TRUNCATE;
import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
import static accord.local.Cleanup.shouldCleanupPartial;
import static com.google.common.base.Preconditions.checkState;
@@ -148,6 +150,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
{
private static final Logger logger =
LoggerFactory.getLogger(CompactionIterator.class);
private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
+ private static Object[] TRUNCATE_CLUSTERING_VALUE = new Object[] {
Long.MAX_VALUE, Integer.MAX_VALUE };
private final OperationType type;
private final AbstractCompactionController controller;
@@ -806,8 +809,8 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
class AccordCommandsPurger extends AbstractPurger
{
final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+ final Int2ObjectHashMap<DurableBefore> durableBefores;
final Int2ObjectHashMap<RangesForEpoch> ranges;
- final DurableBefore durableBefore;
int storeId;
TxnId txnId;
@@ -817,7 +820,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
IAccordService.CompactionInfo compactionInfo =
accordService.get().getCompactionInfo();
this.redundantBefores = compactionInfo.redundantBefores;
this.ranges = compactionInfo.ranges;
- this.durableBefore = compactionInfo.durableBefore;
+ this.durableBefores = compactionInfo.durableBefores;
}
protected void beginPartition(UnfilteredRowIterator partition)
@@ -833,6 +836,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
updateProgress();
RedundantBefore redundantBefore = redundantBefores.get(storeId);
+ DurableBefore durableBefore = durableBefores.get(storeId);
// TODO (expected): if the store has been retired, this should
return null
if (redundantBefore == null)
return row;
@@ -1013,8 +1017,8 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
class AccordJournalPurger extends AbstractPurger
{
final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+ final Int2ObjectHashMap<DurableBefore> durableBefores;
final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges;
- final DurableBefore durableBefore;
final ColumnMetadata recordColumn;
final ColumnMetadata versionColumn;
final KeySupport<JournalKey> keySupport = JournalKey.SUPPORT;
@@ -1026,6 +1030,8 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
Object[] lastClustering = null;
long maxSeenTimestamp = -1;
final int userVersion;
+ long lastDescriptor = -1;
+ int lastOffset = -1;
public AccordJournalPurger(Supplier<IAccordService> serviceSupplier)
{
@@ -1036,7 +1042,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
this.redundantBefores = compactionInfo.redundantBefores;
this.ranges = compactionInfo.ranges;
- this.durableBefore = compactionInfo.durableBefore;
+ this.durableBefores = compactionInfo.durableBefores;
ColumnFamilyStore cfs =
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL);
this.recordColumn =
cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
this.versionColumn =
cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
@@ -1050,6 +1056,8 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
serializer =
(AccordJournalValueSerializers.FlyweightSerializer<Object, Object>)
key.type.serializer;
builder = serializer.mergerFor(key);
maxSeenTimestamp = -1;
+ lastDescriptor = -1;
+ lastOffset = -1;
}
@Override
@@ -1096,6 +1104,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
}
RedundantBefore redundantBefore =
redundantBefores.get(key.commandStoreId);
+ DurableBefore durableBefore =
durableBefores.get(key.commandStoreId);
Cleanup cleanup =
commandBuilder.shouldCleanup(redundantBefore, durableBefore);
if (cleanup == ERASE)
return PartitionUpdate.fullPartitionDelete(metadata(),
partition.partitionKey(), maxSeenTimestamp, nowInSec).unfilteredIterator();
@@ -1107,9 +1116,16 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
return null;
PartitionUpdate.SimpleBuilder newVersion =
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
- newVersion.row(lastClustering)
- .add("record",
commandBuilder.asByteBuffer(userVersion))
+
+ Row.SimpleBuilder rowBuilder;
+ if (cleanup == TRUNCATE || cleanup ==
TRUNCATE_WITH_OUTCOME)
+ rowBuilder = newVersion.row(TRUNCATE_CLUSTERING_VALUE);
+ else
+ rowBuilder = newVersion.row(lastClustering);
+
+ rowBuilder.add("record",
commandBuilder.asByteBuffer(userVersion))
.add("user_version", userVersion);
+
return newVersion.build().unfilteredIterator();
}
@@ -1133,6 +1149,20 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
updateProgress();
maxSeenTimestamp = row.primaryKeyLivenessInfo().timestamp();
ByteBuffer record = row.getCell(recordColumn).buffer();
+ long descriptor =
LongType.instance.compose(row.clustering().getBufferArray()[0]);
+ int offset =
Int32Type.instance.compose(row.clustering().getBufferArray()[1]);
+
+ if (lastOffset != -1)
+ {
+ Invariants.checkState(descriptor >= lastDescriptor,
+ "Descriptors were accessed out of order:
%d was accessed after %d", descriptor, lastDescriptor);
+ Invariants.checkState(descriptor != lastDescriptor ||
+ offset > lastOffset,
+ "Offsets within %s were accessed out of
order: %d was accessed after %s", offset, lastOffset);
+ }
+ lastDescriptor = descriptor;
+ lastOffset = offset;
+
try (DataInputBuffer in = new DataInputBuffer(record, false))
{
int userVersion =
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index d47d801d0e..5e91c7d3d3 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -525,6 +525,8 @@ public class Journal<K, V> implements Shutdownable
ActiveSegment<K, V>.Allocation alloc;
while (null == (alloc = segment.allocate(entrySize, hosts)))
{
+ if (entrySize >= (params.segmentSize() * 3) / 4)
+ throw new IllegalStateException("entrySize " + entrySize + "
too large for a segmentSize of " + params.segmentSize());
// failed to allocate; move to a new segment with enough room
advanceSegment(segment);
segment = currentSegment;
@@ -776,6 +778,11 @@ public class Journal<K, V> implements Shutdownable
swapSegments(current -> current.withNewActiveSegment(activeSegment));
}
+ private void removeEmptySegment(ActiveSegment<K, V> activeSegment)
+ {
+ swapSegments(current -> current.withoutEmptySegment(activeSegment));
+ }
+
private void replaceCompletedSegment(ActiveSegment<K, V> activeSegment,
StaticSegment<K, V> staticSegment)
{
swapSegments(current -> current.withCompletedSegment(activeSegment,
staticSegment));
@@ -869,6 +876,13 @@ public class Journal<K, V> implements Shutdownable
void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V> activeSegment)
{
+ if (activeSegment.isEmpty())
+ {
+ removeEmptySegment(activeSegment);
+ activeSegment.closeAndDiscard();
+ return;
+ }
+
closer.execute(new CloseActiveSegmentRunnable(activeSegment));
}
@@ -973,7 +987,7 @@ public class Journal<K, V> implements Shutdownable
private StaticSegmentIterator()
{
this.segments = selectAndReference(Segment::isStatic);
- this.readers = new PriorityQueue<>((o1, o2) ->
keySupport.compare(o1.key(), o2.key()));
+ this.readers = new PriorityQueue<>();
for (Segment<K, V> segment : this.segments.all())
{
StaticSegment<K, V> staticSegment = (StaticSegment<K,
V>)segment;
diff --git a/src/java/org/apache/cassandra/journal/Segments.java
b/src/java/org/apache/cassandra/journal/Segments.java
index a779aebf23..94282e9d87 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -63,6 +63,14 @@ class Segments<K, V>
return new Segments<>(newSegments);
}
+ Segments<K, V> withoutEmptySegment(ActiveSegment<K, V> activeSegment)
+ {
+ Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
+ Segment<K, V> oldValue =
segments.remove(activeSegment.descriptor.timestamp);
+ Invariants.checkState(oldValue.asActive().isEmpty());
+ return new Segments<>(newSegments);
+ }
+
Segments<K, V> withCompletedSegment(ActiveSegment<K, V> activeSegment,
StaticSegment<K, V> staticSegment)
{
Invariants.checkArgument(activeSegment.descriptor.equals(staticSegment.descriptor));
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index c7ac7ce410..f5f15ee13c 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -473,9 +473,12 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
that.ensureHasAdvanced();
int cmp = keySupport.compare(this.key(), that.key());
- return cmp != 0
- ? cmp
- : this.descriptor.compareTo(that.descriptor);
+ if (cmp != 0)
+ return cmp;
+ cmp = Long.compare(this.descriptor.timestamp,
that.descriptor.timestamp);
+ if (cmp != 0)
+ return cmp;
+ return Integer.compare(this.offset, that.offset);
}
}
}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index a2367e5768..418c5cd687 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -502,9 +502,8 @@ public class AccordCommandStore extends CommandStore
// We find a set of dependencies for a range then update CommandsFor
to know about them
Ranges allRanges = safeStore.ranges().all();
deps.keyDeps.keys().forEach(allRanges, key -> {
- // TODO (now): batch register to minimise GC
+ // TODO (desired): batch register to minimise GC
deps.keyDeps.forEach(key, (txnId, txnIdx) -> {
- // TODO (desired, efficiency): this can be made more efficient
by batching by epoch
if (ranges.coordinates(txnId).contains(key))
return; // already coordinates, no need to replicate
if (!ranges.allBefore(txnId.epoch()).contains(key))
@@ -525,13 +524,13 @@ public class AccordCommandStore extends CommandStore
if (!ranges.allBefore(txnId.epoch()).intersects(range))
return;
+ // TODO (required): this is potentially not safe - it should
not be persisted until we save in journal
+ // but, preferable to retire historical transactions as a
concept entirely, and rely on ExclusiveSyncPoints instead
diskCommandsForRanges().mergeHistoricalTransaction(txnId,
Ranges.single(range).slice(allRanges), Ranges::with);
});
}
}
- public NavigableMap<Timestamp, Ranges> safeToRead() { return
super.safeToRead(); }
-
public void appendCommands(List<SavedCommand.DiffWriter> diffs, Runnable
onFlush)
{
for (int i = 0; i < diffs.size(); i++)
@@ -545,7 +544,7 @@ public class AccordCommandStore extends CommandStore
@VisibleForTesting
public Command loadCommand(TxnId txnId)
{
- return journal.loadCommand(id, txnId, redundantBefore(),
durableBefore());
+ return journal.loadCommand(id, txnId, unsafeGetRedundantBefore(),
unsafeGetDurableBefore());
}
public interface Loader
@@ -592,7 +591,7 @@ public class AccordCommandStore extends CommandStore
Command local = command;
if (local.status() != Truncated && local.status()
!= Invalidated)
{
- Cleanup cleanup =
Cleanup.shouldCleanup(AccordCommandStore.this, local, local.participants());
+ Cleanup cleanup = Cleanup.shouldCleanup(local,
unsafeGetRedundantBefore(), unsafeGetDurableBefore());
switch (cleanup)
{
case NO:
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index c1292d5322..eb8c0007f2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -253,18 +253,18 @@ public class AccordJournal implements IJournal,
Shutdownable
{
RecordPointer pointer = null;
// TODO: avoid allocating keys
- if (fieldUpdates.redundantBefore != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.redundantBefore);
- if (fieldUpdates.durableBefore != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.DURABLE_BEFORE, store), fieldUpdates.durableBefore);
- if (fieldUpdates.bootstrapBeganAt != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.bootstrapBeganAt);
- if (fieldUpdates.safeToRead != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.safeToRead);
- if (fieldUpdates.rangesForEpoch != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.rangesForEpoch);
- if (fieldUpdates.historicalTransactions != null)
- pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.HISTORICAL_TRANSACTIONS, store),
fieldUpdates.historicalTransactions);
+ if (fieldUpdates.addRedundantBefore != null)
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.REDUNDANT_BEFORE, store), fieldUpdates.addRedundantBefore);
+ if (fieldUpdates.addDurableBefore != null)
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.DURABLE_BEFORE, store), fieldUpdates.addDurableBefore);
+ if (fieldUpdates.newBootstrapBeganAt != null)
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.BOOTSTRAP_BEGAN_AT, store), fieldUpdates.newBootstrapBeganAt);
+ if (fieldUpdates.newSafeToRead != null)
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.SAFE_TO_READ, store), fieldUpdates.newSafeToRead);
+ if (fieldUpdates.newRangesForEpoch != null)
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.RANGES_FOR_EPOCH, store), fieldUpdates.newRangesForEpoch);
+ if (fieldUpdates.addHistoricalTransactions != null)
+ pointer = appendInternal(new JournalKey(TxnId.NONE,
JournalKey.Type.HISTORICAL_TRANSACTIONS, store),
fieldUpdates.addHistoricalTransactions);
if (onFlush == null)
return;
@@ -414,4 +414,62 @@ public class AccordJournal implements IJournal,
Shutdownable
isReplay.set(false);
}
}
+
+ // TODO: this is here temporarily; for debugging purposes
+ @VisibleForTesting
+ public void checkAllCommands()
+ {
+ try (AccordJournalTable.KeyOrderIterator<JournalKey> iter =
journalTable.readAll())
+ {
+ IAccordService.CompactionInfo compactionInfo =
AccordService.instance().getCompactionInfo();
+ JournalKey key;
+ SavedCommand.Builder builder = new SavedCommand.Builder();
+ while ((key = iter.key()) != null)
+ {
+ builder.reset(key.id);
+ if (key.type != JournalKey.Type.COMMAND_DIFF)
+ {
+ // TODO (required): add "skip" for the key to avoid
getting stuck
+ iter.readAllForKey(key, (segment, position, key1, buffer,
hosts, userVersion) -> {});
+ continue;
+ }
+
+ JournalKey finalKey = key;
+ List<RecordPointer> pointers = new ArrayList<>();
+ try
+ {
+ iter.readAllForKey(key, (segment, position, local, buffer,
hosts, userVersion) -> {
+ pointers.add(new RecordPointer(segment, position));
+ Invariants.checkState(finalKey.equals(local));
+ try (DataInputBuffer in = new DataInputBuffer(buffer,
false))
+ {
+ builder.deserializeNext(in, userVersion);
+ }
+ catch (IOException e)
+ {
+ // can only throw if serializer is buggy
+ throw new RuntimeException(e);
+ }
+ });
+
+ Cleanup cleanup =
builder.shouldCleanup(compactionInfo.redundantBefores.get(key.commandStoreId),
compactionInfo.durableBefores.get(key.commandStoreId));
+ switch (cleanup)
+ {
+ case ERASE:
+ case EXPUNGE:
+ case EXPUNGE_PARTIAL:
+ case VESTIGIAL:
+ continue;
+ }
+ builder.construct();
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(String.format("Caught an
exception after iterating over: %s", pointers),
+ t);
+ }
+ }
+
+ }
+ }
}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index fae5e4634f..3451899ac0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -284,45 +284,82 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
@Override
public void upsertRedundantBefore(RedundantBefore addRedundantBefore)
{
- // TODO (now): this is a temporary measure, see comment on
AccordJournalValueSerializers; upsert instead
+ // TODO (required): this is a temporary measure, see comment on
AccordJournalValueSerializers; upsert instead
// when modifying, only modify together with
AccordJournalValueSerializers
- ensureFieldUpdates().redundantBefore =
RedundantBefore.merge(commandStore.redundantBefore(), addRedundantBefore);
- super.upsertRedundantBefore(addRedundantBefore);
+ ensureFieldUpdates().newRedundantBefore =
ensureFieldUpdates().addRedundantBefore =
RedundantBefore.merge(redundantBefore(), addRedundantBefore);
}
@Override
public void setBootstrapBeganAt(NavigableMap<TxnId, Ranges>
newBootstrapBeganAt)
{
- ensureFieldUpdates().bootstrapBeganAt = newBootstrapBeganAt;
+ ensureFieldUpdates().newBootstrapBeganAt = newBootstrapBeganAt;
super.setBootstrapBeganAt(newBootstrapBeganAt);
}
@Override
public void upsertDurableBefore(DurableBefore addDurableBefore)
{
- ensureFieldUpdates().durableBefore = addDurableBefore;
+ ensureFieldUpdates().addDurableBefore = addDurableBefore;
super.upsertDurableBefore(addDurableBefore);
}
@Override
public void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead)
{
- ensureFieldUpdates().safeToRead = newSafeToRead;
+ ensureFieldUpdates().newSafeToRead = newSafeToRead;
super.setSafeToRead(newSafeToRead);
}
@Override
public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
{
- ensureFieldUpdates().rangesForEpoch = rangesForEpoch.snapshot();
+ ensureFieldUpdates().newRangesForEpoch = rangesForEpoch.snapshot();
super.setRangesForEpoch(rangesForEpoch);
ranges = rangesForEpoch;
}
+ @Override
+ public NavigableMap<TxnId, Ranges> bootstrapBeganAt()
+ {
+ if (fieldUpdates != null && fieldUpdates.newBootstrapBeganAt != null)
+ return fieldUpdates.newBootstrapBeganAt;
+
+ return super.bootstrapBeganAt();
+ }
+
+ @Override
+ public NavigableMap<Timestamp, Ranges> safeToReadAt()
+ {
+ if (fieldUpdates != null && fieldUpdates.newSafeToRead != null)
+ return fieldUpdates.newSafeToRead;
+
+ return super.safeToReadAt();
+ }
+
+ @Override
+ public RedundantBefore redundantBefore()
+ {
+ if (fieldUpdates != null && fieldUpdates.newRedundantBefore != null)
+ return fieldUpdates.newRedundantBefore;
+
+ return super.redundantBefore();
+ }
+
+ @Override
+ public DurableBefore durableBefore()
+ {
+ if (fieldUpdates != null && fieldUpdates.newDurableBefore != null)
+ return fieldUpdates.newDurableBefore;
+
+ return super.durableBefore();
+ }
+
@Override
protected void registerHistoricalTransactions(Deps deps)
{
- ensureFieldUpdates().historicalTransactions = deps;
+ ensureFieldUpdates().addHistoricalTransactions = deps;
+ // TODO (required): it is potentially unsafe to propagate this
synchronously, as if we fail to write to the journal we may be in an
inconsistent state
+ // however, we can and should retire the concept of historical
transactions in favour of ExclusiveSyncPoints ensuring their deps are known
super.registerHistoricalTransactions(deps);
}
@@ -337,13 +374,34 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
return fieldUpdates;
}
+ public void postExecute()
+ {
+ if (fieldUpdates == null)
+ return;
+
+ if (fieldUpdates.newRedundantBefore != null)
+ super.unsafeSetRedundantBefore(fieldUpdates.newRedundantBefore);
+
+ if (fieldUpdates.newDurableBefore != null)
+ super.unsafeSetDurableBefore(fieldUpdates.newDurableBefore);
+
+ if (fieldUpdates.newBootstrapBeganAt != null)
+ super.setBootstrapBeganAt(fieldUpdates.newBootstrapBeganAt);
+
+ if (fieldUpdates.newSafeToRead != null)
+ super.setSafeToRead(fieldUpdates.newSafeToRead);
+
+ if (fieldUpdates.newRangesForEpoch != null)
+ super.setRangesForEpoch(ranges);
+ }
+
public static class FieldUpdates
{
- public RedundantBefore redundantBefore;
- public DurableBefore durableBefore;
- public NavigableMap<TxnId, Ranges> bootstrapBeganAt;
- public NavigableMap<Timestamp, Ranges> safeToRead;
- public RangesForEpoch.Snapshot rangesForEpoch;
- public Deps historicalTransactions;
+ public RedundantBefore addRedundantBefore, newRedundantBefore;
+ public DurableBefore addDurableBefore, newDurableBefore;
+ public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
+ public NavigableMap<Timestamp, Ranges> newSafeToRead;
+ public RangesForEpoch.Snapshot newRangesForEpoch;
+ public Deps addHistoricalTransactions;
}
}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
index f0c7b38c37..f94510b8b8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
@@ -99,6 +99,8 @@ public class AccordSegmentCompactor<V> implements
SegmentCompactor<JournalKey, V
key = reader.key();
serializer = (FlyweightSerializer<Object, Object>)
key.type.serializer;
builder = serializer.mergerFor(key);
+ lastOffset = -1;
+ lastDescriptor = -1;
}
boolean advanced;
@@ -106,6 +108,14 @@ public class AccordSegmentCompactor<V> implements
SegmentCompactor<JournalKey, V
{
try (DataInputBuffer in = new
DataInputBuffer(reader.record(), false))
{
+ if (lastDescriptor != -1)
+ {
+
Invariants.checkState(reader.descriptor.timestamp >= lastDescriptor,
+ "Descriptors were
accessed out of order: %d was accessed after %d", reader.descriptor.timestamp,
lastDescriptor);
+
Invariants.checkState(reader.descriptor.timestamp != lastDescriptor ||
+ reader.offset() >
lastOffset,
+ "Offsets within %s were
accessed out of order: %d was accessed after %s", reader.offset(), lastOffset);
+ }
serializer.deserialize(key, builder, in,
reader.descriptor.userVersion);
lastDescriptor = reader.descriptor.timestamp;
lastOffset = reader.offset();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index bce8a198da..e051a53803 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -31,7 +31,6 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -298,7 +297,7 @@ public class AccordService implements IAccordService,
Shutdownable
@Override
public CompactionInfo getCompactionInfo()
{
- return new CompactionInfo(new Int2ObjectHashMap<>(), new
Int2ObjectHashMap<>(), DurableBefore.EMPTY);
+ return new CompactionInfo(new Int2ObjectHashMap<>(), new
Int2ObjectHashMap<>(), new Int2ObjectHashMap<>());
}
@Override
@@ -1261,17 +1260,17 @@ public class AccordService implements IAccordService,
Shutdownable
public CompactionInfo getCompactionInfo()
{
Int2ObjectHashMap<RedundantBefore> redundantBefores = new
Int2ObjectHashMap<>();
+ Int2ObjectHashMap<DurableBefore> durableBefores = new
Int2ObjectHashMap<>();
Int2ObjectHashMap<RangesForEpoch> ranges = new Int2ObjectHashMap<>();
- AtomicReference<DurableBefore> durableBefore = new
AtomicReference<>(DurableBefore.EMPTY);
AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> {
synchronized (redundantBefores)
{
- redundantBefores.put(safeStore.commandStore().id(),
safeStore.commandStore().redundantBefore());
+ redundantBefores.put(safeStore.commandStore().id(),
safeStore.redundantBefore());
ranges.put(safeStore.commandStore().id(), safeStore.ranges());
+ durableBefores.put(safeStore.commandStore().id(),
safeStore.durableBefore());
}
- durableBefore.set(DurableBefore.merge(durableBefore.get(),
safeStore.commandStore().durableBefore()));
}));
- return new CompactionInfo(redundantBefores, ranges,
durableBefore.get());
+ return new CompactionInfo(redundantBefores, ranges, durableBefores);
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
index 6324735883..e7e1461054 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
@@ -187,7 +187,7 @@ public class CommandsForRangesLoader
{
//TODO (now): this logic is kinda duplicate of
org.apache.cassandra.service.accord.CommandsForRange.mapReduce
// should figure out if this can be improved... also what is correct?
- var durableBefore = store.durableBefore();
+ var durableBefore = store.unsafeGetDurableBefore();
NavigableMap<TxnId, Summary> map = new TreeMap<>();
for (TxnId txnId : possibleTxns)
{
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 1be920bc1b..e5e2d125f1 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -134,17 +134,17 @@ public interface IAccordService
class CompactionInfo
{
- static final Supplier<CompactionInfo> NO_OP = () -> new
CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(),
DurableBefore.EMPTY);
+ static final Supplier<CompactionInfo> NO_OP = () -> new
CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), new
Int2ObjectHashMap<>());
public final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+ public final Int2ObjectHashMap<DurableBefore> durableBefores;
public final Int2ObjectHashMap<RangesForEpoch> ranges;
- public final DurableBefore durableBefore;
- public CompactionInfo(Int2ObjectHashMap<RedundantBefore>
redundantBefores, Int2ObjectHashMap<RangesForEpoch> ranges, DurableBefore
durableBefore)
+ public CompactionInfo(Int2ObjectHashMap<RedundantBefore>
redundantBefores, Int2ObjectHashMap<RangesForEpoch> ranges,
Int2ObjectHashMap<DurableBefore> durableBefores)
{
this.redundantBefores = redundantBefores;
this.ranges = ranges;
- this.durableBefore = durableBefore;
+ this.durableBefores = durableBefores;
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
index 1f0086b484..209208989f 100644
--- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
@@ -51,6 +51,7 @@ import
org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
import org.apache.cassandra.utils.Throwables;
import static accord.local.Cleanup.NO;
+import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
import static accord.primitives.Known.KnownDeps.DepsErased;
import static accord.primitives.Known.KnownDeps.DepsUnknown;
import static accord.primitives.Known.KnownDeps.NoDeps;
@@ -73,9 +74,10 @@ public class SavedCommand
PARTIAL_DEPS,
WAITING_ON,
WRITES,
+ CLEANUP
;
- static final Fields[] FIELDS = values();
+ public static final Fields[] FIELDS = values();
}
// TODO: maybe rename this and enclosing classes?
@@ -123,7 +125,6 @@ public class SavedCommand
}
}
-
public static ByteBuffer asSerializedDiff(Command after, int userVersion)
throws IOException
{
try (DataOutputBuffer out = new DataOutputBuffer())
@@ -304,6 +305,7 @@ public class SavedCommand
SavedCommand.WaitingOnProvider waitingOn;
Writes writes;
Result result;
+ Cleanup cleanup;
boolean nextCalled;
int count;
@@ -385,14 +387,26 @@ public class SavedCommand
public void clear()
{
flags = 0;
+ txnId = null;
+
executeAt = null;
+ executeAtLeast = null;
saveStatus = null;
durability = null;
+
+ acceptedOrCommitted = null;
promised = null;
+
participants = null;
partialTxn = null;
partialDeps = null;
+
+ waitingOnBytes = null;
+ waitingOn = null;
writes = null;
+ result = null;
+ cleanup = null;
+
nextCalled = false;
count = 0;
}
@@ -428,17 +442,19 @@ public class SavedCommand
return NO;
if (saveStatus == null || participants == null)
- return Cleanup.EXPUNGE_PARTIAL;
+ return Cleanup.NO;
- return Cleanup.shouldCleanup(txnId, saveStatus, durability,
participants, redundantBefore, durableBefore);
+ Cleanup cleanup = Cleanup.shouldCleanup(txnId, saveStatus,
durability, participants, redundantBefore, durableBefore);
+ if (this.cleanup != null && this.cleanup.compareTo(cleanup) > 0)
+ cleanup = this.cleanup;
+ return cleanup;
}
// TODO (expected): avoid allocating new builder
public Builder maybeCleanup(Cleanup cleanup)
{
- // Do not have txnId in selected SSTables; remove
if (saveStatus() == null)
- return null;
+ return this;
switch (cleanup)
{
@@ -447,19 +463,23 @@ public class SavedCommand
return null;
case EXPUNGE_PARTIAL:
- return expungePartial();
+ return expungePartial(cleanup, saveStatus, true);
+
case VESTIGIAL:
case INVALIDATE:
+ return saveStatusOnly();
+
case TRUNCATE_WITH_OUTCOME:
case TRUNCATE:
- return saveStatusOnly();
+ return expungePartial(cleanup, cleanup.appliesIfNot,
cleanup == TRUNCATE_WITH_OUTCOME);
+
case NO:
return this;
default:
throw new IllegalStateException("Unknown cleanup: " +
cleanup);}
}
- public Builder expungePartial()
+ public Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus,
boolean includeOutcome)
{
Invariants.checkState(txnId != null);
Builder builder = new Builder(txnId);
@@ -467,12 +487,11 @@ public class SavedCommand
builder.count++;
builder.nextCalled = true;
- // TODO: these accesses can be abstracted away
- if (saveStatus != null)
- {
- builder.flags = setFieldChanged(Fields.SAVE_STATUS,
builder.flags);
- builder.saveStatus = saveStatus;
- }
+ Invariants.checkState(saveStatus != null);
+ builder.flags = setFieldChanged(Fields.SAVE_STATUS, builder.flags);
+ builder.saveStatus = saveStatus;
+ builder.flags = setFieldChanged(Fields.CLEANUP, builder.flags);
+ builder.cleanup = cleanup;
if (executeAt != null)
{
builder.flags = setFieldChanged(Fields.EXECUTE_AT,
builder.flags);
@@ -488,6 +507,11 @@ public class SavedCommand
builder.flags = setFieldChanged(Fields.PARTICIPANTS,
builder.flags);
builder.participants = participants;
}
+ if (includeOutcome && builder.writes != null)
+ {
+ builder.flags = setFieldChanged(Fields.WRITES, builder.flags);
+ builder.writes = writes;
+ }
return builder;
}
@@ -554,6 +578,9 @@ public class SavedCommand
if (getFieldChanged(Fields.WRITES, flags) &&
!getFieldIsNull(Fields.WRITES, flags))
CommandSerializers.writes.serialize(writes(), out,
userVersion);
+
+ if (getFieldChanged(Fields.CLEANUP, flags))
+ out.writeByte(cleanup.ordinal());
}
@@ -682,6 +709,13 @@ public class SavedCommand
else
writes = CommandSerializers.writes.deserialize(in,
userVersion);
}
+
+ if (getFieldChanged(Fields.CLEANUP, flags))
+ {
+ Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
+ if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
+ cleanup = newCleanup;
+ }
}
public void forceResult(Result newValue)
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 62cda848a6..2c2867aa65 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -269,24 +269,23 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
}
}
- commandStore.completeOperation(safeStore);
- context.releaseResources(commandStore);
- state(COMPLETING);
+ boolean flushed = false;
if (diffs != null || safeStore.fieldUpdates() != null)
{
Runnable onFlush = () -> finish(result, null);
if (safeStore.fieldUpdates() != null)
- {
- if (diffs != null)
- appendCommands(diffs, null);
-
commandStore.persistFieldUpdates(safeStore.fieldUpdates(), onFlush);
- }
- else
- {
+
commandStore.persistFieldUpdates(safeStore.fieldUpdates(), diffs == null ?
onFlush : null);
+ if (diffs != null)
appendCommands(diffs, onFlush);
- }
- return false;
+ flushed = true;
}
+
+ commandStore.completeOperation(safeStore);
+ context.releaseResources(commandStore);
+ state(COMPLETING);
+ if (flushed)
+ return false;
+
case COMPLETING:
finish(result, null);
case FINISHED:
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index 483f7e4f37..fa93afa2ae 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -47,9 +47,9 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordConfigurationService;
import
org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot;
+import org.apache.cassandra.service.accord.AccordSafeCommandStore;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.streaming.StreamManager;
@@ -271,9 +271,9 @@ public class AccordBootstrapTest extends TestBaseImpl
});
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore
-> {
- AccordCommandStore commandStore = (AccordCommandStore)
safeStore.commandStore();
- Assert.assertEquals(Timestamp.NONE,
getOnlyElement(commandStore.bootstrapBeganAt().keySet()));
- Assert.assertEquals(Timestamp.NONE,
getOnlyElement(commandStore.safeToRead().keySet()));
+ AccordSafeCommandStore ss = (AccordSafeCommandStore)
safeStore;
+ Assert.assertEquals(Timestamp.NONE,
getOnlyElement(ss.bootstrapBeganAt().keySet()));
+ Assert.assertEquals(Timestamp.NONE,
getOnlyElement(ss.safeToReadAt().keySet()));
//
// Assert.assertTrue(commandStore.maxBootstrapEpoch() >
0);
//
Assert.assertTrue(commandStore.bootstrapBeganAt().isEmpty());
@@ -316,17 +316,17 @@ public class AccordBootstrapTest extends TestBaseImpl
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore
-> {
if
(safeStore.ranges().currentRanges().contains(partitionKey))
{
- AccordCommandStore commandStore =
(AccordCommandStore) safeStore.commandStore();
-
Assert.assertFalse(commandStore.bootstrapBeganAt().isEmpty());
-
Assert.assertFalse(commandStore.safeToRead().isEmpty());
+ AccordSafeCommandStore ss =
(AccordSafeCommandStore) safeStore;
+
Assert.assertFalse(ss.bootstrapBeganAt().isEmpty());
+
Assert.assertFalse(ss.safeToReadAt().isEmpty());
- Assert.assertEquals(1,
commandStore.bootstrapBeganAt().entrySet().stream()
+ Assert.assertEquals(1,
ss.bootstrapBeganAt().entrySet().stream()
.filter(entry -> entry.getValue().contains(partitionKey))
.map(entry
-> {
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
return
entry;
}).count());
- Assert.assertEquals(1,
commandStore.safeToRead().entrySet().stream()
+ Assert.assertEquals(1,
ss.safeToReadAt().entrySet().stream()
.filter(entry -> entry.getValue().contains(partitionKey))
.map(entry
-> {
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
@@ -458,17 +458,17 @@ public class AccordBootstrapTest extends TestBaseImpl
safeStore -> {
if
(!safeStore.ranges().allAt(preMove).contains(partitionKey))
{
- AccordCommandStore commandStore =
(AccordCommandStore) safeStore.commandStore();
-
Assert.assertFalse(commandStore.bootstrapBeganAt().isEmpty());
-
Assert.assertFalse(commandStore.safeToRead().isEmpty());
+ AccordSafeCommandStore ss =
(AccordSafeCommandStore) safeStore;
+
Assert.assertFalse(ss.bootstrapBeganAt().isEmpty());
+
Assert.assertFalse(ss.safeToReadAt().isEmpty());
- Assert.assertEquals(1,
commandStore.bootstrapBeganAt().entrySet().stream()
+ Assert.assertEquals(1,
ss.bootstrapBeganAt().entrySet().stream()
.filter(entry -> entry.getValue().contains(partitionKey))
.map(entry -> {
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
return entry;
}).count());
- Assert.assertEquals(1,
commandStore.safeToRead().entrySet().stream()
+ Assert.assertEquals(1,
ss.safeToReadAt().entrySet().stream()
.filter(entry -> entry.getValue().contains(partitionKey))
.map(entry -> {
Assert.assertTrue(entry.getKey().compareTo(Timestamp.NONE) > 0);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 48f8903577..d9315cf2c7 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -38,7 +38,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.AccordSpec;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
@@ -85,7 +84,7 @@ public class AccordLoadTest extends AccordTestBase
cluster.forEach(i -> i.runOnInstance(() -> {
((AccordService)
AccordService.instance()).journal().compactor().updateCompactionPeriod(1,
SECONDS);
- ((AccordSpec.JournalSpec)((AccordService)
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
+// ((AccordSpec.JournalSpec)((AccordService)
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
}));
ICoordinator coordinator = cluster.coordinator(1);
@@ -162,14 +161,23 @@ public class AccordLoadTest extends AccordTestBase
{
nextCompactionAt += compactionInterval;
System.out.println("compacting accord...");
- cluster.forEach(i -> i.nodetool("compact",
"system_accord.journal"));
+ cluster.forEach(i -> {
+ i.nodetool("compact", "system_accord.journal");
+ i.runOnInstance(() -> {
+ ((AccordService)
AccordService.instance()).journal().checkAllCommands();
+ });
+ });
+
}
if ((nextFlushAt -= batchSize) <= 0)
{
nextFlushAt += flushInterval;
System.out.println("flushing journal...");
- cluster.forEach(i -> i.runOnInstance(() ->
((AccordService)
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty()));
+ cluster.forEach(i -> i.runOnInstance(() -> {
+ ((AccordService)
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
+ ((AccordService)
AccordService.instance()).journal().checkAllCommands();
+ }));
}
final Date date = new Date();
diff --git
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
index da1b9cea10..6229e8148f 100644
---
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
+++
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
@@ -141,22 +141,22 @@ public class AccordJournalCompactionTest
{
timestamp = timestamp.next();
AccordSafeCommandStore.FieldUpdates updates = new
AccordSafeCommandStore.FieldUpdates();
- updates.durableBefore = durableBeforeGen.next(rs);
+ updates.addDurableBefore = durableBeforeGen.next(rs);
// TODO: improve redundant before generator and re-enable
// updates.redundantBefore = redundantBeforeGen.next(rs);
- updates.safeToRead = safeToReadGen.next(rs);
- updates.rangesForEpoch = rangesForEpochGen.next(rs);
- updates.historicalTransactions =
historicalTransactionsGen.next(rs);
+ updates.newSafeToRead = safeToReadGen.next(rs);
+ updates.newRangesForEpoch = rangesForEpochGen.next(rs);
+ updates.addHistoricalTransactions =
historicalTransactionsGen.next(rs);
journal.persistStoreState(1, updates, null);
- redundantBeforeAccumulator.update(updates.redundantBefore);
- durableBeforeAccumulator.update(updates.durableBefore);
- if (updates.bootstrapBeganAt != null)
-
bootstrapBeganAtAccumulator.update(updates.bootstrapBeganAt);
- safeToReadAccumulator.update(updates.safeToRead);
- rangesForEpochAccumulator.update(updates.rangesForEpoch);
-
historicalTransactionsAccumulator.update(updates.historicalTransactions);
+ redundantBeforeAccumulator.update(updates.addRedundantBefore);
+ durableBeforeAccumulator.update(updates.addDurableBefore);
+ if (updates.newBootstrapBeganAt != null)
+
bootstrapBeganAtAccumulator.update(updates.newBootstrapBeganAt);
+ safeToReadAccumulator.update(updates.newSafeToRead);
+ rangesForEpochAccumulator.update(updates.newRangesForEpoch);
+
historicalTransactionsAccumulator.update(updates.addHistoricalTransactions);
if (i % 100 == 0)
journal.closeCurrentSegmentForTestingIfNonEmpty();
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 57ef025061..a329be5864 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -420,9 +420,12 @@ public class CompactionAccordIteratorsTest
Int2ObjectHashMap<RedundantBefore> redundantBefores = new
Int2ObjectHashMap<>();
if (redundantBefore != null)
redundantBefores.put(commandStore.id(), redundantBefore);
+ Int2ObjectHashMap<DurableBefore> durableBefores = new
Int2ObjectHashMap<>();
+ if (durableBefore != null)
+ durableBefores.put(commandStore.id(), durableBefore);
Int2ObjectHashMap<CommandStores.RangesForEpoch> rangesForEpochs = new
Int2ObjectHashMap<>();
rangesForEpochs.put(commandStore.id(),
commandStore.unsafeRangesForEpoch());
- when(mockAccordService.getCompactionInfo()).thenReturn(new
IAccordService.CompactionInfo(redundantBefores, rangesForEpochs,
durableBefore));
+ when(mockAccordService.getCompactionInfo()).thenReturn(new
IAccordService.CompactionInfo(redundantBefores, rangesForEpochs,
durableBefores));
return mockAccordService;
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index b6fca2e9cc..e41e4ca579 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -159,6 +159,7 @@ public class AccordCommandStoreTest
{
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
+// SafeCommandStore safeStore =
Timestamp maxTimestamp = timestamp(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(1);
@@ -172,10 +173,10 @@ public class AccordCommandStoreTest
AccordSafeTimestampsForKey tfk = new
AccordSafeTimestampsForKey(loaded(key, null));
tfk.initialize();
- TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk,
txnId1, txnId1, true);
+ TimestampsForKeys.updateLastExecutionTimestamps(null, tfk, txnId1,
txnId1, true);
Assert.assertEquals(txnId1.hlc(),
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId1, true));
- TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk,
txnId2, txnId2, true);
+ TimestampsForKeys.updateLastExecutionTimestamps(null, tfk, txnId2,
txnId2, true);
Assert.assertEquals(txnId2.hlc(),
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId2, true));
Assert.assertEquals(txnId2, tfk.current().lastExecutedTimestamp());
diff --git a/test/unit/org/apache/cassandra/service/accord/MockJournal.java
b/test/unit/org/apache/cassandra/service/accord/MockJournal.java
index 24b911a3ff..dd7377ab58 100644
--- a/test/unit/org/apache/cassandra/service/accord/MockJournal.java
+++ b/test/unit/org/apache/cassandra/service/accord/MockJournal.java
@@ -137,18 +137,18 @@ public class MockJournal implements IJournal
public void persistStoreState(int store,
AccordSafeCommandStore.FieldUpdates fieldUpdates, Runnable onFlush)
{
FieldUpdates updates = fieldUpdates(store);
- if (fieldUpdates.redundantBefore != null)
-
updates.redundantBeforeAccumulator.update(fieldUpdates.redundantBefore);
- if (fieldUpdates.durableBefore != null)
-
updates.durableBeforeAccumulator.update(fieldUpdates.durableBefore);
- if (fieldUpdates.bootstrapBeganAt != null)
-
updates.bootstrapBeganAtAccumulator.update(fieldUpdates.bootstrapBeganAt);
- if (fieldUpdates.safeToRead != null)
- updates.safeToReadAccumulator.update(fieldUpdates.safeToRead);
- if (fieldUpdates.rangesForEpoch != null)
-
updates.rangesForEpochAccumulator.update(fieldUpdates.rangesForEpoch);
- if (fieldUpdates.historicalTransactions != null)
-
updates.historicalTransactionsAccumulator.update(fieldUpdates.historicalTransactions);
+ if (fieldUpdates.addRedundantBefore != null)
+
updates.redundantBeforeAccumulator.update(fieldUpdates.addRedundantBefore);
+ if (fieldUpdates.addDurableBefore != null)
+
updates.durableBeforeAccumulator.update(fieldUpdates.addDurableBefore);
+ if (fieldUpdates.newBootstrapBeganAt != null)
+
updates.bootstrapBeganAtAccumulator.update(fieldUpdates.newBootstrapBeganAt);
+ if (fieldUpdates.newSafeToRead != null)
+ updates.safeToReadAccumulator.update(fieldUpdates.newSafeToRead);
+ if (fieldUpdates.newRangesForEpoch != null)
+
updates.rangesForEpochAccumulator.update(fieldUpdates.newRangesForEpoch);
+ if (fieldUpdates.addHistoricalTransactions != null)
+
updates.historicalTransactionsAccumulator.update(fieldUpdates.addHistoricalTransactions);
onFlush.run();
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
index 5627029644..99963c3af0 100644
--- a/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/SavedCommandTest.java
@@ -132,6 +132,7 @@ public class SavedCommandTest
SoftAssertions checks = new SoftAssertions();
for (Fields field : missing)
{
+ if (field == Fields.CLEANUP) continue;
checks.assertThat(SavedCommand.getFieldChanged(field, flags))
.describedAs("field %s changed", field)
.isFalse();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]