This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 4abca1c39e Fix:  - Accord Journal purging was disabled  - remove 
unique_id from schema keyspace  - avoid String.format in Compactor hot path  - 
avoid string concatenation on hot path; improve segment compactor partition 
build efficiency  - Partial compaction should update records in place to ensure 
truncation of discontiguous compactions do not lead to an incorrect field 
version being used  - StoreParticipants.touches behaviour for RX was 
erroneously modified; should touch all no [...]
4abca1c39e is described below

commit 4abca1c39e7be92405bb24ac308dd937f2645139
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Mar 13 11:55:29 2025 +0000

    Fix:
     - Accord Journal purging was disabled
     - remove unique_id from schema keyspace
     - avoid String.format in Compactor hot path
     - avoid string concatenation on hot path; improve segment compactor 
partition build efficiency
     - Partial compaction should update records in place to ensure truncation 
of discontiguous compactions do not lead to an incorrect field version being 
used
     - StoreParticipants.touches behaviour for RX was erroneously modified; 
should touch all non-redundant ranges including those no longer owned
     - SetShardDurable should correctly set DurableBefore Majority/Universal 
based on the Durability parameter
     - fix erroneous prunedBefore invariant
     - Journal compaction should not rewrite fields shadowed by a newer record
     - Don't save updates to ERASED commands
     - Simplify CommandChange.getFlags
     - fix handling of Durability for Invalidated
     - Don't use ApplyAt for GC_BEFORE with partial input, as might be a 
saveStatus >= ApplyAtKnown but with executeAt < ApplyAtKnown
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20441
---
 modules/accord                                     |   2 +-
 .../org/apache/cassandra/config/AccordSpec.java    |   6 +-
 .../db/compaction/CompactionIterator.java          | 377 ++++++++++++++-------
 .../apache/cassandra/schema/SchemaKeyspace.java    |  10 +-
 .../accord/AbstractAccordSegmentCompactor.java     |  56 +--
 .../service/accord/AccordCommandStore.java         |   6 +-
 .../cassandra/service/accord/AccordJournal.java    |  37 +-
 .../accord/AccordJournalValueSerializers.java      |  41 ++-
 .../cassandra/service/accord/AccordKeyspace.java   |   2 +
 .../cassandra/service/accord/AccordService.java    |   2 +-
 .../accord/journal/AccordTopologyUpdate.java       |  11 +-
 .../cassandra/utils/NativeSSTableLoaderClient.java |   6 +-
 .../service/accord/AccordJournalBurnTest.java      |  99 +++---
 .../accord/AccordJournalCompactionTest.java        |   5 +-
 .../compaction/CompactionAccordIteratorsTest.java  |   6 +-
 .../accord/AccordConfigurationServiceTest.java     |   2 +-
 .../service/accord/AccordJournalOrderTest.java     |   4 +-
 .../service/accord/AccordJournalTestParams.java    |  77 -----
 .../cassandra/service/accord/AccordTestUtils.java  |   2 +-
 .../service/accord/CommandChangeTest.java          |  10 +-
 .../accord/SimulatedAccordCommandStore.java        |   7 +-
 21 files changed, 451 insertions(+), 317 deletions(-)

diff --git a/modules/accord b/modules/accord
index c7379e12bd..0a5446a365 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit c7379e12bd8f8732004cb77264801fe157af1dbe
+Subproject commit 0a5446a365fe7eb1a15b6a2d0d72f9475c51bc47
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index b81792df0c..a85437624f 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -158,10 +158,10 @@ public class AccordSpec
     public volatile DurationSpec.IntSecondsBound fast_path_update_delay = null;
 
     public volatile DurationSpec.IntSecondsBound gc_delay = new 
DurationSpec.IntSecondsBound("5m");
-    public volatile int shard_durability_target_splits = 128;
+    public volatile int shard_durability_target_splits = 16;
     public volatile DurationSpec.IntSecondsBound durability_txnid_lag = new 
DurationSpec.IntSecondsBound(5);
-    public volatile DurationSpec.IntSecondsBound shard_durability_cycle = new 
DurationSpec.IntSecondsBound(15, TimeUnit.MINUTES);
-    public volatile DurationSpec.IntSecondsBound global_durability_cycle = new 
DurationSpec.IntSecondsBound(10, TimeUnit.MINUTES);
+    public volatile DurationSpec.IntSecondsBound shard_durability_cycle = new 
DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
+    public volatile DurationSpec.IntSecondsBound global_durability_cycle = new 
DurationSpec.IntSecondsBound(5, TimeUnit.MINUTES);
 
     public enum TransactionalRangeMigration
     {
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index ee236082ac..eeb9f9c340 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -27,8 +28,8 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongPredicate;
 import java.util.function.Supplier;
-import javax.annotation.Nonnull;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Ordering;
 import org.slf4j.Logger;
@@ -39,6 +40,9 @@ import accord.local.DurableBefore;
 import accord.local.RedundantBefore;
 import accord.utils.Invariants;
 import accord.utils.UnhandledEnum;
+import accord.utils.btree.BTree;
+import accord.utils.btree.BulkIterator;
+import accord.utils.btree.UpdateFunction;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.AbstractCompactionController;
@@ -57,6 +61,9 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.PurgeFunction;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
 import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 import org.apache.cassandra.db.rows.Row;
@@ -82,8 +89,9 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.AccordJournalValueSerializers;
+import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
 import org.apache.cassandra.service.accord.AccordKeyspace;
 import 
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor;
@@ -92,7 +100,6 @@ import org.apache.cassandra.service.accord.IAccordService;
 import org.apache.cassandra.service.accord.IAccordService.AccordCompactionInfo;
 import 
org.apache.cassandra.service.accord.IAccordService.AccordCompactionInfos;
 import org.apache.cassandra.service.accord.JournalKey;
-import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.accord.api.TokenKey;
 import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
 import org.apache.cassandra.service.accord.serializers.Version;
@@ -158,7 +165,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
 
     public CompactionIterator(OperationType type, List<ISSTableScanner> 
scanners, AbstractCompactionController controller, long nowInSec, TimeUUID 
compactionId)
     {
-        this(type, scanners, controller, nowInSec, compactionId, 
ActiveCompactionsTracker.NOOP, null, AccordService::instance);
+        this(type, scanners, controller, nowInSec, compactionId, 
ActiveCompactionsTracker.NOOP, null);
     }
 
     public CompactionIterator(OperationType type,
@@ -170,7 +177,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                               TopPartitionTracker.Collector 
topPartitionCollector)
     {
         this(type, scanners, controller, nowInSec, compactionId, 
activeCompactions, topPartitionCollector,
-             AccordService::instance);
+             AccordService.isSetup() ? AccordService.instance() : null);
     }
 
     public CompactionIterator(OperationType type,
@@ -180,7 +187,23 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                               TimeUUID compactionId,
                               ActiveCompactionsTracker activeCompactions,
                               TopPartitionTracker.Collector 
topPartitionCollector,
-                              @Nonnull Supplier<IAccordService> accordService)
+                              IAccordService accord)
+    {
+        this(type, scanners, controller, nowInSec, compactionId, 
activeCompactions, topPartitionCollector,
+             () -> accord.getCompactionInfo(),
+             () -> 
Version.fromVersion(accord.journalConfiguration().userVersion()));
+    }
+
+    @VisibleForTesting
+    public CompactionIterator(OperationType type,
+                              List<ISSTableScanner> scanners,
+                              AbstractCompactionController controller,
+                              long nowInSec,
+                              TimeUUID compactionId,
+                              ActiveCompactionsTracker activeCompactions,
+                              TopPartitionTracker.Collector 
topPartitionCollector,
+                              Supplier<AccordCompactionInfos> compactionInfos,
+                              Supplier<Version> accordVersion)
     {
         this.controller = controller;
         this.type = type;
@@ -206,13 +229,13 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
         if (topPartitionCollector != null) // need to count tombstones before 
they are purged
             merged = Transformation.apply(merged, new 
TopPartitionTracker.TombstoneCounter(topPartitionCollector, nowInSec));
         merged = Transformation.apply(merged, new GarbageSkipper(controller));
-        Transformation<UnfilteredRowIterator> purger = purger(controller.cfs, 
accordService);
+        Transformation<UnfilteredRowIterator> purger = purger(controller.cfs, 
compactionInfos, accordVersion);
         merged = Transformation.apply(merged, purger);
         merged = DuplicateRowChecker.duringCompaction(merged, type);
         compacted = Transformation.apply(merged, new 
AbortableUnfilteredPartitionTransformation(this));
     }
 
