This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new 31f81c0b2c CEP-45: Integrate Mutation Journal with Memtable flush 
tracking
31f81c0b2c is described below

commit 31f81c0b2cf5939e751b012ce7ebdfd975fb0814
Author: Alex Petrov <[email protected]>
AuthorDate: Thu Sep 18 17:54:52 2025 +0200

    CEP-45: Integrate Mutation Journal with Memtable flush tracking
    
      * Tracking of Memtable -> SSTable flushes in Mutation Journal segments
      * Replay of Mutation Journal segments that were not fully flushed to 
SSTables
      * Distinguish between CommitLog and MutationJournal offsets
      * Effectively adds support for node bounces
    
    Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-20919
---
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  39 ++-
 src/java/org/apache/cassandra/db/Mutation.java     |   5 +
 .../cassandra/db/commitlog/CommitLogReplayer.java  |   6 +-
 .../db/tracked/TrackedKeyspaceWriteHandler.java    |   6 +-
 .../apache/cassandra/journal/ActiveSegment.java    |  39 ++-
 .../journal/DeserializedRecordConsumer.java        |  51 ++++
 src/java/org/apache/cassandra/journal/Journal.java |  75 +++++-
 .../org/apache/cassandra/journal/Metadata.java     |  40 ++-
 .../apache/cassandra/journal/RecordConsumer.java   |   1 +
 .../apache/cassandra/journal/RecordPointer.java    |  35 +--
 src/java/org/apache/cassandra/journal/Segment.java |  12 +-
 .../org/apache/cassandra/journal/Segments.java     |  32 +++
 .../apache/cassandra/journal/StaticSegment.java    |   8 +
 .../apache/cassandra/replication/MutationId.java   |   6 +
 .../cassandra/replication/MutationJournal.java     | 193 +++++++++++++-
 .../cassandra/replication/SegmentStateTracker.java | 295 +++++++++++++++++++++
 .../apache/cassandra/service/CassandraDaemon.java  |   1 +
 .../cassandra/service/accord/AccordJournal.java    |   2 +-
 src/java/org/apache/cassandra/tcm/Startup.java     |   2 +
 .../apache/cassandra/utils/IntegerInterval.java    |  32 ++-
 .../cassandra/distributed/impl/Instance.java       |   1 +
 .../test/tracking/MutationTrackingBounceTest.java  |  92 +++++++
 .../MutationTrackingBounce_ValidateRunnable.java   |  69 +++++
 .../apache/cassandra/harry/checker/TestHelper.java |  15 ++
 .../org/apache/cassandra/journal/SegmentsTest.java | 135 ++++++++++
 .../replication/MutationJournalReplayTest.java     | 218 +++++++++++++++
 .../cassandra/replication/MutationJournalTest.java |   9 +-
 .../replication/SegmentStateTrackerTest.java       | 237 +++++++++++++++++
 28 files changed, 1551 insertions(+), 105 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dacac3becf..80dfa68cf7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -134,6 +134,7 @@ import 
org.apache.cassandra.repair.consistent.admin.CleanupSummary;
 import org.apache.cassandra.repair.consistent.admin.PendingStat;
 import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
 import org.apache.cassandra.replication.MutationId;
+import org.apache.cassandra.replication.MutationJournal;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
@@ -498,9 +499,16 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
 
         logger.info("Initializing {}.{}", getKeyspaceName(), name);
 
-        Memtable initialMemtable = DatabaseDescriptor.isDaemonInitialized() ?
-                                   createMemtable(new 
AtomicReference<>(CommitLog.instance.getCurrentPosition())) :
-                                   null;
+        Memtable initialMemtable = null;
+        if (DatabaseDescriptor.isDaemonInitialized())
+        {
+            CommitLogPosition commitLogPosition;
+            if (metadata().replicationType().isTracked())
+                commitLogPosition = 
MutationJournal.instance.getCurrentPosition();
+            else
+                commitLogPosition = CommitLog.instance.getCurrentPosition();
+            initialMemtable = createMemtable(new 
AtomicReference<>(commitLogPosition));
+        }
         memtableMetricsReleaser = 
memtableFactory.createMemtableMetricsReleaser(metadata);
 
         data = new Tracker(this, initialMemtable, loadSSTables);
@@ -1147,8 +1155,13 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
             // If a flush errored out but the error was ignored, make sure we 
don't discard the commit log.
             if (flushFailure == null && mainMemtable != null)
             {
+                CommitLogPosition commitLogLowerBound = 
mainMemtable.getCommitLogLowerBound();
                 commitLogUpperBound = 
mainMemtable.getFinalCommitLogUpperBound();
-                CommitLog.instance.discardCompletedSegments(metadata.id, 
mainMemtable.getCommitLogLowerBound(), commitLogUpperBound);
+                TableMetadata metadata = metadata();
+                if (metadata.replicationType().isTracked())
+                    MutationJournal.instance.notifyFlushed(metadata.id, 
commitLogLowerBound, commitLogUpperBound);
+                else
+                    CommitLog.instance.discardCompletedSegments(metadata.id, 
commitLogLowerBound, commitLogUpperBound);
             }
 
             metric.pendingFlushes.dec();
@@ -1215,7 +1228,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
 
             // we then ensure an atomic decision is made about the upper bound 
of the continuous range of commit log
             // records owned by this memtable
-            setCommitLogUpperBound(commitLogUpperBound);
+            setCommitLogUpperBound(commitLogUpperBound, 
metadata().replicationType().isTracked());
 
             // we then issue the barrier; this lets us wait for all operations 
started prior to the barrier to complete;
             // since this happens after wiring up the commitLogUpperBound, we 
also know all operations with earlier
@@ -1416,7 +1429,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
     }
 
     // atomically set the upper bound for the commit log
-    private static void 
setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound)
+    private static void 
setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound, 
boolean useMutationJournal)
     {
         // we attempt to set the holder to the current commit log context. at 
the same time all writes to the memtables are
         // also maintaining this value, so if somebody sneaks ahead of us 
somehow (should be rare) we simply retry,
@@ -1424,7 +1437,13 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
         CommitLogPosition lastReplayPosition;
         while (true)
         {
-            lastReplayPosition = new 
Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition()));
+            CommitLogPosition commitLogPosition;
+            if (useMutationJournal)
+                commitLogPosition = 
MutationJournal.instance.getCurrentPosition();
+            else
+                commitLogPosition = CommitLog.instance.getCurrentPosition();
+
+            lastReplayPosition = new 
Memtable.LastCommitLogPosition(commitLogPosition);
             CommitLogPosition currentLast = commitLogUpperBound.get();
             if ((currentLast == null || 
currentLast.compareTo(lastReplayPosition) <= 0)
                 && commitLogUpperBound.compareAndSet(currentLast, 
lastReplayPosition))
@@ -3233,7 +3252,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
 
         data.notifyDropped(DatabaseDescriptor.getAutoSnapshotTtl());
 
-        
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
+        // TODO (required): test mutation tracking + table dropping
+        if (metadata().replicationType().isTracked())
+            MutationJournal.instance.notifyFlushed(metadata.id, new 
CommitLogPosition(0, 0), MutationJournal.instance.getCurrentPosition());
+        else
+            
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
 
         compactionStrategyManager.shutdown();
 
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index f8388bfe89..c81ea47310 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -201,6 +201,11 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
         return key;
     }
 
+    public ImmutableMap<TableId, PartitionUpdate> modifications()
+    {
+        return modifications;
+    }
+
     public ImmutableCollection<PartitionUpdate> getPartitionUpdates()
     {
         return modifications.values();
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 52173d44fb..f331e0b1a7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -121,7 +121,7 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
 
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            // but, if we've truncated the cf in question, then we need to 
need to start replay after the truncation
+            // but, if we've truncated the cf in question, then we need to 
start replay after the truncation
             CommitLogPosition truncatedAt = 
SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
             if (truncatedAt != null)
             {
@@ -323,7 +323,7 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
                     {
                         assert !newPUCollector.isEmpty();
 
-                        
Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(), 
false, true, false);
+                        keyspace.apply(newPUCollector.build(), false, true, 
false);
                         commitLogReplayer.keyspacesReplayed.add(keyspace);
                     }
                 }
@@ -439,7 +439,7 @@ public class CommitLogReplayer implements 
CommitLogReadHandler
             if (toReplay.isEmpty())
                 logger.info("All tables will be included in commit log 
replay.");
             else
-                logger.info("Tables to be replayed: {}", 
toReplay.asMap().toString());
+                logger.info("Tables to be replayed: {}", toReplay.asMap());
 
             return new CustomReplayFilter(toReplay);
         }
diff --git 
a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java 
b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java
index bfd6d593c6..8c06e3c308 100644
--- a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java
+++ b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.journal.RecordPointer;
 import org.apache.cassandra.replication.MutationJournal;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -40,10 +39,9 @@ public class TrackedKeyspaceWriteHandler implements 
KeyspaceWriteHandler
             group = Keyspace.writeOrder.start();
 
             Tracing.trace("Appending to mutation journal");
-            RecordPointer pointer = 
MutationJournal.instance.write(mutation.id(), mutation);
+            CommitLogPosition pointer = 
MutationJournal.instance.write(mutation.id(), mutation);
 
-            // TODO (preferred): update journal to return CommitLogPosition or 
otherwise remove requirement to allocate second object here
-            return new CassandraWriteContext(group, new 
CommitLogPosition(pointer.segment, pointer.position));
+            return new CassandraWriteContext(group, pointer);
         }
         catch (Throwable t)
         {
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java 
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index 6000e2c9d6..9e01f493d7 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -30,10 +30,10 @@ import java.util.function.Consumer;
 import accord.utils.Invariants;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Ref;
@@ -98,6 +98,11 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
         }
     }
 
