This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 4abca1c39e Fix: - Accord Journal purging was disabled - remove
unique_id from schema keyspace - avoid String.format in Compactor hot path -
avoid string concatenation on hot path; improve segment compactor partition
build efficiency - Partial compaction should update records in place to ensure
truncation of discontiguous compactions do not lead to an incorrect field
version being used - StoreParticipants.touches behaviour for RX was
erroneously modified; should touch all no [...]
4abca1c39e is described below
commit 4abca1c39e7be92405bb24ac308dd937f2645139
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Mar 13 11:55:29 2025 +0000
Fix:
- Accord Journal purging was disabled
- remove unique_id from schema keyspace
- avoid String.format in Compactor hot path
- avoid string concatenation on hot path; improve segment compactor
partition build efficiency
- Partial compaction should update records in place to ensure truncation
of discontiguous compactions do not lead to an incorrect field version being
used
- StoreParticipants.touches behaviour for RX was erroneously modified;
should touch all non-redundant ranges including those no longer owned
- SetShardDurable should correctly set DurableBefore Majority/Universal
based on the Durability parameter
- fix erroneous prunedBefore invariant
- Journal compaction should not rewrite fields shadowed by a newer record
- Don't save updates to ERASED commands
- Simplify CommandChange.getFlags
- fix handling of Durability for Invalidated
- Don't use ApplyAt for GC_BEFORE with partial input, as might be a
saveStatus >= ApplyAtKnown but with executeAt < ApplyAtKnown
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20441
---
modules/accord | 2 +-
.../org/apache/cassandra/config/AccordSpec.java | 6 +-
.../db/compaction/CompactionIterator.java | 377 ++++++++++++++-------
.../apache/cassandra/schema/SchemaKeyspace.java | 10 +-
.../accord/AbstractAccordSegmentCompactor.java | 56 +--
.../service/accord/AccordCommandStore.java | 6 +-
.../cassandra/service/accord/AccordJournal.java | 37 +-
.../accord/AccordJournalValueSerializers.java | 41 ++-
.../cassandra/service/accord/AccordKeyspace.java | 2 +
.../cassandra/service/accord/AccordService.java | 2 +-
.../accord/journal/AccordTopologyUpdate.java | 11 +-
.../cassandra/utils/NativeSSTableLoaderClient.java | 6 +-
.../service/accord/AccordJournalBurnTest.java | 99 +++---
.../accord/AccordJournalCompactionTest.java | 5 +-
.../compaction/CompactionAccordIteratorsTest.java | 6 +-
.../accord/AccordConfigurationServiceTest.java | 2 +-
.../service/accord/AccordJournalOrderTest.java | 4 +-
.../service/accord/AccordJournalTestParams.java | 77 -----
.../cassandra/service/accord/AccordTestUtils.java | 2 +-
.../service/accord/CommandChangeTest.java | 10 +-
.../accord/SimulatedAccordCommandStore.java | 7 +-
21 files changed, 451 insertions(+), 317 deletions(-)
diff --git a/modules/accord b/modules/accord
index c7379e12bd..0a5446a365 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit c7379e12bd8f8732004cb77264801fe157af1dbe
+Subproject commit 0a5446a365fe7eb1a15b6a2d0d72f9475c51bc47
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java
b/src/java/org/apache/cassandra/config/AccordSpec.java
index b81792df0c..a85437624f 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -158,10 +158,10 @@ public class AccordSpec
public volatile DurationSpec.IntSecondsBound fast_path_update_delay = null;
public volatile DurationSpec.IntSecondsBound gc_delay = new
DurationSpec.IntSecondsBound("5m");
- public volatile int shard_durability_target_splits = 128;
+ public volatile int shard_durability_target_splits = 16;
public volatile DurationSpec.IntSecondsBound durability_txnid_lag = new
DurationSpec.IntSecondsBound(5);
- public volatile DurationSpec.IntSecondsBound shard_durability_cycle = new
DurationSpec.IntSecondsBound(15, TimeUnit.MINUTES);
- public volatile DurationSpec.IntSecondsBound global_durability_cycle = new
DurationSpec.IntSecondsBound(10, TimeUnit.MINUTES);
+ public volatile DurationSpec.IntSecondsBound shard_durability_cycle = new
DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
+ public volatile DurationSpec.IntSecondsBound global_durability_cycle = new
DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
public enum TransactionalRangeMigration
{
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index ee236082ac..eeb9f9c340 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -27,8 +28,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.LongPredicate;
import java.util.function.Supplier;
-import javax.annotation.Nonnull;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import org.slf4j.Logger;
@@ -39,6 +40,9 @@ import accord.local.DurableBefore;
import accord.local.RedundantBefore;
import accord.utils.Invariants;
import accord.utils.UnhandledEnum;
+import accord.utils.btree.BTree;
+import accord.utils.btree.BulkIterator;
+import accord.utils.btree.UpdateFunction;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.AbstractCompactionController;
@@ -57,6 +61,9 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.PurgeFunction;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
@@ -82,8 +89,9 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.AccordJournalValueSerializers;
+import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
import org.apache.cassandra.service.accord.AccordKeyspace;
import
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor;
@@ -92,7 +100,6 @@ import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.IAccordService.AccordCompactionInfo;
import
org.apache.cassandra.service.accord.IAccordService.AccordCompactionInfos;
import org.apache.cassandra.service.accord.JournalKey;
-import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.TokenKey;
import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
import org.apache.cassandra.service.accord.serializers.Version;
@@ -158,7 +165,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
public CompactionIterator(OperationType type, List<ISSTableScanner>
scanners, AbstractCompactionController controller, long nowInSec, TimeUUID
compactionId)
{
- this(type, scanners, controller, nowInSec, compactionId,
ActiveCompactionsTracker.NOOP, null, AccordService::instance);
+ this(type, scanners, controller, nowInSec, compactionId,
ActiveCompactionsTracker.NOOP, null);
}
public CompactionIterator(OperationType type,
@@ -170,7 +177,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
TopPartitionTracker.Collector
topPartitionCollector)
{
this(type, scanners, controller, nowInSec, compactionId,
activeCompactions, topPartitionCollector,
- AccordService::instance);
+ AccordService.isSetup() ? AccordService.instance() : null);
}
public CompactionIterator(OperationType type,
@@ -180,7 +187,23 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
TimeUUID compactionId,
ActiveCompactionsTracker activeCompactions,
TopPartitionTracker.Collector
topPartitionCollector,
- @Nonnull Supplier<IAccordService> accordService)
+ IAccordService accord)
+ {
+ this(type, scanners, controller, nowInSec, compactionId,
activeCompactions, topPartitionCollector,
+ () -> accord.getCompactionInfo(),
+ () ->
Version.fromVersion(accord.journalConfiguration().userVersion()));
+ }
+
+ @VisibleForTesting
+ public CompactionIterator(OperationType type,
+ List<ISSTableScanner> scanners,
+ AbstractCompactionController controller,
+ long nowInSec,
+ TimeUUID compactionId,
+ ActiveCompactionsTracker activeCompactions,
+ TopPartitionTracker.Collector
topPartitionCollector,
+ Supplier<AccordCompactionInfos> compactionInfos,
+ Supplier<Version> accordVersion)
{
this.controller = controller;
this.type = type;
@@ -206,13 +229,13 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
if (topPartitionCollector != null) // need to count tombstones before
they are purged
merged = Transformation.apply(merged, new
TopPartitionTracker.TombstoneCounter(topPartitionCollector, nowInSec));
merged = Transformation.apply(merged, new GarbageSkipper(controller));
- Transformation<UnfilteredRowIterator> purger = purger(controller.cfs,
accordService);
+ Transformation<UnfilteredRowIterator> purger = purger(controller.cfs,
compactionInfos, accordVersion);
merged = Transformation.apply(merged, purger);
merged = DuplicateRowChecker.duringCompaction(merged, type);
compacted = Transformation.apply(merged, new
AbortableUnfilteredPartitionTransformation(this));
}
- private Transformation<UnfilteredRowIterator> purger(ColumnFamilyStore
cfs, Supplier<IAccordService> accordService)
+ private Transformation<UnfilteredRowIterator> purger(ColumnFamilyStore
cfs, Supplier<AccordCompactionInfos> compactionInfos, Supplier<Version> version)
{
if (isPaxos(cfs) && paxosStatePurging() != legacy)
return new PaxosPurger();
@@ -222,9 +245,9 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
return new Purger(controller, nowInSec);
if (isAccordJournal(cfs))
- return new AccordJournalPurger(accordService, cfs);
+ return new AccordJournalPurger(compactionInfos.get(),
version.get(), cfs);
if (isAccordCommandsForKey(cfs))
- return new AccordCommandsForKeyPurger(AccordKeyspace.CFKAccessor,
accordService);
+ return new AccordCommandsForKeyPurger(AccordKeyspace.CFKAccessor,
compactionInfos);
throw new IllegalArgumentException("Unhandled accord table: " +
cfs.keyspace.getName() + '.' + cfs.name);
}
@@ -792,10 +815,10 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
int storeId;
TokenKey tokenKey;
- AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor,
Supplier<IAccordService> accordService)
+ AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor,
Supplier<AccordCompactionInfos> compactionInfos)
{
this.accessor = accessor;
- this.compactionInfos = accordService.get().getCompactionInfo();
+ this.compactionInfos = compactionInfos.get();
}
protected void beginPartition(UnfilteredRowIterator partition)
@@ -836,31 +859,21 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
final AccordCompactionInfos infos;
final ColumnMetadata recordColumn;
final ColumnMetadata versionColumn;
- final AccordService service;
- final AccordAgent agent;
- AccordCompactionInfo info;
- JournalKey key = null;
- Object builder = null;
- FlyweightSerializer<Object, Object> serializer = null;
+ JournalKey key;
+ AccordRowCompactor<?> compactor;
// Initialize topology serializer during compaction to avoid
deserializing redundant epochs
- FlyweightSerializer<AccordTopologyUpdate, Object> topologySerializer;
- Object[] highestClustering = null;
+ FlyweightSerializer<AccordTopologyUpdate, FlyweightImage>
topologySerializer;
final Version userVersion;
- long lastDescriptor = -1;
- int lastOffset = -1;
- public AccordJournalPurger(Supplier<IAccordService> serviceSupplier,
ColumnFamilyStore cfs)
+ public AccordJournalPurger(AccordCompactionInfos compactionInfos,
Version version, ColumnFamilyStore cfs)
{
- service = (AccordService) serviceSupplier.get();
- // TODO: test serialization version logic
- userVersion =
Version.fromVersion(service.journalConfiguration().userVersion());
+ this.userVersion = version;
- this.agent = service.agent();
- this.infos = service.getCompactionInfo();
+ this.infos = compactionInfos;
this.recordColumn =
cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
this.versionColumn =
cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
- this.topologySerializer =
(FlyweightSerializer<AccordTopologyUpdate, Object>) (FlyweightSerializer) new
AccordTopologyUpdate.AccumulatingSerializer(() -> infos.minEpoch);
+ this.topologySerializer =
(FlyweightSerializer<AccordTopologyUpdate, FlyweightImage>)
(FlyweightSerializer) new AccordTopologyUpdate.AccumulatingSerializer(() ->
infos.minEpoch);
}
@SuppressWarnings("unchecked")
@@ -868,88 +881,36 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
protected void beginPartition(UnfilteredRowIterator partition)
{
key =
AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey());
- serializer =
(AccordJournalValueSerializers.FlyweightSerializer<Object, Object>)
key.type.serializer;
- builder = serializer.mergerFor(key);
- lastDescriptor = -1;
- lastOffset = -1;
- highestClustering = null;
+ if (compactor == null || compactor.serializer !=
key.type.serializer)
+ {
+ switch (key.type)
+ {
+ case COMMAND_DIFF:
+ compactor = new AccordCommandRowCompactor(infos,
userVersion, nowInSec);
+ break;
+ case TOPOLOGY_UPDATE:
+ compactor = new
AccordMergingCompactor(topologySerializer, userVersion);
+ break;
+ default:
+ compactor = new
AccordMergingCompactor(key.type.serializer, userVersion);
+ }
+ }
+ compactor.reset(key);
}
@Override
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator
partition)
{
- beginPartition(partition);
-
- if (partition.isEmpty())
- return null;
+ if (!partition.hasNext())
+ return partition;
try
{
- List<Row> rows = new ArrayList<>();
+ beginPartition(partition);
while (partition.hasNext())
- {
- Row row = (Row) partition.next();
- rows.add(row);
- collect(row);
- }
-
- if (key.type != JournalKey.Type.COMMAND_DIFF)
- {
- PartitionUpdate.SimpleBuilder newVersion =
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
- try (DataOutputBuffer out =
DataOutputBuffer.scratchBuffer.get())
- {
- serializer.reserialize(key, builder, out, userVersion);
- newVersion.row(highestClustering)
- .add("record", out.asNewBuffer())
- .add("user_version", userVersion.version);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ collect((Row)partition.next());
- return newVersion.build().unfilteredIterator();
- }
-
- AccordJournal.Builder commandBuilder = (AccordJournal.Builder)
builder;
- if (commandBuilder.isEmpty())
- {
- Invariants.require(rows.isEmpty());
- return partition;
- }
-
- if (info != null && info.commandStoreId != key.commandStoreId)
info = null;
- if (info == null) info = infos.get(key.commandStoreId);
- // TODO (required): should return null only if commandStore
has been removed
- if (info == null)
- return partition;
- DurableBefore durableBefore = infos.durableBefore;
- Cleanup cleanup = commandBuilder.maybeCleanup(PARTIAL, agent,
info.redundantBefore, durableBefore);
- if (cleanup != NO)
- {
- switch (cleanup)
- {
- default: throw new UnhandledEnum(cleanup);
- case EXPUNGE:
- return null;
- case ERASE:
- return
PartitionUpdate.fullPartitionDelete(metadata(), partition.partitionKey(),
Long.MAX_VALUE, nowInSec).unfilteredIterator();
- case TRUNCATE:
- case INVALIDATE:
- case TRUNCATE_WITH_OUTCOME:
- case VESTIGIAL:
- PartitionUpdate.SimpleBuilder newVersion =
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
-
- Row.SimpleBuilder rowBuilder =
newVersion.row(rows.get(rows.size() - 1).clustering().getBufferArray());
- rowBuilder.add("record",
commandBuilder.asByteBuffer(userVersion))
- .add("user_version",
userVersion.version);
-
- return newVersion.build().unfilteredIterator();
- }
- }
-
- return PartitionUpdate.multiRowUpdate(AccordKeyspace.Journal,
partition.partitionKey(), rows)
- .unfilteredIterator();
+ return compactor.result(key, partition.partitionKey());
}
catch (UnknownTableException e)
{
@@ -962,47 +923,218 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
}
}
+ protected void collect(Row row) throws IOException
+ {
+ updateProgress();
+ ByteBuffer bytes = row.getCell(recordColumn).buffer();
+ Version userVersion =
Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer()));
+ compactor.collect(key, row, bytes, userVersion);
+ }
+ }
+
+ static abstract class AccordRowCompactor<T extends FlyweightImage>
+ {
+ final FlyweightSerializer<Object, T> serializer;
+
+ AccordRowCompactor(FlyweightSerializer<Object, T> serializer)
+ {
+ this.serializer = serializer;
+ }
+
+ abstract void reset(JournalKey key);
+ abstract void collect(JournalKey key, Row row, ByteBuffer bytes,
Version userVersion) throws IOException;
+ abstract UnfilteredRowIterator result(JournalKey journalKey,
DecoratedKey partitionKey) throws IOException;
+ }
+
+ static class AccordMergingCompactor<T extends FlyweightImage> extends
AccordRowCompactor<T>
+ {
+ final T builder;
+ final Version userVersion;
+ Object[] highestClustering;
+ long lastDescriptor;
+ int lastOffset;
+
+ AccordMergingCompactor(FlyweightSerializer<Object, T> serializer,
Version userVersion)
+ {
+ super(serializer);
+ this.builder = serializer.mergerFor();
+ this.userVersion = userVersion;
+ }
+
@Override
- protected Row applyToRow(Row row)
+ void reset(JournalKey key)
{
- return row;
+ builder.reset(key);
+ lastDescriptor = -1;
+ lastOffset = -1;
+ highestClustering = null;
}
- protected void collect(Row row) throws IOException
+ @Override
+ protected void collect(JournalKey key, Row row, ByteBuffer bytes,
Version userVersion) throws IOException
{
- updateProgress();
- ByteBuffer record = row.getCell(recordColumn).buffer();
+ if (highestClustering == null)
+ highestClustering = row.clustering().getBufferArray();
+
long descriptor =
LongType.instance.compose(row.clustering().bufferAt(0));
int offset =
Int32Type.instance.compose(row.clustering().bufferAt(1));
if (lastOffset != -1)
{
Invariants.require(descriptor <= lastDescriptor,
- "Descriptors were accessed out of order:
%d was accessed after %d", descriptor, lastDescriptor);
+ "Descriptors were accessed out of order: %d
was accessed after %d", descriptor, lastDescriptor);
Invariants.require(descriptor != lastDescriptor ||
- offset < lastOffset,
- "Offsets within %s were accessed out of
order: %d was accessed after %s", offset, lastOffset);
+ offset < lastOffset,
+ "Offsets within %d were accessed out of
order: %d was accessed after %s", descriptor, offset, lastOffset);
}
lastDescriptor = descriptor;
lastOffset = offset;
- try (DataInputBuffer in = new DataInputBuffer(record, false))
+ try (DataInputBuffer in = new DataInputBuffer(bytes, false))
{
- Version userVersion =
Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer()));
- if (key.type == JournalKey.Type.TOPOLOGY_UPDATE)
- topologySerializer.deserialize(key, builder, in,
userVersion);
- else
- serializer.deserialize(key, builder, in, userVersion);
- if (highestClustering == null) // we iterate highest to lowest
- highestClustering = row.clustering().getBufferArray();
+ serializer.deserialize(key, builder, in, userVersion);
}
}
@Override
- protected Row applyToStatic(Row row)
+ UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey
partitionKey) throws IOException
{
- checkState(row.isStatic() && row.isEmpty());
- return row;
+ PartitionUpdate.SimpleBuilder newVersion =
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partitionKey);
+ try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
+ {
+ serializer.reserialize(journalKey, builder, out, userVersion);
+ newVersion.row(highestClustering)
+ .add("record", out.asNewBuffer())
+ .add("user_version", userVersion.version);
+ }
+
+ return newVersion.build().unfilteredIterator();
+ }
+ }
+
+ static class AccordCommandRowEntry
+ {
+ final AccordJournal.Builder builder = new AccordJournal.Builder();
+ Row row;
+ boolean modified;
+
+ void init(JournalKey key, Row row, ByteBuffer bytes, Version
userVersion) throws IOException
+ {
+ this.row = row;
+ this.builder.reset(key);
+ try (DataInputBuffer in = new DataInputBuffer(bytes, false))
+ {
+ builder.deserializeNext(in, userVersion);
+ }
+ }
+
+ void clear()
+ {
+ row = null;
+ modified = false;
+ builder.clear();
+ }
+ }
+
+ static class AccordCommandRowCompactor extends
AccordRowCompactor<AccordJournal.Builder>
+ {
+ static final Object[] rowTemplate = BTree.build(BulkIterator.of(new
Object[2]), 2, UpdateFunction.noOp);
+ final long timestamp = ClientState.getTimestamp();
+ final AccordCompactionInfos infos;
+ final Version userVersion;
+ final ColumnData userVersionCell;
+ final long nowInSec;
+
+ final AccordJournal.Builder mainBuilder = new AccordJournal.Builder();
+ final List<AccordCommandRowEntry> entries = new ArrayList<>();
+ final ArrayDeque<AccordCommandRowEntry> reuseEntries = new
ArrayDeque<>();
+ AccordCompactionInfo info;
+
+ AccordCommandRowCompactor(AccordCompactionInfos infos, Version
userVersion, long nowInSec)
+ {
+ super((FlyweightSerializer<Object, AccordJournal.Builder>)
JournalKey.Type.COMMAND_DIFF.serializer);
+ this.infos = infos;
+ this.userVersion = userVersion;
+ this.userVersionCell =
BufferCell.live(AccordKeyspace.JournalColumns.user_version, timestamp,
Int32Type.instance.decompose(userVersion.version));
+ this.nowInSec = nowInSec;
+ }
+
+ @Override
+ void reset(JournalKey key)
+ {
+ mainBuilder.reset(key);
+ reuseEntries.addAll(entries);
+ for (int i = 0; i < entries.size() ; ++i)
+ entries.get(i).clear();
+ entries.clear();
+ }
+
+ @Override
+ void collect(JournalKey key, Row row, ByteBuffer bytes, Version
userVersion) throws IOException
+ {
+ AccordCommandRowEntry e = reuseEntries.pollLast();
+ if (e == null)
+ e = new AccordCommandRowEntry();
+ entries.add(e);
+ e.init(key, row, bytes, userVersion);
+ e.modified |= e.builder.clearSuperseded(false, mainBuilder);
+ mainBuilder.fillInMissingOrCleanup(false, e.builder);
+ }
+
+ @Override
+ UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey
partitionKey) throws IOException
+ {
+ if (mainBuilder.isEmpty())
+ return null;
+
+ if (info != null && info.commandStoreId !=
journalKey.commandStoreId) info = null;
+ if (info == null) info = infos.get(journalKey.commandStoreId);
+ // TODO (required): should return null only if commandStore has
been removed
+ if (info == null)
+ return null;
+
+ DurableBefore durableBefore = infos.durableBefore;
+ Cleanup cleanup = mainBuilder.maybeCleanup(false, PARTIAL,
info.redundantBefore, durableBefore);
+ if (cleanup != NO)
+ {
+ switch (cleanup)
+ {
+ default: throw new UnhandledEnum(cleanup);
+ case EXPUNGE:
+ return null;
+ case ERASE:
+ return
PartitionUpdate.fullPartitionDelete(AccordKeyspace.Journal, partitionKey,
Long.MAX_VALUE, nowInSec).unfilteredIterator();
+
+ case TRUNCATE:
+ case TRUNCATE_WITH_OUTCOME:
+ case INVALIDATE:
+ case VESTIGIAL:
+ for (int i = 0, size = entries.size(); i < size ; i++)
+ {
+ AccordCommandRowEntry entry = entries.get(i);
+ if (i == 0) entry.modified |=
entry.builder.addCleanup(false, cleanup);
+ else entry.modified |=
entry.builder.cleanup(false, cleanup);
+ }
+ }
+ }
+
+ PartitionUpdate.Builder newVersion = new
PartitionUpdate.Builder(AccordKeyspace.Journal, partitionKey,
AccordKeyspace.JournalColumns.regular, entries.size());
+ for (int i = 0, size = entries.size() ; i < size ; ++i)
+ {
+ AccordCommandRowEntry entry = entries.get(i);
+ if (!entry.modified)
+ {
+ newVersion.add(entry.row);
+ }
+ else if (entry.builder.flags() != 0)
+ {
+ Object[] newRow = rowTemplate.clone();
+ newRow[0] =
BufferCell.live(AccordKeyspace.JournalColumns.record, timestamp,
entry.builder.asByteBuffer(userVersion));
+ newRow[1] = userVersionCell;
+ newVersion.add(BTreeRow.create(entry.row.clustering(),
entry.row.primaryKeyLivenessInfo(), entry.row.deletion(), newRow));
+ }
+ }
+ return newVersion.build().unfilteredIterator();
}
}
@@ -1049,9 +1181,8 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
private static boolean requiresAccordSpecificPurger(ColumnFamilyStore cfs)
{
return
cfs.getKeyspaceName().equals(SchemaConstants.ACCORD_KEYSPACE_NAME) &&
- ImmutableSet.of(AccordKeyspace.JOURNAL,
- AccordKeyspace.COMMANDS_FOR_KEY)
- .contains(cfs.getTableName());
+ (cfs.getTableName().contains(AccordKeyspace.JOURNAL) ||
+ AccordKeyspace.COMMANDS_FOR_KEY.equals(cfs.getTableName()));
}
private static boolean isAccordTable(ColumnFamilyStore cfs, String name)
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 40fdf31861..6e597726f6 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -61,6 +61,7 @@ import static
org.apache.cassandra.config.CassandraRelevantProperties.IGNORE_COR
import static
org.apache.cassandra.config.CassandraRelevantProperties.TEST_FLUSH_LOCAL_SCHEMA_CHANGES;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.schema.ColumnMetadata.NO_UNIQUE_ID;
import static org.apache.cassandra.schema.SchemaKeyspaceTables.*;
import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
@@ -140,7 +141,6 @@ public final class SchemaKeyspace
+ "clustering_order text,"
+ "column_name_bytes blob,"
+ "kind text,"
- + "unique_id int,"
+ "position int,"
+ "type text,"
+ "PRIMARY KEY ((keyspace_name), table_name, column_name))");
@@ -169,7 +169,6 @@ public final class SchemaKeyspace
+ "dropped_time timestamp,"
+ "kind text,"
+ "type text,"
- + "unique_id int,"
+ "PRIMARY KEY ((keyspace_name), table_name, column_name))");
private static final TableMetadata Triggers =
@@ -704,7 +703,6 @@ public final class SchemaKeyspace
.add("column_name_bytes", column.name.bytes)
.add("kind", column.kind.toString().toLowerCase())
.add("position", column.position())
- .add("unique_id", column.uniqueId)
.add("clustering_order",
column.clusteringOrder().toString().toLowerCase())
.add("type", type.asCQL3Type().toString());
@@ -1089,7 +1087,6 @@ public final class SchemaKeyspace
ColumnMetadata.Kind kind =
ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
- int uniqueId = row.getInt("unique_id", ColumnMetadata.NO_UNIQUE_ID);
int position = row.getInt("position");
ClusteringOrder order =
ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
@@ -1145,7 +1142,7 @@ public final class SchemaKeyspace
mask = new ColumnMask((ScalarFunction) function, values);
}
- return new ColumnMetadata(keyspace, table, name, type, uniqueId,
position, kind, mask);
+ return new ColumnMetadata(keyspace, table, name, type, NO_UNIQUE_ID,
position, kind, mask);
}
private static Map<ByteBuffer, DroppedColumn> fetchDroppedColumns(String
keyspace, String table)
@@ -1165,7 +1162,6 @@ public final class SchemaKeyspace
String keyspace = row.getString("keyspace_name");
String table = row.getString("table_name");
String name = row.getString("column_name");
- int uniqueId = row.getInt("unique_id", ColumnMetadata.NO_UNIQUE_ID);
/*
* we never store actual UDT names in dropped column types (so that we
can safely drop types if nothing refers to
* them anymore), so before storing dropped columns in schema we
expand UDTs to tuples. See expandUserTypes method.
@@ -1178,7 +1174,7 @@ public final class SchemaKeyspace
assert kind == ColumnMetadata.Kind.REGULAR || kind ==
ColumnMetadata.Kind.STATIC
: "Unexpected dropped column kind: " + kind;
- ColumnMetadata column = new ColumnMetadata(keyspace, table,
ColumnIdentifier.getInterned(name, true), type, uniqueId,
ColumnMetadata.NO_POSITION, kind, null);
+ ColumnMetadata column = new ColumnMetadata(keyspace, table,
ColumnIdentifier.getInterned(name, true), type, NO_UNIQUE_ID,
ColumnMetadata.NO_POSITION, kind, null);
long droppedTime =
TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
return new DroppedColumn(column, droppedTime);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
index aa00a3d43d..668736c842 100644
---
a/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
+++
b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
@@ -26,10 +26,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.utils.Invariants;
+import accord.utils.btree.BTree;
+import accord.utils.btree.BulkIterator;
+import accord.utils.btree.UpdateFunction;
+import org.apache.cassandra.db.BufferClustering;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.util.DataInputBuffer;
@@ -37,6 +47,8 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.journal.SegmentCompactor;
import org.apache.cassandra.journal.StaticSegment;
import org.apache.cassandra.journal.StaticSegment.KeyOrderReader;
+import org.apache.cassandra.service.ClientState;
+import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
import org.apache.cassandra.utils.NoSpamLogger;
@@ -51,12 +63,17 @@ public abstract class AbstractAccordSegmentCompactor<V>
implements SegmentCompac
protected static final Logger logger =
LoggerFactory.getLogger(AbstractAccordSegmentCompactor.class);
private static final NoSpamLogger.NoSpamLogStatement unknownTable =
NoSpamLogger.getStatement(logger, "Unknown (probably dropped) TableId {}
reading {}; skipping record", 1L, MINUTES);
+ static final Object[] rowTemplate = BTree.build(BulkIterator.of(new
Object[2]), 2, UpdateFunction.noOp);
+
protected final Version userVersion;
+ protected final ColumnData userVersionCell;
protected final ColumnFamilyStore cfs;
+ protected final long timestamp = ClientState.getTimestamp();
public AbstractAccordSegmentCompactor(Version userVersion,
ColumnFamilyStore cfs)
{
this.userVersion = userVersion;
+ this.userVersionCell =
BufferCell.live(AccordKeyspace.JournalColumns.user_version, timestamp,
Int32Type.instance.decompose(userVersion.version));
this.cfs = cfs;
}
@@ -96,8 +113,8 @@ public abstract class AbstractAccordSegmentCompactor<V>
implements SegmentCompac
initializeWriter();
JournalKey key = null;
- Object builder = null;
- FlyweightSerializer<Object, Object> serializer = null;
+ FlyweightImage builder = null;
+ FlyweightSerializer<Object, FlyweightImage> serializer = null;
long firstDescriptor = -1, lastDescriptor = -1;
int firstOffset = -1, lastOffset = -1;
try
@@ -110,8 +127,9 @@ public abstract class AbstractAccordSegmentCompactor<V>
implements SegmentCompac
maybeWritePartition(key, builder, serializer,
firstDescriptor, firstOffset);
switchPartitions();
key = reader.key();
- serializer = (FlyweightSerializer<Object, Object>)
key.type.serializer;
- builder = serializer.mergerFor(key);
+ serializer = (FlyweightSerializer<Object, FlyweightImage>)
key.type.serializer;
+ builder = serializer.mergerFor();
+ builder.reset(key);
firstDescriptor = lastDescriptor = -1;
firstOffset = lastOffset = -1;
}
@@ -122,7 +140,10 @@ public abstract class AbstractAccordSegmentCompactor<V>
implements SegmentCompac
do
{
if (builder == null)
- builder = serializer.mergerFor(key);
+ {
+ builder = serializer.mergerFor();
+ builder.reset(key);
+ }
try (DataInputBuffer in = new
DataInputBuffer(reader.record(), false))
{
@@ -178,30 +199,27 @@ public abstract class AbstractAccordSegmentCompactor<V>
implements SegmentCompac
private JournalKey prevKey;
private DecoratedKey prevDecoratedKey;
- private void maybeWritePartition(JournalKey key, Object builder,
FlyweightSerializer<Object, Object> serializer, long descriptor, int offset)
throws IOException
+ private void maybeWritePartition(JournalKey key, FlyweightImage builder,
FlyweightSerializer<Object, FlyweightImage> serializer, long descriptor, int
offset) throws IOException
{
if (builder != null)
{
DecoratedKey decoratedKey =
AccordKeyspace.JournalColumns.decorate(key);
-
- if (prevKey != null)
- {
-
Invariants.requireArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 :
-1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1),
- String.format("Partition key and
JournalKey didn't have matching order, which may imply a serialization
issue.\n%s (%s)\n%s (%s)",
- key, decoratedKey,
prevKey, prevDecoratedKey));
- }
+ Invariants.requireArgument(prevKey == null ||
((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : -1) ==
(JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1)),
+ "Partition key and JournalKey didn't
have matching order, which may imply a serialization issue.\n%s (%s)\n%s (%s)",
+ key, decoratedKey, prevKey,
prevDecoratedKey);
prevKey = key;
prevDecoratedKey = decoratedKey;
- SimpleBuilder partitionBuilder =
PartitionUpdate.simpleBuilder(cfs.metadata(), decoratedKey);
+ Object[] rowData = rowTemplate.clone();
try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
{
serializer.reserialize(key, builder, out, userVersion);
- partitionBuilder.row(descriptor, offset)
- .add("record", out.asNewBuffer())
- .add("user_version", userVersion.version);
+ rowData[0] =
BufferCell.live(AccordKeyspace.JournalColumns.record, timestamp,
out.asNewBuffer());
}
- writer().append(partitionBuilder.build().unfilteredIterator());
+ rowData[1] = userVersionCell;
+ Row row =
BTreeRow.create(BufferClustering.make(LongType.instance.decompose(descriptor),
Int32Type.instance.decompose(offset)), LivenessInfo.EMPTY, Row.Deletion.LIVE,
rowData);
+ PartitionUpdate update =
PartitionUpdate.singleRowUpdate(AccordKeyspace.Journal, decoratedKey, row);
+ writer().append(update.unfilteredIterator());
}
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index dad05aa7a5..4e8629696a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -220,9 +220,11 @@ public class AccordCommandStore extends CommandStore
@Override
public void markShardDurable(SafeCommandStore safeStore, TxnId
globalSyncId, Ranges ranges, Status.Durability durability)
{
- store.snapshot(ranges, globalSyncId);
+ if (durability.compareTo(Status.Durability.UniversalOrInvalidated) >=
0)
+ store.snapshot(ranges, globalSyncId);
super.markShardDurable(safeStore, globalSyncId, ranges, durability);
- commandsForRanges.gcBefore(globalSyncId, ranges);
+ if (durability.compareTo(Status.Durability.UniversalOrInvalidated) >=
0)
+ commandsForRanges.gcBefore(globalSyncId, ranges);
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 0ac5c87807..f4159f5731 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -67,9 +67,9 @@ import org.apache.cassandra.journal.SegmentCompactor;
import org.apache.cassandra.journal.StaticSegment;
import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.net.MessagingService;
+import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
-import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
import org.apache.cassandra.service.accord.serializers.CommandSerializers;
import
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
@@ -109,21 +109,19 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
@VisibleForTesting
protected final AccordJournalTable<JournalKey, Object> journalTable;
private final Params params;
- private final AccordAgent agent;
Node node;
enum Status { INITIALIZED, STARTING, REPLAY, STARTED, TERMINATING,
TERMINATED }
private volatile Status status = Status.INITIALIZED;
- public AccordJournal(Params params, AccordAgent agent)
+ public AccordJournal(Params params)
{
- this(params, agent, new
File(DatabaseDescriptor.getAccordJournalDirectory()),
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL));
+ this(params, new File(DatabaseDescriptor.getAccordJournalDirectory()),
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL));
}
@VisibleForTesting
- public AccordJournal(Params params, AccordAgent agent, File directory,
ColumnFamilyStore cfs)
+ public AccordJournal(Params params, File directory, ColumnFamilyStore cfs)
{
- this.agent = agent;
Version userVersion = Version.fromVersion(params.userVersion());
this.journal = new Journal<>("AccordJournal", directory, params,
JournalKey.SUPPORT,
// In Accord, we are using streaming
serialization, i.e. Reader/Writer interfaces instead of materializing objects
@@ -232,7 +230,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
Builder builder = load(commandStoreId, txnId);
- builder.maybeCleanup(FULL, agent, redundantBefore, durableBefore);
+ builder.maybeCleanup(true, FULL, redundantBefore, durableBefore);
return builder.construct(redundantBefore);
}
@@ -243,7 +241,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
if (builder.isEmpty())
return null;
- Cleanup cleanup = builder.shouldCleanup(FULL, node.agent(),
redundantBefore, durableBefore);
+ Cleanup cleanup = builder.shouldCleanup(FULL, redundantBefore,
durableBefore);
switch (cleanup)
{
case VESTIGIAL:
@@ -384,9 +382,9 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
return loadDiffs(commandStoreId, txnId, Load.ALL);
}
- private <BUILDER> BUILDER readAll(JournalKey key)
+ private <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
{
- BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key);
+ BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
// TODO: this can be further improved to avoid allocating lambdas
AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>)
key.type.serializer;
// TODO (expected): for those where we store an image, read only the
first entry we find in DESC order
@@ -618,11 +616,16 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
}
}
- public static class Builder extends CommandChange.Builder
+ public static class Builder extends CommandChange.Builder implements
FlyweightImage
{
public Builder()
{
- super(null, Load.ALL);
+ this(Load.ALL);
+ }
+
+ public Builder(Load load)
+ {
+ super(null, load);
}
public Builder(TxnId txnId)
@@ -634,6 +637,12 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
{
super(txnId, load);
}
+
+ public void reset(JournalKey key)
+ {
+ reset(key.id);
+ }
+
public ByteBuffer asByteBuffer(Version userVersion) throws IOException
{
try (DataOutputBuffer out = new DataOutputBuffer())
@@ -728,11 +737,11 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
Invariants.require(txnId != null);
int readFlags = in.readInt();
Invariants.require(readFlags != 0);
- nextCalled = true;
+ hasUpdate = true;
count++;
// batch-apply any new nulls
- setNulls(readFlags);
+ setNulls(false, readFlags);
// iterator sets low 16 bits; low readFlag bits are nulls, so
masking with ~readFlags restricts to non-null changed fields
int iterable = toIterableSetFields(readFlags) & ~readFlags;
for (Field field = nextSetField(iterable) ; field != null; field =
nextSetField(iterable = unsetIterable(field, iterable)))
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 25e37e0b0b..e72beaab2c 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -34,7 +34,6 @@ import
org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
import org.apache.cassandra.service.accord.serializers.CommandStoreSerializers;
import org.apache.cassandra.service.accord.serializers.Version;
-import static accord.api.Journal.Load.ALL;
import static accord.local.CommandStores.RangesForEpoch;
// TODO (required): test with large collection values, and perhaps split out
some fields if they have a tendency to grow larger
@@ -42,9 +41,14 @@ import static accord.local.CommandStores.RangesForEpoch;
// TODO (required): versioning
public class AccordJournalValueSerializers
{
- public interface FlyweightSerializer<ENTRY, IMAGE>
+ public interface FlyweightImage
{
- IMAGE mergerFor(JournalKey key);
+ void reset(JournalKey key);
+ }
+
+ public interface FlyweightSerializer<ENTRY, IMAGE extends FlyweightImage>
+ {
+ IMAGE mergerFor();
void serialize(JournalKey key, ENTRY from, DataOutputPlus out, Version
userVersion) throws IOException;
@@ -57,9 +61,9 @@ public class AccordJournalValueSerializers
implements FlyweightSerializer<AccordJournal.Writer, AccordJournal.Builder>
{
@Override
- public AccordJournal.Builder mergerFor(JournalKey journalKey)
+ public AccordJournal.Builder mergerFor()
{
- return new AccordJournal.Builder(journalKey.id, ALL);
+ return new AccordJournal.Builder();
}
@Override
@@ -91,7 +95,7 @@ public class AccordJournalValueSerializers
}
}
- public abstract static class Accumulator<A, V>
+ public abstract static class Accumulator<A, V> implements FlyweightImage
{
protected A accumulated;
@@ -115,10 +119,19 @@ public class AccordJournalValueSerializers
public static class IdentityAccumulator<T> extends Accumulator<T, T>
{
+ final T initial;
boolean hasRead;
public IdentityAccumulator(T initial)
{
super(initial);
+ this.initial = initial;
+ }
+
+ @Override
+ public void reset(JournalKey key)
+ {
+ hasRead = false;
+ accumulated = initial;
}
@Override
@@ -135,7 +148,7 @@ public class AccordJournalValueSerializers
implements FlyweightSerializer<RedundantBefore,
IdentityAccumulator<RedundantBefore>>
{
@Override
- public IdentityAccumulator<RedundantBefore> mergerFor(JournalKey
journalKey)
+ public IdentityAccumulator<RedundantBefore> mergerFor()
{
return new IdentityAccumulator<>(RedundantBefore.EMPTY);
}
@@ -184,6 +197,12 @@ public class AccordJournalValueSerializers
super(DurableBefore.EMPTY);
}
+ @Override
+ public void reset(JournalKey key)
+ {
+ accumulated = DurableBefore.EMPTY;
+ }
+
@Override
protected DurableBefore accumulate(DurableBefore oldValue,
DurableBefore newValue)
{
@@ -194,7 +213,7 @@ public class AccordJournalValueSerializers
public static class DurableBeforeSerializer
implements FlyweightSerializer<DurableBefore, DurableBeforeAccumulator>
{
- public DurableBeforeAccumulator mergerFor(JournalKey journalKey)
+ public DurableBeforeAccumulator mergerFor()
{
return new DurableBeforeAccumulator();
}
@@ -231,7 +250,7 @@ public class AccordJournalValueSerializers
implements FlyweightSerializer<NavigableMap<TxnId, Ranges>,
IdentityAccumulator<NavigableMap<TxnId, Ranges>>>
{
@Override
- public IdentityAccumulator<NavigableMap<TxnId, Ranges>>
mergerFor(JournalKey key)
+ public IdentityAccumulator<NavigableMap<TxnId, Ranges>> mergerFor()
{
return new IdentityAccumulator<>(ImmutableSortedMap.of(TxnId.NONE,
Ranges.EMPTY));
}
@@ -259,7 +278,7 @@ public class AccordJournalValueSerializers
implements FlyweightSerializer<NavigableMap<Timestamp, Ranges>,
IdentityAccumulator<NavigableMap<Timestamp, Ranges>>>
{
@Override
- public IdentityAccumulator<NavigableMap<Timestamp, Ranges>>
mergerFor(JournalKey key)
+ public IdentityAccumulator<NavigableMap<Timestamp, Ranges>> mergerFor()
{
return new
IdentityAccumulator<>(ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY));
}
@@ -287,7 +306,7 @@ public class AccordJournalValueSerializers
implements FlyweightSerializer<RangesForEpoch, Accumulator<RangesForEpoch,
RangesForEpoch>>
{
public static final RangesForEpochSerializer instance = new
RangesForEpochSerializer();
- public IdentityAccumulator<RangesForEpoch> mergerFor(JournalKey key)
+ public IdentityAccumulator<RangesForEpoch> mergerFor()
{
return new IdentityAccumulator<>(null);
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index fed2683e32..efe9f39b25 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -552,6 +553,7 @@ public class AccordKeyspace
public static final ColumnMetadata id = getColumn(Journal, "id");
public static final ColumnMetadata record = getColumn(Journal,
"record");
public static final ColumnMetadata user_version = getColumn(Journal,
"user_version");
+ public static final RegularAndStaticColumns regular = new
RegularAndStaticColumns(Columns.NONE, Columns.from(Arrays.asList(record,
user_version)));
public static DecoratedKey decorate(JournalKey key)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 2d2a024e58..aca45679a7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -296,7 +296,7 @@ public class AccordService implements IAccordService,
Shutdownable
final RequestCallbacks callbacks = new RequestCallbacks(time);
this.scheduler = new AccordScheduler();
this.dataStore = new AccordDataStore();
- this.journal = new
AccordJournal(DatabaseDescriptor.getAccord().journal, agent);
+ this.journal = new
AccordJournal(DatabaseDescriptor.getAccord().journal);
this.configService = new AccordConfigurationService(localId);
this.fastPathCoordinator = AccordFastPathCoordinator.create(localId,
configService);
this.messageSink = new AccordMessageSink(agent, configService,
callbacks);
diff --git
a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
index 86eff7369e..ff4a6e6ecb 100644
---
a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
+++
b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
@@ -45,8 +45,6 @@ import
org.apache.cassandra.service.accord.serializers.KeySerializers;
import org.apache.cassandra.service.accord.serializers.TopologySerializers;
import org.apache.cassandra.service.accord.serializers.Version;
-import static
org.apache.cassandra.service.accord.JournalKey.Type.TOPOLOGY_UPDATE;
-
public interface AccordTopologyUpdate
{
Kind kind();
@@ -367,6 +365,12 @@ public interface AccordTopologyUpdate
super(new TreeMap<>());
}
+ @Override
+ public void reset(JournalKey key)
+ {
+ accumulated = new TreeMap<>();
+ }
+
@Override
public void update(AccordTopologyUpdate newValue)
{
@@ -414,9 +418,8 @@ public interface AccordTopologyUpdate
}
@Override
- public Accumulator mergerFor(JournalKey key)
+ public Accumulator mergerFor()
{
- Invariants.require(key.type == TOPOLOGY_UPDATE);
return new Accumulator();
}
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index bc82017d2e..a226861f8f 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -211,9 +211,8 @@ public class NativeSSTableLoaderClient extends
SSTableLoader.Client
ColumnIdentifier name = new
ColumnIdentifier(row.getBytes("column_name_bytes"),
row.getString("column_name"));
int position = row.getInt("position");
- int uniqueId = row.isNull("unique_id") ? ColumnMetadata.NO_UNIQUE_ID :
row.getInt("unique_id");
org.apache.cassandra.schema.ColumnMetadata.Kind kind =
ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
- return new ColumnMetadata(keyspace, table, name, type, uniqueId,
position, kind, null);
+ return new ColumnMetadata(keyspace, table, name, type,
ColumnMetadata.NO_UNIQUE_ID, position, kind, null);
}
private static DroppedColumn createDroppedColumnFromRow(Row row, String
keyspace, String table)
@@ -221,8 +220,7 @@ public class NativeSSTableLoaderClient extends
SSTableLoader.Client
String name = row.getString("column_name");
AbstractType<?> type = CQLTypeParser.parse(keyspace,
row.getString("type"), Types.none());
ColumnMetadata.Kind kind =
ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
- int uniqueId = row.isNull("unique_id") ? ColumnMetadata.NO_UNIQUE_ID :
row.getInt("unique_id");
- ColumnMetadata column = new ColumnMetadata(keyspace, table,
ColumnIdentifier.getInterned(name, true), type, uniqueId,
ColumnMetadata.NO_POSITION, kind, null);
+ ColumnMetadata column = new ColumnMetadata(keyspace, table,
ColumnIdentifier.getInterned(name, true), type, ColumnMetadata.NO_UNIQUE_ID,
ColumnMetadata.NO_POSITION, kind, null);
long droppedTime = row.getTimestamp("dropped_time").getTime();
return new DroppedColumn(column, droppedTime);
}
diff --git
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index f60026b892..fbf2a9c874 100644
---
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -43,14 +43,15 @@ import accord.local.CommandStores;
import accord.local.Node;
import accord.primitives.EpochSupplier;
import accord.utils.DefaultRandom;
+import accord.utils.Invariants;
import accord.utils.RandomSource;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.CompactionIterator;
-import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
@@ -61,11 +62,12 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.journal.SegmentCompactor;
import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.journal.TestParams;
import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Tables;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.serializers.CommandSerializers;
import org.apache.cassandra.service.accord.serializers.DepsSerializers;
import org.apache.cassandra.service.accord.serializers.KeySerializers;
@@ -163,22 +165,30 @@ public class AccordJournalBurnTest extends BurnTestBase
operations,
10 + random.nextInt(30),
new RandomDelayQueue.Factory(random).get(),
- (node, agent, randomSource) -> {
+ (nodeId, randomSource) -> {
try
{
File directory = new
File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet())));
directory.deleteRecursiveOnExit();
- ColumnFamilyStore cfs =
ks.getColumnFamilyStore("journal_" + node);
+ ColumnFamilyStore cfs =
ks.getColumnFamilyStore("journal_" + nodeId);
cfs.disableAutoCompaction();
- AccordJournal journal = new AccordJournal(new
AccordJournalTestParams()
+ AccordJournal journal = new AccordJournal(new
TestParams()
{
@Override
public int segmentSize()
{
- return 32 * 1024 * 1024;
+ return 1 * 1024 * 1024;
}
- }, new AccordAgent(), directory, cfs)
+ }, directory, cfs)
{
+ @Override
+ public AccordJournal start(Node node)
+ {
+ super.start(node);
+ unsafeSetStarted();
+ return this;
+ }
+
@Override
public void saveCommand(int store, CommandUpdate
update, @Nullable Runnable onFlush)
{
@@ -236,45 +246,48 @@ public class AccordJournalBurnTest extends BurnTestBase
this.journal.closeCurrentSegmentForTestingIfNonEmpty();
this.journal.runCompactorForTesting();
- List<SSTableReader> all = new
ArrayList<>(cfs.getLiveSSTables());
+ Set<SSTableReader> orig =
cfs.getLiveSSTables();
+ List<SSTableReader> all = new
ArrayList<>(orig);
if (all.size() <= 1)
return;
- Set<SSTableReader> sstables = new HashSet<>();
-
- int min, max;
- while (true)
+ Set<SSTableReader> selected = new HashSet<>();
+ int count = all.size();
+ int removeCount = random.nextInt(1, count);
+ while (removeCount-- > 0)
{
- int tmp1 = randomSource.nextInt(0,
all.size());
- int tmp2 = randomSource.nextInt(0,
all.size());
- if (tmp1 != tmp2 && Math.abs(tmp1 - tmp2)
>= 1)
- {
- min = Math.min(tmp1, tmp2);
- max = Math.max(tmp1, tmp2);
- break;
- }
+ int removeIndex = random.nextInt(count);
+ SSTableReader reader =
all.get(removeIndex);
+ if (reader == null)
+ continue;
+ all.set(removeIndex, null);
+ selected.add(reader);
+ --count;
}
- // Random subset
- for (int i = min; i < max; i++)
- sstables.add(all.get(i));
- List<ISSTableScanner> scanners =
sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+ if (selected.isEmpty())
+ return;
+ List<ISSTableScanner> scanners =
selected.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
Collection<SSTableReader> newSStables;
- try (LifecycleTransaction txn =
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
- CompactionController controller = new
CompactionController(cfs, sstables, 0);
- CompactionIterator ci = new
CompactionIterator(OperationType.COMPACTION, scanners, controller, 0,
nextTimeUUID()))
+ try (LifecycleTransaction txn =
cfs.getTracker().tryModify(selected, OperationType.COMPACTION);
+ CompactionController controller = new
CompactionController(cfs, selected, 0);
+ CompactionIterator ci = new
CompactionIterator(OperationType.COMPACTION,
+
scanners,
+
controller,
+
0,
+
nextTimeUUID(),
+
ActiveCompactionsTracker.NOOP, null,
+
() -> getCompactionInfo(node, cfs.getTableId()),
+
() -> Version.V1))
{
-
CompactionManager.instance.active.beginCompaction(ci);
- try (CompactionAwareWriter writer =
getCompactionAwareWriter(cfs, cfs.getDirectories(), txn, sstables))
+ try (CompactionAwareWriter writer =
getCompactionAwareWriter(cfs, cfs.getDirectories(), txn, selected))
{
while (ci.hasNext())
- {
writer.append(ci.next());
-
ci.setTargetDirectory(writer.getSStableDirectory().path());
- }
+
ci.setTargetDirectory(writer.getSStableDirectory().path());
// point of no return
newSStables = writer.finish();
}
@@ -282,11 +295,9 @@ public class AccordJournalBurnTest extends BurnTestBase
{
throw new RuntimeException(e);
}
- finally
- {
-
CompactionManager.instance.active.finishCompaction(ci);
- }
}
+
+
Invariants.require(!orig.equals(cfs.getLiveSSTables()));
}
@@ -299,8 +310,6 @@ public class AccordJournalBurnTest extends BurnTestBase
}
};
- journal.start(null);
- journal.unsafeSetStarted();
return journal;
}
catch (Throwable t)
@@ -316,4 +325,18 @@ public class AccordJournalBurnTest extends BurnTestBase
throw SimulationException.wrap(seed, t);
}
}
+
+ public static IAccordService.AccordCompactionInfos getCompactionInfo(Node
node, TableId tableId)
+ {
+ IAccordService.AccordCompactionInfos compactionInfos = new
IAccordService.AccordCompactionInfos(node.durableBefore(),
node.topology().minEpoch());
+ node.commandStores().forEachCommandStore(commandStore -> {
+ compactionInfos.put(commandStore.id(), new
IAccordService.AccordCompactionInfo(commandStore.id(),
+
commandStore.unsafeGetRedundantBefore(),
+
commandStore.unsafeGetRangesForEpoch(),
+
tableId));
+ });
+ return compactionInfos;
+ }
+
+
}
diff --git
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
index 6844f58827..241f5062cb 100644
---
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
+++
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.TestParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.api.AccordAgent;
@@ -94,7 +95,7 @@ public class AccordJournalCompactionTest
Gen<NavigableMap<Timestamp, Ranges>> safeToReadGen =
AccordGenerators.safeToReadGen(DatabaseDescriptor.getPartitioner());
Gen<RangesForEpoch> rangesForEpochGen =
AccordGenerators.rangesForEpoch(DatabaseDescriptor.getPartitioner());
- AccordJournal journal = new AccordJournal(new AccordJournalTestParams()
+ AccordJournal journal = new AccordJournal(new TestParams()
{
@Override
public int segmentSize()
@@ -107,7 +108,7 @@ public class AccordJournalCompactionTest
{
return false;
}
- }, new AccordAgent());
+ });
try
{
journal.start(null);
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 9691756cc3..0d6d11216f 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.Agent;
import accord.api.Key;
import accord.api.Result;
import accord.local.CheckedCommands;
@@ -75,6 +76,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.journal.TestParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
@@ -265,7 +267,9 @@ public class CompactionAccordIteratorsTest
IAccordService.AccordCompactionInfo compactionInfo = new
IAccordService.AccordCompactionInfo(commandStore.id(), redundantBefore,
commandStore.unsafeGetRangesForEpoch(),
((AccordCommandStore)commandStore).tableId());
IAccordService.AccordCompactionInfos compactionInfos = new
IAccordService.AccordCompactionInfos(durableBefore, 0);
compactionInfos.put(commandStore.id(), compactionInfo);
+ when(mockAccordService.agent()).thenReturn(mock(Agent.class));
when(mockAccordService.getCompactionInfo()).thenReturn(compactionInfos);
+ when(mockAccordService.journalConfiguration()).thenReturn(new
TestParams());
return mockAccordService;
}
@@ -374,7 +378,7 @@ public class CompactionAccordIteratorsTest
nextInputScanners.add(scanners.remove(random.nextInt(scanners.size())));
}
try (CompactionController controller = new
CompactionController(ColumnFamilyStore.getIfExists(ACCORD_KEYSPACE_NAME,
cfs.name), Collections.emptySet(), 0);
- CompactionIterator compactionIterator = new
CompactionIterator(OperationType.COMPACTION, nextInputScanners, controller,
FBUtilities.nowInSeconds(), null, ActiveCompactionsTracker.NOOP, null, () ->
mockAccordService))
+ CompactionIterator compactionIterator = new
CompactionIterator(OperationType.COMPACTION, nextInputScanners, controller,
FBUtilities.nowInSeconds(), null, ActiveCompactionsTracker.NOOP, null,
mockAccordService))
{
while (compactionIterator.hasNext())
{
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
index b2d9c07ad2..0fabc00b52 100644
---
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
@@ -229,7 +229,7 @@ public class AccordConfigurationServiceTest
directory.deleteRecursiveOnExit();
Keyspace ks = Schema.instance.getKeyspaceInstance("system_accord");
ColumnFamilyStore cfs =
ks.getColumnFamilyStore(AccordKeyspace.JOURNAL);
- AccordJournal journal = new AccordJournal(new TestParams(), new
AccordAgent(), directory, cfs);
+ AccordJournal journal = new AccordJournal(new TestParams(), directory,
cfs);
journal.start(null);
journal.unsafeSetStarted();
return journal;
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index 0445e3d75e..03666a777b 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -43,12 +43,12 @@ import
org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.TestParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.TokenKey;
import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.utils.StorageCompatibilityMode;
@@ -74,7 +74,7 @@ public class AccordJournalOrderTest
{
if (new File(DatabaseDescriptor.getAccordJournalDirectory()).exists())
ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory());
- AccordJournal accordJournal = new
AccordJournal(AccordJournalTestParams.INSTANCE, new AccordAgent());
+ AccordJournal accordJournal = new AccordJournal(TestParams.INSTANCE);
accordJournal.start(null);
RandomSource randomSource = RandomSource.wrap(new Random(0));
TxnId id1 = AccordGens.txnIds().next(randomSource);
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordJournalTestParams.java
b/test/unit/org/apache/cassandra/service/accord/AccordJournalTestParams.java
deleted file mode 100644
index c9a14efd7f..0000000000
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalTestParams.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.journal.Params;
-import org.apache.cassandra.service.accord.serializers.Version;
-
-public class AccordJournalTestParams implements Params
-{
- public static final AccordJournalTestParams INSTANCE = new
AccordJournalTestParams();
-
- @Override
- public int segmentSize()
- {
- return 32 << 20;
- }
-
- @Override
- public FailurePolicy failurePolicy()
- {
- return FailurePolicy.STOP;
- }
-
- @Override
- public FlushMode flushMode()
- {
- return FlushMode.GROUP;
- }
-
- @Override
- public boolean enableCompaction()
- {
- return false;
- }
-
- @Override
- public long compactionPeriod(TimeUnit units)
- {
- return units.convert(60, TimeUnit.SECONDS);
- }
-
- @Override
- public long flushPeriod(TimeUnit units)
- {
- return units.convert(1, TimeUnit.SECONDS);
- }
-
- @Override
- public long periodicBlockPeriod(TimeUnit units)
- {
- return units.convert(2, TimeUnit.SECONDS);
- }
-
- @Override
- public int userVersion()
- {
- return Version.LATEST.version;
- }
-}
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 0cb57ad9c1..1bb8614ca4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -353,7 +353,7 @@ public class AccordTestUtils
ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory());
AccordSpec.JournalSpec spec = new AccordSpec.JournalSpec();
spec.flushPeriod = new DurationSpec.IntSecondsBound(1);
- AccordJournal journal = new AccordJournal(spec, agent);
+ AccordJournal journal = new AccordJournal(spec);
journal.start(null);
CommandStore.EpochUpdateHolder holder = new
CommandStore.EpochUpdateHolder();
diff --git
a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
index d87ca5292f..6991290021 100644
--- a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
@@ -69,8 +69,14 @@ public class CommandChangeTest
@Test
public void allNull()
{
- int flags = getFlags(null, null);
- assertMissing(flags, ALL);
+ int flags = getFlags(null,
Command.NotDefined.uninitialised(TxnId.NONE));
+ EnumSet<Field> missing = EnumSet.allOf(Field.class);
+ missing.remove(Field.SAVE_STATUS);
+ missing.remove(Field.PARTICIPANTS);
+ missing.remove(Field.PROMISED);
+ missing.remove(Field.ACCEPTED);
+ missing.remove(Field.DURABILITY);
+ assertMissing(flags, missing);
}
@Test
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index 9a68d127ce..ef7e92c6ea 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -31,7 +31,6 @@ import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import javax.annotation.Nullable;
-import accord.api.Agent;
import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
@@ -234,7 +233,7 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
}
};
- this.journal = new DefaultJournal(nodeId, agent, rs.fork());
+ this.journal = new DefaultJournal(nodeId, rs.fork());
this.commandStore = new AccordCommandStore(0,
storeService,
agent,
@@ -465,9 +464,9 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
private static class DefaultJournal extends InMemoryJournal implements
RangeSearcher.Supplier
{
private final RouteInMemoryIndex<?> index = new RouteInMemoryIndex<>();
- private DefaultJournal(Node.Id id, Agent agent, RandomSource rs)
+ private DefaultJournal(Node.Id id, RandomSource rs)
{
- super(id, agent, rs);
+ super(id, rs);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]