-    private Transformation<UnfilteredRowIterator> purger(ColumnFamilyStore 
cfs, Supplier<IAccordService> accordService)
+    private Transformation<UnfilteredRowIterator> purger(ColumnFamilyStore 
cfs, Supplier<AccordCompactionInfos> compactionInfos, Supplier<Version> version)
     {
         if (isPaxos(cfs) && paxosStatePurging() != legacy)
             return new PaxosPurger();
@@ -222,9 +245,9 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             return new Purger(controller, nowInSec);
 
         if (isAccordJournal(cfs))
-            return new AccordJournalPurger(accordService, cfs);
+            return new AccordJournalPurger(compactionInfos.get(), 
version.get(), cfs);
         if (isAccordCommandsForKey(cfs))
-            return new AccordCommandsForKeyPurger(AccordKeyspace.CFKAccessor, 
accordService);
+            return new AccordCommandsForKeyPurger(AccordKeyspace.CFKAccessor, 
compactionInfos);
 
         throw new IllegalArgumentException("Unhandled accord table: " + 
cfs.keyspace.getName() + '.' + cfs.name);
     }
@@ -792,10 +815,10 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
         int storeId;
         TokenKey tokenKey;
 
-        AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor, 
Supplier<IAccordService> accordService)
+        AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor, 
Supplier<AccordCompactionInfos> compactionInfos)
         {
             this.accessor = accessor;
-            this.compactionInfos = accordService.get().getCompactionInfo();
+            this.compactionInfos = compactionInfos.get();
         }
 
         protected void beginPartition(UnfilteredRowIterator partition)
@@ -836,31 +859,21 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
         final AccordCompactionInfos infos;
         final ColumnMetadata recordColumn;
         final ColumnMetadata versionColumn;
-        final AccordService service;
-        final AccordAgent agent;
 
-        AccordCompactionInfo info;
-        JournalKey key = null;
-        Object builder = null;
-        FlyweightSerializer<Object, Object> serializer = null;
+        JournalKey key;
+        AccordRowCompactor<?> compactor;
         // Initialize topology serializer during compaction to avoid 
deserializing redundant epochs
-        FlyweightSerializer<AccordTopologyUpdate, Object> topologySerializer;
-        Object[] highestClustering = null;
+        FlyweightSerializer<AccordTopologyUpdate, FlyweightImage> 
topologySerializer;
         final Version userVersion;
-        long lastDescriptor = -1;
-        int lastOffset = -1;
 
-        public AccordJournalPurger(Supplier<IAccordService> serviceSupplier, 
ColumnFamilyStore cfs)
+        public AccordJournalPurger(AccordCompactionInfos compactionInfos, 
Version version, ColumnFamilyStore cfs)
         {
-            service = (AccordService) serviceSupplier.get();
-            // TODO: test serialization version logic
-            userVersion = 
Version.fromVersion(service.journalConfiguration().userVersion());
+            this.userVersion = version;
 
-            this.agent = service.agent();
-            this.infos = service.getCompactionInfo();
+            this.infos = compactionInfos;
             this.recordColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
             this.versionColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
-            this.topologySerializer = 
(FlyweightSerializer<AccordTopologyUpdate, Object>) (FlyweightSerializer) new 
AccordTopologyUpdate.AccumulatingSerializer(() -> infos.minEpoch);
+            this.topologySerializer = 
(FlyweightSerializer<AccordTopologyUpdate, FlyweightImage>) 
(FlyweightSerializer) new AccordTopologyUpdate.AccumulatingSerializer(() -> 
infos.minEpoch);
         }
 
         @SuppressWarnings("unchecked")
