This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new a0f7da9b23 Introduce Periodic mode to Accord Journal
a0f7da9b23 is described below
commit a0f7da9b2355d9cf7fddea59c882a76f4acbe73f
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu May 23 14:20:09 2024 +0100
Introduce Periodic mode to Accord Journal
patch by Benedict; reviewed by Aleksey Yeschenko, Alex Petrov and David
Capwell for CASSANDRA-19720
---
.../org/apache/cassandra/config/AccordSpec.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 3 -
.../apache/cassandra/journal/ActiveSegment.java | 37 ++-
src/java/org/apache/cassandra/journal/Flusher.java | 312 +++++++++++++++------
src/java/org/apache/cassandra/journal/Journal.java | 53 +++-
.../apache/cassandra/journal/SegmentWriter.java | 4 +-
.../org/apache/cassandra/journal/Segments.java | 15 +
.../apache/cassandra/journal/SyncedOffsets.java | 36 ++-
.../service/accord/AccordCommandStore.java | 1 +
.../cassandra/service/accord/AccordJournal.java | 8 +-
.../cassandra/service/accord/AccordKeyspace.java | 68 +++--
.../service/accord/AccordObjectSizes.java | 2 +-
.../cassandra/service/accord/AccordService.java | 2 +-
.../accord/serializers/CheckStatusSerializers.java | 3 +-
.../accord/serializers/CommandSerializers.java | 32 +++
.../accord/serializers/FetchSerializers.java | 13 +-
.../accord/serializers/TopologySerializers.java | 15 +-
.../accord/serializers/WaitingOnSerializer.java | 81 ++++--
.../org/apache/cassandra/utils/ByteBufferUtil.java | 2 +-
test/conf/logback-simulator.xml | 2 +-
.../cassandra/distributed/test/TestBaseImpl.java | 4 +
.../distributed/test/accord/AccordLoadTest.java | 97 ++++++-
.../cassandra/simulator/ClusterSimulation.java | 14 +-
.../cassandra/simulator/SimulationRunner.java | 2 +-
.../simulator/paxos/AccordClusterSimulation.java | 10 +-
.../paxos/PairOfSequencesAccordSimulation.java | 6 +-
.../simulator/paxos/PaxosClusterSimulation.java | 5 -
.../simulator/paxos/PaxosSimulationRunner.java | 2 +-
.../cassandra/journal/SyncedOffsetsTest.java | 4 +-
.../serializers/WaitingOnSerializerTest.java | 2 +-
30 files changed, 620 insertions(+), 219 deletions(-)
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java
b/src/java/org/apache/cassandra/config/AccordSpec.java
index b035b0b9b5..2e0d614957 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -30,7 +30,7 @@ public class AccordSpec
public volatile OptionaldPositiveInt shard_count =
OptionaldPositiveInt.UNDEFINED;
- public volatile DurationSpec.IntSecondsBound progress_log_schedule_delay =
new DurationSpec.IntSecondsBound(1);
+ public volatile DurationSpec.IntMillisecondsBound
progress_log_schedule_delay = new DurationSpec.IntMillisecondsBound(100);
/**
* When a barrier transaction is requested how many times to repeat
attempting the barrier before giving up
@@ -79,7 +79,7 @@ public class AccordSpec
{
public int segmentSize = 32 << 20;
public FailurePolicy failurePolicy = FailurePolicy.STOP;
- public FlushMode flushMode = FlushMode.BATCH;
+ public FlushMode flushMode = FlushMode.PERIODIC;
public DurationSpec.IntMillisecondsBound flushPeriod; // pulls default
from 'commitlog_sync_period'
public DurationSpec.IntMillisecondsBound periodicFlushLagBlock = new
DurationSpec.IntMillisecondsBound("1500ms");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 285d7308b7..c98182cdd3 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -154,9 +154,6 @@ import static
org.apache.cassandra.utils.Clock.Global.logInitializationOutcome;
public class DatabaseDescriptor
{
- public static final String
NO_ACCORD_PAXOS_STRATEGY_WITH_ACCORD_DISABLED_MESSAGE =
- "Cannot use lwt_strategy \"accord\" while Accord transactions are
disabled.";
-
static
{
// This static block covers most usages
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index f16126c157..1fd2e4dd1a 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -24,6 +24,7 @@ import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import com.codahale.metrics.Timer;
@@ -50,6 +51,9 @@ final class ActiveSegment<K, V> extends Segment<K, V>
* Everything before this offset has been written and flushed.
*/
private volatile int lastFlushedOffset = 0;
+ private volatile int lastFsyncOffset = 0;
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<ActiveSegment>
lastFsyncOffsetUpdater =
AtomicIntegerFieldUpdater.newUpdater(ActiveSegment.class, "lastFsyncOffset");
/*
* End position of the buffer; initially set to its capacity and
@@ -86,7 +90,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
@SuppressWarnings("resource")
static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params
params, KeySupport<K> keySupport)
{
- SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor, true);
+ SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor);
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
Metadata metadata = Metadata.create();
return new ActiveSegment<>(descriptor, params, syncedOffsets, index,
metadata, keySupport);
@@ -152,7 +156,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
boolean isEmpty = discardUnusedTail();
if (!isEmpty)
{
- flush();
+ flush(true);
if (persistComponents) persistComponents();
}
release();
@@ -261,21 +265,37 @@ final class ActiveSegment<K, V> extends Segment<K, V>
* TODO FIXME: calls from outside Flusher + callbacks
* @return last synced offset
*/
- synchronized int flush()
+ synchronized int flush(boolean fsync)
{
int allocatePosition = this.allocatePosition.get();
if (lastFlushedOffset >= allocatePosition)
return lastFlushedOffset;
waitForModifications();
- flushInternal();
+ if (fsync)
+ {
+ fsyncInternal();
+ lastFsyncOffsetUpdater.accumulateAndGet(this, allocatePosition,
Math::max);
+ }
lastFlushedOffset = allocatePosition;
int syncedOffset = Math.min(allocatePosition, endOfBuffer);
- syncedOffsets.mark(syncedOffset);
+ syncedOffsets.mark(syncedOffset, fsync);
flushComplete.signalAll();
return syncedOffset;
}
+ // provides no ordering guarantees
+ void fsync()
+ {
+ int lastFlushed = lastFlushedOffset;
+ if (lastFsyncOffset >= lastFlushed)
+ return;
+
+ fsyncInternal();
+ syncedOffsets.fsync();
+ lastFsyncOffsetUpdater.accumulateAndGet(this, lastFlushed, Math::max);
+ }
+
private void waitForFlush(int position)
{
while (lastFlushedOffset < position)
@@ -297,7 +317,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
appendOrder.awaitNewBarrier();
}
- private void flushInternal()
+ private void fsyncInternal()
{
try
{
@@ -314,6 +334,11 @@ final class ActiveSegment<K, V> extends Segment<K, V>
return syncedOffset >= endOfBuffer;
}
+ boolean isCompletedAndFullyFsynced()
+ {
+ return lastFsyncOffset >= endOfBuffer;
+ }
+
/**
* Ensures no more of this segment is writeable, by allocating any unused
section at the end
* and marking it discarded void discartUnusedTail()
diff --git a/src/java/org/apache/cassandra/journal/Flusher.java
b/src/java/org/apache/cassandra/journal/Flusher.java
index c4c6d75348..607943b635 100644
--- a/src/java/org/apache/cassandra/journal/Flusher.java
+++ b/src/java/org/apache/cassandra/journal/Flusher.java
@@ -17,12 +17,15 @@
*/
package org.apache.cassandra.journal;
-import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.utils.Invariants;
import com.codahale.metrics.Timer;
import org.apache.cassandra.concurrent.Interruptible;
import org.apache.cassandra.concurrent.Interruptible.TerminateException;
@@ -33,7 +36,6 @@ import org.apache.cassandra.utils.concurrent.Semaphore;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static java.lang.String.format;
-import static java.util.Comparator.comparing;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
@@ -42,7 +44,7 @@ import static
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SY
import static
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
import static
org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
-import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.apache.cassandra.journal.Params.FlushMode.PERIODIC;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
@@ -60,16 +62,19 @@ final class Flusher<K, V>
private final AsyncCallbacks<K, V> callbacks;
private volatile Interruptible flushExecutor;
+ private volatile Interruptible fsyncExecutor;
// counts of total pending write and written entries
private final AtomicLong pending = new AtomicLong(0);
private final AtomicLong written = new AtomicLong(0);
- // all Allocations written before this time will be flushed
- volatile long lastFlushedAt = currentTimeMillis();
+ // the time of the last initiated flush
+ volatile long flushStartedAt = nanoTime();
+ // the time of the earliest flush that has completed an fsync; all
Allocations written before this time are durable
+ volatile long fsyncFinishedFor = flushStartedAt;
// a signal that writers can wait on to be notified of a completed flush
in PERIODIC FlushMode
- private final WaitQueue flushComplete = newWaitQueue();
+ private final WaitQueue fsyncComplete = newWaitQueue(); // TODO
(expected): this is only used for testing, can we remove this?
// a signal and flag that callers outside the flusher thread can use
// to signal they want the journal segments to be flushed to disk
@@ -97,20 +102,144 @@ final class Flusher<K, V>
void shutdown()
{
flushExecutor.shutdown();
+ if (fsyncExecutor != null)
+ fsyncExecutor.shutdown();
}
@Simulate(with={MONITORS,GLOBAL_CLOCK,LOCK_SUPPORT})
private class FlushRunnable implements Interruptible.Task
{
- private final MonotonicClock clock;
+ @Simulate(with={MONITORS,GLOBAL_CLOCK,LOCK_SUPPORT})
+ private class FSyncRunnable implements Interruptible.Task
+ {
+ // this is written only by the Flusher thread, and read only by
the Fsync thread
+ ActiveSegment<K, V> fsyncUpTo;
+ ActiveSegment<K, V> fsyncing;
+
+ private volatile Thread awaitingWork;
+
+ // all Allocations written before this time will be written to at
least the OS page cache;
+ volatile long fsyncWaitingSince = 0;
+ // the time of the earliest flush that has begun participating in
an fsync
+ volatile long fsyncStartedFor = 0;
+
+ @Override
+ public void run(Interruptible.State state) throws
InterruptedException
+ {
+ try
+ {
+ doRun(state);
+ }
+ catch (Throwable t)
+ {
+ if (!journal.handleError("Failed to flush segments to
disk", t))
+ throw new TerminateException();
+ }
+ }
+
+ private void awaitWork() throws InterruptedException
+ {
+ long lastStartedAt = fsyncStartedFor;
+ if (fsyncWaitingSince != lastStartedAt)
+ return;
+
+ awaitingWork = Thread.currentThread();
+ do
+ {
+ if (Thread.interrupted())
+ {
+ awaitingWork = null;
+ throw new InterruptedException();
+ }
+
+ LockSupport.park();
+ }
+ while (fsyncWaitingSince == lastStartedAt);
+
+ awaitingWork = null;
+ }
+
+ void notify(Thread notify)
+ {
+ if (notify != null)
+ LockSupport.unpark(notify);
+ }
+
+ public void doRun(Interruptible.State state) throws
InterruptedException
+ {
+ awaitWork();
+ if (fsyncing == null)
+ fsyncing = journal.oldestActiveSegment();
+
+ // invert order of access; we might see a future fsyncTo, but
at worst this means redundantly invoking fsync before updating fsyncStartedFor
+ long startedAt = fsyncWaitingSince;
+ ActiveSegment<K, V> fsyncTo = this.fsyncUpTo;
+ fsyncStartedFor = startedAt;
+ // synchronized to prevent thread interrupts while performing
IO operations and also
+ // clear interrupted status to prevent
ClosedByInterruptException in ActiveSegment::flush
+ synchronized (this)
+ {
+ boolean ignore = Thread.interrupted();
+ while (fsyncing != fsyncTo)
+ {
+ fsyncing.fsync();
+ journal.closeActiveSegmentAndOpenAsStatic(fsyncing);
+ fsyncing =
journal.getActiveSegment(fsyncing.descriptor.timestamp + 1);
+ }
+ fsyncing.fsync();
+ }
+ fsyncFinishedFor = startedAt;
+ fsyncComplete.signalAll();
+ long finishedAt = clock.now();
+ processDuration(startedAt, finishedAt);
+ }
+
+ void afterFlush(long startedAt, ActiveSegment<K, V> segment, int
syncedOffset)
+ {
+ long requireFsyncTo = startedAt - periodicFlushLagBlockNanos();
+
+ fsyncUpTo = segment;
+ fsyncWaitingSince = startedAt;
+
+ notify(awaitingWork);
+
+ if (requireFsyncTo > fsyncFinishedFor)
+ awaitFsyncAt(requireFsyncTo,
journal.metrics.waitingOnFlush.time());
+ callbacks.onFlush(segment.descriptor.timestamp, syncedOffset);
+ }
+
+ private void doNoOpFlush(long startedAt)
+ {
+ if (fsyncFinishedFor >= fsyncWaitingSince)
+ {
+ fsyncFinishedFor = startedAt;
+ }
+ else
+ {
+ // if the flusher is still running, update the
waitingSince register
+ fsyncWaitingSince = startedAt;
+ notify(awaitingWork);
+ }
+ }
+ }
+
private final NoSpamLogger noSpamLogger;
+ private final MonotonicClock clock;
+ private final @Nullable FSyncRunnable fSyncRunnable;
+
+ private ActiveSegment<K, V> current = null;
- private final ArrayList<ActiveSegment<K, V>> segmentsToFlush = new
ArrayList<>();
+ private long firstLaggedAt = Long.MIN_VALUE; // first lag ever or
since last logged warning
+ private int fsyncCount = 0; // flush count since
firstLaggedAt
+ private int lagCount = 0; // lag count since
firstLaggedAt
+ private long duration = 0; // time spent flushing since
firstLaggedAt
+ private long lagDuration = 0; // cumulative lag since
firstLaggedAt
FlushRunnable(MonotonicClock clock)
{
- this.clock = clock;
this.noSpamLogger = NoSpamLogger.wrap(logger, 5, MINUTES);
+ this.clock = clock;
+ this.fSyncRunnable = params.flushMode() == PERIODIC ?
newFsyncRunnable() : null;
}
@Override
@@ -131,8 +260,9 @@ final class Flusher<K, V>
public void doRun(Interruptible.State state) throws
InterruptedException
{
- long startedRunAt = clock.now();
- boolean flushToDisk = lastFlushedAt + flushPeriodNanos() <=
startedRunAt || state != NORMAL || flushRequested;
+ long startedAt = clock.now();
+ long flushPeriodNanos = flushPeriodNanos();
+ boolean flushToDisk = flushStartedAt + flushPeriodNanos <=
startedAt || state != NORMAL || flushRequested;
// synchronized to prevent thread interrupts while performing IO
operations and also
// clear interrupted status to prevent ClosedByInterruptException
in ActiveSegment::flush
@@ -142,83 +272,71 @@ final class Flusher<K, V>
if (flushToDisk)
{
flushRequested = false;
- doFlush();
- lastFlushedAt = startedRunAt;
- flushComplete.signalAll();
+ flushStartedAt = startedAt;
+ doFlush(startedAt);
}
}
- long now = clock.now();
- if (flushToDisk)
- processFlushDuration(startedRunAt, now);
-
if (state == SHUTTING_DOWN)
return;
- long flushPeriodNanos = flushPeriodNanos();
if (flushPeriodNanos <= 0)
{
haveWork.acquire(1);
}
else
{
- long wakeUpAt = startedRunAt + flushPeriodNanos;
- if (wakeUpAt > now)
- haveWork.tryAcquireUntil(1, wakeUpAt);
+ long wakeUpAt = startedAt + flushPeriodNanos;
+ haveWork.tryAcquireUntil(1, wakeUpAt);
}
}
- private void doFlush()
+ private void doFlush(long startedAt) throws InterruptedException
{
- journal.selectSegmentToFlush(segmentsToFlush);
- segmentsToFlush.sort(comparing(s -> s.descriptor));
+ boolean synchronousFsync = fSyncRunnable == null;
- try
- {
- long syncedSegment = -1;
- int syncedOffset = -1;
+ if (current == null)
+ current = journal.oldestActiveSegment();
+ ActiveSegment<K, V> newCurrent = journal.currentActiveSegment();
- for (ActiveSegment<K, V> segment : segmentsToFlush)
- {
- if (!segment.shouldFlush())
- break;
+ if (newCurrent == current && (newCurrent == null ||
!newCurrent.shouldFlush()))
+ {
+ if (synchronousFsync) fsyncFinishedFor = startedAt;
+ else fSyncRunnable.doNoOpFlush(startedAt);
+ return;
+ }
- syncedSegment = segment.descriptor.timestamp;
- syncedOffset = segment.flush();
+ Invariants.checkState(newCurrent != null);
- // if an older segment isn't fully complete + flushed yet,
don't attempt to flush any younger ones
- if (!segment.isCompletedAndFullyFlushed(syncedOffset))
- break;
+ try
+ {
+ while (current != newCurrent)
+ {
+ current.discardUnusedTail();
+ current.flush(synchronousFsync);
+ if (synchronousFsync)
+ journal.closeActiveSegmentAndOpenAsStatic(current);
+ current =
journal.getActiveSegment(current.descriptor.timestamp + 1);
}
+ int syncedOffset = current.flush(synchronousFsync);
- // invoke the onFlush() callback once, covering entire flushed
range across all flushed segments
- if (syncedSegment != -1 && syncedOffset != -1)
- callbacks.onFlush(syncedSegment, syncedOffset);
+ if (synchronousFsync) afterFSync(startedAt,
current.descriptor.timestamp, syncedOffset);
+ else fSyncRunnable.afterFlush(startedAt, current,
syncedOffset);
}
catch (Throwable t)
{
callbacks.onFlushFailed(t);
throw t;
}
- finally
- {
- segmentsToFlush.clear();
- }
}
- private long firstLaggedAt = Long.MIN_VALUE; // first lag ever or
since last logged warning
- private int flushCount = 0; // flush count since
firstLaggedAt
- private int lagCount = 0; // lag count since
firstLaggedAt
- private long flushDuration = 0; // time spent flushing
since firstLaggedAt
- private long lagDuration = 0; // cumulative lag since
firstLaggedAt
-
- private void processFlushDuration(long startedFlushAt, long
finishedFlushAt)
+ private void processDuration(long startedFlushAt, long finishedFsyncAt)
{
- flushCount++;
- flushDuration += (finishedFlushAt - startedFlushAt);
+ fsyncCount++;
+ duration += (finishedFsyncAt - startedFlushAt);
long flushPeriodNanos = flushPeriodNanos();
- long lag = finishedFlushAt - (startedFlushAt + flushPeriodNanos);
+ long lag = finishedFsyncAt - (startedFlushAt + flushPeriodNanos);
if (flushPeriodNanos <= 0 || lag <= 0)
return;
@@ -226,26 +344,42 @@ final class Flusher<K, V>
lagDuration += lag;
if (firstLaggedAt == Long.MIN_VALUE)
- firstLaggedAt = finishedFlushAt;
+ firstLaggedAt = finishedFsyncAt;
boolean logged =
- noSpamLogger.warn(finishedFlushAt,
- "Out of {} {} journal flushes over the past
{}s with average duration of {}ms, " +
- "{} have exceeded the configured flush
period by an average of {}ms",
- flushCount,
- journal.name,
- format("%.2f", (finishedFlushAt -
firstLaggedAt) * 1e-9d),
- format("%.2f", flushDuration * 1e-6d /
flushCount),
- lagCount,
- format("%.2f", lagDuration * 1e-6d /
lagCount));
+ noSpamLogger.warn(finishedFsyncAt,
+ "Out of {} {} journal flushes over the past {}s
with average duration of {}ms, " +
+ "{} have exceeded the configured flush period by
an average of {}ms",
+ fsyncCount,
+ journal.name,
+ format("%.2f", (finishedFsyncAt - firstLaggedAt)
* 1e-9d),
+ format("%.2f", duration * 1e-6d / fsyncCount),
+ lagCount,
+ format("%.2f", lagDuration * 1e-6d / lagCount));
if (logged) // reset metrics for next log statement
{
firstLaggedAt = Long.MIN_VALUE;
- flushCount = lagCount = 0;
- flushDuration = lagDuration = 0;
+ fsyncCount = lagCount = 0;
+ duration = lagDuration = 0;
}
}
+
+ private void afterFSync(long startedAt, long syncedSegment, int
syncedOffset)
+ {
+ fsyncFinishedFor = startedAt;
+ callbacks.onFlush(syncedSegment, syncedOffset);
+ fsyncComplete.signalAll();
+ long finishedAt = clock.now();
+ processDuration(startedAt, finishedAt);
+ }
+
+ private FSyncRunnable newFsyncRunnable()
+ {
+ final FSyncRunnable fSyncRunnable = new FSyncRunnable();
+ fsyncExecutor = executorFactory().infiniteLoop(journal.name +
"-fsync", fSyncRunnable, SAFE, NON_DAEMON, SYNCHRONIZED);
+ return fSyncRunnable;
+ }
}
@FunctionalInterface
@@ -295,48 +429,40 @@ final class Flusher<K, V>
written.incrementAndGet();
}
- private void asyncFlushBatch(ActiveSegment<K, V>.Allocation alloc)
+ private void waitForFlushGroup(ActiveSegment<K, V>.Allocation alloc)
{
pending.incrementAndGet();
- requestExtraFlush();
- // alloc.awaitFlush(journal.metrics.waitingOnFlush); // TODO
(expected): collect async flush metrics
+ alloc.awaitFlush(journal.metrics.waitingOnFlush);
pending.decrementAndGet();
written.incrementAndGet();
}
- private void waitForFlushGroup(ActiveSegment<K, V>.Allocation alloc)
+ private void waitForFlushPeriodic(ActiveSegment<K, V>.Allocation ignore)
{
- pending.incrementAndGet();
- alloc.awaitFlush(journal.metrics.waitingOnFlush);
- pending.decrementAndGet();
+ long expectedFlushTime = nanoTime() - periodicFlushLagBlockNanos();
+ if (fsyncFinishedFor < expectedFlushTime)
+ {
+ pending.incrementAndGet();
+ awaitFsyncAt(expectedFlushTime,
journal.metrics.waitingOnFlush.time());
+ pending.decrementAndGet();
+ }
written.incrementAndGet();
}
- private void asyncFlushGroup(ActiveSegment<K, V>.Allocation alloc)
+ private void asyncFlushBatch(ActiveSegment<K, V>.Allocation alloc)
{
- pending.incrementAndGet();
- // alloc.awaitFlush(journal.metrics.waitingOnFlush); // TODO
(expected): collect async flush metrics
- pending.decrementAndGet();
+ requestExtraFlush();
written.incrementAndGet();
}
- private void waitForFlushPeriodic(ActiveSegment<K, V>.Allocation alloc)
+ private void asyncFlushGroup(ActiveSegment<K, V>.Allocation alloc)
{
- long expectedFlushTime = nanoTime() - periodicFlushLagBlockNanos();
- if (lastFlushedAt < expectedFlushTime)
- {
- pending.incrementAndGet();
- awaitFlushAt(expectedFlushTime,
journal.metrics.waitingOnFlush.time());
- pending.decrementAndGet();
- }
written.incrementAndGet();
}
private void asyncFlushPeriodic(ActiveSegment<K, V>.Allocation ignore)
{
- pending.incrementAndGet();
- // awaitFlushAt(expectedFlushTime,
journal.metrics.waitingOnFlush.time()); // TODO (expected): collect async flush
metrics
- pending.decrementAndGet();
+ requestExtraFlush();
written.incrementAndGet();
}
@@ -350,17 +476,17 @@ final class Flusher<K, V>
haveWork.release(1);
}
- private void awaitFlushAt(long flushTime, Timer.Context context)
+ private void awaitFsyncAt(long flushTime, Timer.Context context)
{
do
{
- WaitQueue.Signal signal = flushComplete.register(context,
Timer.Context::stop);
- if (lastFlushedAt < flushTime)
+ WaitQueue.Signal signal = fsyncComplete.register(context,
Timer.Context::stop);
+ if (fsyncFinishedFor < flushTime)
signal.awaitUninterruptibly();
else
signal.cancel();
}
- while (lastFlushedAt < flushTime);
+ while (fsyncFinishedFor < flushTime);
}
private long flushPeriodNanos()
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index eae190a15e..aa61e5aca5 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -35,6 +35,7 @@ import java.util.zip.CRC32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.utils.Invariants;
import com.codahale.metrics.Timer.Context;
import org.agrona.collections.ObjectHashSet;
import org.apache.cassandra.concurrent.Interruptible;
@@ -456,9 +457,6 @@ public class Journal<K, V> implements Shutdownable
// signal the allocator thread to prepare a new segment
wakeAllocator();
- if (null != oldSegment)
- closeActiveSegmentAndOpenAsStatic(oldSegment);
-
// request that the journal be flushed out-of-band, as we've finished
a segment
flusher.requestExtraFlush();
}
@@ -659,6 +657,53 @@ public class Journal<K, V> implements Shutdownable
segments().selectActive(currentSegment.descriptor.timestamp, into);
}
+ ActiveSegment<K, V> oldestActiveSegment()
+ {
+ ActiveSegment<K, V> current = currentSegment;
+ if (current == null)
+ return null;
+
+ ActiveSegment<K, V> oldest = segments().oldestActive();
+ if (oldest == null || oldest.descriptor.timestamp >
current.descriptor.timestamp)
+ return current;
+
+ return oldest;
+ }
+
+ ActiveSegment<K, V> currentActiveSegment()
+ {
+ return currentSegment;
+ }
+
+ ActiveSegment<K, V> getActiveSegment(long timestamp)
+ {
+ // we can race with segment addition to the segments() collection,
with a new segment appearing in currentSegment first
+ // since we are most likely to be requesting the currentSegment
anyway, we resolve this case by checking currentSegment first
+ // and resort to the segments() collection only if we do not match
+ ActiveSegment<K, V> currentSegment = this.currentSegment;
+ if (currentSegment == null)
+ throw new IllegalArgumentException("Requested an active segment
with timestamp " + timestamp + " but there is no currently active segment");
+ long currentSegmentTimestamp = currentSegment.descriptor.timestamp;
+ if (timestamp == currentSegmentTimestamp)
+ {
+ return currentSegment;
+ }
+ else if (timestamp > currentSegmentTimestamp)
+ {
+ throw new IllegalArgumentException("Requested a newer timestamp "
+ timestamp + " than the current active segment " + currentSegmentTimestamp);
+ }
+ else
+ {
+ Segment<K, V> segment = segments().get(timestamp);
+ Invariants.checkState(segment != null, "Segment %d expected to be
found, but neither current segment %d nor in active segments", timestamp,
currentSegmentTimestamp);
+ if (segment == null)
+ throw new IllegalArgumentException("Request the active segment
" + timestamp + " but this segment does not exist");
+ if (!segment.isActive())
+ throw new IllegalArgumentException("Request the active segment
" + timestamp + " but this segment is not active");
+ return segment.asActive();
+ }
+ }
+
/**
* Take care of a finished active segment:
* 1. discard tail
@@ -681,7 +726,7 @@ public class Journal<K, V> implements Shutdownable
public void run()
{
activeSegment.discardUnusedTail();
- activeSegment.flush();
+ activeSegment.flush(true);
activeSegment.persistComponents();
replaceCompletedSegment(activeSegment,
StaticSegment.open(activeSegment.descriptor, keySupport));
activeSegment.release();
diff --git a/src/java/org/apache/cassandra/journal/SegmentWriter.java
b/src/java/org/apache/cassandra/journal/SegmentWriter.java
index 852e955b21..b8436aed66 100644
--- a/src/java/org/apache/cassandra/journal/SegmentWriter.java
+++ b/src/java/org/apache/cassandra/journal/SegmentWriter.java
@@ -101,9 +101,9 @@ final class SegmentWriter<K> implements Closeable
throw new JournalWriteError(descriptor, file, e);
}
- try (SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor,
true))
+ try (SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor))
{
- syncedOffsets.mark(position());
+ syncedOffsets.mark(position(), true);
}
index.persist(descriptor);
diff --git a/src/java/org/apache/cassandra/journal/Segments.java
b/src/java/org/apache/cassandra/journal/Segments.java
index 0693997ef3..ca5ca47b2b 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -98,6 +98,21 @@ class Segments<K, V>
into.add(segment.asActive());
}
+ ActiveSegment<K, V> oldestActive()
+ {
+ Segment<K, V> oldest = null;
+ for (Segment<K, V> segment : segments.values())
+ if (segment.isActive() && (oldest == null ||
segment.descriptor.timestamp <= oldest.descriptor.timestamp))
+ oldest = segment;
+
+ return oldest == null ? null : oldest.asActive();
+ }
+
+ Segment<K, V> get(long timestamp)
+ {
+ return segments.get(timestamp);
+ }
+
void selectStatic(Collection<StaticSegment<K, V>> into)
{
for (Segment<K, V> segment : segments.values())
diff --git a/src/java/org/apache/cassandra/journal/SyncedOffsets.java
b/src/java/org/apache/cassandra/journal/SyncedOffsets.java
index bee302d6d8..cd05e6f8ac 100644
--- a/src/java/org/apache/cassandra/journal/SyncedOffsets.java
+++ b/src/java/org/apache/cassandra/journal/SyncedOffsets.java
@@ -50,7 +50,9 @@ interface SyncedOffsets extends Closeable
*
* @param offset the offset into datafile, up to which contents have been
fsynced (exclusive)
*/
- void mark(int offset);
+ void mark(int offset, boolean fsync);
+
+ void fsync();
@Override
default void close()
@@ -60,9 +62,9 @@ interface SyncedOffsets extends Closeable
/**
* @return a disk-backed synced offset tracker for a new {@link
ActiveSegment}
*/
- static Active active(Descriptor descriptor, boolean syncOnMark)
+ static Active active(Descriptor descriptor)
{
- return new Active(descriptor, syncOnMark);
+ return new Active(descriptor);
}
/**
@@ -87,15 +89,13 @@ interface SyncedOffsets extends Closeable
final class Active implements SyncedOffsets
{
private final Descriptor descriptor;
- private final boolean syncOnMark;
private final FileOutputStreamPlus output;
private volatile int syncedOffset;
- private Active(Descriptor descriptor, boolean syncOnMark)
+ private Active(Descriptor descriptor)
{
this.descriptor = descriptor;
- this.syncOnMark = syncOnMark;
File file = descriptor.fileFor(Component.SYNCED_OFFSETS);
if (file.exists())
@@ -123,7 +123,7 @@ interface SyncedOffsets extends Closeable
}
@Override
- public void mark(int offset)
+ public void mark(int offset, boolean fsync)
{
if (offset < syncedOffset)
throw new IllegalArgumentException("offset " + offset + " is
smaller than previous mark " + offset);
@@ -142,10 +142,10 @@ interface SyncedOffsets extends Closeable
}
syncedOffset = offset;
- if (syncOnMark) sync();
+ if (fsync) fsync();
}
- private void sync()
+ public void fsync()
{
try
{
@@ -160,7 +160,7 @@ interface SyncedOffsets extends Closeable
@Override
public void close()
{
- if (!syncOnMark) sync();
+ fsync();
try
{
@@ -218,7 +218,13 @@ interface SyncedOffsets extends Closeable
}
@Override
- public void mark(int offset)
+ public void mark(int offset, boolean fsync)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fsync()
{
throw new UnsupportedOperationException();
}
@@ -235,7 +241,13 @@ interface SyncedOffsets extends Closeable
}
@Override
- public void mark(int offset)
+ public void mark(int offset, boolean fsync)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fsync()
{
throw new UnsupportedOperationException();
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index c846038fd8..0f33f04d92 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -342,6 +342,7 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
Runnable saveCommand(Command before, Command after)
{
Mutation mutation = AccordKeyspace.getCommandMutation(id, before,
after, nextSystemTimestampMicros());
+ // TODO (required): make sure we test recovering when this has failed
to be persisted
return null != mutation ? mutation::applyUnsafe : null;
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 80cfdf31ea..f9daf2354d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -297,6 +297,10 @@ public class AccordJournal implements IJournal,
Shutdownable
// TODO (alexp): tests for objects that go through AccordJournal
private class JournalCallbacks implements AsyncCallbacks<Key, Object>
{
+ private JournalCallbacks()
+ {
+ }
+
/**
* Queue up the record for either frame aggregation (if a protocol
message) or frame application (if a frame).
*/
@@ -352,7 +356,7 @@ public class AccordJournal implements IJournal, Shutdownable
private void onFrameWriteFailed(FrameRecord frame, FrameContext
context, Throwable cause)
{
- // TODO: panic
+ // TODO (required): panic
}
@Override
@@ -364,7 +368,7 @@ public class AccordJournal implements IJournal, Shutdownable
@Override
public void onFlushFailed(Throwable cause)
{
- // TODO: panic
+ // TODO (required): panic
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 2dd2b5d28d..b96ffa9bd0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -97,9 +97,8 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
-import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
@@ -161,6 +160,7 @@ import
org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
import org.apache.cassandra.utils.Clock.Global;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import static accord.utils.Invariants.checkArgument;
@@ -201,7 +201,7 @@ public class AccordKeyspace
private static final LocalPartitioner FOR_KEYS_LOCAL_PARTITIONER =
new LocalPartitioner(CompositeType.getInstance(Int32Type.instance,
BytesType.instance, KEY_TYPE));
- private static final ClusteringIndexFilter FULL_PARTITION = new
ClusteringIndexSliceFilter(Slices.ALL, false);
+ private static final ClusteringIndexFilter FULL_PARTITION = new
ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(),
Clustering.EMPTY), false);
//TODO (now, performance): should this be partitioner rather than TableId?
As of this patch distributed tables should only have 1 partitioner...
private static final ConcurrentMap<TableId,
AccordRoutingKeyByteSource.Serializer> TABLE_SERIALIZERS = new
ConcurrentHashMap<>();
@@ -571,6 +571,8 @@ public class AccordKeyspace
+ "data blob, "
+ "PRIMARY KEY((store_id, key_token, key))"
+ ')')
+ // TODO (expected): make this uncompressed, as not very
compressable (except perhaps the primary key, but could switch to operating on
tokens directly)
+// + " WITH compression = {'enabled':'false'};")
.partitioner(FOR_KEYS_LOCAL_PARTITIONER)
.build();
}
@@ -992,12 +994,12 @@ public class AccordKeyspace
public static UntypedResultSet loadCommandRow(CommandStore commandStore,
TxnId txnId)
{
- String cql = "SELECT * FROM %s.%s " +
+ String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + COMMANDS
+ ' ' +
"WHERE store_id = ? " +
"AND domain = ? " +
"AND txn_id=(?, ?, ?)";
- return executeInternal(format(cql, ACCORD_KEYSPACE_NAME, COMMANDS),
+ return executeInternal(cql,
commandStore.id(),
txnId.domain().ordinal(),
txnId.msb, txnId.lsb, txnId.node.id);
@@ -1404,12 +1406,12 @@ public class AccordKeyspace
public static UntypedResultSet loadTimestampsForKeyRow(CommandStore
commandStore, PartitionKey key)
{
- String cql = "SELECT * FROM %s.%s " +
+ String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' +
TIMESTAMPS_FOR_KEY + ' ' +
"WHERE store_id = ? " +
"AND key_token = ? " +
"AND key=(?, ?)";
- return executeInternal(format(cql, ACCORD_KEYSPACE_NAME,
TIMESTAMPS_FOR_KEY),
+ return executeInternal(cql,
commandStore.id(),
serializeToken(key.token()),
key.table().asUUID(),
key.partitionKey().getKey());
@@ -1624,9 +1626,9 @@ public class AccordKeyspace
private static EpochDiskState saveEpochDiskState(EpochDiskState diskState)
{
- String cql = "INSERT INTO %s.%s (key, min_epoch, max_epoch) VALUES (0,
?, ?);";
- executeInternal(format(cql, ACCORD_KEYSPACE_NAME, EPOCH_METADATA),
- diskState.minEpoch, diskState.maxEpoch);
+ String cql = "INSERT INTO " + ACCORD_KEYSPACE_NAME + '.' +
EPOCH_METADATA + ' ' +
+ "(key, min_epoch, max_epoch) VALUES (0, ?, ?);";
+ executeInternal(cql, diskState.minEpoch, diskState.maxEpoch);
return diskState;
}
@@ -1634,7 +1636,8 @@ public class AccordKeyspace
@VisibleForTesting
public static EpochDiskState loadEpochDiskState()
{
- String cql = "SELECT * FROM %s.%s WHERE key=0";
+ String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' +
EPOCH_METADATA + ' ' +
+ "WHERE key=0";
UntypedResultSet result = executeInternal(format(cql,
ACCORD_KEYSPACE_NAME, EPOCH_METADATA));
if (result.isEmpty())
return null;
@@ -1668,8 +1671,9 @@ public class AccordKeyspace
try
{
- String cql = "UPDATE %s.%s SET topology=? WHERE epoch=?";
- executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+ String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES +
' ' +
+ "SET topology=? WHERE epoch=?";
+ executeInternal(cql,
serialize(topology,
LocalVersionedSerializers.topology), topology.epoch());
flush(Topologies);
}
@@ -1684,8 +1688,9 @@ public class AccordKeyspace
public static EpochDiskState markRemoteTopologySync(Node.Id node, long
epoch, EpochDiskState diskState)
{
diskState = maybeUpdateMaxEpoch(diskState, epoch);
- String cql = "UPDATE %s.%s SET remote_sync_complete =
remote_sync_complete + ? WHERE epoch = ?";
- executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+ String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' '
+
+ "SET remote_sync_complete = remote_sync_complete + ?
WHERE epoch = ?";
+ executeInternal(cql,
Collections.singleton(node.id), epoch);
flush(Topologies);
return diskState;
@@ -1694,8 +1699,9 @@ public class AccordKeyspace
public static EpochDiskState markClosed(Ranges ranges, long epoch,
EpochDiskState diskState)
{
diskState = maybeUpdateMaxEpoch(diskState, epoch);
- String cql = "UPDATE %s.%s SET closed = closed + ? WHERE epoch = ?";
- executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+ String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' '
+
+ "SET closed = closed + ? WHERE epoch = ?";
+ executeInternal(cql,
KeySerializers.rangesToBlobMap(ranges), epoch);
flush(Topologies);
return diskState;
@@ -1704,8 +1710,9 @@ public class AccordKeyspace
public static EpochDiskState markRedundant(Ranges ranges, long epoch,
EpochDiskState diskState)
{
diskState = maybeUpdateMaxEpoch(diskState, epoch);
- String cql = "UPDATE %s.%s SET redundant = redundant + ? WHERE epoch =
?";
- executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+ String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' '
+
+ "SET redundant = redundant + ? WHERE epoch = ?";
+ executeInternal(cql,
KeySerializers.rangesToBlobMap(ranges), epoch);
flush(Topologies);
return diskState;
@@ -1714,8 +1721,9 @@ public class AccordKeyspace
public static EpochDiskState setNotifyingLocalSync(long epoch,
Set<Node.Id> pending, EpochDiskState diskState)
{
diskState = maybeUpdateMaxEpoch(diskState, epoch);
- String cql = "UPDATE %s.%s SET sync_state = ?, pending_sync_notify = ?
WHERE epoch = ?";
- executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+ String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' '
+
+ "SET sync_state = ?, pending_sync_notify = ? WHERE epoch
= ?";
+ executeInternal(cql,
SyncStatus.NOTIFYING.ordinal(),
pending.stream().map(i ->
i.id).collect(Collectors.toSet()),
epoch);
@@ -1725,8 +1733,9 @@ public class AccordKeyspace
public static EpochDiskState markLocalSyncAck(Node.Id node, long epoch,
EpochDiskState diskState)
{
diskState = maybeUpdateMaxEpoch(diskState, epoch);
- String cql = "UPDATE %s.%s SET pending_sync_notify =
pending_sync_notify - ? WHERE epoch = ?";
- executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+ String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' '
+
+ "SET pending_sync_notify = pending_sync_notify - ? WHERE
epoch = ?";
+ executeInternal(cql,
Collections.singleton(node.id), epoch);
return diskState;
}
@@ -1734,8 +1743,9 @@ public class AccordKeyspace
public static EpochDiskState setCompletedLocalSync(long epoch,
EpochDiskState diskState)
{
diskState = maybeUpdateMaxEpoch(diskState, epoch);
- String cql = "UPDATE %s.%s SET sync_state = ?, pending_sync_notify =
{} WHERE epoch = ?";
- executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+ String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' '
+
+ "SET sync_state = ?, pending_sync_notify = {} WHERE epoch
= ?";
+ executeInternal(cql,
SyncStatus.COMPLETED.ordinal(),
epoch);
return diskState;
@@ -1748,8 +1758,9 @@ public class AccordKeyspace
long delete = diskState.minEpoch;
diskState = diskState.withNewMinEpoch(delete + 1);
saveEpochDiskState(diskState);
- String cql = "DELETE FROM %s.%s WHERE epoch = ?";
- executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
delete);
+ String cql = "DELETE FROM " + ACCORD_KEYSPACE_NAME + '.' +
TOPOLOGIES + ' ' +
+ "WHERE epoch = ?";
+ executeInternal(cql, delete);
}
return diskState;
}
@@ -1762,7 +1773,8 @@ public class AccordKeyspace
@VisibleForTesting
public static void loadEpoch(long epoch, TopologyLoadConsumer consumer)
throws IOException
{
- String cql = format("SELECT * FROM %s.%s WHERE epoch=?",
ACCORD_KEYSPACE_NAME, TOPOLOGIES);
+ String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' +
TOPOLOGIES + ' ' +
+ "WHERE epoch=?";
UntypedResultSet result = executeInternal(cql, epoch);
checkState(!result.isEmpty(), "Nothing found for epoch %d", epoch);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 7346a6eebf..160813d722 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -347,7 +347,7 @@ public class AccordObjectSizes
return size;
Command.Committed committed = command.asCommitted();
- size += WaitingOnSerializer.serializedSize(committed.waitingOn);
+ size += WaitingOnSerializer.serializedSize(command.txnId(),
committed.waitingOn);
return size;
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index d3954a208a..36f001162d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -296,7 +296,7 @@ public class AccordService implements IAccordService,
Shutdownable
public static long uniqueNow()
{
- // TODO (correctness, now): This is not unique it's just
currentTimeMillis as microseconds
+ // TODO (now, correctness): This is not unique it's just
currentTimeMillis as microseconds
return
TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index 070fcfa0e6..e506bbf85c 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -223,8 +223,7 @@ public class CheckStatusSerializers
Writes writes =
CommandSerializers.nullableWrites.deserialize(in, version);
Result result = null;
- if (maxKnowledgeStatus == SaveStatus.PreApplied ||
maxKnowledgeStatus == SaveStatus.Applied
- || maxKnowledgeStatus == SaveStatus.TruncatedApply ||
maxKnowledgeStatus == SaveStatus.TruncatedApplyWithOutcome ||
maxKnowledgeStatus == SaveStatus.TruncatedApplyWithDeps)
+ if (maxKnowledgeStatus.known.outcome.isOrWasApply())
result = CommandSerializers.APPLIED;
return createOk(map, maxKnowledgeStatus, maxStatus,
maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, executeAt,
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index fbc3aeb22f..fe16d3033e 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
+import java.nio.ByteBuffer;
import com.google.common.base.Preconditions;
@@ -99,6 +100,13 @@ public class CommandSerializers
TopologySerializers.nodeId.serialize(ts.node, out, version);
}
+ public void serialize(T ts, DataOutputPlus out) throws IOException
+ {
+ out.writeLong(ts.msb);
+ out.writeLong(ts.lsb);
+ TopologySerializers.NodeIdSerializer.serialize(ts.node, out);
+ }
+
public <V> int serialize(T ts, V dst, ValueAccessor<V> accessor, int
offset)
{
int position = offset;
@@ -110,6 +118,13 @@ public class CommandSerializers
return size;
}
+ public void serialize(T ts, ByteBuffer out)
+ {
+ out.putLong(ts.msb);
+ out.putLong(ts.lsb);
+ TopologySerializers.nodeId.serialize(ts.node, out);
+ }
+
@Override
public T deserialize(DataInputPlus in, int version) throws IOException
{
@@ -118,6 +133,13 @@ public class CommandSerializers
TopologySerializers.nodeId.deserialize(in,
version));
}
+ public T deserialize(DataInputPlus in) throws IOException
+ {
+ return factory.create(in.readLong(),
+ in.readLong(),
+
TopologySerializers.NodeIdSerializer.deserialize(in));
+ }
+
public <V> T deserialize(V src, ValueAccessor<V> accessor, int offset)
{
long msb = accessor.getLong(src, offset);
@@ -128,6 +150,16 @@ public class CommandSerializers
return factory.create(msb, lsb, node);
}
+ public T deserialize(ByteBuffer buffer, int position)
+ {
+ long msb = buffer.getLong(position);
+ position += TypeSizes.LONG_SIZE;
+ long lsb = buffer.getLong(position);
+ position += TypeSizes.LONG_SIZE;
+ Node.Id node = TopologySerializers.nodeId.deserialize(buffer,
position);
+ return factory.create(msb, lsb, node);
+ }
+
@Override
public long serializedSize(T ts, int version)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
index 370f88b73d..4512776154 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
@@ -178,17 +178,8 @@ public class FetchSerializers
Writes writes = CommandSerializers.nullableWrites.deserialize(in,
version);
Result result = null;
- switch (maxSaveStatus)
- {
- case PreApplied:
- case Applying:
- case Applied:
- case TruncatedApply:
- case TruncatedApplyWithOutcome:
- case TruncatedApplyWithDeps:
- result = CommandSerializers.APPLIED;
- break;
- }
+ if (achieved.outcome.isOrWasApply())
+ result = CommandSerializers.APPLIED;
return Propagate.SerializerSupport.create(txnId,
route,
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
index c5c2f9a382..4693c03c5c 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
@@ -46,7 +47,7 @@ public class TopologySerializers
{
private NodeIdSerializer() {}
- private static void serialize(Node.Id id, DataOutputPlus out) throws
IOException
+ public static void serialize(Node.Id id, DataOutputPlus out) throws
IOException
{
out.writeInt(id.id);
}
@@ -68,7 +69,12 @@ public class TopologySerializers
return accessor.putInt(dst, offset, id.id);
}
- private static Node.Id deserialize(DataInputPlus in) throws IOException
+ public void serialize(Node.Id id, ByteBuffer out)
+ {
+ out.putInt(id.id);
+ }
+
+ public static Node.Id deserialize(DataInputPlus in) throws IOException
{
return new Node.Id(in.readInt());
}
@@ -90,6 +96,11 @@ public class TopologySerializers
return new Node.Id(accessor.getInt(src, offset));
}
+ public <V> Node.Id deserialize(ByteBuffer src, int position)
+ {
+ return new Node.Id(src.getInt(position));
+ }
+
public int serializedSize()
{
return TypeSizes.INT_SIZE; // id.id
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
index 3efb9e2c6c..6c22d28440 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
@@ -21,9 +21,11 @@ package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
import java.nio.ByteBuffer;
+import accord.local.Command;
import accord.local.Command.WaitingOn;
import accord.primitives.Keys;
import accord.primitives.Routable;
+import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.ImmutableBitSet;
import accord.utils.Invariants;
@@ -32,14 +34,18 @@ import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.vint.VIntCoding;
public class WaitingOnSerializer
{
public static void serialize(TxnId txnId, WaitingOn waitingOn,
DataOutputPlus out) throws IOException
{
- out.writeUnsignedVInt32(waitingOn.keys.size());
- out.writeUnsignedVInt32(waitingOn.txnIds.size());
+ if (txnId.kind().awaitsOnlyDeps())
+ {
+ Timestamp executeAtLeast = waitingOn.executeAtLeast();
+ out.writeBoolean(executeAtLeast != null);
+ if (executeAtLeast != null)
+ CommandSerializers.timestamp.serialize(executeAtLeast, out);
+ }
int keyCount = waitingOn.keys.size();
int txnIdCount = waitingOn.txnIds.size();
int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
@@ -53,8 +59,12 @@ public class WaitingOnSerializer
public static WaitingOn deserialize(TxnId txnId, Keys keys,
SortedArrayList<TxnId> txnIds, DataInputPlus in) throws IOException
{
- int a = in.readUnsignedVInt32();
- int b = in.readUnsignedVInt32();
+ Timestamp executeAtLeast = null;
+ if (txnId.kind().awaitsOnlyDeps())
+ {
+ if (in.readBoolean())
+ executeAtLeast = CommandSerializers.timestamp.deserialize(in);
+ }
int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
ImmutableBitSet waitingOn = deserialize(waitingOnLength, in);
ImmutableBitSet appliedOrInvalidated = null;
@@ -63,17 +73,26 @@ public class WaitingOnSerializer
int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
appliedOrInvalidated = deserialize(appliedOrInvalidatedLength, in);
}
- return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
+
+ WaitingOn result = new WaitingOn(keys, txnIds, waitingOn,
appliedOrInvalidated);
+ if (executeAtLeast != null)
+ result = new Command.WaitingOnWithExecuteAt(result,
executeAtLeast);
+ return result;
}
- public static long serializedSize(WaitingOn waitingOn)
+ public static long serializedSize(TxnId txnId, WaitingOn waitingOn)
{
int keyCount = waitingOn.keys.size();
int txnIdCount = waitingOn.txnIds.size();
int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
long size = serializedSize(waitingOnLength, waitingOn.waitingOn);
- size += TypeSizes.sizeofUnsignedVInt(keyCount);
- size += TypeSizes.sizeofUnsignedVInt(txnIdCount);
+ if (txnId.kind().awaitsOnlyDeps())
+ {
+ Timestamp executeAtLeast = waitingOn.executeAtLeast();
+ size += 1;
+ if (executeAtLeast != null)
+ size += CommandSerializers.timestamp.serializedSize();
+ }
if (waitingOn.appliedOrInvalidated == null)
return size;
@@ -113,10 +132,24 @@ public class WaitingOnSerializer
if (txnId.domain() == Routable.Domain.Range)
appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
- ByteBuffer out =
ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(keyCount) +
TypeSizes.sizeofUnsignedVInt(txnIdCount)
- + TypeSizes.LONG_SIZE *
(waitingOnLength + appliedOrInvalidatedLength));
- VIntCoding.writeUnsignedVInt32(keyCount, out);
- VIntCoding.writeUnsignedVInt32(txnIdCount, out);
+ int size = TypeSizes.LONG_SIZE * (waitingOnLength +
appliedOrInvalidatedLength);
+ Timestamp executeAtLeast = null;
+ if (txnId.kind().awaitsOnlyDeps())
+ {
+ executeAtLeast = waitingOn.executeAtLeast();
+ size += 1;
+ if (executeAtLeast != null)
+ size += CommandSerializers.timestamp.serializedSize();
+ }
+
+ ByteBuffer out = ByteBuffer.allocate(size);
+ if (txnId.kind().awaitsOnlyDeps())
+ {
+ out.put((byte)(executeAtLeast != null ? 1 : 0));
+ if (executeAtLeast != null)
+ CommandSerializers.timestamp.serialize(executeAtLeast, out);
+ }
+
serialize(waitingOnLength, waitingOn.waitingOn, out);
if (appliedOrInvalidatedLength > 0)
serialize(appliedOrInvalidatedLength,
waitingOn.appliedOrInvalidated, out);
@@ -133,12 +166,18 @@ public class WaitingOnSerializer
public static WaitingOn deserialize(TxnId txnId, Keys keys,
SortedArrayList<TxnId> txnIds, ByteBuffer in) throws IOException
{
- int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
int position = in.position();
- int a = VIntCoding.readUnsignedVInt32(in, position);
- position += TypeSizes.sizeofUnsignedVInt(a);
- int b = VIntCoding.readUnsignedVInt32(in, position);
- position += TypeSizes.sizeofUnsignedVInt(a);
+ Timestamp executeAtLeast = null;
+ if (txnId.kind().awaitsOnlyDeps())
+ {
+ if (in.get(position++) != 0)
+ {
+ executeAtLeast = CommandSerializers.timestamp.deserialize(in,
position);
+ position += CommandSerializers.timestamp.serializedSize();
+ }
+ }
+
+ int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
ImmutableBitSet waitingOn = deserialize(position, waitingOnLength, in);
ImmutableBitSet appliedOrInvalidated = null;
if (txnId.domain() == Routable.Domain.Range)
@@ -147,7 +186,11 @@ public class WaitingOnSerializer
int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
appliedOrInvalidated = deserialize(position,
appliedOrInvalidatedLength, in);
}
- return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
+
+ WaitingOn result = new WaitingOn(keys, txnIds, waitingOn,
appliedOrInvalidated);
+ if (executeAtLeast != null)
+ result = new Command.WaitingOnWithExecuteAt(result,
executeAtLeast);
+ return result;
}
private static ImmutableBitSet deserialize(int position, int length,
ByteBuffer in)
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 7965decddb..009930955b 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -1000,7 +1000,7 @@ public class ByteBufferUtil
public static void writeLeastSignificantBytes(long register, int bytes,
ByteBuffer out)
{
- writeMostSignificantBytesSlow(register << ((8 - bytes)*8), bytes, out);
+ writeMostSignificantBytes(register << ((8 - bytes)*8), bytes, out);
}
public static void writeMostSignificantBytes(long register, int bytes,
ByteBuffer out)
diff --git a/test/conf/logback-simulator.xml b/test/conf/logback-simulator.xml
index a4c24aab8d..fe823383ee 100644
--- a/test/conf/logback-simulator.xml
+++ b/test/conf/logback-simulator.xml
@@ -16,7 +16,7 @@
~ limitations under the License.
-->
-<configuration debug="false" scan="true" scanPeriod="60 seconds">
+<configuration debug="true" scan="true" scanPeriod="60 seconds">
<define name="run_start"
class="org.apache.cassandra.simulator.logging.RunStartDefiner" scope="system"/>
<define name="run_seed"
class="org.apache.cassandra.simulator.logging.SeedDefiner" scope="system"/>
<define name="cluster_id"
class="org.apache.cassandra.distributed.impl.ClusterIDDefiner"/>
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index ce0e95db03..fdc276de0b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableSet;
import org.junit.After;
import org.junit.BeforeClass;
+import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.Duration;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BooleanType;
@@ -62,6 +63,8 @@ import
org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.service.accord.AccordStateCache;
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.cassandra.config.CassandraRelevantProperties.JOIN_RING;
import static
org.apache.cassandra.config.CassandraRelevantProperties.RESET_BOOTSTRAP_PROGRESS;
import static
org.apache.cassandra.config.CassandraRelevantProperties.SKIP_GC_INSPECTOR;
@@ -81,6 +84,7 @@ public class TestBaseImpl extends DistributedTestBase
@BeforeClass
public static void beforeClass() throws Throwable
{
+
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
ICluster.setup();
SKIP_GC_INSPECTOR.setBoolean(true);
AccordStateCache.validateLoadOnEvict(true);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 42e7fbf34a..8e663ab966 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -19,9 +19,15 @@
package org.apache.cassandra.distributed.test.accord;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Date;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.RateLimiter;
import org.junit.BeforeClass;
@@ -30,10 +36,16 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.EstimatedHistogram;
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class AccordLoadTest extends AccordTestBase
@@ -43,20 +55,35 @@ public class AccordLoadTest extends AccordTestBase
@BeforeClass
public static void setUp() throws IOException
{
- AccordTestBase.setupCluster(builder -> builder.withConfig(config ->
config.set("lwt_strategy", "accord").set("non_serial_write_strategy",
"accord")), 2);
+
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
+ AccordTestBase.setupCluster(builder -> builder, 2);
}
@Ignore
@Test
public void testLoad() throws Exception
{
- test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY
KEY(k))",
+ test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY
KEY(k)) WITH transactional_mode = 'full'",
cluster -> {
+
+ final ConcurrentHashMap<Verb, AtomicInteger> verbs = new
ConcurrentHashMap<>();
+ cluster.filters().outbound().messagesMatching(new
IMessageFilters.Matcher()
+ {
+ @Override
+ public boolean matches(int i, int i1, IMessage iMessage)
+ {
+ verbs.computeIfAbsent(Verb.fromId(iMessage.verb()),
ignore -> new AtomicInteger()).incrementAndGet();
+ return false;
+ }
+ }).drop();
ICoordinator coordinator = cluster.coordinator(1);
+ final int repairInterval = 3000;
final int batchSize = 1000;
final int concurrency = 100;
final int ratePerSecond = 1000;
final int keyCount = 10;
+ final float readChance = 0.33f;
+ long nextRepairAt = repairInterval;
for (int i = 1; i <= keyCount; i++)
coordinator.execute("INSERT INTO " + qualifiedTableName +
" (k, v) VALUES (0, 0) USING TIMESTAMP 0;", ConsistencyLevel.ALL, i);
@@ -75,14 +102,66 @@ public class AccordLoadTest extends AccordTestBase
inFlight.acquire();
rateLimiter.acquire();
long commandStart = System.nanoTime();
- coordinator.executeWithResult((success, fail) -> {
- inFlight.release();
- if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
-// else exceptions.add(fail);
- }, "UPDATE " + qualifiedTableName + " SET v += 1
WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM,
random.nextInt(keyCount));
+ if (random.nextFloat() < readChance)
+ {
+ coordinator.executeWithResult((success, fail) -> {
+ inFlight.release();
+ if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+ // else
exceptions.add(fail);
+ }, "SELECT * FROM " + qualifiedTableName + "
WHERE k = ?;", ConsistencyLevel.SERIAL, random.nextInt(keyCount));
+ }
+ else
+ {
+ coordinator.executeWithResult((success, fail) -> {
+ inFlight.release();
+ if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+ // else exceptions.add(fail);
+ }, "UPDATE " + qualifiedTableName + " SET v += 1
WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM,
random.nextInt(keyCount));
+ }
+ }
+
+ if ((nextRepairAt -= batchSize) <= 0)
+ {
+ nextRepairAt += repairInterval;
+ System.out.println("repairing...");
+ cluster.coordinator(1).instance().nodetool("repair",
qualifiedTableName);
+ }
+
+ final Date date = new Date();
+ System.out.printf("%tT rate: %.2f/s\n", date,
(((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() -
batchStart)));
+ System.out.printf("%tT percentiles: %d %d %d %d\n", date,
histogram.percentile(.25)/1000, histogram.percentile(.5)/1000,
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+
+ class VerbCount
+ {
+ final Verb verb;
+ final int count;
+
+ VerbCount(Verb verb, int count)
+ {
+ this.verb = verb;
+ this.count = count;
+ }
+ }
+ List<VerbCount> verbCounts = new ArrayList<>();
+ for (Map.Entry<Verb, AtomicInteger> e : verbs.entrySet())
+ {
+ int count = e.getValue().getAndSet(0);
+ if (count != 0) verbCounts.add(new
VerbCount(e.getKey(), count));
+ }
+ verbCounts.sort(Comparator.comparing(v -> -v.count));
+
+ StringBuilder verbSummary = new StringBuilder();
+ for (VerbCount vs : verbCounts)
+ {
+ {
+ if (verbSummary.length() > 0)
+ verbSummary.append(", ");
+ verbSummary.append(vs.verb);
+ verbSummary.append(": ");
+ verbSummary.append(vs.count);
+ }
}
- System.out.printf("%tT rate: %.2f/s\n", new Date(),
(((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() -
batchStart)));
- System.out.printf("%tT percentiles: %d %d %d %d\n", new
Date(), histogram.percentile(.25)/1000, histogram.percentile(.5)/1000,
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+ System.out.printf("%tT verbs: %s\n", date, verbSummary);
}
}
);
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
index 60a71b4b3b..8ff2aefe74 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.filesystem.ListenableFileSystem;
import org.apache.cassandra.io.util.FileSystems;
import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.service.paxos.BallotGenerator;
import org.apache.cassandra.service.paxos.PaxosPrepare;
import org.apache.cassandra.simulator.RandomSource.Choices;
@@ -198,7 +199,7 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
protected HeapPool.Logged.Listener memoryListener;
protected SimulatedTime.Listener timeListener = (i1, i2) -> {};
protected LongConsumer onThreadLocalRandomCheck;
- protected String lwtStrategy = "migration";
+ protected String transactionalMode = "full";
public Builder<S> failures(Failures failures)
{
@@ -575,12 +576,17 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
return this;
}
- public Builder<S> lwtStrategy(String strategy)
+ public Builder<S> transactionalMode(String mode)
{
- this.lwtStrategy = strategy;
+ this.transactionalMode = mode;
return this;
}
+ public TransactionalMode transactionalMode()
+ {
+ return TransactionalMode.fromString(transactionalMode);
+ }
+
public abstract ClusterSimulation<S> create(long seed) throws
IOException;
}
@@ -774,7 +780,6 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
.set("use_deterministic_table_id", true)
.set("disk_access_mode", "standard")
.set("failure_detector",
SimulatedFailureDetector.Instance.class.getName())
- .set("lwt_strategy", builder.lwtStrategy)
.set("commitlog_compression", new
ParameterizedClass(LZ4Compressor.class.getName(), emptyMap()));
;
configUpdater.accept(threadAllocator.update(config));
@@ -866,6 +871,7 @@ public class ClusterSimulation<S extends Simulation>
implements AutoCloseable
simulated.register((SimulatedFutureActionScheduler)
futureActionScheduler);
scheduler = builder.schedulerFactory.create(random);
+ // TODO (required): we aren't passing paxos variant change parameter
anymore
options = new ClusterActions.Options(builder.topologyChangeLimit,
Choices.uniform(KindOfSequence.values()).choose(random).period(builder.topologyChangeIntervalNanos,
random),
Choices.random(random,
builder.topologyChanges),
builder.consensusChangeLimit,
Choices.uniform(KindOfSequence.values()).choose(random).period(builder.consensusChangeIntervalNanos,
random),
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
index 05f3588911..e5c875cb6e 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
@@ -344,7 +344,7 @@ public class SimulationRunner
builder.debug(debugLevels, debugPrimaryKeys);
}
- Optional.ofNullable(lwtStrategy).ifPresent(builder::lwtStrategy);
+
Optional.ofNullable(lwtStrategy).ifPresent(builder::transactionalMode);
}
public void run(B builder) throws IOException
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
index a75a1ef461..ee8fd0ca49 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
@@ -71,11 +71,11 @@ class AccordClusterSimulation extends
ClusterSimulation<PaxosSimulation> impleme
int[] primaryKeys = primaryKeys(seed,
builder.primaryKeyCount());
KindOfSequence.Period jitter =
RandomSource.Choices.uniform(KindOfSequence.values()).choose(random)
.period(builder.schedulerJitterNanos(), random);
- return new PairOfSequencesAccordSimulation(simulated,
cluster, options,
-
builder.readChance().select(random), builder.concurrency(),
builder.primaryKeySeconds(), builder.withinKeyConcurrency(),
- SERIAL,
schedulers, builder.debug(), seed,
- primaryKeys,
builder.secondsToSimulate() >= 0 ? SECONDS.toNanos(builder.secondsToSimulate())
: -1,
- () ->
jitter.get(random));
+ return new PairOfSequencesAccordSimulation(simulated,
cluster, options, builder.transactionalMode(),
+
builder.readChance().select(random), builder.concurrency(),
builder.primaryKeySeconds(), builder.withinKeyConcurrency(),
+ SERIAL,
schedulers, builder.debug(), seed,
+ primaryKeys,
builder.secondsToSimulate() >= 0 ? SECONDS.toNanos(builder.secondsToSimulate())
: -1,
+ () ->
jitter.get(random));
});
}
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
index 8d6c8a0dcc..7965c29fc1 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.impl.Query;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.simulator.Action;
import org.apache.cassandra.simulator.Debug;
import org.apache.cassandra.simulator.RunnableActionScheduler;
@@ -123,10 +124,12 @@ public class PairOfSequencesAccordSimulation extends
AbstractPairOfSequencesPaxo
private final float writeRatio;
private final HistoryValidator validator;
+ private final TransactionalMode transactionalMode;
public PairOfSequencesAccordSimulation(SimulatedSystems simulated,
Cluster cluster,
ClusterActions.Options
clusterOptions,
+ TransactionalMode transactionalMode,
float readRatio,
int concurrency, IntRange
simulateKeyForSeconds, IntRange withinKeyConcurrency,
ConsistencyLevel serialConsistency,
RunnableActionScheduler scheduler, Debug debug,
@@ -139,6 +142,7 @@ public class PairOfSequencesAccordSimulation extends
AbstractPairOfSequencesPaxo
scheduler, debug,
seed, primaryKeys,
runForNanos, jitter);
+ this.transactionalMode = transactionalMode;
this.writeRatio = 1F - readRatio;
HistoryValidator validator = new
StrictSerializabilityValidator(primaryKeys);
if
(CassandraRelevantProperties.TEST_HISTORY_VALIDATOR_LOGGING_ENABLED.getBoolean())
@@ -149,7 +153,7 @@ public class PairOfSequencesAccordSimulation extends
AbstractPairOfSequencesPaxo
@Override
protected String createTableStmt()
{
- return "CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq
text, PRIMARY KEY (pk))";
+ return String.format("CREATE TABLE " + KEYSPACE + ".tbl (pk int, count
int, seq text, PRIMARY KEY (pk)) WITH transactional_mode = '%s'",
transactionalMode);
}
@Override
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
index 6e7d058bea..c54ba1c26d 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
@@ -69,11 +69,6 @@ class PaxosClusterSimulation extends
ClusterSimulation<PaxosSimulation> implemen
random.reset(seed);
return new PaxosClusterSimulation(random, seed, uniqueNum, this);
}
-
- public String lwtStrategy()
- {
- return lwtStrategy;
- }
}
PaxosClusterSimulation(RandomSource random, long seed, int uniqueNum,
Builder builder) throws IOException
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
index 71734c6e68..095c79769a 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
@@ -69,7 +69,7 @@ public class PaxosSimulationRunner extends SimulationRunner
@Override
protected void run( long seed, PaxosClusterSimulation.Builder builder)
throws IOException
{
- if (Objects.equals(builder.lwtStrategy(), "accord"))
+ if (Objects.equals(builder.transactionalMode(), "accord"))
{
// Apply handicaps
builder.dcs(new IntRange(1, 1));
diff --git a/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java
b/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java
index 5b83ee88f1..b5df2b6b22 100644
--- a/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java
+++ b/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java
@@ -57,9 +57,9 @@ public class SyncedOffsetsTest
Descriptor descriptor = Descriptor.create(directory,
System.currentTimeMillis(), 1);
- SyncedOffsets active = SyncedOffsets.active(descriptor, syncOnMark);
+ SyncedOffsets active = SyncedOffsets.active(descriptor);
for (int i = 0; i < n; i++)
- active.mark(i);
+ active.mark(i, syncOnMark);
assertEquals(n - 1, active.syncedOffset());
active.close();
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
index 3df7d87e0d..07b626346a 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
@@ -57,7 +57,7 @@ public class WaitingOnSerializerTest
TxnId txnId = TxnId.NONE;
if (waitingOn.appliedOrInvalidated != null) txnId = new
TxnId(txnId.epoch(), txnId.hlc(), txnId.kind(), Routable.Domain.Range,
txnId.node);
buffer.clear();
- long expectedSize = WaitingOnSerializer.serializedSize(waitingOn);
+ long expectedSize = WaitingOnSerializer.serializedSize(txnId,
waitingOn);
WaitingOnSerializer.serialize(txnId, waitingOn, buffer);
Assertions.assertThat(buffer.getLength()).isEqualTo(expectedSize);
Command.WaitingOn read = WaitingOnSerializer.deserialize(txnId,
waitingOn.keys, waitingOn.txnIds, new
DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]