+    public CommitLogPosition currentPosition()
+    {
+        return new CommitLogPosition(id(), (int) allocateOffset);
+    }
+
     static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params 
params, KeySupport<K> keySupport)
     {
         InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
@@ -134,6 +139,12 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void persistMetadata()
+    {
+        throw new UnsupportedOperationException("Can not mutate active 
segment's metadata.");
+    }
+
     /**
      * Read the entry and specified offset into the entry holder.
      * Expects the caller to acquire the ref to the segment and the record to 
exist.
@@ -425,19 +436,21 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
         }
     }
 
-    final class Allocation
+    final class Allocation extends RecordPointer
     {
         private final OpOrder.Group appendOp;
         private final ByteBuffer buffer;
-        private final int start;
-        private final int length;
 
         Allocation(OpOrder.Group appendOp, ByteBuffer buffer, int length)
         {
+            super(descriptor.timestamp, buffer.position(), length);
             this.appendOp = appendOp;
             this.buffer = buffer;
-            this.start = buffer.position();
-            this.length = length;
+        }
+
+        Segment<K, V> segment()
+        {
+            return ActiveSegment.this;
         }
 
         void write(K id, ByteBuffer record)
@@ -446,7 +459,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
             {
                 EntrySerializer.write(id, record, keySupport, buffer, 
descriptor.userVersion);
                 metadata.update();
-                index.update(id, start, length);
+                index.update(id, position, length);
             }
             catch (IOException e)
             {
@@ -472,14 +485,12 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
             }
         }
 
-
-        // Variant of write that does not allocate/return a record pointer
         void writeInternal(K id, ByteBuffer record)
         {
             try
             {
                 EntrySerializer.write(id, record, keySupport, buffer, 
descriptor.userVersion);
-                index.update(id, start, length);
+                index.update(id, position, length);
                 metadata.update();
             }
             catch (IOException e)
@@ -496,13 +507,13 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
         {
             try (Timer.Context ignored = waitingOnFlush.time())
             {
-                waitForFlush(start);
+                waitForFlush(position);
             }
         }
 
         boolean isFsynced()
         {
-            return fsyncedTo >= start + length;
+            return fsyncedTo >= position + length;
         }
 
         Descriptor descriptor()
@@ -512,12 +523,12 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
 
         int start()
         {
-            return start;
+            return position;
         }
 
         RecordPointer recordPointer()
         {
-            return new RecordPointer(descriptor.timestamp, start, length);
+            return this;
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java 
b/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java
new file mode 100644
index 0000000000..0f058befc0
--- /dev/null
+++ b/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.journal;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public abstract class DeserializedRecordConsumer<K, V> implements 
RecordConsumer<K>
+{
+    final ValueSerializer<K, V> valueSerializer;
+
+    public DeserializedRecordConsumer(ValueSerializer<K, V> valueSerializer)
+    {
+        this.valueSerializer = valueSerializer;
+    }
+
+    @Override
+    public void accept(long segment, int position, K key, ByteBuffer buffer, 
int userVersion)
+    {
+        try (DataInputBuffer in = new DataInputBuffer(buffer, false))
+        {
+            V value = valueSerializer.deserialize(key, in, userVersion);
+            accept(segment, position, key, value);
+        }
+        catch (IOException e)
+        {
+            // can only throw if serializer is buggy
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected abstract void accept(long segment, int position, K key, V value);
+}
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index eaab157ea9..3176c5b395 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -36,6 +36,7 @@ import java.util.zip.CRC32;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.agrona.collections.Long2ObjectHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,7 +113,7 @@ public class Journal<K, V> implements Shutdownable
     // segment that is ready to be used; allocator thread fills this and 
blocks until consumed
     private volatile ActiveSegment<K, V> availableSegment = null;
 
-    private final AtomicReference<Segments<K, V>> segments = new 
AtomicReference<>();
+    private final AtomicReference<Segments<K, V>> segments = new 
AtomicReference<>(new Segments<>(new Long2ObjectHashMap<>()));
 
     final AtomicReference<State> state = new 
AtomicReference<>(State.UNINITIALIZED);
 
@@ -340,6 +341,15 @@ public class Journal<K, V> implements Shutdownable
         return null;
     }
 
+    public void readLast(K id, long segmentId, DeserializedRecordConsumer<K, 
V> consumer)
+    {
+        Segment<K, V> segment = segments.get().get(segmentId);
+        try (OpOrder.Group group = readOrder.start())
+        {
+            segment.readLast(id, consumer);
+        }
+    }
+
     public void readAll(K id, RecordConsumer<K> consumer)
     {
         EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
@@ -474,16 +484,16 @@ public class Journal<K, V> implements Shutdownable
 
     public int sizeOfRecord(RecordPointer pointer)
     {
-        Descriptor descriptor = segments.get().descriptor(pointer.segment);
+        Descriptor descriptor = segments.get().descriptor(pointer.segmentId);
         Invariants.nonNull(descriptor);
-        return pointer.size - EntrySerializer.overheadSize(keySupport, 
descriptor.userVersion);
+        return pointer.length - EntrySerializer.overheadSize(keySupport, 
descriptor.userVersion);
     }
 
     public boolean read(RecordPointer pointer, RecordConsumer<K> consumer)
     {
         try (OpOrder.Group group = readOrder.start())
         {
-            Segment<K, V> segment = segments.get().get(pointer.segment);
+            Segment<K, V> segment = segments.get().get(pointer.segmentId);
             return segment != null && segment.read(pointer, consumer);
         }
     }
@@ -552,7 +562,6 @@ public class Journal<K, V> implements Shutdownable
 
     private ActiveSegment<K, V>.Allocation allocate(int entrySize)
     {
-
         ActiveSegment<K, V> segment = currentSegment;
         ActiveSegment<K, V>.Allocation alloc;
         while (null == (alloc = segment.allocate(entrySize)))
@@ -807,11 +816,23 @@ public class Journal<K, V> implements Shutdownable
         return oldest;
     }
 
+    public List<Segment<K, V>> getSegments(long lowerBound, long upperBound)
+    {
+        List<Segment<K, V>> res = new ArrayList<>();
+        segments().select(lowerBound, upperBound, res);
+        return res;
+    }
+
     public ActiveSegment<K, V> currentActiveSegment()
     {
         return currentSegment;
     }
 
+    @Nullable protected Segment<K, V> getSegment(long timestamp)
+    {
+        return segments().get(timestamp);
+    }
+
     ActiveSegment<K, V> getActiveSegment(long timestamp)
     {
         // we can race with segment addition to the segments() collection, 
with a new segment appearing in currentSegment first
@@ -853,10 +874,12 @@ public class Journal<K, V> implements Shutdownable
     private class CloseActiveSegmentRunnable implements Runnable
     {
         private final ActiveSegment<K, V> activeSegment;
+        private final Runnable onDone;
 
-        CloseActiveSegmentRunnable(ActiveSegment<K, V> activeSegment)
+        CloseActiveSegmentRunnable(ActiveSegment<K, V> activeSegment, 
@Nullable Runnable onDone)
         {
             this.activeSegment = activeSegment;
+            this.onDone = onDone;
         }
 
         @Override
@@ -868,10 +891,16 @@ public class Journal<K, V> implements Shutdownable
             activeSegment.persistComponents();
             replaceCompletedSegment(activeSegment, 
StaticSegment.open(activeSegment.descriptor, keySupport));
             activeSegment.release(Journal.this);
+            if (onDone != null) onDone.run();
         }
     }
 
-    void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V> activeSegment)
+    protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V> 
activeSegment)
+    {
+        closeActiveSegmentAndOpenAsStatic(activeSegment, null);
+    }
+
+    protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V> 
activeSegment, @Nullable Runnable onDone)
     {
         if (activeSegment.isEmpty())
         {
@@ -880,7 +909,7 @@ public class Journal<K, V> implements Shutdownable
             return;
         }
 
-        closer.execute(new CloseActiveSegmentRunnable(activeSegment));
+        closer.execute(new CloseActiveSegmentRunnable(activeSegment, onDone));
     }
 
     @VisibleForTesting
@@ -981,13 +1010,21 @@ public class Journal<K, V> implements Shutdownable
     }
 
     /**
-     * Static segment iterator iterates all keys in _static_ segments in order.
+     * Static segment iterator iterates all keys in _static_ segments in order 
for the given key range.
      */
     public StaticSegmentKeyIterator staticSegmentKeyIterator(K min, K max)
     {
         return new StaticSegmentKeyIterator(min, max);
     }
 
+    /**
+     * Static segment iterator iterates all keys in selected segments in order.
+     */
+    public StaticSegmentKeyIterator 
staticSegmentKeyIterator(Predicate<Segment<K, V>> predicate)
+    {
+        return new StaticSegmentKeyIterator(null, null, predicate);
+    }
+
     /**
      * List of key and a list of segment descriptors referencing this key
      */
@@ -1013,6 +1050,11 @@ public class Journal<K, V> implements Shutdownable
                 consumer.accept(segments[i]);
         }
 
+        public long lastSegment()
+        {
+            return segments[segments.length - 1];
+        }
+
         public long[] copyOfSegments()
         {
             return segments == null ? new long[0] : Arrays.copyOf(segments, 
size);
@@ -1060,10 +1102,17 @@ public class Journal<K, V> implements Shutdownable
 
         public StaticSegmentKeyIterator(K min, K max)
         {
-            this.segments = selectAndReference(s -> s.isStatic()
-                                                    && 
s.asStatic().index().entryCount() > 0
-                                                    && (min == null || 
keySupport.compare(s.index().lastId(), min) >= 0)
-                                                    && (max == null || 
keySupport.compare(s.index().firstId(), max) <= 0));
+            this(min, max, s -> {
+                return s.isStatic()
+                        && s.asStatic().index().entryCount() > 0
+                        && (min == null || 
keySupport.compare(s.index().lastId(), min) >= 0)
+                        && (max == null || 
keySupport.compare(s.index().firstId(), max) <= 0);
+            });
+        }
+
+        public StaticSegmentKeyIterator(K min, K max, Predicate<Segment<K, V>> 
predicate)
+        {
+            this.segments = selectAndReference(predicate);
             List<Iterator<Head>> iterators = new ArrayList<>(segments.count());
 
             for (Segment<K, V> segment : segments.allSorted(true))
diff --git a/src/java/org/apache/cassandra/journal/Metadata.java 
b/src/java/org/apache/cassandra/journal/Metadata.java
index 13198fa77a..6f9552a229 100644
--- a/src/java/org/apache/cassandra/journal/Metadata.java
+++ b/src/java/org/apache/cassandra/journal/Metadata.java
@@ -33,23 +33,25 @@ import static 
org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
  * - total count of records in this segment file
  *       used for compaction prioritisation
  */
-final class Metadata
+public final class Metadata
 {
     private int fsyncLimit;
-
+    // Indicates whether a segment needs to be replayed or no.
+    private volatile boolean needsReplay;
     private volatile int recordsCount;
     private static final AtomicIntegerFieldUpdater<Metadata> 
recordsCountUpdater =
         AtomicIntegerFieldUpdater.newUpdater(Metadata.class, "recordsCount");
 
     static Metadata empty()
     {
-        return new Metadata(0, 0);
+        return new Metadata(0, 0,  true);
     }
 
-    private Metadata(int recordsCount, int fsyncLimit)
+    private Metadata(int recordsCount, int fsyncLimit, boolean needsReplay)
     {
         this.recordsCount = recordsCount;
         this.fsyncLimit = fsyncLimit;
+        this.needsReplay = needsReplay;
     }
 
     void update()
@@ -62,17 +64,27 @@ final class Metadata
         this.fsyncLimit = fsyncLimit;
     }
 
+    public void clearNeedsReplay()
+    {
+        this.needsReplay = false;
+    }
+
     int fsyncLimit()
     {
         return fsyncLimit;
     }
 
+    public boolean needsReplay()
+    {
+        return needsReplay;
+    }
+
     private void incrementRecordsCount()
     {
         recordsCountUpdater.incrementAndGet(this);
     }
 
-    int totalCount()
+    public int totalCount()
     {
         return recordsCount;
     }
@@ -82,8 +94,10 @@ final class Metadata
         CRC32 crc = Crc.crc32();
         out.writeInt(recordsCount);
         out.writeInt(fsyncLimit);
+        out.writeBoolean(needsReplay);
         updateChecksumInt(crc, recordsCount);
         updateChecksumInt(crc, fsyncLimit);
+        updateChecksumInt(crc, needsReplay ? 1 : 0);
         out.writeInt((int) crc.getValue());
     }
 
@@ -92,10 +106,12 @@ final class Metadata
         CRC32 crc = Crc.crc32();
         int recordsCount = in.readInt();
         int fsyncLimit = in.readInt();
+        boolean needsReplay = in.readBoolean();
         updateChecksumInt(crc, recordsCount);
         updateChecksumInt(crc, fsyncLimit);
+        updateChecksumInt(crc, needsReplay ? 1 : 0);
         validateCRC(crc, in.readInt());
-        return new Metadata(recordsCount, fsyncLimit);
+        return new Metadata(recordsCount, fsyncLimit, needsReplay);
     }
 
     void persist(Descriptor descriptor)
@@ -145,7 +161,7 @@ final class Metadata
                 throw e;
         }
 