@@ -868,88 +881,36 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
         protected void beginPartition(UnfilteredRowIterator partition)
         {
             key = 
AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey());
-            serializer = 
(AccordJournalValueSerializers.FlyweightSerializer<Object, Object>) 
key.type.serializer;
-            builder = serializer.mergerFor(key);
-            lastDescriptor = -1;
-            lastOffset = -1;
-            highestClustering = null;
+            if (compactor == null || compactor.serializer != 
key.type.serializer)
+            {
+                switch (key.type)
+                {
+                    case COMMAND_DIFF:
+                        compactor = new AccordCommandRowCompactor(infos, 
userVersion, nowInSec);
+                        break;
+                    case TOPOLOGY_UPDATE:
+                        compactor = new 
AccordMergingCompactor(topologySerializer, userVersion);
+                        break;
+                    default:
+                        compactor = new 
AccordMergingCompactor(key.type.serializer, userVersion);
+                }
+            }
+            compactor.reset(key);
         }
 
         @Override
         protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
         {
-            beginPartition(partition);
-
-            if (partition.isEmpty())
-                return null;
+            if (!partition.hasNext())
+                return partition;
 
             try
             {
-                List<Row> rows = new ArrayList<>();
+                beginPartition(partition);
                 while (partition.hasNext())
-                {
-                    Row row = (Row) partition.next();
-                    rows.add(row);
-                    collect(row);
-                }
-
-                if (key.type != JournalKey.Type.COMMAND_DIFF)
-                {
-                    PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
-                    try (DataOutputBuffer out = 
DataOutputBuffer.scratchBuffer.get())
-                    {
-                        serializer.reserialize(key, builder, out, userVersion);
-                        newVersion.row(highestClustering)
-                                  .add("record", out.asNewBuffer())
-                                  .add("user_version", userVersion.version);
-                    }
-                    catch (IOException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
+                    collect((Row)partition.next());
 
-                    return newVersion.build().unfilteredIterator();
-                }
-
-                AccordJournal.Builder commandBuilder = (AccordJournal.Builder) 
builder;
-                if (commandBuilder.isEmpty())
-                {
-                    Invariants.require(rows.isEmpty());
-                    return partition;
-                }
-
-                if (info != null && info.commandStoreId != key.commandStoreId) 
info = null;
-                if (info == null) info = infos.get(key.commandStoreId);
-                // TODO (required): should return null only if commandStore 
has been removed
-                if (info == null)
-                    return partition;
-                DurableBefore durableBefore = infos.durableBefore;
-                Cleanup cleanup = commandBuilder.maybeCleanup(PARTIAL, agent, 
info.redundantBefore, durableBefore);
-                if (cleanup != NO)
-                {
-                    switch (cleanup)
-                    {
-                        default: throw new UnhandledEnum(cleanup);
-                        case EXPUNGE:
-                            return null;
-                        case ERASE:
-                            return 
PartitionUpdate.fullPartitionDelete(metadata(), partition.partitionKey(), 
Long.MAX_VALUE, nowInSec).unfilteredIterator();
-                        case TRUNCATE:
-                        case INVALIDATE:
-                        case TRUNCATE_WITH_OUTCOME:
-                        case VESTIGIAL:
-                            PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
-
-                            Row.SimpleBuilder rowBuilder = 
newVersion.row(rows.get(rows.size() - 1).clustering().getBufferArray());
-                            rowBuilder.add("record", 
commandBuilder.asByteBuffer(userVersion))
-                                      .add("user_version", 
userVersion.version);
-
-                            return newVersion.build().unfilteredIterator();
-                    }
-                }
-
-                return PartitionUpdate.multiRowUpdate(AccordKeyspace.Journal, 
partition.partitionKey(), rows)
-                                      .unfilteredIterator();
+                return compactor.result(key, partition.partitionKey());
             }
             catch (UnknownTableException e)
             {
@@ -962,47 +923,218 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             }
         }
 
+        protected void collect(Row row) throws IOException
+        {
+            updateProgress();
+            ByteBuffer bytes = row.getCell(recordColumn).buffer();
+            Version userVersion = 
Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer()));
+            compactor.collect(key, row, bytes, userVersion);
+        }
+    }
+
+    static abstract class AccordRowCompactor<T extends FlyweightImage>
+    {
+        final FlyweightSerializer<Object, T> serializer;
+
+        AccordRowCompactor(FlyweightSerializer<Object, T> serializer)
+        {
+            this.serializer = serializer;
+        }
+
+        abstract void reset(JournalKey key);
+        abstract void collect(JournalKey key, Row row, ByteBuffer bytes, 
Version userVersion) throws IOException;
+        abstract UnfilteredRowIterator result(JournalKey journalKey, 
DecoratedKey partitionKey) throws IOException;
+    }
+
+    static class AccordMergingCompactor<T extends FlyweightImage> extends 
AccordRowCompactor<T>
+    {
+        final T builder;
+        final Version userVersion;
+        Object[] highestClustering;
+        long lastDescriptor;
+        int lastOffset;
+
+        AccordMergingCompactor(FlyweightSerializer<Object, T> serializer, 
Version userVersion)
+        {
+            super(serializer);
+            this.builder = serializer.mergerFor();
+            this.userVersion = userVersion;
+        }
+
         @Override
-        protected Row applyToRow(Row row)
+        void reset(JournalKey key)
         {
-            return row;
+            builder.reset(key);
+            lastDescriptor = -1;
+            lastOffset = -1;
+            highestClustering = null;
         }
 
-        protected void collect(Row row) throws IOException
+        @Override
+        protected void collect(JournalKey key, Row row, ByteBuffer bytes, 
Version userVersion) throws IOException
         {
-            updateProgress();
-            ByteBuffer record = row.getCell(recordColumn).buffer();
+            if (highestClustering == null)
+                highestClustering = row.clustering().getBufferArray();
+
             long descriptor = 
LongType.instance.compose(row.clustering().bufferAt(0));
             int offset = 
Int32Type.instance.compose(row.clustering().bufferAt(1));
 
             if (lastOffset != -1)
             {
                 Invariants.require(descriptor <= lastDescriptor,
-                                      "Descriptors were accessed out of order: 
%d was accessed after %d", descriptor, lastDescriptor);
+                                   "Descriptors were accessed out of order: %d 
was accessed after %d", descriptor, lastDescriptor);
                 Invariants.require(descriptor != lastDescriptor ||
-                                      offset < lastOffset,
-                                      "Offsets within %s were accessed out of 
order: %d was accessed after %s", offset, lastOffset);
+                                   offset < lastOffset,
+                                   "Offsets within %d were accessed out of 
order: %d was accessed after %s", descriptor, offset, lastOffset);
             }
             lastDescriptor = descriptor;
             lastOffset = offset;
 
-            try (DataInputBuffer in = new DataInputBuffer(record, false))
+            try (DataInputBuffer in = new DataInputBuffer(bytes, false))
             {
-                Version userVersion = 
Version.fromVersion(Int32Type.instance.compose(row.getCell(versionColumn).buffer()));
-                if (key.type == JournalKey.Type.TOPOLOGY_UPDATE)
-                    topologySerializer.deserialize(key, builder, in, 
userVersion);
-                else
-                    serializer.deserialize(key, builder, in, userVersion);
-                if (highestClustering == null) // we iterate highest to lowest
-                    highestClustering = row.clustering().getBufferArray();
+                serializer.deserialize(key, builder, in, userVersion);
             }
         }
 
         @Override
-        protected Row applyToStatic(Row row)
+        UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey 
partitionKey) throws IOException
         {
-            checkState(row.isStatic() && row.isEmpty());
-            return row;
+            PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partitionKey);
+            try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
+            {
+                serializer.reserialize(journalKey, builder, out, userVersion);
+                newVersion.row(highestClustering)
+                          .add("record", out.asNewBuffer())
+                          .add("user_version", userVersion.version);
+            }
+
+            return newVersion.build().unfilteredIterator();
+        }
+    }
+
+    static class AccordCommandRowEntry
+    {
+        final AccordJournal.Builder builder = new AccordJournal.Builder();
+        Row row;
+        boolean modified;
+
+        void init(JournalKey key, Row row, ByteBuffer bytes, Version 
userVersion) throws IOException
+        {
+            this.row = row;
+            this.builder.reset(key);
+            try (DataInputBuffer in = new DataInputBuffer(bytes, false))
+            {
+                builder.deserializeNext(in, userVersion);
+            }
+        }
+
+        void clear()
+        {
+            row = null;
+            modified = false;
+            builder.clear();
+        }
+    }
+
+    static class AccordCommandRowCompactor extends 
AccordRowCompactor<AccordJournal.Builder>
+    {
+        static final Object[] rowTemplate = BTree.build(BulkIterator.of(new 
Object[2]), 2, UpdateFunction.noOp);
+        final long timestamp = ClientState.getTimestamp();
+        final AccordCompactionInfos infos;
+        final Version userVersion;
+        final ColumnData userVersionCell;
+        final long nowInSec;
+
+        final AccordJournal.Builder mainBuilder = new AccordJournal.Builder();
+        final List<AccordCommandRowEntry> entries = new ArrayList<>();
+        final ArrayDeque<AccordCommandRowEntry> reuseEntries = new 
ArrayDeque<>();
+        AccordCompactionInfo info;
+
+        AccordCommandRowCompactor(AccordCompactionInfos infos, Version 
userVersion, long nowInSec)
+        {
+            super((FlyweightSerializer<Object, AccordJournal.Builder>) 
JournalKey.Type.COMMAND_DIFF.serializer);
+            this.infos = infos;
+            this.userVersion = userVersion;
+            this.userVersionCell = 
BufferCell.live(AccordKeyspace.JournalColumns.user_version, timestamp, 
Int32Type.instance.decompose(userVersion.version));
+            this.nowInSec = nowInSec;
+        }
+
+        @Override
+        void reset(JournalKey key)
+        {
+            mainBuilder.reset(key);
+            reuseEntries.addAll(entries);
+            for (int i = 0; i < entries.size() ; ++i)
+                entries.get(i).clear();
+            entries.clear();
+        }
+
+        @Override
+        void collect(JournalKey key, Row row, ByteBuffer bytes, Version 
userVersion) throws IOException
+        {
+            AccordCommandRowEntry e = reuseEntries.pollLast();
+            if (e == null)
+                e = new AccordCommandRowEntry();
+            entries.add(e);
+            e.init(key, row, bytes, userVersion);
+            e.modified |= e.builder.clearSuperseded(false, mainBuilder);
+            mainBuilder.fillInMissingOrCleanup(false, e.builder);
+        }
+
+        @Override
+        UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey 
partitionKey) throws IOException
+        {
+            if (mainBuilder.isEmpty())
+                return null;
+
+            if (info != null && info.commandStoreId != 
journalKey.commandStoreId) info = null;
+            if (info == null) info = infos.get(journalKey.commandStoreId);
+            // TODO (required): should return null only if commandStore has 
been removed
+            if (info == null)
+                return null;
+
+            DurableBefore durableBefore = infos.durableBefore;
+            Cleanup cleanup = mainBuilder.maybeCleanup(false, PARTIAL, 
info.redundantBefore, durableBefore);
+            if (cleanup != NO)
+            {
+                switch (cleanup)
+                {
+                    default: throw new UnhandledEnum(cleanup);
+                    case EXPUNGE:
+                        return null;
+                    case ERASE:
+                        return 
PartitionUpdate.fullPartitionDelete(AccordKeyspace.Journal, partitionKey, 
Long.MAX_VALUE, nowInSec).unfilteredIterator();
+
+                    case TRUNCATE:
+                    case TRUNCATE_WITH_OUTCOME:
+                    case INVALIDATE:
+                    case VESTIGIAL:
+                        for (int i = 0, size = entries.size(); i < size ; i++)
+                        {
+                            AccordCommandRowEntry entry = entries.get(i);
+                            if (i == 0) entry.modified |= 
entry.builder.addCleanup(false, cleanup);
+                            else        entry.modified |= 
entry.builder.cleanup(false, cleanup);
+                        }
+                }
+            }
+
+            PartitionUpdate.Builder newVersion = new 
PartitionUpdate.Builder(AccordKeyspace.Journal, partitionKey, 
AccordKeyspace.JournalColumns.regular, entries.size());
+            for (int i = 0, size = entries.size() ; i < size ; ++i)
+            {
+                AccordCommandRowEntry entry = entries.get(i);
+                if (!entry.modified)
+                {
+                    newVersion.add(entry.row);
+                }
+                else if (entry.builder.flags() != 0)
+                {
+                    Object[] newRow = rowTemplate.clone();
+                    newRow[0] = 
BufferCell.live(AccordKeyspace.JournalColumns.record, timestamp, 
entry.builder.asByteBuffer(userVersion));
+                    newRow[1] = userVersionCell;
+                    newVersion.add(BTreeRow.create(entry.row.clustering(), 
entry.row.primaryKeyLivenessInfo(), entry.row.deletion(), newRow));
+                }
+            }
+            return newVersion.build().unfilteredIterator();
         }
     }
 
