This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new 31f81c0b2c CEP-45: Integrate Mutation Journal with Memtable flush
tracking
31f81c0b2c is described below
commit 31f81c0b2cf5939e751b012ce7ebdfd975fb0814
Author: Alex Petrov <[email protected]>
AuthorDate: Thu Sep 18 17:54:52 2025 +0200
CEP-45: Integrate Mutation Journal with Memtable flush tracking
* Tracking of Memtable -> SSTable flushes in Mutation Journal segments
* Replay of Mutation Journal segments that were not fully flushed to
SSTables
* Distinguish between CommitLog and MutationJournal offsets
* Effectively adds support for node bounces
Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-20919
---
.../org/apache/cassandra/db/ColumnFamilyStore.java | 39 ++-
src/java/org/apache/cassandra/db/Mutation.java | 5 +
.../cassandra/db/commitlog/CommitLogReplayer.java | 6 +-
.../db/tracked/TrackedKeyspaceWriteHandler.java | 6 +-
.../apache/cassandra/journal/ActiveSegment.java | 39 ++-
.../journal/DeserializedRecordConsumer.java | 51 ++++
src/java/org/apache/cassandra/journal/Journal.java | 75 +++++-
.../org/apache/cassandra/journal/Metadata.java | 40 ++-
.../apache/cassandra/journal/RecordConsumer.java | 1 +
.../apache/cassandra/journal/RecordPointer.java | 35 +--
src/java/org/apache/cassandra/journal/Segment.java | 12 +-
.../org/apache/cassandra/journal/Segments.java | 32 +++
.../apache/cassandra/journal/StaticSegment.java | 8 +
.../apache/cassandra/replication/MutationId.java | 6 +
.../cassandra/replication/MutationJournal.java | 193 +++++++++++++-
.../cassandra/replication/SegmentStateTracker.java | 295 +++++++++++++++++++++
.../apache/cassandra/service/CassandraDaemon.java | 1 +
.../cassandra/service/accord/AccordJournal.java | 2 +-
src/java/org/apache/cassandra/tcm/Startup.java | 2 +
.../apache/cassandra/utils/IntegerInterval.java | 32 ++-
.../cassandra/distributed/impl/Instance.java | 1 +
.../test/tracking/MutationTrackingBounceTest.java | 92 +++++++
.../MutationTrackingBounce_ValidateRunnable.java | 69 +++++
.../apache/cassandra/harry/checker/TestHelper.java | 15 ++
.../org/apache/cassandra/journal/SegmentsTest.java | 135 ++++++++++
.../replication/MutationJournalReplayTest.java | 218 +++++++++++++++
.../cassandra/replication/MutationJournalTest.java | 9 +-
.../replication/SegmentStateTrackerTest.java | 237 +++++++++++++++++
28 files changed, 1551 insertions(+), 105 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dacac3becf..80dfa68cf7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -134,6 +134,7 @@ import
org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.replication.MutationId;
+import org.apache.cassandra.replication.MutationJournal;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
@@ -498,9 +499,16 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
logger.info("Initializing {}.{}", getKeyspaceName(), name);
- Memtable initialMemtable = DatabaseDescriptor.isDaemonInitialized() ?
- createMemtable(new
AtomicReference<>(CommitLog.instance.getCurrentPosition())) :
- null;
+ Memtable initialMemtable = null;
+ if (DatabaseDescriptor.isDaemonInitialized())
+ {
+ CommitLogPosition commitLogPosition;
+ if (metadata().replicationType().isTracked())
+ commitLogPosition =
MutationJournal.instance.getCurrentPosition();
+ else
+ commitLogPosition = CommitLog.instance.getCurrentPosition();
+ initialMemtable = createMemtable(new
AtomicReference<>(commitLogPosition));
+ }
memtableMetricsReleaser =
memtableFactory.createMemtableMetricsReleaser(metadata);
data = new Tracker(this, initialMemtable, loadSSTables);
@@ -1147,8 +1155,13 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
// If a flush errored out but the error was ignored, make sure we
don't discard the commit log.
if (flushFailure == null && mainMemtable != null)
{
+ CommitLogPosition commitLogLowerBound =
mainMemtable.getCommitLogLowerBound();
commitLogUpperBound =
mainMemtable.getFinalCommitLogUpperBound();
- CommitLog.instance.discardCompletedSegments(metadata.id,
mainMemtable.getCommitLogLowerBound(), commitLogUpperBound);
+ TableMetadata metadata = metadata();
+ if (metadata.replicationType().isTracked())
+ MutationJournal.instance.notifyFlushed(metadata.id,
commitLogLowerBound, commitLogUpperBound);
+ else
+ CommitLog.instance.discardCompletedSegments(metadata.id,
commitLogLowerBound, commitLogUpperBound);
}
metric.pendingFlushes.dec();
@@ -1215,7 +1228,7 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
// we then ensure an atomic decision is made about the upper bound
of the continuous range of commit log
// records owned by this memtable
- setCommitLogUpperBound(commitLogUpperBound);
+ setCommitLogUpperBound(commitLogUpperBound,
metadata().replicationType().isTracked());
// we then issue the barrier; this lets us wait for all operations
started prior to the barrier to complete;
// since this happens after wiring up the commitLogUpperBound, we
also know all operations with earlier
@@ -1416,7 +1429,7 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
}
// atomically set the upper bound for the commit log
- private static void
setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound)
+ private static void
setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound,
boolean useMutationJournal)
{
// we attempt to set the holder to the current commit log context. at
the same time all writes to the memtables are
// also maintaining this value, so if somebody sneaks ahead of us
somehow (should be rare) we simply retry,
@@ -1424,7 +1437,13 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
CommitLogPosition lastReplayPosition;
while (true)
{
- lastReplayPosition = new
Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition()));
+ CommitLogPosition commitLogPosition;
+ if (useMutationJournal)
+ commitLogPosition =
MutationJournal.instance.getCurrentPosition();
+ else
+ commitLogPosition = CommitLog.instance.getCurrentPosition();
+
+ lastReplayPosition = new
Memtable.LastCommitLogPosition(commitLogPosition);
CommitLogPosition currentLast = commitLogUpperBound.get();
if ((currentLast == null ||
currentLast.compareTo(lastReplayPosition) <= 0)
&& commitLogUpperBound.compareAndSet(currentLast,
lastReplayPosition))
@@ -3233,7 +3252,11 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
data.notifyDropped(DatabaseDescriptor.getAutoSnapshotTtl());
-
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
+ // TODO (required): test mutation tracking + table dropping
+ if (metadata().replicationType().isTracked())
+ MutationJournal.instance.notifyFlushed(metadata.id, new
CommitLogPosition(0, 0), MutationJournal.instance.getCurrentPosition());
+ else
+
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
compactionStrategyManager.shutdown();
diff --git a/src/java/org/apache/cassandra/db/Mutation.java
b/src/java/org/apache/cassandra/db/Mutation.java
index f8388bfe89..c81ea47310 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -201,6 +201,11 @@ public class Mutation implements IMutation,
Supplier<Mutation>
return key;
}
+ public ImmutableMap<TableId, PartitionUpdate> modifications()
+ {
+ return modifications;
+ }
+
public ImmutableCollection<PartitionUpdate> getPartitionUpdates()
{
return modifications.values();
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 52173d44fb..f331e0b1a7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -121,7 +121,7 @@ public class CommitLogReplayer implements
CommitLogReadHandler
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- // but, if we've truncated the cf in question, then we need to
need to start replay after the truncation
+ // but, if we've truncated the cf in question, then we need to
start replay after the truncation
CommitLogPosition truncatedAt =
SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
if (truncatedAt != null)
{
@@ -323,7 +323,7 @@ public class CommitLogReplayer implements
CommitLogReadHandler
{
assert !newPUCollector.isEmpty();
-
Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(),
false, true, false);
+ keyspace.apply(newPUCollector.build(), false, true,
false);
commitLogReplayer.keyspacesReplayed.add(keyspace);
}
}
@@ -439,7 +439,7 @@ public class CommitLogReplayer implements
CommitLogReadHandler
if (toReplay.isEmpty())
logger.info("All tables will be included in commit log
replay.");
else
- logger.info("Tables to be replayed: {}",
toReplay.asMap().toString());
+ logger.info("Tables to be replayed: {}", toReplay.asMap());
return new CustomReplayFilter(toReplay);
}
diff --git
a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java
b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java
index bfd6d593c6..8c06e3c308 100644
--- a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java
+++ b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.journal.RecordPointer;
import org.apache.cassandra.replication.MutationJournal;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -40,10 +39,9 @@ public class TrackedKeyspaceWriteHandler implements
KeyspaceWriteHandler
group = Keyspace.writeOrder.start();
Tracing.trace("Appending to mutation journal");
- RecordPointer pointer =
MutationJournal.instance.write(mutation.id(), mutation);
+ CommitLogPosition pointer =
MutationJournal.instance.write(mutation.id(), mutation);
- // TODO (preferred): update journal to return CommitLogPosition or
otherwise remove requirement to allocate second object here
- return new CassandraWriteContext(group, new
CommitLogPosition(pointer.segment, pointer.position));
+ return new CassandraWriteContext(group, pointer);
}
catch (Throwable t)
{
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index 6000e2c9d6..9e01f493d7 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -30,10 +30,10 @@ import java.util.function.Consumer;
import accord.utils.Invariants;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -98,6 +98,11 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
}
}
+ public CommitLogPosition currentPosition()
+ {
+ return new CommitLogPosition(id(), (int) allocateOffset);
+ }
+
static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params
params, KeySupport<K> keySupport)
{
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
@@ -134,6 +139,12 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
throw new UnsupportedOperationException();
}
+ @Override
+ public void persistMetadata()
+ {
+ throw new UnsupportedOperationException("Can not mutate active
segment's metadata.");
+ }
+
/**
* Read the entry and specified offset into the entry holder.
* Expects the caller to acquire the ref to the segment and the record to
exist.
@@ -425,19 +436,21 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
}
}
- final class Allocation
+ final class Allocation extends RecordPointer
{
private final OpOrder.Group appendOp;
private final ByteBuffer buffer;
- private final int start;
- private final int length;
Allocation(OpOrder.Group appendOp, ByteBuffer buffer, int length)
{
+ super(descriptor.timestamp, buffer.position(), length);
this.appendOp = appendOp;
this.buffer = buffer;
- this.start = buffer.position();
- this.length = length;
+ }
+
+ Segment<K, V> segment()
+ {
+ return ActiveSegment.this;
}
void write(K id, ByteBuffer record)
@@ -446,7 +459,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
{
EntrySerializer.write(id, record, keySupport, buffer,
descriptor.userVersion);
metadata.update();
- index.update(id, start, length);
+ index.update(id, position, length);
}
catch (IOException e)
{
@@ -472,14 +485,12 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
}
}
-
- // Variant of write that does not allocate/return a record pointer
void writeInternal(K id, ByteBuffer record)
{
try
{
EntrySerializer.write(id, record, keySupport, buffer,
descriptor.userVersion);
- index.update(id, start, length);
+ index.update(id, position, length);
metadata.update();
}
catch (IOException e)
@@ -496,13 +507,13 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
{
try (Timer.Context ignored = waitingOnFlush.time())
{
- waitForFlush(start);
+ waitForFlush(position);
}
}
boolean isFsynced()
{
- return fsyncedTo >= start + length;
+ return fsyncedTo >= position + length;
}
Descriptor descriptor()
@@ -512,12 +523,12 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
int start()
{
- return start;
+ return position;
}
RecordPointer recordPointer()
{
- return new RecordPointer(descriptor.timestamp, start, length);
+ return this;
}
}
diff --git
a/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java
b/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java
new file mode 100644
index 0000000000..0f058befc0
--- /dev/null
+++ b/src/java/org/apache/cassandra/journal/DeserializedRecordConsumer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.journal;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public abstract class DeserializedRecordConsumer<K, V> implements
RecordConsumer<K>
+{
+ final ValueSerializer<K, V> valueSerializer;
+
+ public DeserializedRecordConsumer(ValueSerializer<K, V> valueSerializer)
+ {
+ this.valueSerializer = valueSerializer;
+ }
+
+ @Override
+ public void accept(long segment, int position, K key, ByteBuffer buffer,
int userVersion)
+ {
+ try (DataInputBuffer in = new DataInputBuffer(buffer, false))
+ {
+ V value = valueSerializer.deserialize(key, in, userVersion);
+ accept(segment, position, key, value);
+ }
+ catch (IOException e)
+ {
+ // can only throw if serializer is buggy
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected abstract void accept(long segment, int position, K key, V value);
+}
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index eaab157ea9..3176c5b395 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -36,6 +36,7 @@ import java.util.zip.CRC32;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
+import org.agrona.collections.Long2ObjectHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +113,7 @@ public class Journal<K, V> implements Shutdownable
// segment that is ready to be used; allocator thread fills this and
blocks until consumed
private volatile ActiveSegment<K, V> availableSegment = null;
- private final AtomicReference<Segments<K, V>> segments = new
AtomicReference<>();
+ private final AtomicReference<Segments<K, V>> segments = new
AtomicReference<>(new Segments<>(new Long2ObjectHashMap<>()));
final AtomicReference<State> state = new
AtomicReference<>(State.UNINITIALIZED);
@@ -340,6 +341,15 @@ public class Journal<K, V> implements Shutdownable
return null;
}
+ public void readLast(K id, long segmentId, DeserializedRecordConsumer<K,
V> consumer)
+ {
+ Segment<K, V> segment = segments.get().get(segmentId);
+ try (OpOrder.Group group = readOrder.start())
+ {
+ segment.readLast(id, consumer);
+ }
+ }
+
public void readAll(K id, RecordConsumer<K> consumer)
{
EntrySerializer.EntryHolder<K> holder = new
EntrySerializer.EntryHolder<>();
@@ -474,16 +484,16 @@ public class Journal<K, V> implements Shutdownable
public int sizeOfRecord(RecordPointer pointer)
{
- Descriptor descriptor = segments.get().descriptor(pointer.segment);
+ Descriptor descriptor = segments.get().descriptor(pointer.segmentId);
Invariants.nonNull(descriptor);
- return pointer.size - EntrySerializer.overheadSize(keySupport,
descriptor.userVersion);
+ return pointer.length - EntrySerializer.overheadSize(keySupport,
descriptor.userVersion);
}
public boolean read(RecordPointer pointer, RecordConsumer<K> consumer)
{
try (OpOrder.Group group = readOrder.start())
{
- Segment<K, V> segment = segments.get().get(pointer.segment);
+ Segment<K, V> segment = segments.get().get(pointer.segmentId);
return segment != null && segment.read(pointer, consumer);
}
}
@@ -552,7 +562,6 @@ public class Journal<K, V> implements Shutdownable
private ActiveSegment<K, V>.Allocation allocate(int entrySize)
{
-
ActiveSegment<K, V> segment = currentSegment;
ActiveSegment<K, V>.Allocation alloc;
while (null == (alloc = segment.allocate(entrySize)))
@@ -807,11 +816,23 @@ public class Journal<K, V> implements Shutdownable
return oldest;
}
+ public List<Segment<K, V>> getSegments(long lowerBound, long upperBound)
+ {
+ List<Segment<K, V>> res = new ArrayList<>();
+ segments().select(lowerBound, upperBound, res);
+ return res;
+ }
+
public ActiveSegment<K, V> currentActiveSegment()
{
return currentSegment;
}
+ @Nullable protected Segment<K, V> getSegment(long timestamp)
+ {
+ return segments().get(timestamp);
+ }
+
ActiveSegment<K, V> getActiveSegment(long timestamp)
{
// we can race with segment addition to the segments() collection,
with a new segment appearing in currentSegment first
@@ -853,10 +874,12 @@ public class Journal<K, V> implements Shutdownable
private class CloseActiveSegmentRunnable implements Runnable
{
private final ActiveSegment<K, V> activeSegment;
+ private final Runnable onDone;
- CloseActiveSegmentRunnable(ActiveSegment<K, V> activeSegment)
+ CloseActiveSegmentRunnable(ActiveSegment<K, V> activeSegment,
@Nullable Runnable onDone)
{
this.activeSegment = activeSegment;
+ this.onDone = onDone;
}
@Override
@@ -868,10 +891,16 @@ public class Journal<K, V> implements Shutdownable
activeSegment.persistComponents();
replaceCompletedSegment(activeSegment,
StaticSegment.open(activeSegment.descriptor, keySupport));
activeSegment.release(Journal.this);
+ if (onDone != null) onDone.run();
}
}
- void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V> activeSegment)
+ protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V>
activeSegment)
+ {
+ closeActiveSegmentAndOpenAsStatic(activeSegment, null);
+ }
+
+ protected void closeActiveSegmentAndOpenAsStatic(ActiveSegment<K, V>
activeSegment, @Nullable Runnable onDone)
{
if (activeSegment.isEmpty())
{
@@ -880,7 +909,7 @@ public class Journal<K, V> implements Shutdownable
return;
}
- closer.execute(new CloseActiveSegmentRunnable(activeSegment));
+ closer.execute(new CloseActiveSegmentRunnable(activeSegment, onDone));
}
@VisibleForTesting
@@ -981,13 +1010,21 @@ public class Journal<K, V> implements Shutdownable
}
/**
- * Static segment iterator iterates all keys in _static_ segments in order.
+ * Static segment iterator iterates all keys in _static_ segments in order
for the given key range.
*/
public StaticSegmentKeyIterator staticSegmentKeyIterator(K min, K max)
{
return new StaticSegmentKeyIterator(min, max);
}
+ /**
+ * Static segment iterator iterates all keys in selected segments in order.
+ */
+ public StaticSegmentKeyIterator
staticSegmentKeyIterator(Predicate<Segment<K, V>> predicate)
+ {
+ return new StaticSegmentKeyIterator(null, null, predicate);
+ }
+
/**
* List of key and a list of segment descriptors referencing this key
*/
@@ -1013,6 +1050,11 @@ public class Journal<K, V> implements Shutdownable
consumer.accept(segments[i]);
}
+ public long lastSegment()
+ {
+ return segments[segments.length - 1];
+ }
+
public long[] copyOfSegments()
{
return segments == null ? new long[0] : Arrays.copyOf(segments,
size);
@@ -1060,10 +1102,17 @@ public class Journal<K, V> implements Shutdownable
public StaticSegmentKeyIterator(K min, K max)
{
- this.segments = selectAndReference(s -> s.isStatic()
- &&
s.asStatic().index().entryCount() > 0
- && (min == null ||
keySupport.compare(s.index().lastId(), min) >= 0)
- && (max == null ||
keySupport.compare(s.index().firstId(), max) <= 0));
+ this(min, max, s -> {
+ return s.isStatic()
+ && s.asStatic().index().entryCount() > 0
+ && (min == null ||
keySupport.compare(s.index().lastId(), min) >= 0)
+ && (max == null ||
keySupport.compare(s.index().firstId(), max) <= 0);
+ });
+ }
+
+ public StaticSegmentKeyIterator(K min, K max, Predicate<Segment<K, V>>
predicate)
+ {
+ this.segments = selectAndReference(predicate);
List<Iterator<Head>> iterators = new ArrayList<>(segments.count());
for (Segment<K, V> segment : segments.allSorted(true))
diff --git a/src/java/org/apache/cassandra/journal/Metadata.java
b/src/java/org/apache/cassandra/journal/Metadata.java
index 13198fa77a..6f9552a229 100644
--- a/src/java/org/apache/cassandra/journal/Metadata.java
+++ b/src/java/org/apache/cassandra/journal/Metadata.java
@@ -33,23 +33,25 @@ import static
org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
* - total count of records in this segment file
* used for compaction prioritisation
*/
-final class Metadata
+public final class Metadata
{
private int fsyncLimit;
-
+ // Indicates whether a segment needs to be replayed or no.
+ private volatile boolean needsReplay;
private volatile int recordsCount;
private static final AtomicIntegerFieldUpdater<Metadata>
recordsCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(Metadata.class, "recordsCount");
static Metadata empty()
{
- return new Metadata(0, 0);
+ return new Metadata(0, 0, true);
}
- private Metadata(int recordsCount, int fsyncLimit)
+ private Metadata(int recordsCount, int fsyncLimit, boolean needsReplay)
{
this.recordsCount = recordsCount;
this.fsyncLimit = fsyncLimit;
+ this.needsReplay = needsReplay;
}
void update()
@@ -62,17 +64,27 @@ final class Metadata
this.fsyncLimit = fsyncLimit;
}
+ public void clearNeedsReplay()
+ {
+ this.needsReplay = false;
+ }
+
int fsyncLimit()
{
return fsyncLimit;
}
+ public boolean needsReplay()
+ {
+ return needsReplay;
+ }
+
private void incrementRecordsCount()
{
recordsCountUpdater.incrementAndGet(this);
}
- int totalCount()
+ public int totalCount()
{
return recordsCount;
}
@@ -82,8 +94,10 @@ final class Metadata
CRC32 crc = Crc.crc32();
out.writeInt(recordsCount);
out.writeInt(fsyncLimit);
+ out.writeBoolean(needsReplay);
updateChecksumInt(crc, recordsCount);
updateChecksumInt(crc, fsyncLimit);
+ updateChecksumInt(crc, needsReplay ? 1 : 0);
out.writeInt((int) crc.getValue());
}
@@ -92,10 +106,12 @@ final class Metadata
CRC32 crc = Crc.crc32();
int recordsCount = in.readInt();
int fsyncLimit = in.readInt();
+ boolean needsReplay = in.readBoolean();
updateChecksumInt(crc, recordsCount);
updateChecksumInt(crc, fsyncLimit);
+ updateChecksumInt(crc, needsReplay ? 1 : 0);
validateCRC(crc, in.readInt());
- return new Metadata(recordsCount, fsyncLimit);
+ return new Metadata(recordsCount, fsyncLimit, needsReplay);
}
void persist(Descriptor descriptor)
@@ -145,7 +161,7 @@ final class Metadata
throw e;
}
- return new Metadata(recordsCount, fsyncLimit);
+ return new Metadata(recordsCount, fsyncLimit, true);
}
static <K> Metadata rebuildAndPersist(Descriptor descriptor, KeySupport<K>
keySupport)
@@ -156,11 +172,11 @@ final class Metadata
}
@Override
- public String toString()
- {
+ public String toString() {
return "Metadata{" +
- "fsyncLimit=" + fsyncLimit +
- ", recordsCount=" + recordsCount +
- '}';
+ "fsyncLimit=" + fsyncLimit +
+ ", needsReplay=" + needsReplay +
+ ", recordsCount=" + recordsCount +
+ '}';
}
}
diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java
b/src/java/org/apache/cassandra/journal/RecordConsumer.java
index 22d2bc4e9f..4d2b28b419 100644
--- a/src/java/org/apache/cassandra/journal/RecordConsumer.java
+++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.journal;
import java.nio.ByteBuffer;
+// TODO (desired): rename to SerializedRecordConsumer
@FunctionalInterface
public interface RecordConsumer<K>
{
diff --git a/src/java/org/apache/cassandra/journal/RecordPointer.java
b/src/java/org/apache/cassandra/journal/RecordPointer.java
index 22d874712d..8e5c08ca51 100644
--- a/src/java/org/apache/cassandra/journal/RecordPointer.java
+++ b/src/java/org/apache/cassandra/journal/RecordPointer.java
@@ -18,33 +18,29 @@
package org.apache.cassandra.journal;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
// TODO: make this available in the accord table as an ID
-public class RecordPointer implements Comparable<RecordPointer>
+public class RecordPointer extends CommitLogPosition
{
- public final long segment; // unique segment id
- public final int position; // record start position within the segment
- public final int size; // full size of the record
+ public final int length; // full size of the record
public final long writtenAt; // only set for periodic mode
- public RecordPointer(long segment, int position, int size)
+ public RecordPointer(long segment, int position, int length)
{
- this(segment, position, size, 0);
+ this(segment, position, length, 0);
}
- public RecordPointer(long segment, int position, int size, long writtenAt)
+ public RecordPointer(long segment, int position, int length, long
writtenAt)
{
- this.segment = segment;
- this.position = position;
- this.size = size;
+ super(segment, position);
+ this.length = length;
this.writtenAt = writtenAt;
}
public RecordPointer(RecordPointer pointer)
{
- this(pointer.segment, pointer.position, pointer.size,
pointer.writtenAt);
+ this(pointer.segmentId, pointer.position, pointer.length,
pointer.writtenAt);
}
@Override
@@ -55,26 +51,19 @@ public class RecordPointer implements
Comparable<RecordPointer>
if (!(other instanceof RecordPointer))
return false;
RecordPointer that = (RecordPointer) other;
- return this.segment == that.segment
+ return this.segmentId == that.segmentId
&& this.position == that.position;
}
@Override
public int hashCode()
{
- return Long.hashCode(segment) + position * 31;
+ return Long.hashCode(segmentId) + position * 31;
}
@Override
public String toString()
{
- return "(" + segment + ", " + position + ')';
- }
-
- @Override
- public int compareTo(RecordPointer that)
- {
- int cmp = Longs.compare(this.segment, that.segment);
- return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
+ return "(" + segmentId + ", " + position + ')';
}
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/journal/Segment.java
b/src/java/org/apache/cassandra/journal/Segment.java
index a80bbb4f4c..0b34f14516 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -66,16 +66,24 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
abstract boolean isActive();
abstract boolean isFlushed(long position);
- boolean isStatic() { return !isActive(); }
abstract ActiveSegment<K, V> asActive();
abstract StaticSegment<K, V> asStatic();
+ public boolean isStatic() { return !isActive(); }
+
+ public Metadata metadata()
+ {
+ return metadata;
+ }
+
public long id()
{
return descriptor.timestamp;
}
+ public abstract void persistMetadata();
+
/*
* Reading entries (by id, by offset, iterate)
*/
@@ -110,7 +118,7 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
boolean read(RecordPointer pointer, RecordConsumer<K> consumer)
{
EntrySerializer.EntryHolder<K> into = new
EntrySerializer.EntryHolder<>();
- if (read(pointer.position, pointer.size, into))
+ if (read(pointer.position, pointer.length, into))
{
consumer.accept(descriptor.timestamp, pointer.position, into.key,
into.value, descriptor.userVersion);
return true;
diff --git a/src/java/org/apache/cassandra/journal/Segments.java
b/src/java/org/apache/cassandra/journal/Segments.java
index 50c32ed8b6..4e01bd47b4 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -128,6 +128,37 @@ class Segments<K, V>
into.add(segment.asActive());
}
+ void select(long minTimestamp, long maxTimestamp, Collection<Segment<K,
V>> into)
+ {
+ List<Segment<K, V>> sorted = allSorted(true);
+ int idx = minTimestamp == 0 ? 0 : findIdxFor(minTimestamp);
+ while (idx < sorted.size())
+ {
+ Segment<K, V> segment = sorted.get(idx++);
+ if (segment.descriptor.timestamp > maxTimestamp)
+ break;
+ into.add(segment);
+ }
+ }
+
+ int findIdxFor(long timestamp)
+ {
+ List<Segment<K, V>> sorted = allSorted(true);
+ int low = 0, mid = sorted.size(), high = mid - 1, res = -1;
+ while (low <= high)
+ {
+ mid = (low + high) >>> 1;
+ res = Long.compare(timestamp,
sorted.get(mid).descriptor.timestamp);
+ if (res > 0)
+ low = mid + 1;
+ else if (res == 0)
+ return mid;
+ else
+ high = mid - 1;
+ }
+ throw new IllegalStateException(String.format("Could not find a
segment with timestamp %d among %s", timestamp, sorted));
+ }
+
boolean isSwitched(ActiveSegment<K, V> active)
{
for (Segment<K, V> segment : segments.values())
@@ -232,6 +263,7 @@ class Segments<K, V>
@Override
public String toString()
{
+ List<Segment<K, V>> sorted = allSorted(true);
return sorted.toString();
}
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index 35c987c8a4..e78187b129 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.journal;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Closeable;
+import org.apache.cassandra.utils.SyncUtil;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -270,6 +271,13 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
return this;
}
+ @Override
+ public void persistMetadata()
+ {
+ metadata.persist(descriptor);
+ SyncUtil.trySyncDir(descriptor.directory);
+ }
+
/**
* Read the entry and specified offset into the entry holder.
* Expects the record to have been written at this offset, but potentially
not flushed and lost.
diff --git a/src/java/org/apache/cassandra/replication/MutationId.java
b/src/java/org/apache/cassandra/replication/MutationId.java
index 5faf4db7d5..259b950b33 100644
--- a/src/java/org/apache/cassandra/replication/MutationId.java
+++ b/src/java/org/apache/cassandra/replication/MutationId.java
@@ -46,6 +46,12 @@ public class MutationId extends ShortMutationId
*/
protected final int timestamp;
+ public MutationId(long logId, int offset, int timestamp)
+ {
+ super(logId, offset);
+ this.timestamp = timestamp;
+ }
+
public MutationId(long logId, long sequenceId)
{
super(logId, offset(sequenceId));
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index aeccc3f052..2de55de424 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -20,35 +20,54 @@ package org.apache.cassandra.replication;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
import java.util.zip.Checksum;
-
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.cassandra.journal.*;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.journal.Journal;
-import org.apache.cassandra.journal.KeySupport;
-import org.apache.cassandra.journal.Params;
-import org.apache.cassandra.journal.RecordConsumer;
-import org.apache.cassandra.journal.RecordPointer;
-import org.apache.cassandra.journal.SegmentCompactor;
-import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Semaphore;
+import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
+
+// TODO (required): handle table truncations
public class MutationJournal
{
public static final MutationJournal instance = new MutationJournal();
+ private static final Logger log =
LoggerFactory.getLogger(MutationJournal.class);
private final Journal<ShortMutationId, Mutation> journal;
+ private final Map<Long, SegmentStateTracker> segmentStateTrackers;
+
+ // Most of the time during write, we will notify last known segment, so we
optimistically cache last segment tracker,
+ // without imposing any visibility guarantees. If we do not see the right
segment in this field, we will look it up
+ // in NBHM.
+ private SegmentStateTracker lastSegmentTracker;
private MutationJournal()
{
@@ -58,7 +77,38 @@ public class MutationJournal
@VisibleForTesting
MutationJournal(File directory, Params params)
{
- journal = new Journal<>("MutationJournal", directory, params, new
MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop());
+ journal = new Journal<>("MutationJournal", directory, params,
MutationIdSupport.INSTANCE, MutationSerializer.INSTANCE,
SegmentCompactor.noop()) {
+ @Override
+ protected void
closeActiveSegmentAndOpenAsStatic(ActiveSegment<ShortMutationId, Mutation>
activeSegment, Runnable onDone)
+ {
+ super.closeActiveSegmentAndOpenAsStatic(activeSegment,
+ () -> {
+
maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id())));
+ if (onDone !=
null) onDone.run();
+ });
+ }
+ };
+ segmentStateTrackers = new NonBlockingHashMapLong<>();
+ }
+
+ public CommitLogPosition getCurrentPosition()
+ {
+ return journal.currentActiveSegment().currentPosition();
+ }
+
+ // If all Memtables associated with given segment were flushed by the time
we have closed active segment
+ // and opened it as static, mark its metadata to indicate it does not need
replay. It may happen that we
+ // crash before persisting this metadata, in which case we will
unnecessarily replay the segment, which
+ // has no correctness implications.
+ private void maybeCleanupStaticSegment(Segment<ShortMutationId, Mutation>
segment)
+ {
+ Invariants.require(segment.isStatic());
+ SegmentStateTracker tracker = segmentStateTrackers.get(segment.id());
+ if (tracker != null && tracker.removeCleanFromDirty())
+ {
+ segment.metadata().clearNeedsReplay();
+ segment.persistMetadata();
+ }
}
public void start()
@@ -73,7 +123,40 @@ public class MutationJournal
public RecordPointer write(ShortMutationId id, Mutation mutation)
{
- return journal.blockingWrite(id, mutation);
+ // TODO (required): why are we using blocking write here? We
can/should wait for completion on `close` of WriteContext.
+ RecordPointer ptr = journal.blockingWrite(id, mutation);
+
+ // IMPORTANT: there should be no way for mutation to be applied to
memtable before we mark it as dirty here,
+ // since this will introduce a race between marking as dirty and
marking as clean.
+ for (TableId tableId : mutation.getTableIds())
+ {
+ SegmentStateTracker tracker = lastSegmentTracker;
+ if (tracker == null || tracker.segmentId() != ptr.segmentId)
+ {
+ tracker = segmentStateTrackers.computeIfAbsent(ptr.segmentId,
SegmentStateTracker::new);
+ lastSegmentTracker = tracker;
+ }
+
+ tracker.markDirty(tableId, ptr);
+ }
+
+ return ptr;
+ }
+
+ /**
+ * Called by post-flush callback, Memtable is fully flushed to SSTable.
+ */
+ public void notifyFlushed(TableId tableId, CommitLogPosition lowerBound,
CommitLogPosition upperBound)
+ {
+ for (Segment<ShortMutationId, Mutation> segment :
journal.getSegments(lowerBound.segmentId, upperBound.segmentId))
+ {
+
Invariants.nonNull(segmentStateTrackers.get(segment.id())).markClean(tableId,
lowerBound, upperBound);
+
+ // We can only safely mark static segments as non-replayable.
Active segment can still be written to,
+ // so we only persist this metadata on flush.
+ if (segment.isStatic())
+ maybeCleanupStaticSegment(segment);
+ }
}
@Nullable
@@ -116,6 +199,89 @@ public class MutationJournal
}
}
+ public void replayStaticSegments()
+ {
+ replay(new DeserializedRecordConsumer<>(MutationSerializer.INSTANCE)
+ {
+ @Override
+ protected void accept(long segmentId, int position,
ShortMutationId key, Mutation value)
+ {
+ if
(Schema.instance.getKeyspaceMetadata(value.getKeyspaceName()) == null)
+ return;
+ // TODO: if (commitLogReplayer.pointInTimeExceeded(mutation))
+ final Keyspace keyspace =
Keyspace.open(value.getKeyspaceName());
+
+ Mutation.PartitionUpdateCollector newPUCollector = null;
+ // TODO (required): replayFilter
+ for (Map.Entry<TableId, PartitionUpdate> e :
value.modifications().entrySet())
+ {
+ PartitionUpdate update = e.getValue();
+ update.validate();
+ if (Schema.instance.getTableMetadata(update.metadata().id)
== null)
+ continue; // dropped
+ TableId tableId = e.getKey();
+
+ // Start segment state tracking
+ segmentStateTrackers.computeIfAbsent(segmentId,
SegmentStateTracker::new)
+ .markDirty(tableId, segmentId,
position);
+ // TODO (required): shouldReplay
+ if (newPUCollector == null)
+ newPUCollector = new
Mutation.PartitionUpdateCollector(value.id(), value.getKeyspaceName(),
value.key());
+ newPUCollector.add(update);
+ // TODO (required): replayedCount
+ }
+ if (newPUCollector != null)
+ {
+ assert !newPUCollector.isEmpty();
+ keyspace.apply(newPUCollector.build(), false, true, false);
+ }
+ }
+ }, getAvailableProcessors());
+ }
+
+ @VisibleForTesting
+ public void replay(DeserializedRecordConsumer<ShortMutationId, Mutation>
replayOne, int parallelism)
+ {
+ try (Journal<ShortMutationId, Mutation>.StaticSegmentKeyIterator iter =
+ journal.staticSegmentKeyIterator(s -> s.isStatic()
+ &&
s.metadata().totalCount() > 0
+ &&
s.metadata().needsReplay()))
+ {
+ final Semaphore replayParallelism =
Semaphore.newSemaphore(parallelism);
+ final AtomicBoolean abort = new AtomicBoolean();
+
+ while (iter.hasNext() && !abort.get())
+ {
+ Journal.KeyRefs<ShortMutationId> v = iter.next();
+ v = v; // Make sure it can not be used in async lambda by
accident
+ ShortMutationId key = v.key();
+ long lastSegment = v.lastSegment();
+ // TODO: respect
SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
+ replayParallelism.acquireThrowUncheckedOnInterrupt(1);
+ Stage.MUTATION.submit(() -> journal.readLast(key, lastSegment,
replayOne))
+ .addCallback(new BiConsumer<Object, Throwable>()
+ {
+ @Override
+ public void accept(Object o, Throwable fail)
+ {
+ if (fail != null && !journal.handleError("Could not
replay mutation " + key, fail))
+ abort.set(true);
+ replayParallelism.release(1);
+ }
+ });
+ }
+
+ // Wait for all mutations to be applied before returning
+ replayParallelism.acquireThrowUncheckedOnInterrupt(parallelism);
+ }
+ }
+
+ @VisibleForTesting
+ public void closeCurrentSegmentForTestingIfNonEmpty()
+ {
+ journal.closeCurrentSegmentForTestingIfNonEmpty();
+ }
+
static class JournalParams implements Params
{
@Override
@@ -175,6 +341,8 @@ public class MutationJournal
static class MutationIdSupport implements KeySupport<ShortMutationId>
{
+ static final MutationIdSupport INSTANCE = new MutationIdSupport();
+
static final int LOG_ID_OFFSET = 0;
static final int OFFSET_OFFSET = LOG_ID_OFFSET + TypeSizes.LONG_SIZE;
@@ -245,17 +413,20 @@ public class MutationJournal
}
}
- static class MutationSerializer implements
ValueSerializer<ShortMutationId, Mutation>
+ public static class MutationSerializer implements
ValueSerializer<ShortMutationId, Mutation>
{
+ public static MutationSerializer INSTANCE = new MutationSerializer();
@Override
public void serialize(ShortMutationId id, Mutation mutation,
DataOutputPlus out, int userVersion) throws IOException
{
+ Invariants.require(id.hostId != Integer.MIN_VALUE);
Mutation.serializer.serialize(mutation, out, userVersion);
}
@Override
public Mutation deserialize(ShortMutationId id, DataInputPlus in, int
userVersion) throws IOException
{
+ Invariants.require(id.hostId != Integer.MIN_VALUE);
return Mutation.serializer.deserialize(in, userVersion);
}
}
diff --git a/src/java/org/apache/cassandra/replication/SegmentStateTracker.java
b/src/java/org/apache/cassandra/replication/SegmentStateTracker.java
new file mode 100644
index 0000000000..739af6d48d
--- /dev/null
+++ b/src/java/org/apache/cassandra/replication/SegmentStateTracker.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import accord.utils.Invariants;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.IntegerInterval;
+
+/**
+ * Tracks offsets of clean (i.e. memtable->sstable flushed) and dirty (i.e.
not yet durably persisted in sstable)
+ * allocations.
+ *
+ * Mutations in segments marked as clean do not need to be replayed.
+ *
+ * Tracks which parts of a commit log segment contain unflushed data for each
table, and determines when all
+ * mutations associated with a segment are fully memtable->sstable flushed
+ *
+ * Maintains per-table states:
+ * - "dirty" high mark (bumped when new allocation is made in the segment)
+ * - "clean" intervals (min/max bounds reported via memtable flushes)
+ *
+ * A segment is considered clean when all dirty intervals are covered by clean
intervals for every table.
+ */
+public class SegmentStateTracker
+{
+ final long segmentId;
+
+ private final Map<TableId, IntervalState> states = new HashMap<>(32);
+ private final Lock lock = new ReentrantLock();
+
+ public SegmentStateTracker(long segmentId)
+ {
+ this.segmentId = segmentId;
+ }
+
+ public long segmentId()
+ {
+ return segmentId;
+ }
+
+ @VisibleForTesting
+ public boolean isClean()
+ {
+ removeCleanFromDirty();
+ lock.lock();
+ try
+ {
+ return states.isEmpty();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Should be called _only_ for a static segment to ensure there can be no
way for interval state
+ * to go back from clean to dirty.
+ *
+ * Removes all clean (i.e. memtable -> sstable flushed) from dirty
interval. If metadata tracking for all intervals of all tables
+ * are clean, returns true. False otherwise.
+ */
+ public boolean removeCleanFromDirty()
+ {
+ List<Map.Entry<TableId, IntervalState>> states;
+ // Take a "snapshot" of states, while holding a lock
+ lock.lock();
+ try
+ {
+ states = new ArrayList<>(this.states.entrySet());
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ int[] remove = new int[states.size()];
+ int removeCount = 0;
+
+ // Check if any of the remaining items can be cleaned up, without
holding a lock
+ for (int i = 0; i < states.size(); i++)
+ {
+ IntervalState state = states.get(i).getValue();
+ if (!state.isDirty())
+ remove[removeCount++] = i;
+ }
+
+ // Remove all fully covered items, while holding a lock
+ if (removeCount > 0)
+ {
+ lock.lock();
+ try
+ {
+ if (this.states.size() == removeCount)
+ {
+ this.states.clear();
+ return true;
+ }
+
+ for (int i = 0; i < removeCount; i++)
+ {
+ Map.Entry<TableId, IntervalState> e =
states.get(remove[i]);
+ this.states.remove(e.getKey());
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ return false;
+ }
+
+ public void markDirty(TableId tableId, CommitLogPosition ptr)
+ {
+ markDirty(tableId, ptr.segmentId, ptr.position);
+ }
+
+ public void markDirty(TableId tableId, long segmentId, int position)
+ {
+ Invariants.require(segmentId == this.segmentId);
+ IntervalState state;
+ lock.lock();
+ try
+ {
+ state = states.computeIfAbsent(tableId, (k) -> {
+ // Initialize with given position as both low and high bound
to ensure we correctly set
+ // lower bound when marking as clean
+ return new IntervalState(position, position);
+ });
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ state.markDirty(position);
+ }
+
+ public void markClean(TableId tableId, CommitLogPosition lowerBound,
CommitLogPosition upperBound)
+ {
+ Invariants.require(lowerBound.compareTo(upperBound) <= 0, "%s should
be smaller than %s", lowerBound, upperBound);
+ if (lowerBound.segmentId > segmentId || upperBound.segmentId <
segmentId)
+ return;
+
+ IntervalState state;
+ lock.lock();
+ try
+ {
+ state = states.get(tableId);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ if (state != null)
+ {
+ // TODO (required): test this logic
+ // Only mark clean ranges for _this_ segment
+ int lower = lowerBound.segmentId == segmentId ?
lowerBound.position : 0;
+ int upper = upperBound.segmentId == segmentId ?
upperBound.position : Integer.MAX_VALUE;
+ state.markClean(lower, upper);
+ }
+ }
+
+ private static class IntervalState
+ {
+ static final long[] EMPTY = new long[0];
+
+ // dirty interval in this segment; if interval is not covered by the
clean set, the log contains unflushed data
+ volatile long dirty;
+ // clean intervals; separate map from above to permit marking Cfs
clean whilst the log is still in use
+ volatile long[] clean = EMPTY;
+
+ private static final AtomicLongFieldUpdater<IntervalState>
dirtyUpdater = AtomicLongFieldUpdater.newUpdater (IntervalState.class,
"dirty");
+ private static final AtomicReferenceFieldUpdater<IntervalState,
long[]> cleanUpdater =
AtomicReferenceFieldUpdater.newUpdater(IntervalState.class, long[].class,
"clean");
+
+ public IntervalState(int lower, int upper)
+ {
+ this(make(lower, upper));
+ }
+
+ private IntervalState(long dirty)
+ {
+ this.dirty = dirty;
+ }
+
+ public void markClean(int start, int end)
+ {
+ long[] prev;
+ long[] next;
+ do
+ {
+ prev = this.clean;
+ next = IntegerInterval.Set.add(prev, start, end);
+ }
+ while (!cleanUpdater.compareAndSet(this, prev, next));
+ }
+
+ public boolean isDirty()
+ {
+ long[] clean = this.clean;
+ long dirty = this.dirty;
+ return !IntegerInterval.Set.covers(clean, lower(dirty),
upper(dirty));
+ }
+
+ /**
+ * Expands the interval to cover the given value by extending one of
its sides if necessary.
+ * Mutates this. Thread-safe.
+ */
+ public void markDirty(int value)
+ {
+ long prev;
+ int lower;
+ int upper;
+ do
+ {
+ prev = dirty;
+ upper = upper(prev);
+ lower = lower(prev);
+ if (value > upper) // common case
+ upper = value;
+ else if (value < lower)
+ lower = value;
+ }
+ while (!dirtyUpdater.compareAndSet(this, prev, make(lower,
upper)));
+ }
+
+ public String toString()
+ {
+ long dirty = this.dirty;
+ long[] clean = this.clean;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < clean.length; i++)
+ {
+ long l = clean[i];
+ if (i > 0)
+ sb.append(',');
+
sb.append('[').append(lower(l)).append(',').append(upper(l)).append("]");
+ }
+ return "dirty:[" + lower(dirty) + ',' + upper(dirty) + "],
clean:[" + sb + "]";
+ }
+
+ private static long make(int lower, int upper)
+ {
+ assert lower <= upper;
+ return ((lower & 0xFFFFFFFFL) << 32) | upper & 0xFFFFFFFFL;
+ }
+
+ private static int lower(long interval)
+ {
+ return (int) (interval >>> 32);
+ }
+
+ private static int upper(long interval)
+ {
+ return (int) interval;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultStateTracker{" +
+ "segmentId=" + segmentId +
+ ", states=" + states +
+ '}';
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index b66611d8f9..11647410e2 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -340,6 +340,7 @@ public class CassandraDaemon
try
{
CommitLog.instance.recoverSegmentsOnDisk();
+ MutationJournal.instance.replayStaticSegments();
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 08819934d0..aa568104a2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -343,7 +343,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
&& diff.after.route() != null)
journal.onDurable(pointer, () ->
journalTable.safeNotify(index ->
-
index.update(pointer.segment, key.commandStoreId, key.id, diff.after.route())));
+
index.update(pointer.segmentId, key.commandStoreId, key.id,
diff.after.route())));
if (onFlush != null)
journal.onDurable(pointer, onFlush);
}
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java
b/src/java/org/apache/cassandra/tcm/Startup.java
index 7fc336809c..7333bc468a 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.replication.MutationJournal;
import org.apache.cassandra.replication.MutationTrackingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -283,6 +284,7 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
try
{
CommitLog.instance.recoverSegmentsOnDisk();
+ MutationJournal.instance.replayStaticSegments();
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/utils/IntegerInterval.java
b/src/java/org/apache/cassandra/utils/IntegerInterval.java
index b26ac45f06..d5e43289e6 100644
--- a/src/java/org/apache/cassandra/utils/IntegerInterval.java
+++ b/src/java/org/apache/cassandra/utils/IntegerInterval.java
@@ -22,9 +22,12 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
+import accord.utils.Invariants;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
+import net.nicoulaj.compilecommand.annotations.Inline;
+
/**
* Mutable integer interval class, thread-safe.
* Represents the interval [lower,upper].
@@ -32,7 +35,7 @@ import com.google.common.primitives.Longs;
public class IntegerInterval
{
volatile long interval;
- private static AtomicLongFieldUpdater<IntegerInterval> intervalUpdater =
+ private static final AtomicLongFieldUpdater<IntegerInterval>
intervalUpdater =
AtomicLongFieldUpdater.newUpdater(IntegerInterval.class,
"interval");
private IntegerInterval(long interval)
@@ -126,7 +129,7 @@ public class IntegerInterval
*/
public static class Set
{
- static long[] EMPTY = new long[0];
+ static final long[] EMPTY = new long[0];
private volatile long[] ranges = EMPTY;
@@ -135,11 +138,15 @@ public class IntegerInterval
*/
public synchronized void add(int start, int end)
{
- assert start <= end;
- long[] ranges, newRanges;
- {
- ranges = this.ranges; // take local copy to avoid risk of it
changing in the midst of operation
+ this.ranges = add(this.ranges, start, end);
+ }
+ @Inline
+ public static long[] add(long[] ranges, int start, int end)
+ {
+ Invariants.require(start <= end, "Start (%d) should be less than
or equal to end (%d)", start, end);
+ long[] newRanges;
+ {
// extend ourselves to cover any ranges we overlap
// record directly preceding our end may extend past us, so
take the max of our end and its
int rpos = Arrays.binarySearch(ranges, ((end & 0xFFFFFFFFL) <<
32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position
@@ -174,7 +181,7 @@ public class IntegerInterval
for (int i = rpos + 1; i < ranges.length; ++i)
newRanges[dest++] = ranges[i];
}
- this.ranges = newRanges;
+ return newRanges;
}
/**
@@ -191,8 +198,13 @@ public class IntegerInterval
*/
public boolean covers(int start, int end)
{
- long[] ranges = this.ranges; // take local copy to avoid risk of
it changing in the midst of operation
- int rpos = Arrays.binarySearch(ranges, ((start & 0xFFFFFFFFL) <<
32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position
+ return covers(this.ranges, start, end);
+ }
+
+ @Inline
+ public static boolean covers(long[] ranges, int start, int end)
+ {
+ int rpos = Arrays.binarySearch(ranges, ((start & 0xFFFFFFFFL) <<
32) | 0xFFFFFFFFL); // floor (i.e. greatest <=) of the end position
if (rpos < 0)
rpos = (-1 - rpos) - 1;
if (rpos == -1)
@@ -219,7 +231,7 @@ public class IntegerInterval
public Collection<IntegerInterval> intervals()
{
- return Lists.transform(Longs.asList(ranges), iv -> new
IntegerInterval(iv));
+ return Lists.transform(Longs.asList(ranges), IntegerInterval::new);
}
@Override
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index bd9f69451a..dfa676b074 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -825,6 +825,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
try
{
CommitLog.instance.recoverSegmentsOnDisk();
+ MutationJournal.instance.replayStaticSegments();
}
catch (IOException e)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java
new file mode 100644
index 0000000000..3e0a1ad260
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.log.FuzzTestBase;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
+import org.apache.cassandra.replication.MutationJournal;
+import org.junit.Test;
+
+
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+
+public class MutationTrackingBounceTest extends FuzzTestBase
+{
+ private static final int POPULATION = 1000;
+
+ @Test
+ public void bounceTest() throws Throwable
+ {
+ try (Cluster cluster = builder().withNodes(1).start())
+ {
+ int tables = 10;
+ int writesPerKey = 2;
+ int pks = 100;
+ withRandom(rng -> {
+ cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 1} " +
+ "AND
replication_type='tracked'",
+ KEYSPACE));
+
+ List<HistoryBuilder> builders = new ArrayList<>();
+ for (int i = 0; i < tables; i++)
+ {
+ Generator<SchemaSpec> schemaGen =
SchemaGenerators.trivialSchema(KEYSPACE, () -> "mutation_tracking_bounce_" +
(builders.size() + 1), POPULATION,
+
SchemaSpec.optionsBuilder());
+
+ SchemaSpec schema = schemaGen.generate(rng);
+ cluster.schemaChange(schema.compile());
+ builders.add(new
ReplayingHistoryBuilder(schema.valueGenerators,
+ hb ->
InJvmDTestVisitExecutor.builder()
+
.consistencyLevel(ConsistencyLevel.QUORUM)
+
.build(schema, hb, cluster)));
+ }
+
+ int counter = 0;
+ for (int pk = 0; pk < pks; pk++) {
+ for (HistoryBuilder history : builders)
+ for (int i = 0; i < writesPerKey; i++)
+ history.insert(pk);
+
+ if (++counter % 10 == 0)
+ cluster.get(1).runOnInstance(() ->
MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty());
+ }
+
+ ClusterUtils.stopUnchecked(cluster.get(1));
+ cluster.get(1).startup();
+
+ for (int pk = 0; pk < pks; pk++)
+ for (HistoryBuilder history : builders)
+ for (int i = 0; i < 10; i++)
+ history.selectPartition(pk);
+
+ cluster.get(1).runOnInstance(new
MutationTrackingBounce_ValidateRunnable(tables * pks * writesPerKey));
+ });
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java
new file mode 100644
index 0000000000..0afb402b1d
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.journal.DeserializedRecordConsumer;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.ShortMutationId;
+import org.junit.Assert;
+
+// Separate class for MutationTrackingBounceTest, since without it we were
getting non-serializable class exceptions, likely due to static fields
+public class MutationTrackingBounce_ValidateRunnable implements
IIsolatedExecutor.SerializableRunnable
+{
+ private final int count;
+
+ public MutationTrackingBounce_ValidateRunnable(int expectedMutations)
+ {
+ this.count = expectedMutations;
+ }
+
+ @Override
+ public void run()
+ {
+ AtomicInteger counter = new AtomicInteger();
+ MutationJournal.instance.replay(new
DeserializedRecordConsumer<>(MutationJournal.MutationSerializer.INSTANCE)
+ {
+ Set<ShortMutationId> seen = new HashSet<>();
+ @Override
+ protected void accept(long segment, int position, ShortMutationId
key, Mutation mutation)
+ {
+ if (!seen.add(key))
+ throw new AssertionError(String.format("Should have
witnessed each key just once, but seen %s already", key));
+
+ for (PartitionUpdate partitionUpdate :
mutation.getPartitionUpdates())
+ {
+ if
(!MutationTrackingService.instance.createSummaryForKey(partitionUpdate.partitionKey(),
partitionUpdate.metadata().id, false)
+ .contains(key))
+ {
+ throw new AssertionError(String.format("Mutation %s
should have been witnessed (%s)", mutation, key));
+ }
+ }
+ counter.incrementAndGet();
+ }
+ }, 1);
+ Assert.assertEquals(count, counter.get());
+ }
+}
diff --git a/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java
b/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java
index f44b6e5fa2..ee930bbd3e 100644
--- a/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java
+++ b/test/harry/main/org/apache/cassandra/harry/checker/TestHelper.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.harry.checker;
+import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,20 @@ public class TestHelper
}
}
+ public static void withRandom(long seed,
ModelChecker.ThrowingConsumer<EntropySource> rng, Consumer<Throwable> onError)
+ {
+ try
+ {
+ logger.info("Seed: {}", seed);
+ rng.accept(new JdkRandomEntropySource(seed));
+ }
+ catch (Throwable t)
+ {
+ onError.accept(t);
+ throw new AssertionError(String.format("Caught an exception at
seed:%dL", seed), t);
+ }
+ }
+
public static void repeat(int num,
ExecUtil.ThrowingSerializableRunnable<?> r)
{
for (int i = 0; i < num; i++)
diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java
b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
new file mode 100644
index 0000000000..7e5922237a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.junit.Test;
+
+
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+
+public class SegmentsTest
+{
+ @Test
+ public void testSelect()
+ {
+ withRandom(0l, rng -> {
+ // Create mock segments with different timestamps
+ java.io.File file = File.createTempFile("segments", "test");
+ List<Segment<String, String>> segmentList = new ArrayList<>();
+ Set<Long> taken = new HashSet<>();
+ for (int i = 0; i < 100; i++)
+ {
+ while (true)
+ {
+ long id = rng.nextLong(0, 10_000);
+ if (taken.add(id))
+ {
+ segmentList.add(new TestSegment<>(file, id));
+ break;
+ }
+ }
+ }
+ segmentList.sort(Comparator.comparing(s ->
s.descriptor.timestamp));
+
+ Segments<String, String> segments = Segments.of(segmentList);
+ for (int i = 0; i < 10_000; i++)
+ {
+ // Generate two distinct segment idxs
+ int i1 = rng.nextInt(segmentList.size());
+ int i2;
+ do
+ {
+ i2 = rng.nextInt(segmentList.size());
+ }
+ while (i2 == i1);
+ int min = Math.min(i1, i2);
+ int max = Math.max(i1, i2);
+ List<Segment<String, String>> selected = new ArrayList<>();
+ segments.select(segmentList.get(min).id(),
+ segmentList.get(max).id(),
+ selected);
+ List<Segment<String, String>> expected =
segmentList.subList(min, max + 1);
+ if (!Objects.equals(expected, selected))
+ {
+ throw new AssertionError(String.format("\nExpected: %s\n" +
+ "Selected: %s",
+ expected,
+ selected));
+ }
+ }
+ });
+ }
+
+ private static class TestSegment<K, V> extends Segment<K, V>
+ {
+ TestSegment(File dir, long timestamp)
+ {
+ super(Descriptor.create(new
org.apache.cassandra.io.util.File(dir), timestamp, 1), null, null);
+ }
+
+ @Override
+ void close(Journal<K, V> journal)
+ {
+
+ }
+
+ @Override
+ public boolean isActive()
+ {
+ return false;
+ }
+
+ @Override public boolean isStatic()
+ {
+ return false;
+ }
+
+ @Override Index<K> index() { throw new
UnsupportedOperationException(); }
+ @Override boolean isFlushed(long position) { throw new
UnsupportedOperationException(); }
+ @Override public void persistMetadata() { throw new
UnsupportedOperationException(); }
+ @Override boolean read(int offset, int size,
EntrySerializer.EntryHolder<K> into) { throw new
UnsupportedOperationException(); }
+ @Override public ActiveSegment<K, V> asActive() { throw new
UnsupportedOperationException(); }
+ @Override public StaticSegment<K, V> asStatic() { throw new
UnsupportedOperationException(); }
+ @Override public Ref<Segment<K, V>> selfRef() { throw new
UnsupportedOperationException(); }
+ @Override public Ref<Segment<K, V>> tryRef(){ throw new
UnsupportedOperationException(); }
+ @Override public Ref<Segment<K, V>> ref(){ throw new
UnsupportedOperationException(); }
+
+ @Override
+ public String toString()
+ {
+ return "TestSegment{" +
+ "id=" + descriptor.timestamp +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ TestSegment<K, V> other = (TestSegment<K, V>) obj;
+ return descriptor.equals(other.descriptor);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java
b/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java
new file mode 100644
index 0000000000..a834d9ff8a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/MutationJournalReplayTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.harry.checker.TestHelper;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.DeserializedRecordConsumer;
+import org.apache.cassandra.journal.TestParams;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tools.FieldUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import static
org.apache.cassandra.replication.MutationJournalTest.assertMutationsEqual;
+
+public class MutationJournalReplayTest
+{
+ private static final String KEYSPACE = "ks";
+ private static final String TABLE_PREFIX = "tbl";
+ private static final TableMetadata[] TABLES = new TableMetadata[10];
+ private static MutationJournal journal;
+
+ @BeforeClass
+ public static void setUp() throws IOException
+ {
+ SchemaLoader.prepareServer();
+
+ File directory = new
File(Files.createTempDirectory("mutation-journal-replay-test"));
+ directory.deleteRecursiveOnExit();
+
+ journal = new MutationJournal(directory, new
TestParams(MessagingService.current_version)
+ {
+ @Override
+ public long flushPeriod(TimeUnit units)
+ {
+ return 1;
+ }
+
+ @Override
+ public FlushMode flushMode()
+ {
+ return FlushMode.PERIODIC;
+ }
+ });
+ FieldUtil.setInstanceUnsafe(MutationJournal.class, journal,
"instance");
+ journal.start();
+
+ for (int i = 0; i < TABLES.length; i++)
+ {
+ TABLES[i] = TableMetadata.builder(KEYSPACE, String.format("%s_%d",
TABLE_PREFIX, i))
+
.keyspaceReplicationType(ReplicationType.tracked)
+ .addPartitionKeyColumn("pk",
UTF8Type.instance)
+ .addClusteringColumn("ck",
UTF8Type.instance)
+ .addRegularColumn("value",
UTF8Type.instance)
+ .build();
+ }
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1,
ReplicationType.tracked), TABLES);
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ journal.shutdownBlocking();
+ }
+
+ @Test
+ public void testReplay() throws Throwable
+ {
+ long seed = 0l;
+ TestHelper.withRandom(seed,
+ rng -> {
+ List<Mutation> original = new ArrayList<>();
+ for (int i = 1; i <= 10_000; i++)
+ {
+ MutationId id = new MutationId(100L, i,
i);
+ Mutation mutation = mutation(i % 10,
i).withMutationId(id);
+ journal.write(id, mutation);
+ original.add(mutation);
+ if (i % rng.nextInt(1, 100) > 90)
+
journal.closeCurrentSegmentForTestingIfNonEmpty();
+ }
+
+
journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+ List<Mutation> replayed = new ArrayList<>();
+ journal.replay(new
DeserializedRecordConsumer<ShortMutationId,
Mutation>(MutationJournal.MutationSerializer.INSTANCE)
+ {
+ @Override
+ protected void accept(long segment, int
position, ShortMutationId key, Mutation mutation)
+ {
+ replayed.add(mutation);
+ }
+ }, 1);
+
+ assertMutationsEqual(original, replayed);
+ });
+ }
+
+ @Test
+ public void testReplayFlushed() throws Throwable
+ {
+ long seed = 0l;
+ class Bounds
+ {
+ final CommitLogPosition start;
+ final CommitLogPosition end;
+ final int count;
+
+ Bounds(CommitLogPosition start, CommitLogPosition end, int count)
+ {
+ this.start = start;
+ this.end = end;
+ this.count = count;
+ }
+ }
+ TestHelper.withRandom(seed,
+ rng -> {
+ List<Mutation> original = new ArrayList<>();
+
+ List<Bounds> testFlushBounds = new
ArrayList<>();
+ CommitLogPosition prevPos =
journal.getCurrentPosition();
+ int count = 0;
+ for (int i = 1; i <= 1000; i++)
+ {
+ MutationId id = new MutationId(100L, i,
i);
+ Mutation mutation = mutation(i %
TABLES.length, i).withMutationId(id);
+ journal.write(id, mutation);
+ count++;
+ original.add(mutation);
+ if (i % rng.nextInt(1, 100) > 90)
+ {
+ CommitLogPosition curPos =
journal.getCurrentPosition();
+
journal.closeCurrentSegmentForTestingIfNonEmpty();
+ testFlushBounds.add(new
Bounds(prevPos, curPos, count));
+ count = 0;
+ prevPos = curPos;
+ }
+ }
+
+
journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+ int flushed = 0;
+ for (Bounds bounds : testFlushBounds)
+ {
+ if (rng.nextBoolean())
+ {
+ for (TableMetadata table : TABLES)
+ journal.notifyFlushed(table.id,
bounds.start, bounds.end);
+ flushed += bounds.count;
+ }
+ }
+
+ List<Mutation> replayed = new ArrayList<>();
+ journal.replay(new
DeserializedRecordConsumer<ShortMutationId,
Mutation>(MutationJournal.MutationSerializer.INSTANCE)
+ {
+ @Override
+ protected void accept(long segment, int
position, ShortMutationId key, Mutation mutation)
+ {
+ replayed.add(mutation);
+ }
+ }, 1);
+
+ Assert.assertEquals(original.size() -
flushed,
+ replayed.size());
+ });
+ }
+
+
+ private static String CACHED_STRING = null;
+ private static Mutation mutation(int table, int value)
+ {
+ if (CACHED_STRING == null)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 512; i++)
+ {
+ sb.append('.');
+ }
+ CACHED_STRING = sb.toString();
+ }
+ return new RowUpdateBuilder(TABLES[table], 0, "key_" + value)
+ .clustering("ck")
+ .add("value", CACHED_STRING)
+ .build();
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
index a0c778a3e4..ba741799aa 100644
--- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
+++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
@@ -127,19 +127,20 @@ public class MutationJournalTest
assertMutationsEqual(expected, actual);
}
- private static void assertMutationEquals(Mutation expected, Mutation
actual)
+ public static void assertMutationEquals(Mutation expected, Mutation actual)
{
- assertEquals(serialize(expected), serialize(actual));
+ if (!serialize(expected).equals(serialize(actual)))
+ throw new AssertionError(String.format("Expected %s but got %s",
expected, actual));
}
- private static void assertMutationsEqual(List<Mutation> expected,
List<Mutation> actual)
+ public static void assertMutationsEqual(List<Mutation> expected,
List<Mutation> actual)
{
assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++)
assertMutationEquals(expected.get(i), actual.get(i));
}
- private static ByteBuffer serialize(Mutation mutation)
+ public static ByteBuffer serialize(Mutation mutation)
{
try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
{
diff --git
a/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java
b/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java
new file mode 100644
index 0000000000..928deca008
--- /dev/null
+++ b/test/unit/org/apache/cassandra/replication/SegmentStateTrackerTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.harry.checker.TestHelper;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SegmentStateTrackerTest
+{
+ @Test
+ public void trivialTest()
+ {
+ long segment = 123L;
+ SegmentStateTracker tracker = new SegmentStateTracker(segment);
+ TableId tbl = TableId.generate();
+
+ assertTrue(tracker.isClean());
+ for (int i = 1; i <= 10; i++)
+ {
+ tracker.markDirty(tbl, segment, i);
+ assertFalse(tracker.isClean());
+ if (i % 5 == 0)
+ {
+ tracker.markClean(tbl, pos(segment, i - 5), pos(segment, i));
+ assertTrue(tracker.isClean());
+ }
+ }
+ assertTrue(tracker.isClean());
+ }
+
+ @Test
+ public void memtableSpannignMultipleSegmentsTest()
+ {
+ SegmentStateTracker segment1 = new SegmentStateTracker(1);
+ SegmentStateTracker segment2 = new SegmentStateTracker(2);
+ TableId tbl = TableId.fromLong(1);
+
+ segment1.markDirty(tbl, segment1.segmentId(), 10);
+ segment2.markDirty(tbl, segment2.segmentId(), 10);
+ segment2.markDirty(tbl, segment2.segmentId(), 20);
+
+ segment1.markClean(tbl, pos(segment1.segmentId(), 10),
pos(segment2.segmentId(), 10)); // first flush
+ segment2.markClean(tbl, pos(segment1.segmentId(), 10),
pos(segment2.segmentId(), 10)); // first flush
+ segment2.markClean(tbl, pos(segment2.segmentId(), 10),
pos(segment2.segmentId(), 30)); // second flush
+ Assert.assertTrue(segment1.isClean());
+ Assert.assertTrue(segment2.isClean());
+ }
+
+ @Test
+ public void fuzzTest()
+ {
+ int allocationCount = 10_000;
+ long seed = 0;
+ List<String> ops = null; // new ArrayList<>(); // to enable operation
logging
+ AtomicLong tableIdGen = new AtomicLong();
+ TestHelper.withRandom(seed,
+ rng -> {
+ // This test tracks state from the
perspective of state tracker:
+ // We make allocations by bumping the
pointer, reporting allocations to different tables.
+ // Table can flush subsequent allocations,
and report information back to the segment.
+ class Table
+ {
+ final TableId tableId =
TableId.fromLong(tableIdGen.getAndIncrement());
+
+ // Track all allocations to a cfid in a
Table, and perform contiguous "flushes"
+ // that perform position bounds to
tracker
+ ArrayList<CommitLogPosition>
unflushedAllocations = new ArrayList<>();
+
+ // Memtables _always_ report contiguous
chunks of metadata, see CFS$Flush, new memtable
+ // will always be created with
commitLogUpperBound of the previous one.
+ CommitLogPosition lastFlushMax = null;
+
+ void addAllocation(CommitLogPosition pos)
+ {
+ if (ops != null) ops.add("Allocate "
+ pos + " in " + tableId);
+ unflushedAllocations.add(pos);
+ }
+
+ void
flush(Collection<SegmentStateTracker> trackers)
+ {
+ if (unflushedAllocations.isEmpty())
+ return;
+
+ CommitLogPosition min = lastFlushMax
== null ? unflushedAllocations.get(0) : lastFlushMax;
+ CommitLogPosition max =
unflushedAllocations.get(rng.nextInt(unflushedAllocations.size()));
+ lastFlushMax = max;
+
+ if (ops != null)
ops.add(String.format("Flush %s [%s, %s]", tableId, min, max));
+ for (SegmentStateTracker tracker :
trackers)
+ reportFlushed(tracker, min, max);
+
+ // TODO (required): use array an
copying instead
+ unflushedAllocations.removeIf(alloc
-> alloc.compareTo(min) >= 0 && alloc.compareTo(max) <= 0);
+ }
+
+ boolean hasUnflushed()
+ {
+ return
!unflushedAllocations.isEmpty();
+ }
+
+ boolean hasUnflushedFor(long segment)
+ {
+ for (CommitLogPosition alloc :
unflushedAllocations)
+ {
+ if (alloc.segmentId == segment)
+ return true;
+ }
+ return false;
+ }
+
+ List<CommitLogPosition>
getUnflushedFor(long segment)
+ {
+ List<CommitLogPosition> unflushed =
new ArrayList<>();
+ for (CommitLogPosition alloc :
unflushedAllocations)
+ {
+ if (alloc.segmentId == segment)
+ unflushed.add(alloc);
+ }
+ return unflushed;
+ }
+
+ void reportFlushed(SegmentStateTracker
tracker, CommitLogPosition minBound, CommitLogPosition maxBound)
+ {
+ if (tracker.segmentId() >=
minBound.segmentId && tracker.segmentId() <= maxBound.segmentId)
+ tracker.markClean(tableId,
minBound, maxBound);
+ }
+ }
+
+ int tableCount = 10;
+ List<Table> tables = new
ArrayList<>(tableCount);
+ for (int i = 0; i < tableCount; i++)
+ tables.add(new Table());
+
+ Map<Long, SegmentStateTracker> segments =
new HashMap<>();
+ Runnable validateAllSegments = () -> {
+ for (SegmentStateTracker segment :
segments.values())
+ {
+ boolean segmentIsClean =
segment.isClean();
+ boolean allTablesFlushed =
tables.stream().noneMatch(t -> t.hasUnflushedFor(segment.segmentId()));
+ if (segmentIsClean !=
allTablesFlushed)
+ throw new
IllegalArgumentException(String.format("Segment is %sclean, but table has
%sunflushed allocations:\n%s\n%s",
+
segmentIsClean ? "" : "not ",
+
allTablesFlushed ? "" : "no ",
+
segment,
+
tables.stream().map(t -> Pair.create(t.tableId,
t.getUnflushedFor(segment.segmentId())))
+
.filter(t -> !t.right.isEmpty())
+
.collect(Collectors.toList())
+
));
+ }
+ };
+
+ SegmentStateTracker currentSegment = null;
+ int currentSegmentOffset = 0;
+ for (int i = 0; i < allocationCount; i++)
+ {
+ if (i > 0 && i % 50 == 0)
+ {
+ for (int j = 0; j < 3; j++)
+ {
+ Table table =
tables.get(rng.nextInt(tableCount));
+ table.flush(segments.values());
+ validateAllSegments.run();
+ }
+ }
+
+ if (i % 100 == 0)
+ {
+ currentSegment = new
SegmentStateTracker(segments.size());
+ currentSegmentOffset = 0;
+
segments.put(currentSegment.segmentId(), currentSegment);
+ }
+
+ int size = rng.nextInt(100);
+ Table table =
tables.get(rng.nextInt(tableCount));
+ CommitLogPosition pos =
pos(currentSegment.segmentId(), currentSegmentOffset);
+ table.addAllocation(pos);
+ currentSegment.markDirty(table.tableId,
pos);
+ currentSegmentOffset += size;
+ validateAllSegments.run();
+ }
+
+ while
(tables.stream().anyMatch(Table::hasUnflushed))
+ {
+ Table table =
tables.get(rng.nextInt(tableCount));
+ if (table.hasUnflushed())
+ table.flush(segments.values());
+
+ validateAllSegments.run();
+ }
+ },
+ e -> {
+ if (ops != null)
+ {
+ System.out.println("History: ");
+ for (String op : ops)
+ System.out.println(op);
+ }
+ });
+ }
+
+
+ public static CommitLogPosition pos(long segmentId, int pos)
+ {
+ return new CommitLogPosition(segmentId, pos);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]