-        return new Metadata(recordsCount, fsyncLimit);
+        return new Metadata(recordsCount, fsyncLimit, true);
     }
 
     static <K> Metadata rebuildAndPersist(Descriptor descriptor, KeySupport<K> 
keySupport)
@@ -156,11 +172,11 @@ final class Metadata
     }
 
     @Override
-    public String toString()
-    {
+    public String toString() {
         return "Metadata{" +
-               "fsyncLimit=" + fsyncLimit +
-               ", recordsCount=" + recordsCount +
-               '}';
+                "fsyncLimit=" + fsyncLimit +
+                ", needsReplay=" + needsReplay +
+                ", recordsCount=" + recordsCount +
+                '}';
     }
 }
diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java 
b/src/java/org/apache/cassandra/journal/RecordConsumer.java
index 22d2bc4e9f..4d2b28b419 100644
--- a/src/java/org/apache/cassandra/journal/RecordConsumer.java
+++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.journal;
 
 import java.nio.ByteBuffer;
 
+// TODO (desired): rename to SerializedRecordConsumer
 @FunctionalInterface
 public interface RecordConsumer<K>
 {
diff --git a/src/java/org/apache/cassandra/journal/RecordPointer.java 
b/src/java/org/apache/cassandra/journal/RecordPointer.java
index 22d874712d..8e5c08ca51 100644
--- a/src/java/org/apache/cassandra/journal/RecordPointer.java
+++ b/src/java/org/apache/cassandra/journal/RecordPointer.java
@@ -18,33 +18,29 @@
 
 package org.apache.cassandra.journal;
 
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 
 // TODO: make this available in the accord table as an ID
-public class RecordPointer implements Comparable<RecordPointer>
+public class RecordPointer extends CommitLogPosition
 {
-    public final long segment;   // unique segment id
-    public final int position;   // record start position within the segment
-    public final int size;       // full size of the record
+    public final int length;     // full size of the record
     public final long writtenAt; // only set for periodic mode
 
-    public RecordPointer(long segment, int position, int size)
+    public RecordPointer(long segment, int position, int length)
     {
-        this(segment, position, size, 0);
+        this(segment, position, length, 0);
     }
 
-    public RecordPointer(long segment, int position, int size, long writtenAt)
+    public RecordPointer(long segment, int position, int length, long 
writtenAt)
     {
-        this.segment = segment;
-        this.position = position;
-        this.size = size;
+        super(segment, position);
+        this.length = length;
         this.writtenAt = writtenAt;
     }
 
     public RecordPointer(RecordPointer pointer)
     {
-        this(pointer.segment, pointer.position, pointer.size, 
pointer.writtenAt);
+        this(pointer.segmentId, pointer.position, pointer.length, 
pointer.writtenAt);
     }
 
     @Override
@@ -55,26 +51,19 @@ public class RecordPointer implements 
Comparable<RecordPointer>
         if (!(other instanceof RecordPointer))
             return false;
         RecordPointer that = (RecordPointer) other;
-        return this.segment == that.segment
+        return this.segmentId == that.segmentId
                && this.position == that.position;
     }
 
     @Override
     public int hashCode()
     {
-        return Long.hashCode(segment) + position * 31;
+        return Long.hashCode(segmentId) + position * 31;
     }
 
     @Override
     public String toString()
     {
-        return "(" + segment + ", " + position + ')';
-    }
-
-    @Override
-    public int compareTo(RecordPointer that)
-    {
-        int cmp = Longs.compare(this.segment, that.segment);
-        return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
+        return "(" + segmentId + ", " + position + ')';
     }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index a80bbb4f4c..0b34f14516 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -66,16 +66,24 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
 
     abstract boolean isActive();
     abstract boolean isFlushed(long position);
-    boolean isStatic() { return !isActive(); }
 
     abstract ActiveSegment<K, V> asActive();
     abstract StaticSegment<K, V> asStatic();
 
+    public boolean isStatic() { return !isActive(); }
+
+    public Metadata metadata()
+    {
+        return metadata;
+    }
+
     public long id()
     {
         return descriptor.timestamp;
     }
 
+    public abstract void persistMetadata();
+
     /*
      * Reading entries (by id, by offset, iterate)
      */
@@ -110,7 +118,7 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
     boolean read(RecordPointer pointer, RecordConsumer<K> consumer)
     {
         EntrySerializer.EntryHolder<K> into = new 
EntrySerializer.EntryHolder<>();
-        if (read(pointer.position, pointer.size, into))
+        if (read(pointer.position, pointer.length, into))
         {
             consumer.accept(descriptor.timestamp, pointer.position, into.key, 
into.value, descriptor.userVersion);
             return true;
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index 50c32ed8b6..4e01bd47b4 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -128,6 +128,37 @@ class Segments<K, V>
                 into.add(segment.asActive());
     }
 
+    void select(long minTimestamp, long maxTimestamp, Collection<Segment<K, 
V>> into)
+    {
+        List<Segment<K, V>> sorted = allSorted(true);
+        int idx = minTimestamp == 0 ? 0 : findIdxFor(minTimestamp);
+        while (idx < sorted.size())
+        {
+            Segment<K, V> segment = sorted.get(idx++);
+            if (segment.descriptor.timestamp > maxTimestamp)
+                break;
+            into.add(segment);
+        }
+    }
+
+    int findIdxFor(long timestamp)
+    {
+        List<Segment<K, V>> sorted = allSorted(true);
+        int low = 0, mid = sorted.size(), high = mid - 1, res = -1;
+        while (low <= high)
+        {
+            mid = (low + high) >>> 1;
+            res = Long.compare(timestamp, 
sorted.get(mid).descriptor.timestamp);
+            if (res > 0)
+                low = mid + 1;
+            else if (res == 0)
+                return mid;
+            else
+                high = mid - 1;
+        }
+        throw new IllegalStateException(String.format("Could not find a 
segment with timestamp %d among %s", timestamp, sorted));
+    }
+
     boolean isSwitched(ActiveSegment<K, V> active)
     {
         for (Segment<K, V> segment : segments.values())
@@ -232,6 +263,7 @@ class Segments<K, V>
     @Override
     public String toString()
     {
+        List<Segment<K, V>> sorted = allSorted(true);
         return sorted.toString();
     }
 
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index 35c987c8a4..e78187b129 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.journal;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.Closeable;
+import org.apache.cassandra.utils.SyncUtil;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -270,6 +271,13 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
         return this;
     }
 
+    @Override
+    public void persistMetadata()
+    {
+        metadata.persist(descriptor);
+        SyncUtil.trySyncDir(descriptor.directory);
+    }
+
     /**
      * Read the entry and specified offset into the entry holder.
      * Expects the record to have been written at this offset, but potentially 
not flushed and lost.
diff --git a/src/java/org/apache/cassandra/replication/MutationId.java 
b/src/java/org/apache/cassandra/replication/MutationId.java
index 5faf4db7d5..259b950b33 100644
--- a/src/java/org/apache/cassandra/replication/MutationId.java
+++ b/src/java/org/apache/cassandra/replication/MutationId.java
@@ -46,6 +46,12 @@ public class MutationId extends ShortMutationId
      */
     protected final int timestamp;
 
+    public MutationId(long logId, int offset, int timestamp)
+    {
+        super(logId, offset);
+        this.timestamp = timestamp;
+    }
+
     public MutationId(long logId, long sequenceId)
     {
         super(logId, offset(sequenceId));
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java 
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index aeccc3f052..2de55de424 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -20,35 +20,54 @@ package org.apache.cassandra.replication;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 import java.util.zip.Checksum;
-
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.apache.cassandra.journal.*;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.journal.Journal;
-import org.apache.cassandra.journal.KeySupport;
-import org.apache.cassandra.journal.Params;
-import org.apache.cassandra.journal.RecordConsumer;
-import org.apache.cassandra.journal.RecordPointer;
-import org.apache.cassandra.journal.SegmentCompactor;
-import org.apache.cassandra.journal.ValueSerializer;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Semaphore;
 
+import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
+
+// TODO (required): handle table truncations
 public class MutationJournal
 {
     public static final MutationJournal instance = new MutationJournal();
+    private static final Logger log = 
LoggerFactory.getLogger(MutationJournal.class);
 
     private final Journal<ShortMutationId, Mutation> journal;
+    private final Map<Long, SegmentStateTracker> segmentStateTrackers;
+
+    // Most of the time during write, we will notify last known segment, so we 
optimistically cache last segment tracker,
+    // without imposing any visibility guarantees. If we do not see the right 
segment in this field, we will look it up
+    // in NBHM.
+    private SegmentStateTracker lastSegmentTracker;
 
     private MutationJournal()
     {
@@ -58,7 +77,38 @@ public class MutationJournal
     @VisibleForTesting
     MutationJournal(File directory, Params params)
     {
-        journal = new Journal<>("MutationJournal", directory, params, new 
MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop());
+        journal = new Journal<>("MutationJournal", directory, params, 
MutationIdSupport.INSTANCE, MutationSerializer.INSTANCE, 
SegmentCompactor.noop()) {
+            @Override
+            protected void 
closeActiveSegmentAndOpenAsStatic(ActiveSegment<ShortMutationId, Mutation> 
activeSegment, Runnable onDone)
+            {
+                super.closeActiveSegmentAndOpenAsStatic(activeSegment,
+                                                        () -> {
+                                                            
maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id())));
+                                                            if (onDone != 
null) onDone.run();
+                                                        });
+            }
+        };
+        segmentStateTrackers = new NonBlockingHashMapLong<>();
+    }
+
+    public CommitLogPosition getCurrentPosition()
+    {
+        return journal.currentActiveSegment().currentPosition();
+    }
+
+    // If all Memtables associated with given segment were flushed by the time 
we have closed active segment
+    // and opened it as static, mark its metadata to indicate it does not need 
replay. It may happen that we
+    // crash before persisting this metadata, in which case we will 
unnecessarily replay the segment, which
+    // has no correctness implications.
+    private void maybeCleanupStaticSegment(Segment<ShortMutationId, Mutation> 
segment)
+    {
+        Invariants.require(segment.isStatic());
+        SegmentStateTracker tracker = segmentStateTrackers.get(segment.id());
+        if (tracker != null && tracker.removeCleanFromDirty())
+        {
+            segment.metadata().clearNeedsReplay();
+            segment.persistMetadata();
+        }
     }
 
     public void start()
@@ -73,7 +123,40 @@ public class MutationJournal
 
     public RecordPointer write(ShortMutationId id, Mutation mutation)
     {
-        return journal.blockingWrite(id, mutation);
+        // TODO (required): why are we using blocking write here? We 
can/should wait for completion on `close` of WriteContext.
+        RecordPointer ptr = journal.blockingWrite(id, mutation);
+
+        // IMPORTANT: there should be no way for mutation to be applied to 
memtable before we mark it as dirty here,
+        // since this will introduce a race between marking as dirty and 
marking as clean.
+        for (TableId tableId : mutation.getTableIds())
+        {
+            SegmentStateTracker tracker = lastSegmentTracker;
+            if (tracker == null || tracker.segmentId() != ptr.segmentId)
+            {
+                tracker = segmentStateTrackers.computeIfAbsent(ptr.segmentId, 
SegmentStateTracker::new);
+                lastSegmentTracker = tracker;
+            }
+
+            tracker.markDirty(tableId, ptr);
+        }
+
+        return ptr;
+    }
+
+    /**
+     * Called by post-flush callback, Memtable is fully flushed to SSTable.
+     */
+    public void notifyFlushed(TableId tableId, CommitLogPosition lowerBound, 
CommitLogPosition upperBound)
+    {
+        for (Segment<ShortMutationId, Mutation> segment : 
journal.getSegments(lowerBound.segmentId, upperBound.segmentId))
+        {
+            
Invariants.nonNull(segmentStateTrackers.get(segment.id())).markClean(tableId, 
lowerBound, upperBound);
+
+            // We can only safely mark static segments as non-replayable. 
Active segment can still be written to,
+            // so we only persist this metadata on flush.
+            if (segment.isStatic())
+                maybeCleanupStaticSegment(segment);
+        }
     }
 
     @Nullable
@@ -116,6 +199,89 @@ public class MutationJournal
         }
     }
 
+    public void replayStaticSegments()
+    {
+        replay(new DeserializedRecordConsumer<>(MutationSerializer.INSTANCE)
+        {
+            @Override
+            protected void accept(long segmentId, int position, 
ShortMutationId key, Mutation value)
+            {
+                if 
(Schema.instance.getKeyspaceMetadata(value.getKeyspaceName()) == null)
+                    return;
+                // TODO: if (commitLogReplayer.pointInTimeExceeded(mutation))
+                final Keyspace keyspace = 
Keyspace.open(value.getKeyspaceName());
+
+                Mutation.PartitionUpdateCollector newPUCollector = null;
+                // TODO (required): replayFilter
+                for (Map.Entry<TableId, PartitionUpdate> e : 
value.modifications().entrySet())
+                {
+                    PartitionUpdate update = e.getValue();
+                    update.validate();
+                    if (Schema.instance.getTableMetadata(update.metadata().id) 
== null)
+                        continue; // dropped
+                    TableId tableId = e.getKey();
+
+                    // Start segment state tracking
+                    segmentStateTrackers.computeIfAbsent(segmentId, 
SegmentStateTracker::new)
+                                        .markDirty(tableId, segmentId, 
position);
+                    // TODO (required): shouldReplay
+                    if (newPUCollector == null)
+                        newPUCollector = new 
Mutation.PartitionUpdateCollector(value.id(), value.getKeyspaceName(), 
value.key());
+                    newPUCollector.add(update);
+                    // TODO (required): replayedCount
+                }
+                if (newPUCollector != null)
+                {
+                    assert !newPUCollector.isEmpty();
+                    keyspace.apply(newPUCollector.build(), false, true, false);
+                }
+            }
+        }, getAvailableProcessors());
+    }
+
+    @VisibleForTesting
+    public void replay(DeserializedRecordConsumer<ShortMutationId, Mutation> 
replayOne, int parallelism)
+    {
+        try (Journal<ShortMutationId, Mutation>.StaticSegmentKeyIterator iter =
+                     journal.staticSegmentKeyIterator(s -> s.isStatic()
+                                                        && 
s.metadata().totalCount() > 0
+                                                        && 
s.metadata().needsReplay()))
+        {
+            final Semaphore replayParallelism = 
Semaphore.newSemaphore(parallelism);
+            final AtomicBoolean abort = new AtomicBoolean();
+
+            while (iter.hasNext() && !abort.get())
+            {
+                Journal.KeyRefs<ShortMutationId> v = iter.next();
+                v = v; // Make sure it can not be used in async lambda by 
accident
+                ShortMutationId key = v.key();
+                long lastSegment = v.lastSegment();
+                // TODO: respect 
SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
+                replayParallelism.acquireThrowUncheckedOnInterrupt(1);
+                Stage.MUTATION.submit(() -> journal.readLast(key, lastSegment, 
replayOne))
+                              .addCallback(new BiConsumer<Object, Throwable>()
+                {
+                    @Override
+                    public void accept(Object o, Throwable fail)
+                    {
+                        if (fail != null && !journal.handleError("Could not 
replay mutation " + key, fail))
+                            abort.set(true);
+                        replayParallelism.release(1);
+                    }
+                });
+            }
+
+            // Wait for all mutations to be applied before returning
+            replayParallelism.acquireThrowUncheckedOnInterrupt(parallelism);
+        }
+    }
+
+    @VisibleForTesting
+    public void closeCurrentSegmentForTestingIfNonEmpty()
+    {
+        journal.closeCurrentSegmentForTestingIfNonEmpty();
+    }
+
     static class JournalParams implements Params
     {
         @Override
@@ -175,6 +341,8 @@ public class MutationJournal
 
     static class MutationIdSupport implements KeySupport<ShortMutationId>
     {
+        static final MutationIdSupport INSTANCE = new MutationIdSupport();
+
         static final int LOG_ID_OFFSET = 0;
         static final int OFFSET_OFFSET = LOG_ID_OFFSET + TypeSizes.LONG_SIZE;
 
@@ -245,17 +413,20 @@ public class MutationJournal
         }
     }
 
-    static class MutationSerializer implements 
ValueSerializer<ShortMutationId, Mutation>
+    public static class MutationSerializer implements 
ValueSerializer<ShortMutationId, Mutation>
     {
+        public static MutationSerializer INSTANCE = new MutationSerializer();
         @Override
         public void serialize(ShortMutationId id, Mutation mutation, 
DataOutputPlus out, int userVersion) throws IOException
         {
+            Invariants.require(id.hostId != Integer.MIN_VALUE);
             Mutation.serializer.serialize(mutation, out, userVersion);
         }
 
         @Override
         public Mutation deserialize(ShortMutationId id, DataInputPlus in, int 
userVersion) throws IOException
         {
+            Invariants.require(id.hostId != Integer.MIN_VALUE);
             return Mutation.serializer.deserialize(in, userVersion);
         }
     }
diff --git a/src/java/org/apache/cassandra/replication/SegmentStateTracker.java 
b/src/java/org/apache/cassandra/replication/SegmentStateTracker.java
new file mode 100644
index 0000000000..739af6d48d
--- /dev/null
+++ b/src/java/org/apache/cassandra/replication/SegmentStateTracker.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import accord.utils.Invariants;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.IntegerInterval;
+
+/**
+ * Tracks offsets of clean (i.e. memtable->sstable flushed) and dirty (i.e. 
not yet durably persisted in sstable)
+ * allocations.
+ *
+ * Mutations in segments marked as clean do not need to be replayed.
+ *
+ * Tracks which parts of a commit log segment contain unflushed data for each 
table, and determines when all
+ * mutations associated with a segment are fully memtable->sstable flushed
+ *
+ * Maintains per-table states:
+ * - "dirty" high mark (bumped when new allocation is made in the segment)
+ * - "clean" intervals (min/max bounds reported via memtable flushes)
+ *
+ * A segment is considered clean when all dirty intervals are covered by clean 
intervals for every table.
+ */
+public class SegmentStateTracker
+{
+    final long segmentId;
+
+    private final Map<TableId, IntervalState> states = new HashMap<>(32);
+    private final Lock lock = new ReentrantLock();
+
+    public SegmentStateTracker(long segmentId)
+    {
+        this.segmentId = segmentId;
+    }
+
+    public long segmentId()
+    {
+        return segmentId;
+    }
+
+    @VisibleForTesting
+    public boolean isClean()
+    {
+        removeCleanFromDirty();
+        lock.lock();
+        try
+        {
+            return states.isEmpty();
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Should be called _only_ for a static segment to ensure there can be no 
way for interval state
+     * to go back from clean to dirty.
+     *
+     * Removes all clean (i.e. memtable -> sstable flushed) from dirty 
interval. If metadata tracking for all intervals of all tables
+     * are clean, returns true. False otherwise.
+     */
+    public boolean removeCleanFromDirty()
+    {
+        List<Map.Entry<TableId, IntervalState>> states;
+        // Take a "snapshot" of states, while holding a lock
+        lock.lock();
+        try
+        {
+            states = new ArrayList<>(this.states.entrySet());
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+        int[] remove = new int[states.size()];
+        int removeCount = 0;
+
+        // Check if any of the remaining items can be cleaned up, without 
holding a lock
+        for (int i = 0; i < states.size(); i++)
+        {
+            IntervalState state = states.get(i).getValue();
+            if (!state.isDirty())
+                remove[removeCount++] = i;
+        }
+
+        // Remove all fully covered items, while holding a lock
+        if (removeCount > 0)
+        {
+            lock.lock();
+            try
+            {
+                if (this.states.size() == removeCount)
+                {
+                    this.states.clear();
+                    return true;
+                }
+
+                for (int i = 0; i < removeCount; i++)
+                {
+                    Map.Entry<TableId, IntervalState> e = 
states.get(remove[i]);
+                    this.states.remove(e.getKey());
+                }
+            }
+            finally
+            {
+                lock.unlock();
+            }
+        }
+
+        return false;
+    }
+
+    public void markDirty(TableId tableId, CommitLogPosition ptr)
+    {
+        markDirty(tableId, ptr.segmentId, ptr.position);
+    }
+
+    public void markDirty(TableId tableId, long segmentId, int position)
+    {
+        Invariants.require(segmentId == this.segmentId);
+        IntervalState state;
+        lock.lock();
+        try
+        {
+            state = states.computeIfAbsent(tableId, (k) -> {
+                // Initialize with given position as both low and high bound 
to ensure we correctly set
+                // lower bound when marking as clean
+                return new IntervalState(position, position);
+            });
+        }
+        finally
+        {
+            lock.unlock();
+        }
+        state.markDirty(position);
+    }
+
+    public void markClean(TableId tableId, CommitLogPosition lowerBound, 
CommitLogPosition upperBound)
+    {
+        Invariants.require(lowerBound.compareTo(upperBound) <= 0, "%s should 
be smaller than %s", lowerBound, upperBound);
+        if (lowerBound.segmentId > segmentId || upperBound.segmentId < 
segmentId)
+            return;
+
+        IntervalState state;
+        lock.lock();
+        try
+        {
+            state = states.get(tableId);
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+        if (state != null)
+        {
+            // TODO (required): test this logic
+            // Only mark clean ranges for _this_ segment
+            int lower = lowerBound.segmentId == segmentId ? 
lowerBound.position : 0;
+            int upper = upperBound.segmentId == segmentId ? 
upperBound.position : Integer.MAX_VALUE;
+            state.markClean(lower, upper);
+        }
+    }
+
+    private static class IntervalState
+    {
+        static final long[] EMPTY = new long[0];
+
+        // dirty interval in this segment; if interval is not covered by the 
clean set, the log contains unflushed data
+        volatile long dirty;
+        // clean intervals; separate map from above to permit marking Cfs 
clean whilst the log is still in use
+        volatile long[] clean = EMPTY;
+
+        private static final AtomicLongFieldUpdater<IntervalState>             
 dirtyUpdater = AtomicLongFieldUpdater.newUpdater     (IntervalState.class,     
          "dirty");
+        private static final AtomicReferenceFieldUpdater<IntervalState, 
long[]> cleanUpdater = 
AtomicReferenceFieldUpdater.newUpdater(IntervalState.class, long[].class, 
"clean");
+
+        public IntervalState(int lower, int upper)
+        {
+            this(make(lower, upper));
+        }
+
+        private IntervalState(long dirty)
+        {
+            this.dirty = dirty;
+        }
+
+        public void markClean(int start, int end)
+        {
+            long[] prev;
+            long[] next;
+            do
+            {
+                prev = this.clean;
+                next = IntegerInterval.Set.add(prev, start, end);
+            }
+            while (!cleanUpdater.compareAndSet(this, prev, next));
+        }
+
+        public boolean isDirty()
+        {
+            long[] clean = this.clean;
+            long dirty = this.dirty;
+            return !IntegerInterval.Set.covers(clean, lower(dirty), 
upper(dirty));
+        }
+
+        /**
+         * Expands the interval to cover the given value by extending one of 
its sides if necessary.
+         * Mutates this. Thread-safe.
+         */
+        public void markDirty(int value)
+        {
+            long prev;
+            int lower;
+            int upper;
+            do
+            {
+                prev = dirty;
+                upper = upper(prev);
+                lower = lower(prev);
+                if (value > upper) // common case
+                    upper = value;
+                else if (value < lower)
+                    lower = value;
+            }
+            while (!dirtyUpdater.compareAndSet(this, prev, make(lower, 
upper)));
+        }
+
+        public String toString()
+        {
+            long dirty = this.dirty;
+            long[] clean = this.clean;
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < clean.length; i++)
+            {
+                long l = clean[i];
+                if (i > 0)
+                    sb.append(',');
+                
sb.append('[').append(lower(l)).append(',').append(upper(l)).append("]");
+            }
+            return "dirty:[" + lower(dirty) + ',' + upper(dirty) + "], 
clean:[" + sb + "]";
+        }
+
+        private static long make(int lower, int upper)
+        {
+            assert lower <= upper;
+            return ((lower & 0xFFFFFFFFL) << 32) | upper & 0xFFFFFFFFL;
+        }
+
+        private static int lower(long interval)
+        {
+            return (int) (interval >>> 32);
+        }
+
+        private static int upper(long interval)
+        {
+            return (int) interval;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "DefaultStateTracker{" +
+                "segmentId=" + segmentId +
+                ", states=" + states +
+                '}';
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index b66611d8f9..11647410e2 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -340,6 +340,7 @@ public class CassandraDaemon
         try
         {
             CommitLog.instance.recoverSegmentsOnDisk();
+            MutationJournal.instance.replayStaticSegments();
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 08819934d0..aa568104a2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -343,7 +343,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             && diff.after.route() != null)
             journal.onDurable(pointer, () ->
                                        journalTable.safeNotify(index ->
-                                                               
index.update(pointer.segment, key.commandStoreId, key.id, diff.after.route())));
+                                                               
index.update(pointer.segmentId, key.commandStoreId, key.id, 
diff.after.route())));
         if (onFlush != null)
             journal.onDurable(pointer, onFlush);
     }
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 7fc336809c..7333bc468a 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.replication.MutationJournal;
 import org.apache.cassandra.replication.MutationTrackingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -283,6 +284,7 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         try
         {
             CommitLog.instance.recoverSegmentsOnDisk();
+            MutationJournal.instance.replayStaticSegments();
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/utils/IntegerInterval.java 
b/src/java/org/apache/cassandra/utils/IntegerInterval.java
index b26ac45f06..d5e43289e6 100644
--- a/src/java/org/apache/cassandra/utils/IntegerInterval.java
+++ b/src/java/org/apache/cassandra/utils/IntegerInterval.java
@@ -22,9 +22,12 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.stream.Collectors;
 
+import accord.utils.Invariants;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 
+import net.nicoulaj.compilecommand.annotations.Inline;
+
 /**
  * Mutable integer interval class, thread-safe.
  * Represents the interval [lower,upper].
@@ -32,7 +35,7 @@ import com.google.common.primitives.Longs;
 public class IntegerInterval
 {
     volatile long interval;
-    private static AtomicLongFieldUpdater<IntegerInterval> intervalUpdater =
+    private static final AtomicLongFieldUpdater<IntegerInterval> 
intervalUpdater =
             AtomicLongFieldUpdater.newUpdater(IntegerInterval.class, 
"interval");
 
     private IntegerInterval(long interval)
@@ -126,7 +129,7 @@ public class IntegerInterval
      */
     public static class Set
     {
-        static long[] EMPTY = new long[0];
+        static final long[] EMPTY = new long[0];
 
         private volatile long[] ranges = EMPTY;
 
@@ -135,11 +138,15 @@ public class IntegerInterval
          */
         public synchronized void add(int start, int end)
         {
-            assert start <= end;
-            long[] ranges, newRanges;
-            {
-                ranges = this.ranges; // take local copy to avoid risk of it 
changing in the midst of operation
+            this.ranges = add(this.ranges, start, end);
+        }
 
+        @Inline
+        public static long[] add(long[] ranges, int start, int end)
+        {
+            Invariants.require(start <= end, "Start (%d) should be less than 
or equal to end (%d)", start, end);
+            long[] newRanges;
+            {
                 // extend ourselves to cover any ranges we overlap
                 // record directly preceding our end may extend past us, so 
take the max of our end and its
                 int rpos = Arrays.binarySearch(ranges, ((end & 0xFFFFFFFFL) << 
32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position
@@ -174,7 +181,7 @@ public class IntegerInterval
                 for (int i = rpos + 1; i < ranges.length; ++i)
                     newRanges[dest++] = ranges[i];
             }
-            this.ranges = newRanges;
+            return newRanges;
         }
 
         /**
@@ -191,8 +198,13 @@ public class IntegerInterval
          */
         public boolean covers(int start, int end)
         {
-            long[] ranges = this.ranges; // take local copy to avoid risk of 
it changing in the midst of operation
-            int rpos = Arrays.binarySearch(ranges, ((start & 0xFFFFFFFFL) << 
32) | 0xFFFFFFFFL);        // floor (i.e. greatest <=) of the end position
+            return covers(this.ranges, start, end);
+        }
+
+        @Inline
+        public static boolean covers(long[] ranges, int start, int end)
+        {
+            int rpos = Arrays.binarySearch(ranges, ((start & 0xFFFFFFFFL) << 
32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position
             if (rpos < 0)
                 rpos = (-1 - rpos) - 1;
             if (rpos == -1)
@@ -219,7 +231,7 @@ public class IntegerInterval
 
         public Collection<IntegerInterval> intervals()
         {
-            return Lists.transform(Longs.asList(ranges), iv -> new 
IntegerInterval(iv));
+            return Lists.transform(Longs.asList(ranges), IntegerInterval::new);
         }
 
         @Override
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index bd9f69451a..dfa676b074 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -825,6 +825,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         try
         {
             CommitLog.instance.recoverSegmentsOnDisk();
+            MutationJournal.instance.replayStaticSegments();
         }
         catch (IOException e)
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java
new file mode 100644
index 0000000000..3e0a1ad260
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.log.FuzzTestBase;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
+import org.apache.cassandra.replication.MutationJournal;
+import org.junit.Test;
+
+
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+
+public class MutationTrackingBounceTest extends FuzzTestBase
+{
+    private static final int POPULATION = 1000;
+
+    @Test
+    public void bounceTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(1).start())
+        {
+            int tables = 10;
+            int writesPerKey = 2;
+            int pks = 100;
+            withRandom(rng -> {
+                cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1} " +
+                                                   "AND 
replication_type='tracked'",
+                                                   KEYSPACE));
+
+                List<HistoryBuilder> builders = new ArrayList<>();
+                for (int i = 0; i < tables; i++)
+                {
+                    Generator<SchemaSpec> schemaGen = 
SchemaGenerators.trivialSchema(KEYSPACE, () -> "mutation_tracking_bounce_" + 
(builders.size() + 1), POPULATION,
+                                                                               
      SchemaSpec.optionsBuilder());
+
+                    SchemaSpec schema = schemaGen.generate(rng);
+                    cluster.schemaChange(schema.compile());
+                    builders.add(new 
ReplayingHistoryBuilder(schema.valueGenerators,
+                                                             hb -> 
InJvmDTestVisitExecutor.builder()
+                                                                               
           .consistencyLevel(ConsistencyLevel.QUORUM)
+                                                                               
           .build(schema, hb, cluster)));
+                }
+
+                int counter = 0;
+                for (int pk = 0; pk < pks; pk++) {
+                    for (HistoryBuilder history : builders)
+                        for (int i = 0; i < writesPerKey; i++)
+                            history.insert(pk);
+
+                    if (++counter % 10 == 0)
+                        cluster.get(1).runOnInstance(() -> 
MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty());
+                }
+
+                ClusterUtils.stopUnchecked(cluster.get(1));
+                cluster.get(1).startup();
+
+                for (int pk = 0; pk < pks; pk++)
+                    for (HistoryBuilder history : builders)
+                        for (int i = 0; i < 10; i++)
+                            history.selectPartition(pk);
+
+                cluster.get(1).runOnInstance(new 
MutationTrackingBounce_ValidateRunnable(tables * pks * writesPerKey));
+            });
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java
new file mode 100644
index 0000000000..0afb402b1d
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.journal.DeserializedRecordConsumer;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.ShortMutationId;
+import org.junit.Assert;
+
+// Separate class for MutationTrackingBounceTest, since without it we were 
getting non-serializable class exceptions, likely due to static fields
+public class MutationTrackingBounce_ValidateRunnable implements 
IIsolatedExecutor.SerializableRunnable
+{
+    private final int count;
+
+    public MutationTrackingBounce_ValidateRunnable(int expectedMutations)
+    {
+        this.count = expectedMutations;
+    }
+
+    @Override
+    public void run()
+    {
+        AtomicInteger counter = new AtomicInteger();
+        MutationJournal.instance.replay(new 
DeserializedRecordConsumer<>(MutationJournal.MutationSerializer.INSTANCE)
+        {
+            Set<ShortMutationId> seen = new HashSet<>();
+            @Override
+            protected void accept(long segment, int position, ShortMutationId 
key, Mutation mutation)
+            {
+                if (!seen.add(key))
+                    throw new AssertionError(String.format("Should have 
witnessed each key just once, but seen %s already", key));
+
+                for (PartitionUpdate partitionUpdate : 
mutation.getPartitionUpdates())
+                {
+                    if 
(!MutationTrackingService.instance.createSummaryForKey(partitionUpdate.partitionKey(),
 partitionUpdate.metadata().id, false)
+                                                         .contains(key))
+                    {
+                        throw new AssertionError(String.format("Mutation %s 
should have been witnessed (%s)", mutation, key));
+                    }
+                }
+                counter.incrementAndGet();
+            }
+        }, 1);
+        Assert.assertEquals(count, counter.get());
+    }
+}
diff --git a/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java 
b/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java
index f44b6e5fa2..ee930bbd3e 100644
--- a/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java
+++ b/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.harry.checker;
 
+import java.util.function.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,20 @@ public class TestHelper
         }
     }
 
+    public static void withRandom(long seed, 
ModelChecker.ThrowingConsumer<EntropySource> rng, Consumer<Throwable> onError)
+    {
+        try
+        {
+            logger.info("Seed: {}", seed);
+            rng.accept(new JdkRandomEntropySource(seed));
+        }
+        catch (Throwable t)
+        {
+            onError.accept(t);
+            throw new AssertionError(String.format("Caught an exception at 
seed:%dL", seed), t);
+        }
+    }
+
     public static void repeat(int num, 
ExecUtil.ThrowingSerializableRunnable<?> r)
     {
         for (int i = 0; i < num; i++)
diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java 
b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
new file mode 100644
index 0000000000..7e5922237a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.junit.Test;
+
+
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+
+public class SegmentsTest
+{
+    @Test
+    public void testSelect()
+    {
+        withRandom(0l, rng -> {
+            // Create mock segments with different timestamps
+            java.io.File file = File.createTempFile("segments", "test");
+            List<Segment<String, String>> segmentList = new ArrayList<>();
+            Set<Long> taken = new HashSet<>();
+            for (int i = 0; i < 100; i++)
+            {
+                while (true)
+                {
+                    long id = rng.nextLong(0, 10_000);
+                    if (taken.add(id))
+                    {
+                        segmentList.add(new TestSegment<>(file, id));
+                        break;
+                    }
+                }
+            }
+            segmentList.sort(Comparator.comparing(s -> 
s.descriptor.timestamp));
+
+            Segments<String, String> segments = Segments.of(segmentList);
+            for (int i = 0; i < 10_000; i++)
+            {
+                // Generate two distinct segment idxs
+                int i1 = rng.nextInt(segmentList.size());
+                int i2;
+                do
+                {
+                    i2 = rng.nextInt(segmentList.size());
+                }
+                while (i2 == i1);
+                int min = Math.min(i1, i2);
+                int max = Math.max(i1, i2);
+                List<Segment<String, String>> selected = new ArrayList<>();
+                segments.select(segmentList.get(min).id(),
+                                segmentList.get(max).id(),
+                                selected);
+                List<Segment<String, String>> expected = 
segmentList.subList(min, max + 1);
+                if (!Objects.equals(expected, selected))
+                {
+                    throw new AssertionError(String.format("\nExpected: %s\n" +
+                                                           "Selected: %s",
+                                                           expected,
+                                                           selected));
+                }
+            }
+        });
+    }
+
+    private static class TestSegment<K, V> extends Segment<K, V>
+    {
+        TestSegment(File dir, long timestamp)
+        {
+            super(Descriptor.create(new 
org.apache.cassandra.io.util.File(dir), timestamp, 1), null, null);
+        }
+
+        @Override
+        void close(Journal<K, V> journal)
+        {
+
+        }
+
+        @Override
+        public boolean isActive()
+        {
+            return false;
+        }
+
+        @Override public boolean isStatic()
+        {
+            return false;
+        }
+
+        @Override Index<K> index() { throw new 
UnsupportedOperationException(); }
+        @Override boolean isFlushed(long position) { throw new 
UnsupportedOperationException(); }
+        @Override public void persistMetadata() { throw new 
UnsupportedOperationException(); }
+        @Override boolean read(int offset, int size, 
EntrySerializer.EntryHolder<K> into)  { throw new 
UnsupportedOperationException(); }
+        @Override public ActiveSegment<K, V> asActive() { throw new 
UnsupportedOperationException(); }
+        @Override public StaticSegment<K, V> asStatic() { throw new 
UnsupportedOperationException(); }
+        @Override public Ref<Segment<K, V>> selfRef() { throw new 
UnsupportedOperationException(); }
+        @Override public Ref<Segment<K, V>> tryRef(){ throw new 
UnsupportedOperationException(); }
+        @Override public Ref<Segment<K, V>> ref(){ throw new 
UnsupportedOperationException(); }
+
+        @Override
+        public String toString()
+        {
+            return "TestSegment{" +
+                   "id=" + descriptor.timestamp +
+                   '}';
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            TestSegment<K, V> other = (TestSegment<K, V>) obj;
+            return descriptor.equals(other.descriptor);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java 
b/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java
new file mode 100644
index 0000000000..a834d9ff8a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.harry.checker.TestHelper;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.DeserializedRecordConsumer;
+import org.apache.cassandra.journal.TestParams;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tools.FieldUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import static 
org.apache.cassandra.replication.MutationJournalTest.assertMutationsEqual;
+
+public class MutationJournalReplayTest
+{
+    private static final String KEYSPACE = "ks";
+    private static final String TABLE_PREFIX = "tbl";
+    private static final TableMetadata[] TABLES = new TableMetadata[10];
+    private static MutationJournal journal;
+
+    @BeforeClass
+    public static void setUp() throws IOException
+    {
+        SchemaLoader.prepareServer();
+
+        File directory = new 
File(Files.createTempDirectory("mutation-journal-replay-test"));
+        directory.deleteRecursiveOnExit();
+
+        journal = new MutationJournal(directory, new 
TestParams(MessagingService.current_version)
+        {
+            @Override
+            public long flushPeriod(TimeUnit units)
+            {
+                return 1;
+            }
+
+            @Override
+            public FlushMode flushMode()
+            {
+                return FlushMode.PERIODIC;
+            }
+        });
+        FieldUtil.setInstanceUnsafe(MutationJournal.class, journal, 
"instance");
+        journal.start();
+
+        for (int i = 0; i < TABLES.length; i++)
+        {
+            TABLES[i] = TableMetadata.builder(KEYSPACE, String.format("%s_%d", 
TABLE_PREFIX, i))
+                                     
.keyspaceReplicationType(ReplicationType.tracked)
+                                     .addPartitionKeyColumn("pk", 
UTF8Type.instance)
+                                     .addClusteringColumn("ck", 
UTF8Type.instance)
+                                     .addRegularColumn("value", 
UTF8Type.instance)
+                                     .build();
+        }
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1, 
ReplicationType.tracked), TABLES);
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        journal.shutdownBlocking();
+    }
+
+    @Test
+    public void testReplay() throws Throwable
+    {
+        long seed = 0l;
+        TestHelper.withRandom(seed,
+                              rng -> {
+                                  List<Mutation> original = new ArrayList<>();
+                                  for (int i = 1; i <= 10_000; i++)
+                                  {
+                                      MutationId id = new MutationId(100L, i, 
i);
+                                      Mutation mutation = mutation(i % 10, 
i).withMutationId(id);
+                                      journal.write(id, mutation);
+                                      original.add(mutation);
+                                      if (i % rng.nextInt(1, 100) > 90)
+                                          
journal.closeCurrentSegmentForTestingIfNonEmpty();
+                                  }
+
+                                  
journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+                                  List<Mutation> replayed = new ArrayList<>();
+                                  journal.replay(new 
DeserializedRecordConsumer<ShortMutationId, 
Mutation>(MutationJournal.MutationSerializer.INSTANCE)
+                                  {
+                                      @Override
+                                      protected void accept(long segment, int 
position, ShortMutationId key, Mutation mutation)
+                                      {
+                                          replayed.add(mutation);
+                                      }
+                                  }, 1);
+
+                                  assertMutationsEqual(original, replayed);
+                              });
+    }
+
+    @Test
+    public void testReplayFlushed() throws Throwable
+    {
+        long seed = 0l;
+        class Bounds
+        {
+            final CommitLogPosition start;
+            final CommitLogPosition end;
+            final int count;
+
+            Bounds(CommitLogPosition start, CommitLogPosition end, int count)
+            {
+                this.start = start;
+                this.end = end;
+                this.count = count;
+            }
+        }
+        TestHelper.withRandom(seed,
+                              rng -> {
+                                  List<Mutation> original = new ArrayList<>();
+
+                                  List<Bounds> testFlushBounds = new 
ArrayList<>();
+                                  CommitLogPosition prevPos = 
journal.getCurrentPosition();
+                                  int count = 0;
+                                  for (int i = 1; i <= 1000; i++)
+                                  {
+                                      MutationId id = new MutationId(100L, i, 
i);
+                                      Mutation mutation = mutation(i % 
TABLES.length, i).withMutationId(id);
+                                      journal.write(id, mutation);
+                                      count++;
+                                      original.add(mutation);
+                                      if (i % rng.nextInt(1, 100) > 90)
+                                      {
+                                          CommitLogPosition curPos = 
journal.getCurrentPosition();
+                                          
journal.closeCurrentSegmentForTestingIfNonEmpty();
+                                          testFlushBounds.add(new 
Bounds(prevPos, curPos, count));
+                                          count = 0;
+                                          prevPos = curPos;
+                                      }
+                                  }
+
+                                  
journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+                                  int flushed = 0;
+                                  for (Bounds bounds : testFlushBounds)
+                                  {
+                                      if (rng.nextBoolean())
+                                      {
+                                          for (TableMetadata table : TABLES)
+                                              journal.notifyFlushed(table.id, 
bounds.start, bounds.end);
+                                          flushed += bounds.count;
+                                      }
+                                  }
+
+                                  List<Mutation> replayed = new ArrayList<>();
+                                  journal.replay(new 
DeserializedRecordConsumer<ShortMutationId, 
Mutation>(MutationJournal.MutationSerializer.INSTANCE)
+                                  {
+                                      @Override
+                                      protected void accept(long segment, int 
position, ShortMutationId key, Mutation mutation)
+                                      {
+                                          replayed.add(mutation);
+                                      }
+                                  }, 1);
+
+                                  Assert.assertEquals(original.size() - 
flushed,
+                                                      replayed.size());
+                              });
+    }
+
+
+    private static String CACHED_STRING = null;
+    private static Mutation mutation(int table, int value)
+    {
+        if (CACHED_STRING == null)
+        {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < 512; i++)
+            {
+                sb.append('.');
+            }
+            CACHED_STRING = sb.toString();
+        }
+        return new RowUpdateBuilder(TABLES[table], 0, "key_" + value)
+                .clustering("ck")
+                .add("value", CACHED_STRING)
+                .build();
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java 
b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
index a0c778a3e4..ba741799aa 100644
--- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
+++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
@@ -127,19 +127,20 @@ public class MutationJournalTest
         assertMutationsEqual(expected, actual);
     }
 
-    private static void assertMutationEquals(Mutation expected, Mutation 
actual)
+    public static void assertMutationEquals(Mutation expected, Mutation actual)
     {
-        assertEquals(serialize(expected), serialize(actual));
+        if (!serialize(expected).equals(serialize(actual)))
+            throw new AssertionError(String.format("Expected %s but got %s", 
expected, actual));
     }
 
-    private static void assertMutationsEqual(List<Mutation> expected, 
List<Mutation> actual)
+    public static void assertMutationsEqual(List<Mutation> expected, 
List<Mutation> actual)
     {
         assertEquals(expected.size(), actual.size());
         for (int i = 0; i < expected.size(); i++)
             assertMutationEquals(expected.get(i), actual.get(i));
     }
 
-    private static ByteBuffer serialize(Mutation mutation)
+    public static ByteBuffer serialize(Mutation mutation)
     {
         try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
         {
diff --git 
a/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java 
b/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java
new file mode 100644
index 0000000000..928deca008
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.harry.checker.TestHelper;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SegmentStateTrackerTest
+{
+    @Test
+    public void trivialTest()
+    {
+        long segment = 123L;
+        SegmentStateTracker tracker = new SegmentStateTracker(segment);
+        TableId tbl = TableId.generate();
+
+        assertTrue(tracker.isClean());
+        for (int i = 1; i <= 10; i++)
+        {
+            tracker.markDirty(tbl, segment, i);
+            assertFalse(tracker.isClean());
+            if (i % 5 == 0)
+            {
+                tracker.markClean(tbl, pos(segment, i - 5), pos(segment, i));
+                assertTrue(tracker.isClean());
+            }
+        }
+        assertTrue(tracker.isClean());
+    }
+
+    @Test
+    public void memtableSpannignMultipleSegmentsTest()
+    {
+        SegmentStateTracker segment1 = new SegmentStateTracker(1);
+        SegmentStateTracker segment2 = new SegmentStateTracker(2);
+        TableId tbl = TableId.fromLong(1);
+
+        segment1.markDirty(tbl, segment1.segmentId(), 10);
+        segment2.markDirty(tbl, segment2.segmentId(), 10);
+        segment2.markDirty(tbl, segment2.segmentId(), 20);
+
+        segment1.markClean(tbl, pos(segment1.segmentId(), 10), 
pos(segment2.segmentId(), 10)); // first flush
+        segment2.markClean(tbl, pos(segment1.segmentId(), 10), 
pos(segment2.segmentId(), 10)); // first flush
+        segment2.markClean(tbl, pos(segment2.segmentId(), 10), 
pos(segment2.segmentId(), 30)); // second flush
+        Assert.assertTrue(segment1.isClean());
+        Assert.assertTrue(segment2.isClean());
+    }
+
+    @Test
+    public void fuzzTest()
+    {
+        int allocationCount = 10_000;
+        long seed = 0;
+        List<String> ops = null; // new ArrayList<>(); // to enable operation 
logging
+        AtomicLong tableIdGen = new AtomicLong();
+        TestHelper.withRandom(seed,
+                              rng -> {
+                                  // This test tracks state from the 
perspective of state tracker:
+                                  // We make allocations by bumping the 
pointer, reporting allocations to different tables.
+                                  // Table can flush subsequent allocations, 
and report information back to the segment.
+                                  class Table
+                                  {
+                                      final TableId tableId = 
TableId.fromLong(tableIdGen.getAndIncrement());
+
+                                      // Track all allocations to a cfid in a 
Table, and perform contiguous "flushes"
+                                      // that perform position bounds to 
tracker
+                                      ArrayList<CommitLogPosition> 
unflushedAllocations = new ArrayList<>();
+
+                                      // Memtables _always_ report contiguous 
chunks of metadata, see CFS$Flush, new memtable
+                                      // will always be created with 
commitLogUpperBound of the previous one.
+                                      CommitLogPosition lastFlushMax = null;
+
+                                      void addAllocation(CommitLogPosition pos)
+                                      {
+                                          if (ops != null) ops.add("Allocate " 
+ pos + " in " + tableId);
+                                          unflushedAllocations.add(pos);
+                                      }
+
+                                      void 
flush(Collection<SegmentStateTracker> trackers)
+                                      {
+                                          if (unflushedAllocations.isEmpty())
+                                              return;
+
+                                          CommitLogPosition min = lastFlushMax 
== null ? unflushedAllocations.get(0) : lastFlushMax;
+                                          CommitLogPosition max = 
unflushedAllocations.get(rng.nextInt(unflushedAllocations.size()));
+                                          lastFlushMax = max;
+
+                                          if (ops != null) 
ops.add(String.format("Flush %s [%s, %s]", tableId, min, max));
+                                          for (SegmentStateTracker tracker : 
trackers)
+                                              reportFlushed(tracker, min, max);
+
+                                          // TODO (required): use array an 
copying instead
+                                          unflushedAllocations.removeIf(alloc 
-> alloc.compareTo(min) >= 0 && alloc.compareTo(max) <= 0);
+                                      }
+
+                                      boolean hasUnflushed()
+                                      {
+                                          return 
!unflushedAllocations.isEmpty();
+                                      }
+
+                                      boolean hasUnflushedFor(long segment)
+                                      {
+                                          for (CommitLogPosition alloc : 
unflushedAllocations)
+                                          {
+                                              if (alloc.segmentId == segment)
+                                                  return true;
+                                          }
+                                          return false;
+                                      }
+
+                                      List<CommitLogPosition> 
getUnflushedFor(long segment)
+                                      {
+                                          List<CommitLogPosition> unflushed = 
new ArrayList<>();
+                                          for (CommitLogPosition alloc : 
unflushedAllocations)
+                                          {
+                                              if (alloc.segmentId == segment)
+                                                  unflushed.add(alloc);
+                                          }
+                                          return unflushed;
+                                      }
+
+                                      void reportFlushed(SegmentStateTracker 
tracker, CommitLogPosition minBound, CommitLogPosition maxBound)
+                                      {
+                                          if (tracker.segmentId() >= 
minBound.segmentId && tracker.segmentId() <= maxBound.segmentId)
+                                              tracker.markClean(tableId, 
minBound, maxBound);
+                                      }
+                                  }
+
+                                  int tableCount = 10;
+                                  List<Table> tables = new 
ArrayList<>(tableCount);
+                                  for (int i = 0; i < tableCount; i++)
+                                      tables.add(new Table());
+
+                                  Map<Long, SegmentStateTracker> segments = 
new HashMap<>();
+                                  Runnable validateAllSegments = () -> {
+                                      for (SegmentStateTracker segment : 
segments.values())
+                                      {
+                                          boolean segmentIsClean = 
segment.isClean();
+                                          boolean allTablesFlushed = 
tables.stream().noneMatch(t -> t.hasUnflushedFor(segment.segmentId()));
+                                          if (segmentIsClean != 
allTablesFlushed)
+                                              throw new 
IllegalArgumentException(String.format("Segment is %sclean, but table has 
%sunflushed allocations:\n%s\n%s",
+                                                                               
                segmentIsClean ? "" : "not ",
+                                                                               
                allTablesFlushed ? "" : "no ",
+                                                                               
                segment,
+                                                                               
                tables.stream().map(t -> Pair.create(t.tableId, 
t.getUnflushedFor(segment.segmentId())))
+                                                                               
                      .filter(t -> !t.right.isEmpty())
+                                                                               
                      .collect(Collectors.toList())
+                                                                               
               ));
+                                      }
+                                  };
+
+                                  SegmentStateTracker currentSegment = null;
+                                  int currentSegmentOffset = 0;
+                                  for (int i = 0; i < allocationCount; i++)
+                                  {
+                                      if (i > 0 && i % 50 == 0)
+                                      {
+                                          for (int j = 0; j < 3; j++)
+                                          {
+                                              Table table = 
tables.get(rng.nextInt(tableCount));
+                                              table.flush(segments.values());
+                                              validateAllSegments.run();
+                                          }
+                                      }
+
+                                      if (i % 100 == 0)
+                                      {
+                                          currentSegment = new 
SegmentStateTracker(segments.size());
+                                          currentSegmentOffset = 0;
+                                          
segments.put(currentSegment.segmentId(), currentSegment);
+                                      }
+
+                                      int size = rng.nextInt(100);
+                                      Table table = 
tables.get(rng.nextInt(tableCount));
+                                      CommitLogPosition pos = 
pos(currentSegment.segmentId(), currentSegmentOffset);
+                                      table.addAllocation(pos);
+                                      currentSegment.markDirty(table.tableId, 
pos);
+                                      currentSegmentOffset += size;
+                                      validateAllSegments.run();
+                                  }
+
+                                  while 
(tables.stream().anyMatch(Table::hasUnflushed))
+                                  {
+                                      Table table = 
tables.get(rng.nextInt(tableCount));
+                                      if (table.hasUnflushed())
+                                          table.flush(segments.values());
+
+                                      validateAllSegments.run();
+                                  }
+                              },
+                              e -> {
+                                  if (ops != null)
+                                  {
+                                      System.out.println("History: ");
+                                      for (String op : ops)
+                                          System.out.println(op);
+                                  }
+                              });
+    }
+
+
+    public static CommitLogPosition pos(long segmentId, int pos)
+    {
+        return new CommitLogPosition(segmentId, pos);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to