@@ -1049,9 +1181,8 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
     private static boolean requiresAccordSpecificPurger(ColumnFamilyStore cfs)
     {
         return 
cfs.getKeyspaceName().equals(SchemaConstants.ACCORD_KEYSPACE_NAME) &&
-               ImmutableSet.of(AccordKeyspace.JOURNAL,
-                               AccordKeyspace.COMMANDS_FOR_KEY)
-                           .contains(cfs.getTableName());
+               (cfs.getTableName().contains(AccordKeyspace.JOURNAL) ||
+                AccordKeyspace.COMMANDS_FOR_KEY.equals(cfs.getTableName()));
     }
 
     private static boolean isAccordTable(ColumnFamilyStore cfs, String name)
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 40fdf31861..6e597726f6 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -61,6 +61,7 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.IGNORE_COR
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.TEST_FLUSH_LOCAL_SCHEMA_CHANGES;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.schema.ColumnMetadata.NO_UNIQUE_ID;
 import static org.apache.cassandra.schema.SchemaKeyspaceTables.*;
 import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
 
@@ -140,7 +141,6 @@ public final class SchemaKeyspace
               + "clustering_order text,"
               + "column_name_bytes blob,"
               + "kind text,"
-              + "unique_id int,"
               + "position int,"
               + "type text,"
               + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
@@ -169,7 +169,6 @@ public final class SchemaKeyspace
               + "dropped_time timestamp,"
               + "kind text,"
               + "type text,"
-              + "unique_id int,"
               + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
 
     private static final TableMetadata Triggers =
@@ -704,7 +703,6 @@ public final class SchemaKeyspace
                .add("column_name_bytes", column.name.bytes)
                .add("kind", column.kind.toString().toLowerCase())
                .add("position", column.position())
-               .add("unique_id", column.uniqueId)
                .add("clustering_order", 
column.clusteringOrder().toString().toLowerCase())
                .add("type", type.asCQL3Type().toString());
 
@@ -1089,7 +1087,6 @@ public final class SchemaKeyspace
 
         ColumnMetadata.Kind kind = 
ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
 
-        int uniqueId = row.getInt("unique_id", ColumnMetadata.NO_UNIQUE_ID);
         int position = row.getInt("position");
         ClusteringOrder order = 
ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase());
 
@@ -1145,7 +1142,7 @@ public final class SchemaKeyspace
             mask = new ColumnMask((ScalarFunction) function, values);
         }
 
-        return new ColumnMetadata(keyspace, table, name, type, uniqueId, 
position, kind, mask);
+        return new ColumnMetadata(keyspace, table, name, type, NO_UNIQUE_ID, 
position, kind, mask);
     }
 
     private static Map<ByteBuffer, DroppedColumn> fetchDroppedColumns(String 
keyspace, String table)
@@ -1165,7 +1162,6 @@ public final class SchemaKeyspace
         String keyspace = row.getString("keyspace_name");
         String table = row.getString("table_name");
         String name = row.getString("column_name");
