This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 26ccd43d54 Follow-up to: Do not contact faulty replicas, and support
reporting slow replies for preaccept/read. Do not wait for stale or left nodes
for durability.
26ccd43d54 is described below
commit 26ccd43d540912c75f22d1e762dc2ebc6f8f2b46
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 12:51:18 2024 +0100
Follow-up to: Do not contact faulty replicas, and support reporting slow
replies for preaccept/read. Do not wait for stale or left nodes for durability.
---
modules/accord | 2 +-
.../org/apache/cassandra/config/AccordSpec.java | 4 +-
.../apache/cassandra/journal/EntrySerializer.java | 1 +
.../apache/cassandra/journal/InMemoryIndex.java | 13 +-
src/java/org/apache/cassandra/journal/Segment.java | 1 +
.../apache/cassandra/journal/StaticSegment.java | 7 +-
src/java/org/apache/cassandra/net/Verb.java | 4 +-
.../service/accord/AccordMessageSink.java | 51 ++--
.../service/accord/AccordVerbHandler.java | 2 +-
.../service/accord/api/AccordTopologySorter.java | 2 +-
.../accord/api/CompositeTopologySorter.java | 5 +
.../accord/serializers/ReadDataSerializers.java | 2 +-
.../distributed/test/accord/AccordLoadTest.java | 301 ++++++++++++---------
13 files changed, 214 insertions(+), 181 deletions(-)
diff --git a/modules/accord b/modules/accord
index d24d8f5d86..291cbe70ad 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit d24d8f5d866783b71205631944cfecf9a63d4f0c
+Subproject commit 291cbe70ad82b0d5f875101f541d9f841912802f
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java
b/src/java/org/apache/cassandra/config/AccordSpec.java
index d5a53c2636..451bfeaa54 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -40,8 +40,8 @@ public class AccordSpec
// TODO (expected): we should be able to support lower recover delays, at
least for txns
public volatile DurationSpec.IntMillisecondsBound recover_delay = new
DurationSpec.IntMillisecondsBound(5000);
public volatile DurationSpec.IntMillisecondsBound range_sync_recover_delay
= new DurationSpec.IntMillisecondsBound(10000);
- public String slowPreAccept = "100ms <= p50*2 <= 1000ms";
- public String slowRead = "100ms <= p50*2 <= 1000ms";
+ public String slowPreAccept = "30ms <= p50*2 <= 100ms";
+ public String slowRead = "30ms <= p50*2 <= 100ms";
public long recoveryDelayFor(TxnId txnId, TimeUnit unit)
{
diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java
b/src/java/org/apache/cassandra/journal/EntrySerializer.java
index 2a707e7d73..48ef59a6e4 100644
--- a/src/java/org/apache/cassandra/journal/EntrySerializer.java
+++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.Set;
import java.util.zip.CRC32;
+import accord.utils.Invariants;
import org.agrona.collections.IntHashSet;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputBuffer;
diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java
b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
index 1f0da7fd28..8141f338a9 100644
--- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java
+++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
@@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import accord.utils.Invariants;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.journal.StaticSegment.SequentialReader;
/**
* An index for a segment that's still being updated by journal writers
concurrently.
@@ -138,17 +140,10 @@ final class InMemoryIndex<K> extends Index<K>
{
InMemoryIndex<K> index = new InMemoryIndex<>(keySupport, new
TreeMap<>(keySupport));
- try (StaticSegment.SequentialReader<K> reader =
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
+ try (SequentialReader<K> reader =
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
{
- int last = -1;
while (reader.advance())
- {
- int current = reader.offset();
- if (last >= 0)
- index.update(reader.key(), last, current);
- last = current;
- }
-
+ index.update(reader.key(), reader.offset,
reader.buffer.position() - reader.offset);
}
return index;
}
diff --git a/src/java/org/apache/cassandra/journal/Segment.java
b/src/java/org/apache/cassandra/journal/Segment.java
index 7f955669cd..77f7c68fea 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -93,6 +93,7 @@ public abstract class Segment<K, V> implements Closeable,
RefCounted<Segment<K,
int size = Index.readSize(all[i]);
Invariants.checkState(offset < prevOffset);
Invariants.checkState(read(offset, size, into), "Read should
always return true");
+ Invariants.checkState(id.equals(into.key), "Index for %s read
incorrect key: expected %s but read %s", descriptor, id, into.key);
onEntry.accept(descriptor.timestamp, offset, into.key, into.value,
into.hosts, into.userVersion);
}
}
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index 3a8c03bb1a..bf46ca8ecd 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -28,6 +28,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
+import accord.utils.Invariants;
import org.agrona.collections.IntHashSet;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.File;
@@ -260,7 +261,11 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
ByteBuffer duplicate =
buffer.duplicate().position(offset).limit(offset + size);
try (DataInputBuffer in = new DataInputBuffer(duplicate, false))
{
- return EntrySerializer.tryRead(into, keySupport, duplicate, in,
syncedOffsets.syncedOffset(), descriptor.userVersion);
+ if (!EntrySerializer.tryRead(into, keySupport, duplicate, in,
syncedOffsets.syncedOffset(), descriptor.userVersion))
+ return false;
+
+ Invariants.checkState(in.available() == 0);
+ return true;
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index 6109a502a8..732f0e5df0 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -313,8 +313,8 @@ public enum Verb
ACCORD_ACCEPT_RSP (122, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.reply,
AccordService::responseHandlerOrNoop
),
ACCORD_ACCEPT_REQ (123, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_ACCEPT_RSP
),
ACCORD_ACCEPT_INVALIDATE_REQ (124, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.invalidate,
AccordService::requestHandlerOrNoop, ACCORD_ACCEPT_RSP
),
- ACCORD_READ_RSP (125, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.reply,
AccordService::responseHandlerOrNoop
),
- ACCORD_READ_REQ (126, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.readData,
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP
),
+ ACCORD_READ_RSP (125, P2, readTimeout, IMMEDIATE,
() -> ReadDataSerializers.reply,
AccordService::responseHandlerOrNoop
),
+ ACCORD_READ_REQ (126, P2, readTimeout, IMMEDIATE,
() -> ReadDataSerializers.readData,
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP
),
ACCORD_COMMIT_REQ (127, P2, writeTimeout, IMMEDIATE,
() -> CommitSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP
),
ACCORD_COMMIT_INVALIDATE_REQ (128, P2, writeTimeout, IMMEDIATE,
() -> CommitSerializers.invalidate,
AccordService::requestHandlerOrNoop
),
ACCORD_APPLY_RSP (129, P2, writeTimeout, IMMEDIATE,
() -> ApplySerializers.reply,
AccordService::responseHandlerOrNoop
),
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index 22fe8f5ad2..968068481d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -259,42 +259,37 @@ public class AccordMessageSink implements MessageSink
@Override
public void send(Node.Id to, Request request, AgentExecutor executor,
Callback callback)
{
- long nowNanos = Clock.Global.nanoTime();
Verb verb = getVerb(request);
Preconditions.checkNotNull(verb, "Verb is null for type %s",
request.type());
- Message<Request> message;
- if (isRangeBarrier(request))
- {
- message = Message.out(verb, request, nowNanos +
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos());
- }
- else
- {
- message = Message.out(verb, request);
- }
- InetAddressAndPort endpoint = endpointMapper.mappedEndpoint(to);
- logger.trace("Sending {} {} to {}", verb, message.payload, endpoint);
- long expiresAtNanos = message.expiresAtNanos();
+ long nowNanos = Clock.Global.nanoTime();
+ long expiresAtNanos;
+ if (isRangeBarrier(request)) expiresAtNanos = nowNanos +
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos();
+ else expiresAtNanos = nowNanos + verb.expiresAfterNanos();
+ long delayedAtNanos = Long.MAX_VALUE;
switch (verb)
{
+ case ACCORD_COMMIT_REQ:
+ if (((Commit)request).readData == null)
+ break;
+
case ACCORD_READ_REQ:
- {
- long delayedAtNanos = Long.MAX_VALUE;
- if (slowRead != null && !isRangeBarrier(request))
delayedAtNanos = nowNanos + slowRead.computeWait(1, NANOSECONDS);
- callbacks.register(message.id(), executor, callback, to,
nowNanos, delayedAtNanos, expiresAtNanos, NANOSECONDS);
+ if (slowRead == null || isRangeBarrier(request))
+ break;
+
+ case ACCORD_CHECK_STATUS_REQ:
+ delayedAtNanos = nowNanos + slowRead.computeWait(1,
NANOSECONDS);
break;
- }
+
case ACCORD_PRE_ACCEPT_REQ:
- {
- long delayedAtNanos = Long.MAX_VALUE;
- if (slowPreaccept != null && !isRangeBarrier(request))
delayedAtNanos = slowPreaccept.computeWait(1, NANOSECONDS);
- callbacks.register(message.id(), executor, callback, to,
delayedAtNanos, expiresAtNanos, NANOSECONDS);
- break;
- }
- default:
- {
- callbacks.register(message.id(), executor, callback, to,
expiresAtNanos, NANOSECONDS);
- }
+ if (slowPreaccept == null || isRangeBarrier(request))
+ break;
+ delayedAtNanos = nowNanos + slowPreaccept.computeWait(1,
NANOSECONDS);
}
+
+ Message<Request> message = Message.out(verb, request, expiresAtNanos);
+ InetAddressAndPort endpoint = endpointMapper.mappedEndpoint(to);
+ logger.trace("Sending {} {} to {}", verb, message.payload, endpoint);
+ callbacks.registerAt(message.id(), executor, callback, to, nowNanos,
delayedAtNanos, expiresAtNanos, NANOSECONDS);
messaging.send(message, endpoint);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
index 34c7b26bd9..59fc056b90 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
@@ -60,7 +60,7 @@ public class AccordVerbHandler<T extends Request> implements
IVerbHandler<T>
Node.Id fromNodeId = endpointMapper.mappedId(message.from());
long waitForEpoch = request.waitForEpoch();
- if (node.topology().hasEpoch(waitForEpoch))
+ if (node.topology().hasAtLeastEpoch(waitForEpoch))
request.process(node, fromNodeId, message);
else
node.withEpoch(waitForEpoch, (ignored, withEpochFailure) -> {
diff --git
a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
index 2185df91cb..658851ebda 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
@@ -115,7 +115,7 @@ public class AccordTopologySorter implements TopologySorter
VersionedValue event =
epState.getApplicationState(ApplicationState.SEVERITY);
if (event == null)
- return true;
+ return false;
return Double.parseDouble(event.value) == 0.0;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java
b/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java
index d7f17e02b9..597e4aad86 100644
---
a/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java
+++
b/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java
@@ -85,6 +85,11 @@ public class CompositeTopologySorter implements
TopologySorter
@Override
public boolean isFaulty(Node.Id node)
{
+ for (int i = 0; i < delegates.length; i++)
+ {
+ if (delegates[i].isFaulty(node))
+ return true;
+ }
return false;
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 6ef51b957d..c537434d95 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -145,7 +145,7 @@ public class ReadDataSerializers
}
};
- private static final ReadDataSerializer<ReadEphemeralTxnData>
readEphemeralTxnData = new ReadDataSerializer<ReadEphemeralTxnData>()
+ public static final ReadDataSerializer<ReadEphemeralTxnData>
readEphemeralTxnData = new ReadDataSerializer<>()
{
@Override
public void serialize(ReadEphemeralTxnData read, DataOutputPlus out,
int version) throws IOException
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 8fc86c3fdc..bfb8ed8997 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -27,9 +27,14 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.RateLimiter;
import org.junit.BeforeClass;
@@ -62,7 +67,8 @@ public class AccordLoadTest extends AccordTestBase
public static void setUp() throws IOException
{
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
- AccordTestBase.setupCluster(builder -> builder.withConfig(config ->
config.with(Feature.values())), 3);
+ AccordTestBase.setupCluster(builder -> builder, 3);
+// AccordTestBase.setupCluster(builder -> builder.withConfig(config ->
config.with(Feature.values())), 3);
}
@Ignore
@@ -72,162 +78,187 @@ public class AccordLoadTest extends AccordTestBase
test("CREATE TABLE " + qualifiedAccordTableName + " (k int, v int,
PRIMARY KEY(k)) WITH transactional_mode = 'full'",
cluster -> {
- final ConcurrentHashMap<Verb, AtomicInteger> verbs = new
ConcurrentHashMap<>();
- cluster.filters().outbound().messagesMatching(new
IMessageFilters.Matcher()
+ try
{
- @Override
- public boolean matches(int i, int i1, IMessage iMessage)
+
+ final ConcurrentHashMap<Verb, AtomicInteger> verbs = new
ConcurrentHashMap<>();
+ cluster.filters().outbound().messagesMatching(new
IMessageFilters.Matcher()
{
- verbs.computeIfAbsent(Verb.fromId(iMessage.verb()),
ignore -> new AtomicInteger()).incrementAndGet();
- return false;
- }
- }).drop();
-
- cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
- cluster.forEach(i -> i.runOnInstance(() -> {
- ((AccordService)
AccordService.instance()).journal().compactor().updateCompactionPeriod(1,
SECONDS);
-// ((AccordSpec.JournalSpec)((AccordService)
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
- }));
-
- ICoordinator coordinator = cluster.coordinator(1);
- final int repairInterval = 3000;
- final int compactionInterval = 3000;
- final int flushInterval = 1000;
- final int restartInterval = 10_000;
- final int batchSizeLimit = 1000;
- final long batchTime = TimeUnit.SECONDS.toNanos(10);
- final int concurrency = 100;
- final int ratePerSecond = 1000;
- final int keyCount = 1000000;
- final float readChance = 0.33f;
- long nextRepairAt = repairInterval;
- long nextCompactionAt = compactionInterval;
- long nextFlushAt = flushInterval;
- long nextRestartAt = restartInterval;
- final BitSet initialised = new BitSet();
-
- Random random = new Random();
-// CopyOnWriteArrayList<Throwable> exceptions = new
CopyOnWriteArrayList<>();
- final Semaphore inFlight = new Semaphore(concurrency);
- final RateLimiter rateLimiter =
RateLimiter.create(ratePerSecond);
-// long testStart = System.nanoTime();
-// while (NANOSECONDS.toMinutes(System.nanoTime() - testStart)
< 10 && exceptions.size() < 10000)
- while (true)
- {
- final EstimatedHistogram histogram = new
EstimatedHistogram(200);
- long batchStart = System.nanoTime();
- long batchEnd = batchStart + batchTime;
- int batchSize = 0;
- while (batchSize < batchSizeLimit)
+ @Override
+ public boolean matches(int i, int i1, IMessage
iMessage)
+ {
+
verbs.computeIfAbsent(Verb.fromId(iMessage.verb()), ignore -> new
AtomicInteger()).incrementAndGet();
+ return false;
+ }
+ }).drop();
+
+ ICoordinator coordinator = cluster.coordinator(1);
+ final int repairInterval = Integer.MAX_VALUE;
+ // final int repairInterval = 3000;
+ final int compactionInterval = Integer.MAX_VALUE;
+ // final int compactionInterval = 3000;
+ final int flushInterval = Integer.MAX_VALUE;
+ // final int flushInterval = 1000;
+ final int compactionPeriodSeconds = -1;
+ final int restartInterval = 150_000_000;
+ final int batchSizeLimit = 1000;
+ final long batchTime = TimeUnit.SECONDS.toNanos(10);
+ final int concurrency = 100;
+ final int ratePerSecond = 1000;
+ final int keyCount = 1000000;
+ final float readChance = 0.33f;
+ long nextRepairAt = repairInterval;
+ long nextCompactionAt = compactionInterval;
+ long nextFlushAt = flushInterval;
+ long nextRestartAt = restartInterval;
+ final ExecutorService restartExecutor =
Executors.newSingleThreadExecutor();
+ final BitSet initialised = new BitSet();
+
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
+ cluster.forEach(i -> i.runOnInstance(() -> {
+ if (compactionPeriodSeconds > 0)
+ ((AccordService)
AccordService.instance()).journal().compactor().updateCompactionPeriod(1,
SECONDS);
+ // ((AccordSpec.JournalSpec)((AccordService)
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
+ }));
+
+ Random random = new Random();
+ // CopyOnWriteArrayList<Throwable> exceptions = new
CopyOnWriteArrayList<>();
+ final Semaphore inFlight = new Semaphore(concurrency);
+ final RateLimiter rateLimiter =
RateLimiter.create(ratePerSecond);
+ // long testStart = System.nanoTime();
+ // while (NANOSECONDS.toMinutes(System.nanoTime() -
testStart) < 10 && exceptions.size() < 10000)
+ while (true)
{
- inFlight.acquire();
- rateLimiter.acquire();
- long commandStart = System.nanoTime();
- int k = random.nextInt(keyCount);
- if (random.nextFloat() < readChance)
+ final EstimatedHistogram histogram = new
EstimatedHistogram(200);
+ long batchStart = System.nanoTime();
+ long batchEnd = batchStart + batchTime;
+ int batchSize = 0;
+ while (batchSize < batchSizeLimit)
{
- coordinator.executeWithResult((success, fail) -> {
- inFlight.release();
- if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
- // else
exceptions.add(fail);
- }, "SELECT * FROM " + qualifiedAccordTableName +
" WHERE k = ?;", ConsistencyLevel.SERIAL, k);
+ inFlight.acquire();
+ rateLimiter.acquire();
+ long commandStart = System.nanoTime();
+ int k = random.nextInt(keyCount);
+ if (random.nextFloat() < readChance)
+ {
+ coordinator.executeWithResult((success, fail)
-> {
+ inFlight.release();
+ if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+ // else
exceptions.add(fail);
+ }, "SELECT * FROM " +
qualifiedAccordTableName + " WHERE k = ?;", ConsistencyLevel.SERIAL, k);
+ }
+ else if (initialised.get(k))
+ {
+ coordinator.executeWithResult((success, fail)
-> {
+ inFlight.release();
+ if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+ // else exceptions.add(fail);
+ }, "UPDATE " + qualifiedAccordTableName + "
SET v += 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL,
ConsistencyLevel.QUORUM, k);
+ }
+ else
+ {
+ initialised.set(k);
+ coordinator.executeWithResult((success, fail)
-> {
+ inFlight.release();
+ if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+ // else
exceptions.add(fail);
+ }, "UPDATE " + qualifiedAccordTableName + "
SET v = 0 WHERE k = ? IF NOT EXISTS;", ConsistencyLevel.SERIAL,
ConsistencyLevel.QUORUM, k);
+ }
+ batchSize++;
+ if (System.nanoTime() >= batchEnd)
+ break;
}
- else if (initialised.get(k))
+
+ if ((nextRepairAt -= batchSize) <= 0)
{
- coordinator.executeWithResult((success, fail) -> {
- inFlight.release();
- if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
- // else exceptions.add(fail);
- }, "UPDATE " + qualifiedAccordTableName + " SET v
+= 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM,
k);
+ nextRepairAt += repairInterval;
+ System.out.println("repairing...");
+
cluster.coordinator(1).instance().nodetool("repair", qualifiedAccordTableName);
}
- else
+
+ if ((nextCompactionAt -= batchSize) <= 0)
{
- initialised.set(k);
- coordinator.executeWithResult((success, fail) -> {
- inFlight.release();
- if (fail == null)
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
- // else
exceptions.add(fail);
- }, "UPDATE " + qualifiedAccordTableName + " SET v
= 0 WHERE k = ? IF NOT EXISTS;", ConsistencyLevel.SERIAL,
ConsistencyLevel.QUORUM, k);
+ nextCompactionAt += compactionInterval;
+ System.out.println("compacting accord...");
+ cluster.forEach(i -> {
+ i.nodetool("compact",
"system_accord.journal");
+ i.runOnInstance(() -> {
+ ((AccordService)
AccordService.instance()).journal().checkAllCommands();
+ });
+ });
}
- batchSize++;
- if (System.nanoTime() >= batchEnd)
- break;
- }
- if ((nextRepairAt -= batchSize) <= 0)
- {
- nextRepairAt += repairInterval;
- System.out.println("repairing...");
- cluster.coordinator(1).instance().nodetool("repair",
qualifiedAccordTableName);
- }
-
- if ((nextCompactionAt -= batchSize) <= 0)
- {
- nextCompactionAt += compactionInterval;
- System.out.println("compacting accord...");
- cluster.forEach(i -> {
- i.nodetool("compact", "system_accord.journal");
- i.runOnInstance(() -> {
+ if ((nextFlushAt -= batchSize) <= 0)
+ {
+ nextFlushAt += flushInterval;
+ System.out.println("flushing journal...");
+ cluster.forEach(i -> i.runOnInstance(() -> {
+ ((AccordService)
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
((AccordService)
AccordService.instance()).journal().checkAllCommands();
- });
- });
- }
+ }));
+ }
- if ((nextFlushAt -= batchSize) <= 0)
- {
- nextFlushAt += flushInterval;
- System.out.println("flushing journal...");
- cluster.forEach(i -> i.runOnInstance(() -> {
- ((AccordService)
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
- ((AccordService)
AccordService.instance()).journal().checkAllCommands();
- }));
- }
+ if ((nextRestartAt -= batchSize) <= 0)
+ {
+ nextRestartAt += restartInterval;
+ int nodeIdx = random.nextInt(cluster.size());
- if ((nextRestartAt -= batchSize) <= 0)
- {
- nextRestartAt += flushInterval;
- int nodeIdx = random.nextInt(cluster.size());
- System.out.printf("restarting node %d...\n", nodeIdx);
- cluster.get(nodeIdx).shutdown().get();
- cluster.get(nodeIdx).startup();
- }
+ restartExecutor.submit(() -> {
+ System.out.printf("restarting node %d...\n",
nodeIdx);
+ try
+ {
+ cluster.get(nodeIdx).shutdown().get();
+ cluster.get(nodeIdx).startup();
+ return null;
+ }
+ catch (InterruptedException |
ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
- final Date date = new Date();
- System.out.printf("%tT rate: %.2f/s (%d total)\n", date,
(((float)batchSizeLimit * 1000) / NANOSECONDS.toMillis(System.nanoTime() -
batchStart)), batchSize);
- System.out.printf("%tT percentiles: %d %d %d %d\n", date,
histogram.percentile(.25)/1000, histogram.percentile(.5)/1000,
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+ final Date date = new Date();
+ System.out.printf("%tT rate: %.2f/s (%d total)\n",
date, (((float)batchSizeLimit * 1000) / NANOSECONDS.toMillis(System.nanoTime()
- batchStart)), batchSize);
+ System.out.printf("%tT percentiles: %d %d %d %d\n",
date, histogram.percentile(.25)/1000, histogram.percentile(.5)/1000,
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
- class VerbCount
- {
- final Verb verb;
- final int count;
+ class VerbCount
+ {
+ final Verb verb;
+ final int count;
- VerbCount(Verb verb, int count)
+ VerbCount(Verb verb, int count)
+ {
+ this.verb = verb;
+ this.count = count;
+ }
+ }
+ List<VerbCount> verbCounts = new ArrayList<>();
+ for (Map.Entry<Verb, AtomicInteger> e :
verbs.entrySet())
{
- this.verb = verb;
- this.count = count;
+ int count = e.getValue().getAndSet(0);
+ if (count != 0) verbCounts.add(new
VerbCount(e.getKey(), count));
}
- }
- List<VerbCount> verbCounts = new ArrayList<>();
- for (Map.Entry<Verb, AtomicInteger> e : verbs.entrySet())
- {
- int count = e.getValue().getAndSet(0);
- if (count != 0) verbCounts.add(new
VerbCount(e.getKey(), count));
- }
- verbCounts.sort(Comparator.comparing(v -> -v.count));
+ verbCounts.sort(Comparator.comparing(v -> -v.count));
- StringBuilder verbSummary = new StringBuilder();
- for (VerbCount vs : verbCounts)
- {
+ StringBuilder verbSummary = new StringBuilder();
+ for (VerbCount vs : verbCounts)
{
- if (verbSummary.length() > 0)
- verbSummary.append(", ");
- verbSummary.append(vs.verb);
- verbSummary.append(": ");
- verbSummary.append(vs.count);
+ {
+ if (verbSummary.length() > 0)
+ verbSummary.append(", ");
+ verbSummary.append(vs.verb);
+ verbSummary.append(": ");
+ verbSummary.append(vs.count);
+ }
}
+ System.out.printf("%tT verbs: %s\n", date,
verbSummary);
}
- System.out.printf("%tT verbs: %s\n", date, verbSummary);
- }
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ }
}
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]