-        int uniqueId = row.getInt("unique_id", ColumnMetadata.NO_UNIQUE_ID);
         /*
          * we never store actual UDT names in dropped column types (so that we 
can safely drop types if nothing refers to
          * them anymore), so before storing dropped columns in schema we 
expand UDTs to tuples. See expandUserTypes method.
@@ -1178,7 +1174,7 @@ public final class SchemaKeyspace
         assert kind == ColumnMetadata.Kind.REGULAR || kind == 
ColumnMetadata.Kind.STATIC
             : "Unexpected dropped column kind: " + kind;
 
-        ColumnMetadata column = new ColumnMetadata(keyspace, table, 
ColumnIdentifier.getInterned(name, true), type, uniqueId, 
ColumnMetadata.NO_POSITION, kind, null);
+        ColumnMetadata column = new ColumnMetadata(keyspace, table, 
ColumnIdentifier.getInterned(name, true), type, NO_UNIQUE_ID, 
ColumnMetadata.NO_POSITION, kind, null);
         long droppedTime = 
TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time"));
         return new DroppedColumn(column, droppedTime);
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
 
b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
index aa00a3d43d..668736c842 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
@@ -26,10 +26,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.utils.Invariants;
+import accord.utils.btree.BTree;
+import accord.utils.btree.BulkIterator;
+import accord.utils.btree.UpdateFunction;
+import org.apache.cassandra.db.BufferClustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -37,6 +47,8 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.StaticSegment;
 import org.apache.cassandra.journal.StaticSegment.KeyOrderReader;
+import org.apache.cassandra.service.ClientState;
+import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
 import org.apache.cassandra.utils.NoSpamLogger;
 
@@ -51,12 +63,17 @@ public abstract class AbstractAccordSegmentCompactor<V> 
implements SegmentCompac
     protected static final Logger logger = 
LoggerFactory.getLogger(AbstractAccordSegmentCompactor.class);
     private static final NoSpamLogger.NoSpamLogStatement unknownTable = 
NoSpamLogger.getStatement(logger, "Unknown (probably dropped) TableId {} 
reading {}; skipping record", 1L, MINUTES);
 
+    static final Object[] rowTemplate = BTree.build(BulkIterator.of(new 
Object[2]), 2, UpdateFunction.noOp);
+
     protected final Version userVersion;
+    protected final ColumnData userVersionCell;
     protected final ColumnFamilyStore cfs;
+    protected final long timestamp = ClientState.getTimestamp();
 
     public AbstractAccordSegmentCompactor(Version userVersion, 
ColumnFamilyStore cfs)
     {
         this.userVersion = userVersion;
+        this.userVersionCell = 
BufferCell.live(AccordKeyspace.JournalColumns.user_version, timestamp, 
Int32Type.instance.decompose(userVersion.version));
         this.cfs = cfs;
     }
 
@@ -96,8 +113,8 @@ public abstract class AbstractAccordSegmentCompactor<V> 
implements SegmentCompac
         initializeWriter();
 
         JournalKey key = null;
-        Object builder = null;
-        FlyweightSerializer<Object, Object> serializer = null;
+        FlyweightImage builder = null;
+        FlyweightSerializer<Object, FlyweightImage> serializer = null;
         long firstDescriptor = -1, lastDescriptor = -1;
         int firstOffset = -1, lastOffset = -1;
         try
@@ -110,8 +127,9 @@ public abstract class AbstractAccordSegmentCompactor<V> 
implements SegmentCompac
                     maybeWritePartition(key, builder, serializer, 
firstDescriptor, firstOffset);
                     switchPartitions();
                     key = reader.key();
-                    serializer = (FlyweightSerializer<Object, Object>) 
key.type.serializer;
-                    builder = serializer.mergerFor(key);
+                    serializer = (FlyweightSerializer<Object, FlyweightImage>) 
key.type.serializer;
+                    builder = serializer.mergerFor();
+                    builder.reset(key);
                     firstDescriptor = lastDescriptor = -1;
                     firstOffset = lastOffset = -1;
                 }
@@ -122,7 +140,10 @@ public abstract class AbstractAccordSegmentCompactor<V> 
implements SegmentCompac
                 do
                 {
                     if (builder == null)
-                        builder = serializer.mergerFor(key);
+                    {
+                        builder = serializer.mergerFor();
+                        builder.reset(key);
+                    }
 
                     try (DataInputBuffer in = new 
DataInputBuffer(reader.record(), false))
                     {
@@ -178,30 +199,27 @@ public abstract class AbstractAccordSegmentCompactor<V> 
implements SegmentCompac
     private JournalKey prevKey;
     private DecoratedKey prevDecoratedKey;
 
-    private void maybeWritePartition(JournalKey key, Object builder, 
FlyweightSerializer<Object, Object> serializer, long descriptor, int offset) 
throws IOException
+    private void maybeWritePartition(JournalKey key, FlyweightImage builder, 
FlyweightSerializer<Object, FlyweightImage> serializer, long descriptor, int 
offset) throws IOException
     {
         if (builder != null)
         {
             DecoratedKey decoratedKey = 
AccordKeyspace.JournalColumns.decorate(key);
-
-            if (prevKey != null)
-            {
-                
Invariants.requireArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : 
-1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1),
-                                           String.format("Partition key and 
JournalKey didn't have matching order, which may imply a serialization 
issue.\n%s (%s)\n%s (%s)",
-                                                         key, decoratedKey, 
prevKey, prevDecoratedKey));
-            }
+            Invariants.requireArgument(prevKey == null || 
((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : -1) == 
(JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1)),
+                                       "Partition key and JournalKey didn't 
have matching order, which may imply a serialization issue.\n%s (%s)\n%s (%s)",
+                                       key, decoratedKey, prevKey, 
prevDecoratedKey);
             prevKey = key;
             prevDecoratedKey = decoratedKey;
 
-            SimpleBuilder partitionBuilder = 
PartitionUpdate.simpleBuilder(cfs.metadata(), decoratedKey);
+            Object[] rowData = rowTemplate.clone();
             try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
             {
                 serializer.reserialize(key, builder, out, userVersion);
-                partitionBuilder.row(descriptor, offset)
-                                .add("record", out.asNewBuffer())
-                                .add("user_version", userVersion.version);
+                rowData[0] = 
BufferCell.live(AccordKeyspace.JournalColumns.record, timestamp, 
out.asNewBuffer());
             }
-            writer().append(partitionBuilder.build().unfilteredIterator());
+            rowData[1] = userVersionCell;
+            Row row = 
BTreeRow.create(BufferClustering.make(LongType.instance.decompose(descriptor), 
Int32Type.instance.decompose(offset)), LivenessInfo.EMPTY, Row.Deletion.LIVE, 
rowData);
+            PartitionUpdate update = 
PartitionUpdate.singleRowUpdate(AccordKeyspace.Journal, decoratedKey, row);
+            writer().append(update.unfilteredIterator());
         }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index dad05aa7a5..4e8629696a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -220,9 +220,11 @@ public class AccordCommandStore extends CommandStore
     @Override
     public void markShardDurable(SafeCommandStore safeStore, TxnId 
globalSyncId, Ranges ranges, Status.Durability durability)
     {
-        store.snapshot(ranges, globalSyncId);
+        if (durability.compareTo(Status.Durability.UniversalOrInvalidated) >= 
0)
+            store.snapshot(ranges, globalSyncId);
         super.markShardDurable(safeStore, globalSyncId, ranges, durability);
-        commandsForRanges.gcBefore(globalSyncId, ranges);
+        if (durability.compareTo(Status.Durability.UniversalOrInvalidated) >= 
0)
+            commandsForRanges.gcBefore(globalSyncId, ranges);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 0ac5c87807..f4159f5731 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -67,9 +67,9 @@ import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.StaticSegment;
 import org.apache.cassandra.journal.ValueSerializer;
 import org.apache.cassandra.net.MessagingService;
+import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
 import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
-import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import 
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
@@ -109,21 +109,19 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     @VisibleForTesting
     protected final AccordJournalTable<JournalKey, Object> journalTable;
     private final Params params;
-    private final AccordAgent agent;
     Node node;
 
     enum Status { INITIALIZED, STARTING, REPLAY, STARTED, TERMINATING, 
TERMINATED }
     private volatile Status status = Status.INITIALIZED;
 
-    public AccordJournal(Params params, AccordAgent agent)
+    public AccordJournal(Params params)
     {
-        this(params, agent, new 
File(DatabaseDescriptor.getAccordJournalDirectory()), 
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL));
+        this(params, new File(DatabaseDescriptor.getAccordJournalDirectory()), 
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL));
     }
 
     @VisibleForTesting
-    public AccordJournal(Params params, AccordAgent agent, File directory, 
ColumnFamilyStore cfs)
+    public AccordJournal(Params params, File directory, ColumnFamilyStore cfs)
     {
-        this.agent = agent;
         Version userVersion = Version.fromVersion(params.userVersion());
         this.journal = new Journal<>("AccordJournal", directory, params, 
JournalKey.SUPPORT,
                                      // In Accord, we are using streaming 
serialization, i.e. Reader/Writer interfaces instead of materializing objects
@@ -232,7 +230,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
         Builder builder = load(commandStoreId, txnId);
-        builder.maybeCleanup(FULL, agent, redundantBefore, durableBefore);
+        builder.maybeCleanup(true, FULL, redundantBefore, durableBefore);
         return builder.construct(redundantBefore);
     }
 
@@ -243,7 +241,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         if (builder.isEmpty())
             return null;
 
-        Cleanup cleanup = builder.shouldCleanup(FULL, node.agent(), 
redundantBefore, durableBefore);
+        Cleanup cleanup = builder.shouldCleanup(FULL, redundantBefore, 
durableBefore);
         switch (cleanup)
         {
             case VESTIGIAL:
@@ -384,9 +382,9 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         return loadDiffs(commandStoreId, txnId, Load.ALL);
     }
 
-    private <BUILDER> BUILDER readAll(JournalKey key)
+    private <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
     {
-        BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key);
+        BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
         // TODO: this can be further improved to avoid allocating lambdas
         AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> 
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) 
key.type.serializer;
         // TODO (expected): for those where we store an image, read only the 
first entry we find in DESC order
@@ -618,11 +616,16 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         }
     }
 
-    public static class Builder extends CommandChange.Builder
+    public static class Builder extends CommandChange.Builder implements 
FlyweightImage
     {
         public Builder()
         {
-            super(null, Load.ALL);
+            this(Load.ALL);
+        }
+
+        public Builder(Load load)
+        {
+            super(null, load);
         }
 
         public Builder(TxnId txnId)
@@ -634,6 +637,12 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         {
             super(txnId, load);
         }
+
+        public void reset(JournalKey key)
+        {
+            reset(key.id);
+        }
+
         public ByteBuffer asByteBuffer(Version userVersion) throws IOException
         {
             try (DataOutputBuffer out = new DataOutputBuffer())
@@ -728,11 +737,11 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             Invariants.require(txnId != null);
             int readFlags = in.readInt();
             Invariants.require(readFlags != 0);
-            nextCalled = true;
+            hasUpdate = true;
             count++;
 
             // batch-apply any new nulls
-            setNulls(readFlags);
+            setNulls(false, readFlags);
             // iterator sets low 16 bits; low readFlag bits are nulls, so 
masking with ~readFlags restricts to non-null changed fields
             int iterable = toIterableSetFields(readFlags) & ~readFlags;
             for (Field field = nextSetField(iterable) ; field != null; field = 
nextSetField(iterable = unsetIterable(field, iterable)))
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 25e37e0b0b..e72beaab2c 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -34,7 +34,6 @@ import 
org.apache.cassandra.service.accord.journal.AccordTopologyUpdate;
 import org.apache.cassandra.service.accord.serializers.CommandStoreSerializers;
 import org.apache.cassandra.service.accord.serializers.Version;
 
-import static accord.api.Journal.Load.ALL;
 import static accord.local.CommandStores.RangesForEpoch;
 
 // TODO (required): test with large collection values, and perhaps split out 
some fields if they have a tendency to grow larger
@@ -42,9 +41,14 @@ import static accord.local.CommandStores.RangesForEpoch;
 // TODO (required): versioning
 public class AccordJournalValueSerializers
 {
-    public interface FlyweightSerializer<ENTRY, IMAGE>
+    public interface FlyweightImage
     {
-        IMAGE mergerFor(JournalKey key);
+        void reset(JournalKey key);
+    }
+
+    public interface FlyweightSerializer<ENTRY, IMAGE extends FlyweightImage>
+    {
+        IMAGE mergerFor();
 
         void serialize(JournalKey key, ENTRY from, DataOutputPlus out, Version 
userVersion) throws IOException;
 
@@ -57,9 +61,9 @@ public class AccordJournalValueSerializers
     implements FlyweightSerializer<AccordJournal.Writer, AccordJournal.Builder>
     {
         @Override
-        public AccordJournal.Builder mergerFor(JournalKey journalKey)
+        public AccordJournal.Builder mergerFor()
         {
-            return new AccordJournal.Builder(journalKey.id, ALL);
+            return new AccordJournal.Builder();
         }
 
         @Override
@@ -91,7 +95,7 @@ public class AccordJournalValueSerializers
         }
     }
 
-    public abstract static class Accumulator<A, V>
+    public abstract static class Accumulator<A, V> implements FlyweightImage
     {
         protected A accumulated;
 
@@ -115,10 +119,19 @@ public class AccordJournalValueSerializers
 
     public static class IdentityAccumulator<T> extends Accumulator<T, T>
     {
+        final T initial;
         boolean hasRead;
         public IdentityAccumulator(T initial)
         {
             super(initial);
+            this.initial = initial;
+        }
+
+        @Override
+        public void reset(JournalKey key)
+        {
+            hasRead = false;
+            accumulated = initial;
         }
 
         @Override
@@ -135,7 +148,7 @@ public class AccordJournalValueSerializers
     implements FlyweightSerializer<RedundantBefore, 
IdentityAccumulator<RedundantBefore>>
     {
         @Override
-        public IdentityAccumulator<RedundantBefore> mergerFor(JournalKey 
journalKey)
+        public IdentityAccumulator<RedundantBefore> mergerFor()
         {
             return new IdentityAccumulator<>(RedundantBefore.EMPTY);
         }
@@ -184,6 +197,12 @@ public class AccordJournalValueSerializers
             super(DurableBefore.EMPTY);
         }
 
+        @Override
+        public void reset(JournalKey key)
+        {
+            accumulated = DurableBefore.EMPTY;
+        }
+
         @Override
         protected DurableBefore accumulate(DurableBefore oldValue, 
DurableBefore newValue)
         {
@@ -194,7 +213,7 @@ public class AccordJournalValueSerializers
     public static class DurableBeforeSerializer
     implements FlyweightSerializer<DurableBefore, DurableBeforeAccumulator>
     {
-        public DurableBeforeAccumulator mergerFor(JournalKey journalKey)
+        public DurableBeforeAccumulator mergerFor()
         {
             return new DurableBeforeAccumulator();
         }
@@ -231,7 +250,7 @@ public class AccordJournalValueSerializers
     implements FlyweightSerializer<NavigableMap<TxnId, Ranges>, 
IdentityAccumulator<NavigableMap<TxnId, Ranges>>>
     {
         @Override
-        public IdentityAccumulator<NavigableMap<TxnId, Ranges>> 
mergerFor(JournalKey key)
+        public IdentityAccumulator<NavigableMap<TxnId, Ranges>> mergerFor()
         {
             return new IdentityAccumulator<>(ImmutableSortedMap.of(TxnId.NONE, 
Ranges.EMPTY));
         }
@@ -259,7 +278,7 @@ public class AccordJournalValueSerializers
     implements FlyweightSerializer<NavigableMap<Timestamp, Ranges>, 
IdentityAccumulator<NavigableMap<Timestamp, Ranges>>>
     {
         @Override
-        public IdentityAccumulator<NavigableMap<Timestamp, Ranges>> 
mergerFor(JournalKey key)
+        public IdentityAccumulator<NavigableMap<Timestamp, Ranges>> mergerFor()
         {
             return new 
IdentityAccumulator<>(ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY));
         }
@@ -287,7 +306,7 @@ public class AccordJournalValueSerializers
     implements FlyweightSerializer<RangesForEpoch, Accumulator<RangesForEpoch, 
RangesForEpoch>>
     {
         public static final RangesForEpochSerializer instance = new 
RangesForEpochSerializer();
-        public IdentityAccumulator<RangesForEpoch> mergerFor(JournalKey key)
+        public IdentityAccumulator<RangesForEpoch> mergerFor()
         {
             return new IdentityAccumulator<>(null);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index fed2683e32..efe9f39b25 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -552,6 +553,7 @@ public class AccordKeyspace
         public static final ColumnMetadata id = getColumn(Journal, "id");
         public static final ColumnMetadata record = getColumn(Journal, 
"record");
         public static final ColumnMetadata user_version = getColumn(Journal, 
"user_version");
+        public static final RegularAndStaticColumns regular = new 
RegularAndStaticColumns(Columns.NONE, Columns.from(Arrays.asList(record, 
user_version)));
 
         public static DecoratedKey decorate(JournalKey key)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 2d2a024e58..aca45679a7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -296,7 +296,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         final RequestCallbacks callbacks = new RequestCallbacks(time);
         this.scheduler = new AccordScheduler();
         this.dataStore = new AccordDataStore();
-        this.journal = new 
AccordJournal(DatabaseDescriptor.getAccord().journal, agent);
+        this.journal = new 
AccordJournal(DatabaseDescriptor.getAccord().journal);
         this.configService = new AccordConfigurationService(localId);
         this.fastPathCoordinator = AccordFastPathCoordinator.create(localId, 
configService);
         this.messageSink = new AccordMessageSink(agent, configService, 
callbacks);
diff --git 
a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
 
b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
index 86eff7369e..ff4a6e6ecb 100644
--- 
a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
+++ 
b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java
@@ -45,8 +45,6 @@ import 
org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.service.accord.serializers.TopologySerializers;
 import org.apache.cassandra.service.accord.serializers.Version;
 
-import static 
org.apache.cassandra.service.accord.JournalKey.Type.TOPOLOGY_UPDATE;
-
 public interface AccordTopologyUpdate
 {
     Kind kind();
@@ -367,6 +365,12 @@ public interface AccordTopologyUpdate
             super(new TreeMap<>());
         }
 
+        @Override
+        public void reset(JournalKey key)
+        {
+            accumulated = new TreeMap<>();
+        }
+
         @Override
         public void update(AccordTopologyUpdate newValue)
         {
@@ -414,9 +418,8 @@ public interface AccordTopologyUpdate
         }
 
         @Override
-        public Accumulator mergerFor(JournalKey key)
+        public Accumulator mergerFor()
         {
-            Invariants.require(key.type == TOPOLOGY_UPDATE);
             return new Accumulator();
         }
 
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java 
b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index bc82017d2e..a226861f8f 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -211,9 +211,8 @@ public class NativeSSTableLoaderClient extends 
SSTableLoader.Client
         ColumnIdentifier name = new 
ColumnIdentifier(row.getBytes("column_name_bytes"), 
row.getString("column_name"));
 
         int position = row.getInt("position");
-        int uniqueId = row.isNull("unique_id") ? ColumnMetadata.NO_UNIQUE_ID : 
row.getInt("unique_id");
         org.apache.cassandra.schema.ColumnMetadata.Kind kind = 
ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
-        return new ColumnMetadata(keyspace, table, name, type, uniqueId, 
position, kind, null);
+        return new ColumnMetadata(keyspace, table, name, type, 
ColumnMetadata.NO_UNIQUE_ID, position, kind, null);
     }
 
     private static DroppedColumn createDroppedColumnFromRow(Row row, String 
keyspace, String table)
@@ -221,8 +220,7 @@ public class NativeSSTableLoaderClient extends 
SSTableLoader.Client
         String name = row.getString("column_name");
         AbstractType<?> type = CQLTypeParser.parse(keyspace, 
row.getString("type"), Types.none());
         ColumnMetadata.Kind kind = 
ColumnMetadata.Kind.valueOf(row.getString("kind").toUpperCase());
-        int uniqueId = row.isNull("unique_id") ? ColumnMetadata.NO_UNIQUE_ID : 
row.getInt("unique_id");
-        ColumnMetadata column = new ColumnMetadata(keyspace, table, 
ColumnIdentifier.getInterned(name, true), type, uniqueId, 
ColumnMetadata.NO_POSITION, kind, null);
+        ColumnMetadata column = new ColumnMetadata(keyspace, table, 
ColumnIdentifier.getInterned(name, true), type, ColumnMetadata.NO_UNIQUE_ID, 
ColumnMetadata.NO_POSITION, kind, null);
         long droppedTime = row.getTimestamp("dropped_time").getTime();
         return new DroppedColumn(column, droppedTime);
     }
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index f60026b892..fbf2a9c874 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -43,14 +43,15 @@ import accord.local.CommandStores;
 import accord.local.Node;
 import accord.primitives.EpochSupplier;
 import accord.utils.DefaultRandom;
+import accord.utils.Invariants;
 import accord.utils.RandomSource;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.CompactionIterator;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
@@ -61,11 +62,12 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import org.apache.cassandra.service.accord.serializers.DepsSerializers;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
@@ -163,22 +165,30 @@ public class AccordJournalBurnTest extends BurnTestBase
                  operations,
                  10 + random.nextInt(30),
                  new RandomDelayQueue.Factory(random).get(),
-                 (node, agent, randomSource) -> {
+                 (nodeId, randomSource) -> {
                      try
                      {
                          File directory = new 
File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet())));
                          directory.deleteRecursiveOnExit();
-                         ColumnFamilyStore cfs = 
ks.getColumnFamilyStore("journal_" + node);
+                         ColumnFamilyStore cfs = 
ks.getColumnFamilyStore("journal_" + nodeId);
                          cfs.disableAutoCompaction();
-                         AccordJournal journal = new AccordJournal(new 
AccordJournalTestParams()
+                         AccordJournal journal = new AccordJournal(new 
TestParams()
                          {
                              @Override
                              public int segmentSize()
                              {
-                                 return 32 * 1024 * 1024;
+                                 return 1 * 1024 * 1024;
                              }
-                         }, new AccordAgent(), directory, cfs)
+                         }, directory, cfs)
                          {
+                             @Override
+                             public AccordJournal start(Node node)
+                             {
+                                 super.start(node);
+                                 unsafeSetStarted();
+                                 return this;
+                             }
+
                              @Override
                              public void saveCommand(int store, CommandUpdate 
update, @Nullable Runnable onFlush)
                              {
@@ -236,45 +246,48 @@ public class AccordJournalBurnTest extends BurnTestBase
                                  
this.journal.closeCurrentSegmentForTestingIfNonEmpty();
                                  this.journal.runCompactorForTesting();
 
-                                 List<SSTableReader> all = new 
ArrayList<>(cfs.getLiveSSTables());
+                                 Set<SSTableReader> orig = 
cfs.getLiveSSTables();
+                                 List<SSTableReader> all = new 
ArrayList<>(orig);
                                  if (all.size() <= 1)
                                      return;
 
-                                 Set<SSTableReader> sstables = new HashSet<>();
-
-                                 int min, max;
-                                 while (true)
+                                 Set<SSTableReader> selected = new HashSet<>();
+                                 int count = all.size();
+                                 int removeCount = random.nextInt(1, count);
+                                 while (removeCount-- > 0)
                                  {
-                                     int tmp1 = randomSource.nextInt(0, 
all.size());
-                                     int tmp2 = randomSource.nextInt(0, 
all.size());
-                                     if (tmp1 != tmp2 && Math.abs(tmp1 - tmp2) 
>= 1)
-                                     {
-                                         min = Math.min(tmp1, tmp2);
-                                         max = Math.max(tmp1, tmp2);
-                                         break;
-                                     }
+                                     int removeIndex = random.nextInt(count);
+                                     SSTableReader reader = 
all.get(removeIndex);
+                                     if (reader == null)
+                                         continue;
+                                     all.set(removeIndex, null);
+                                     selected.add(reader);
+                                     --count;
                                  }
-                                 // Random subset
-                                 for (int i = min; i < max; i++)
-                                     sstables.add(all.get(i));
 
-                                 List<ISSTableScanner> scanners = 
sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+                                 if (selected.isEmpty())
+                                     return;
+                                 List<ISSTableScanner> scanners = 
selected.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
 
                                  Collection<SSTableReader> newSStables;
 
-                                 try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
-                                      CompactionController controller = new 
CompactionController(cfs, sstables, 0);
-                                      CompactionIterator ci = new 
CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, 
nextTimeUUID()))
+                                 try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(selected, OperationType.COMPACTION);
+                                      CompactionController controller = new 
CompactionController(cfs, selected, 0);
+                                      CompactionIterator ci = new 
CompactionIterator(OperationType.COMPACTION,
+                                                                               
      scanners,
+                                                                               
      controller,
+                                                                               
      0,
+                                                                               
      nextTimeUUID(),
+                                                                               
      ActiveCompactionsTracker.NOOP, null,
+                                                                               
      () -> getCompactionInfo(node, cfs.getTableId()),
+                                                                               
      () -> Version.V1))
                                  {
-                                     
CompactionManager.instance.active.beginCompaction(ci);
-                                     try (CompactionAwareWriter writer = 
getCompactionAwareWriter(cfs, cfs.getDirectories(), txn, sstables))
+                                     try (CompactionAwareWriter writer = 
getCompactionAwareWriter(cfs, cfs.getDirectories(), txn, selected))
                                      {
                                          while (ci.hasNext())
-                                         {
                                              writer.append(ci.next());
-                                             
ci.setTargetDirectory(writer.getSStableDirectory().path());
-                                         }
 
+                                         
ci.setTargetDirectory(writer.getSStableDirectory().path());
                                          // point of no return
                                          newSStables = writer.finish();
                                      }
@@ -282,11 +295,9 @@ public class AccordJournalBurnTest extends BurnTestBase
                                      {
                                          throw new RuntimeException(e);
                                      }
-                                     finally
-                                     {
-                                         
CompactionManager.instance.active.finishCompaction(ci);
-                                     }
                                  }
+
+                                 
Invariants.require(!orig.equals(cfs.getLiveSSTables()));
                              }
 
 
@@ -299,8 +310,6 @@ public class AccordJournalBurnTest extends BurnTestBase
                              }
                          };
 
-                         journal.start(null);
-                         journal.unsafeSetStarted();
                          return journal;
                      }
                      catch (Throwable t)
@@ -316,4 +325,18 @@ public class AccordJournalBurnTest extends BurnTestBase
             throw SimulationException.wrap(seed, t);
         }
     }
+
+    public static IAccordService.AccordCompactionInfos getCompactionInfo(Node 
node, TableId tableId)
+    {
+        IAccordService.AccordCompactionInfos compactionInfos = new 
IAccordService.AccordCompactionInfos(node.durableBefore(), 
node.topology().minEpoch());
+        node.commandStores().forEachCommandStore(commandStore -> {
+            compactionInfos.put(commandStore.id(), new 
IAccordService.AccordCompactionInfo(commandStore.id(),
+                                                                               
            commandStore.unsafeGetRedundantBefore(),
+                                                                               
            commandStore.unsafeGetRangesForEpoch(),
+                                                                               
            tableId));
+        });
+        return compactionInfos;
+    }
+
+
 }
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
index 6844f58827..241f5062cb 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.api.AccordAgent;
@@ -94,7 +95,7 @@ public class AccordJournalCompactionTest
         Gen<NavigableMap<Timestamp, Ranges>> safeToReadGen = 
AccordGenerators.safeToReadGen(DatabaseDescriptor.getPartitioner());
         Gen<RangesForEpoch> rangesForEpochGen = 
AccordGenerators.rangesForEpoch(DatabaseDescriptor.getPartitioner());
 
-        AccordJournal journal = new AccordJournal(new AccordJournalTestParams()
+        AccordJournal journal = new AccordJournal(new TestParams()
         {
             @Override
             public int segmentSize()
@@ -107,7 +108,7 @@ public class AccordJournalCompactionTest
             {
                 return false;
             }
-        }, new AccordAgent());
+        });
         try
         {
             journal.start(null);
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 9691756cc3..0d6d11216f 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Agent;
 import accord.api.Key;
 import accord.api.Result;
 import accord.local.CheckedCommands;
@@ -75,6 +76,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
@@ -265,7 +267,9 @@ public class CompactionAccordIteratorsTest
         IAccordService.AccordCompactionInfo compactionInfo = new 
IAccordService.AccordCompactionInfo(commandStore.id(), redundantBefore, 
commandStore.unsafeGetRangesForEpoch(), 
((AccordCommandStore)commandStore).tableId());
         IAccordService.AccordCompactionInfos compactionInfos = new 
IAccordService.AccordCompactionInfos(durableBefore, 0);
         compactionInfos.put(commandStore.id(), compactionInfo);
+        when(mockAccordService.agent()).thenReturn(mock(Agent.class));
         
when(mockAccordService.getCompactionInfo()).thenReturn(compactionInfos);
+        when(mockAccordService.journalConfiguration()).thenReturn(new 
TestParams());
         return mockAccordService;
     }
 
@@ -374,7 +378,7 @@ public class CompactionAccordIteratorsTest
                 
nextInputScanners.add(scanners.remove(random.nextInt(scanners.size())));
             }
             try (CompactionController controller = new 
CompactionController(ColumnFamilyStore.getIfExists(ACCORD_KEYSPACE_NAME, 
cfs.name), Collections.emptySet(), 0);
-                 CompactionIterator compactionIterator = new 
CompactionIterator(OperationType.COMPACTION, nextInputScanners, controller, 
FBUtilities.nowInSeconds(), null, ActiveCompactionsTracker.NOOP, null, () -> 
mockAccordService))
+                 CompactionIterator compactionIterator = new 
CompactionIterator(OperationType.COMPACTION, nextInputScanners, controller, 
FBUtilities.nowInSeconds(), null, ActiveCompactionsTracker.NOOP, null, 
mockAccordService))
             {
                 while (compactionIterator.hasNext())
                 {
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
index b2d9c07ad2..0fabc00b52 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
@@ -229,7 +229,7 @@ public class AccordConfigurationServiceTest
         directory.deleteRecursiveOnExit();
         Keyspace ks = Schema.instance.getKeyspaceInstance("system_accord");
         ColumnFamilyStore cfs = 
ks.getColumnFamilyStore(AccordKeyspace.JOURNAL);
-        AccordJournal journal = new AccordJournal(new TestParams(), new 
AccordAgent(), directory, cfs);
+        AccordJournal journal = new AccordJournal(new TestParams(), directory, 
cfs);
         journal.start(null);
         journal.unsafeSetStarted();
         return journal;
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index 0445e3d75e..03666a777b 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -43,12 +43,12 @@ import 
org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.accord.api.TokenKey;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.utils.StorageCompatibilityMode;
@@ -74,7 +74,7 @@ public class AccordJournalOrderTest
     {
         if (new File(DatabaseDescriptor.getAccordJournalDirectory()).exists())
             
ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory());
-        AccordJournal accordJournal = new 
AccordJournal(AccordJournalTestParams.INSTANCE, new AccordAgent());
+        AccordJournal accordJournal = new AccordJournal(TestParams.INSTANCE);
         accordJournal.start(null);
         RandomSource randomSource = RandomSource.wrap(new Random(0));
         TxnId id1 = AccordGens.txnIds().next(randomSource);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordJournalTestParams.java 
b/test/unit/org/apache/cassandra/service/accord/AccordJournalTestParams.java
deleted file mode 100644
index c9a14efd7f..0000000000
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalTestParams.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.journal.Params;
-import org.apache.cassandra.service.accord.serializers.Version;
-
-public class AccordJournalTestParams implements Params
-{
-    public static final AccordJournalTestParams INSTANCE = new 
AccordJournalTestParams();
-
-    @Override
-    public int segmentSize()
-    {
-        return 32 << 20;
-    }
-
-    @Override
-    public FailurePolicy failurePolicy()
-    {
-        return FailurePolicy.STOP;
-    }
-
-    @Override
-    public FlushMode flushMode()
-    {
-        return FlushMode.GROUP;
-    }
-
-    @Override
-    public boolean enableCompaction()
-    {
-        return false;
-    }
-
-    @Override
-    public long compactionPeriod(TimeUnit units)
-    {
-        return units.convert(60, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public long flushPeriod(TimeUnit units)
-    {
-        return units.convert(1, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public long periodicBlockPeriod(TimeUnit units)
-    {
-        return units.convert(2, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public int userVersion()
-    {
-        return Version.LATEST.version;
-    }
-}
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 0cb57ad9c1..1bb8614ca4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -353,7 +353,7 @@ public class AccordTestUtils
             
ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory());
         AccordSpec.JournalSpec spec = new AccordSpec.JournalSpec();
         spec.flushPeriod = new DurationSpec.IntSecondsBound(1);
-        AccordJournal journal = new AccordJournal(spec, agent);
+        AccordJournal journal = new AccordJournal(spec);
         journal.start(null);
 
         CommandStore.EpochUpdateHolder holder = new 
CommandStore.EpochUpdateHolder();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java 
b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
index d87ca5292f..6991290021 100644
--- a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
@@ -69,8 +69,14 @@ public class CommandChangeTest
     @Test
     public void allNull()
     {
-        int flags = getFlags(null, null);
-        assertMissing(flags, ALL);
+        int flags = getFlags(null, 
Command.NotDefined.uninitialised(TxnId.NONE));
+        EnumSet<Field> missing = EnumSet.allOf(Field.class);
+        missing.remove(Field.SAVE_STATUS);
+        missing.remove(Field.PARTICIPANTS);
+        missing.remove(Field.PROMISED);
+        missing.remove(Field.ACCEPTED);
+        missing.remove(Field.DURABILITY);
+        assertMissing(flags, missing);
     }
 
     @Test
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index 9a68d127ce..ef7e92c6ea 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -31,7 +31,6 @@ import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 import javax.annotation.Nullable;
 
-import accord.api.Agent;
 import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
@@ -234,7 +233,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
             }
         };
 
-        this.journal = new DefaultJournal(nodeId, agent, rs.fork());
+        this.journal = new DefaultJournal(nodeId, rs.fork());
         this.commandStore = new AccordCommandStore(0,
                                                    storeService,
                                                    agent,
@@ -465,9 +464,9 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
     private static class DefaultJournal extends InMemoryJournal implements 
RangeSearcher.Supplier
     {
         private final RouteInMemoryIndex<?> index = new RouteInMemoryIndex<>();
-        private DefaultJournal(Node.Id id, Agent agent, RandomSource rs)
+        private DefaultJournal(Node.Id id, RandomSource rs)
         {
-            super(id, agent, rs);
+            super(id, rs);
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to