This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 26ccd43d54 Follow-up to: Do not contact faulty replicas, and support 
reporting slow replies for preaccept/read. Do not wait for stale or left nodes 
for durability.
26ccd43d54 is described below

commit 26ccd43d540912c75f22d1e762dc2ebc6f8f2b46
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 12:51:18 2024 +0100

    Follow-up to: Do not contact faulty replicas, and support reporting slow 
replies for preaccept/read. Do not wait for stale or left nodes for durability.
---
 modules/accord                                     |   2 +-
 .../org/apache/cassandra/config/AccordSpec.java    |   4 +-
 .../apache/cassandra/journal/EntrySerializer.java  |   1 +
 .../apache/cassandra/journal/InMemoryIndex.java    |  13 +-
 src/java/org/apache/cassandra/journal/Segment.java |   1 +
 .../apache/cassandra/journal/StaticSegment.java    |   7 +-
 src/java/org/apache/cassandra/net/Verb.java        |   4 +-
 .../service/accord/AccordMessageSink.java          |  51 ++--
 .../service/accord/AccordVerbHandler.java          |   2 +-
 .../service/accord/api/AccordTopologySorter.java   |   2 +-
 .../accord/api/CompositeTopologySorter.java        |   5 +
 .../accord/serializers/ReadDataSerializers.java    |   2 +-
 .../distributed/test/accord/AccordLoadTest.java    | 301 ++++++++++++---------
 13 files changed, 214 insertions(+), 181 deletions(-)

diff --git a/modules/accord b/modules/accord
index d24d8f5d86..291cbe70ad 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit d24d8f5d866783b71205631944cfecf9a63d4f0c
+Subproject commit 291cbe70ad82b0d5f875101f541d9f841912802f
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index d5a53c2636..451bfeaa54 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -40,8 +40,8 @@ public class AccordSpec
     // TODO (expected): we should be able to support lower recover delays, at 
least for txns
     public volatile DurationSpec.IntMillisecondsBound recover_delay = new 
DurationSpec.IntMillisecondsBound(5000);
     public volatile DurationSpec.IntMillisecondsBound range_sync_recover_delay 
= new DurationSpec.IntMillisecondsBound(10000);
-    public String slowPreAccept = "100ms <= p50*2 <= 1000ms";
-    public String slowRead = "100ms <= p50*2 <= 1000ms";
+    public String slowPreAccept = "30ms <= p50*2 <= 100ms";
+    public String slowRead = "30ms <= p50*2 <= 100ms";
 
     public long recoveryDelayFor(TxnId txnId, TimeUnit unit)
     {
diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java 
b/src/java/org/apache/cassandra/journal/EntrySerializer.java
index 2a707e7d73..48ef59a6e4 100644
--- a/src/java/org/apache/cassandra/journal/EntrySerializer.java
+++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.zip.CRC32;
 
+import accord.utils.Invariants;
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputBuffer;
diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java 
b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
index 1f0da7fd28..8141f338a9 100644
--- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java
+++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
@@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.journal.StaticSegment.SequentialReader;
 
 /**
  * An index for a segment that's still being updated by journal writers 
concurrently.
@@ -138,17 +140,10 @@ final class InMemoryIndex<K> extends Index<K>
     {
         InMemoryIndex<K> index = new InMemoryIndex<>(keySupport, new 
TreeMap<>(keySupport));
 
-        try (StaticSegment.SequentialReader<K> reader = 
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
+        try (SequentialReader<K> reader = 
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
         {
-            int last = -1;
             while (reader.advance())
-            {
-                int current = reader.offset();
-                if (last >= 0)
-                    index.update(reader.key(), last, current);
-                last = current;
-            }
-
+                index.update(reader.key(), reader.offset, 
reader.buffer.position() - reader.offset);
         }
         return index;
     }
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index 7f955669cd..77f7c68fea 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -93,6 +93,7 @@ public abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K,
             int size = Index.readSize(all[i]);
             Invariants.checkState(offset < prevOffset);
             Invariants.checkState(read(offset, size, into), "Read should 
always return true");
+            Invariants.checkState(id.equals(into.key), "Index for %s read 
incorrect key: expected %s but read %s", descriptor, id, into.key);
             onEntry.accept(descriptor.timestamp, offset, into.key, into.value, 
into.hosts, into.userVersion);
         }
     }
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index 3a8c03bb1a..bf46ca8ecd 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.locks.LockSupport;
 
+import accord.utils.Invariants;
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.File;
@@ -260,7 +261,11 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
         ByteBuffer duplicate = 
buffer.duplicate().position(offset).limit(offset + size);
         try (DataInputBuffer in = new DataInputBuffer(duplicate, false))
         {
-            return EntrySerializer.tryRead(into, keySupport, duplicate, in, 
syncedOffsets.syncedOffset(), descriptor.userVersion);
+            if (!EntrySerializer.tryRead(into, keySupport, duplicate, in, 
syncedOffsets.syncedOffset(), descriptor.userVersion))
+                return false;
+
+            Invariants.checkState(in.available() == 0);
+            return true;
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 6109a502a8..732f0e5df0 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -313,8 +313,8 @@ public enum Verb
     ACCORD_ACCEPT_RSP               (122, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.reply,              
AccordService::responseHandlerOrNoop                                           
),
     ACCORD_ACCEPT_REQ               (123, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.request,            
AccordService::requestHandlerOrNoop, ACCORD_ACCEPT_RSP                         
),
     ACCORD_ACCEPT_INVALIDATE_REQ    (124, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.invalidate,         
AccordService::requestHandlerOrNoop, ACCORD_ACCEPT_RSP                         
),
-    ACCORD_READ_RSP                 (125, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.reply,            
AccordService::responseHandlerOrNoop                                           
),
-    ACCORD_READ_REQ                 (126, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.readData,         
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP                           
),
+    ACCORD_READ_RSP                 (125, P2, readTimeout,  IMMEDIATE,         
 () -> ReadDataSerializers.reply,            
AccordService::responseHandlerOrNoop                                           
),
+    ACCORD_READ_REQ                 (126, P2, readTimeout,  IMMEDIATE,         
 () -> ReadDataSerializers.readData,         
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP                           
),
     ACCORD_COMMIT_REQ               (127, P2, writeTimeout, IMMEDIATE,         
 () -> CommitSerializers.request,            
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP                           
),
     ACCORD_COMMIT_INVALIDATE_REQ    (128, P2, writeTimeout, IMMEDIATE,         
 () -> CommitSerializers.invalidate,         
AccordService::requestHandlerOrNoop                                            
),
     ACCORD_APPLY_RSP                (129, P2, writeTimeout, IMMEDIATE,         
 () -> ApplySerializers.reply,               
AccordService::responseHandlerOrNoop                                           
),
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index 22fe8f5ad2..968068481d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -259,42 +259,37 @@ public class AccordMessageSink implements MessageSink
     @Override
     public void send(Node.Id to, Request request, AgentExecutor executor, 
Callback callback)
     {
-        long nowNanos = Clock.Global.nanoTime();
         Verb verb = getVerb(request);
         Preconditions.checkNotNull(verb, "Verb is null for type %s", 
request.type());
-        Message<Request> message;
-        if (isRangeBarrier(request))
-        {
-            message = Message.out(verb, request, nowNanos + 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos());
-        }
-        else
-        {
-            message = Message.out(verb, request);
-        }
-        InetAddressAndPort endpoint = endpointMapper.mappedEndpoint(to);
-        logger.trace("Sending {} {} to {}", verb, message.payload, endpoint);
-        long expiresAtNanos = message.expiresAtNanos();
+        long nowNanos = Clock.Global.nanoTime();
+        long expiresAtNanos;
+        if (isRangeBarrier(request)) expiresAtNanos = nowNanos + 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos();
+        else expiresAtNanos = nowNanos + verb.expiresAfterNanos();
+        long delayedAtNanos = Long.MAX_VALUE;
         switch (verb)
         {
+            case ACCORD_COMMIT_REQ:
+                if (((Commit)request).readData == null)
+                    break;
+
             case ACCORD_READ_REQ:
-            {
-                long delayedAtNanos = Long.MAX_VALUE;
-                if (slowRead != null && !isRangeBarrier(request)) 
delayedAtNanos = nowNanos + slowRead.computeWait(1, NANOSECONDS);
-                callbacks.register(message.id(), executor, callback, to, 
nowNanos, delayedAtNanos, expiresAtNanos, NANOSECONDS);
+                if (slowRead == null || isRangeBarrier(request))
+                    break;
+
+            case ACCORD_CHECK_STATUS_REQ:
+                delayedAtNanos = nowNanos + slowRead.computeWait(1, 
NANOSECONDS);
                 break;
-            }
+
             case ACCORD_PRE_ACCEPT_REQ:
-            {
-                long delayedAtNanos = Long.MAX_VALUE;
-                if (slowPreaccept != null && !isRangeBarrier(request)) 
delayedAtNanos = slowPreaccept.computeWait(1, NANOSECONDS);
-                callbacks.register(message.id(), executor, callback, to, 
delayedAtNanos, expiresAtNanos, NANOSECONDS);
-                break;
-            }
-            default:
-            {
-                callbacks.register(message.id(), executor, callback, to, 
expiresAtNanos, NANOSECONDS);
-            }
+                if (slowPreaccept == null || isRangeBarrier(request))
+                    break;
+                delayedAtNanos = nowNanos + slowPreaccept.computeWait(1, 
NANOSECONDS);
         }
+
+        Message<Request> message = Message.out(verb, request, expiresAtNanos);
+        InetAddressAndPort endpoint = endpointMapper.mappedEndpoint(to);
+        logger.trace("Sending {} {} to {}", verb, message.payload, endpoint);
+        callbacks.registerAt(message.id(), executor, callback, to, nowNanos, 
delayedAtNanos, expiresAtNanos, NANOSECONDS);
         messaging.send(message, endpoint);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java 
b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
index 34c7b26bd9..59fc056b90 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
@@ -60,7 +60,7 @@ public class AccordVerbHandler<T extends Request> implements 
IVerbHandler<T>
         Node.Id fromNodeId = endpointMapper.mappedId(message.from());
         long waitForEpoch = request.waitForEpoch();
 
-        if (node.topology().hasEpoch(waitForEpoch))
+        if (node.topology().hasAtLeastEpoch(waitForEpoch))
             request.process(node, fromNodeId, message);
         else
             node.withEpoch(waitForEpoch, (ignored, withEpochFailure) -> {
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
index 2185df91cb..658851ebda 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
@@ -115,7 +115,7 @@ public class AccordTopologySorter implements TopologySorter
 
         VersionedValue event = 
epState.getApplicationState(ApplicationState.SEVERITY);
         if (event == null)
-            return true;
+            return false;
 
         return Double.parseDouble(event.value) == 0.0;
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java 
b/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java
index d7f17e02b9..597e4aad86 100644
--- 
a/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java
+++ 
b/src/java/org/apache/cassandra/service/accord/api/CompositeTopologySorter.java
@@ -85,6 +85,11 @@ public class CompositeTopologySorter implements 
TopologySorter
     @Override
     public boolean isFaulty(Node.Id node)
     {
+        for (int i = 0; i < delegates.length; i++)
+        {
+            if (delegates[i].isFaulty(node))
+                return true;
+        }
         return false;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 6ef51b957d..c537434d95 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -145,7 +145,7 @@ public class ReadDataSerializers
         }
     };
 
-    private static final ReadDataSerializer<ReadEphemeralTxnData> 
readEphemeralTxnData = new ReadDataSerializer<ReadEphemeralTxnData>()
+    public static final ReadDataSerializer<ReadEphemeralTxnData> 
readEphemeralTxnData = new ReadDataSerializer<>()
     {
         @Override
         public void serialize(ReadEphemeralTxnData read, DataOutputPlus out, 
int version) throws IOException
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 8fc86c3fdc..bfb8ed8997 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -27,9 +27,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.RateLimiter;
 import org.junit.BeforeClass;
@@ -62,7 +67,8 @@ public class AccordLoadTest extends AccordTestBase
     public static void setUp() throws IOException
     {
         
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
-        AccordTestBase.setupCluster(builder -> builder.withConfig(config -> 
config.with(Feature.values())), 3);
+        AccordTestBase.setupCluster(builder -> builder, 3);
+//        AccordTestBase.setupCluster(builder -> builder.withConfig(config -> 
config.with(Feature.values())), 3);
     }
 
     @Ignore
@@ -72,162 +78,187 @@ public class AccordLoadTest extends AccordTestBase
         test("CREATE TABLE " + qualifiedAccordTableName + " (k int, v int, 
PRIMARY KEY(k)) WITH transactional_mode = 'full'",
              cluster -> {
 
-                final ConcurrentHashMap<Verb, AtomicInteger> verbs = new 
ConcurrentHashMap<>();
-                cluster.filters().outbound().messagesMatching(new 
IMessageFilters.Matcher()
+                try
                 {
-                    @Override
-                    public boolean matches(int i, int i1, IMessage iMessage)
+
+                    final ConcurrentHashMap<Verb, AtomicInteger> verbs = new 
ConcurrentHashMap<>();
+                    cluster.filters().outbound().messagesMatching(new 
IMessageFilters.Matcher()
                     {
-                        verbs.computeIfAbsent(Verb.fromId(iMessage.verb()), 
ignore -> new AtomicInteger()).incrementAndGet();
-                        return false;
-                    }
-                }).drop();
-
-                 cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
-                 cluster.forEach(i -> i.runOnInstance(() -> {
-                     ((AccordService) 
AccordService.instance()).journal().compactor().updateCompactionPeriod(1, 
SECONDS);
-//                     ((AccordSpec.JournalSpec)((AccordService) 
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
-                 }));
-
-                 ICoordinator coordinator = cluster.coordinator(1);
-                 final int repairInterval = 3000;
-                 final int compactionInterval = 3000;
-                 final int flushInterval = 1000;
-                 final int restartInterval = 10_000;
-                 final int batchSizeLimit = 1000;
-                 final long batchTime = TimeUnit.SECONDS.toNanos(10);
-                 final int concurrency = 100;
-                 final int ratePerSecond = 1000;
-                 final int keyCount = 1000000;
-                 final float readChance = 0.33f;
-                 long nextRepairAt = repairInterval;
-                 long nextCompactionAt = compactionInterval;
-                 long nextFlushAt = flushInterval;
-                 long nextRestartAt = restartInterval;
-                 final BitSet initialised = new BitSet();
-
-                 Random random = new Random();
-//                 CopyOnWriteArrayList<Throwable> exceptions = new 
CopyOnWriteArrayList<>();
-                 final Semaphore inFlight = new Semaphore(concurrency);
-                 final RateLimiter rateLimiter = 
RateLimiter.create(ratePerSecond);
-//                 long testStart = System.nanoTime();
-//                 while (NANOSECONDS.toMinutes(System.nanoTime() - testStart) 
< 10 && exceptions.size() < 10000)
-                 while (true)
-                 {
-                     final EstimatedHistogram histogram = new 
EstimatedHistogram(200);
-                     long batchStart = System.nanoTime();
-                     long batchEnd = batchStart + batchTime;
-                     int batchSize = 0;
-                     while (batchSize < batchSizeLimit)
+                        @Override
+                        public boolean matches(int i, int i1, IMessage 
iMessage)
+                        {
+                            
verbs.computeIfAbsent(Verb.fromId(iMessage.verb()), ignore -> new 
AtomicInteger()).incrementAndGet();
+                            return false;
+                        }
+                    }).drop();
+
+                     ICoordinator coordinator = cluster.coordinator(1);
+                     final int repairInterval = Integer.MAX_VALUE;
+    //                 final int repairInterval = 3000;
+                     final int compactionInterval = Integer.MAX_VALUE;
+    //                 final int compactionInterval = 3000;
+                     final int flushInterval = Integer.MAX_VALUE;
+    //                 final int flushInterval = 1000;
+                     final int compactionPeriodSeconds = -1;
+                     final int restartInterval = 150_000_000;
+                     final int batchSizeLimit = 1000;
+                     final long batchTime = TimeUnit.SECONDS.toNanos(10);
+                     final int concurrency = 100;
+                     final int ratePerSecond = 1000;
+                     final int keyCount = 1000000;
+                     final float readChance = 0.33f;
+                     long nextRepairAt = repairInterval;
+                     long nextCompactionAt = compactionInterval;
+                     long nextFlushAt = flushInterval;
+                     long nextRestartAt = restartInterval;
+                     final ExecutorService restartExecutor = 
Executors.newSingleThreadExecutor();
+                     final BitSet initialised = new BitSet();
+
+                     cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+                     cluster.forEach(i -> i.runOnInstance(() -> {
+                         if (compactionPeriodSeconds > 0)
+                            ((AccordService) 
AccordService.instance()).journal().compactor().updateCompactionPeriod(1, 
SECONDS);
+    //                     ((AccordSpec.JournalSpec)((AccordService) 
AccordService.instance()).journal().configuration()).segmentSize = 128 << 10;
+                     }));
+
+                     Random random = new Random();
+    //                 CopyOnWriteArrayList<Throwable> exceptions = new 
CopyOnWriteArrayList<>();
+                     final Semaphore inFlight = new Semaphore(concurrency);
+                     final RateLimiter rateLimiter = 
RateLimiter.create(ratePerSecond);
+    //                 long testStart = System.nanoTime();
+    //                 while (NANOSECONDS.toMinutes(System.nanoTime() - 
testStart) < 10 && exceptions.size() < 10000)
+                     while (true)
                      {
-                         inFlight.acquire();
-                         rateLimiter.acquire();
-                         long commandStart = System.nanoTime();
-                         int k = random.nextInt(keyCount);
-                         if (random.nextFloat() < readChance)
+                         final EstimatedHistogram histogram = new 
EstimatedHistogram(200);
+                         long batchStart = System.nanoTime();
+                         long batchEnd = batchStart + batchTime;
+                         int batchSize = 0;
+                         while (batchSize < batchSizeLimit)
                          {
-                             coordinator.executeWithResult((success, fail) -> {
-                                 inFlight.release();
-                                 if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
-                                 //                             else 
exceptions.add(fail);
-                             }, "SELECT * FROM " + qualifiedAccordTableName + 
" WHERE k = ?;", ConsistencyLevel.SERIAL, k);
+                             inFlight.acquire();
+                             rateLimiter.acquire();
+                             long commandStart = System.nanoTime();
+                             int k = random.nextInt(keyCount);
+                             if (random.nextFloat() < readChance)
+                             {
+                                 coordinator.executeWithResult((success, fail) 
-> {
+                                     inFlight.release();
+                                     if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+                                     //                             else 
exceptions.add(fail);
+                                 }, "SELECT * FROM " + 
qualifiedAccordTableName + " WHERE k = ?;", ConsistencyLevel.SERIAL, k);
+                             }
+                             else if (initialised.get(k))
+                             {
+                                 coordinator.executeWithResult((success, fail) 
-> {
+                                     inFlight.release();
+                                     if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+        //                             else exceptions.add(fail);
+                                 }, "UPDATE " + qualifiedAccordTableName + " 
SET v += 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, 
ConsistencyLevel.QUORUM, k);
+                             }
+                             else
+                             {
+                                 initialised.set(k);
+                                 coordinator.executeWithResult((success, fail) 
-> {
+                                     inFlight.release();
+                                     if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+                                     //                             else 
exceptions.add(fail);
+                                 }, "UPDATE " + qualifiedAccordTableName + " 
SET v = 0 WHERE k = ? IF NOT EXISTS;", ConsistencyLevel.SERIAL, 
ConsistencyLevel.QUORUM, k);
+                             }
+                             batchSize++;
+                             if (System.nanoTime() >= batchEnd)
+                                 break;
                          }
-                         else if (initialised.get(k))
+
+                         if ((nextRepairAt -= batchSize) <= 0)
                          {
-                             coordinator.executeWithResult((success, fail) -> {
-                                 inFlight.release();
-                                 if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
-    //                             else exceptions.add(fail);
-                             }, "UPDATE " + qualifiedAccordTableName + " SET v 
+= 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, 
k);
+                             nextRepairAt += repairInterval;
+                             System.out.println("repairing...");
+                             
cluster.coordinator(1).instance().nodetool("repair", qualifiedAccordTableName);
                          }
-                         else
+
+                         if ((nextCompactionAt -= batchSize) <= 0)
                          {
-                             initialised.set(k);
-                             coordinator.executeWithResult((success, fail) -> {
-                                 inFlight.release();
-                                 if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
-                                 //                             else 
exceptions.add(fail);
-                             }, "UPDATE " + qualifiedAccordTableName + " SET v 
= 0 WHERE k = ? IF NOT EXISTS;", ConsistencyLevel.SERIAL, 
ConsistencyLevel.QUORUM, k);
+                             nextCompactionAt += compactionInterval;
+                             System.out.println("compacting accord...");
+                             cluster.forEach(i -> {
+                                 i.nodetool("compact", 
"system_accord.journal");
+                                 i.runOnInstance(() -> {
+                                     ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
+                                 });
+                             });
                          }
-                         batchSize++;
-                         if (System.nanoTime() >= batchEnd)
-                             break;
-                     }
 
-                     if ((nextRepairAt -= batchSize) <= 0)
-                     {
-                         nextRepairAt += repairInterval;
-                         System.out.println("repairing...");
-                         cluster.coordinator(1).instance().nodetool("repair", 
qualifiedAccordTableName);
-                     }
-
-                     if ((nextCompactionAt -= batchSize) <= 0)
-                     {
-                         nextCompactionAt += compactionInterval;
-                         System.out.println("compacting accord...");
-                         cluster.forEach(i -> {
-                             i.nodetool("compact", "system_accord.journal");
-                             i.runOnInstance(() -> {
+                         if ((nextFlushAt -= batchSize) <= 0)
+                         {
+                             nextFlushAt += flushInterval;
+                             System.out.println("flushing journal...");
+                             cluster.forEach(i -> i.runOnInstance(() -> {
+                                 ((AccordService) 
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
                                  ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
-                             });
-                         });
-                     }
+                             }));
+                         }
 
-                     if ((nextFlushAt -= batchSize) <= 0)
-                     {
-                         nextFlushAt += flushInterval;
-                         System.out.println("flushing journal...");
-                         cluster.forEach(i -> i.runOnInstance(() -> {
-                             ((AccordService) 
AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty();
-                             ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
-                         }));
-                     }
+                         if ((nextRestartAt -= batchSize) <= 0)
+                         {
+                             nextRestartAt += restartInterval;
+                             int nodeIdx = random.nextInt(cluster.size());
 
-                     if ((nextRestartAt -= batchSize) <= 0)
-                     {
-                         nextRestartAt += flushInterval;
-                         int nodeIdx = random.nextInt(cluster.size());
-                         System.out.printf("restarting node %d...\n", nodeIdx);
-                         cluster.get(nodeIdx).shutdown().get();
-                         cluster.get(nodeIdx).startup();
-                     }
+                             restartExecutor.submit(() -> {
+                                 System.out.printf("restarting node %d...\n", 
nodeIdx);
+                                 try
+                                 {
+                                     cluster.get(nodeIdx).shutdown().get();
+                                     cluster.get(nodeIdx).startup();
+                                     return null;
+                                 }
+                                 catch (InterruptedException | 
ExecutionException e)
+                                 {
+                                     throw new RuntimeException(e);
+                                 }
+                             });
+                         }
 
-                     final Date date = new Date();
-                     System.out.printf("%tT rate: %.2f/s (%d total)\n", date, 
(((float)batchSizeLimit * 1000) / NANOSECONDS.toMillis(System.nanoTime() - 
batchStart)), batchSize);
-                     System.out.printf("%tT percentiles: %d %d %d %d\n", date, 
histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, 
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+                         final Date date = new Date();
+                         System.out.printf("%tT rate: %.2f/s (%d total)\n", 
date, (((float)batchSizeLimit * 1000) / NANOSECONDS.toMillis(System.nanoTime() 
- batchStart)), batchSize);
+                         System.out.printf("%tT percentiles: %d %d %d %d\n", 
date, histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, 
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
 
-                     class VerbCount
-                     {
-                         final Verb verb;
-                         final int count;
+                         class VerbCount
+                         {
+                             final Verb verb;
+                             final int count;
 
-                         VerbCount(Verb verb, int count)
+                             VerbCount(Verb verb, int count)
+                             {
+                                 this.verb = verb;
+                                 this.count = count;
+                             }
+                         }
+                         List<VerbCount> verbCounts = new ArrayList<>();
+                         for (Map.Entry<Verb, AtomicInteger> e : 
verbs.entrySet())
                          {
-                             this.verb = verb;
-                             this.count = count;
+                             int count = e.getValue().getAndSet(0);
+                             if (count != 0) verbCounts.add(new 
VerbCount(e.getKey(), count));
                          }
-                     }
-                     List<VerbCount> verbCounts = new ArrayList<>();
-                     for (Map.Entry<Verb, AtomicInteger> e : verbs.entrySet())
-                     {
-                         int count = e.getValue().getAndSet(0);
-                         if (count != 0) verbCounts.add(new 
VerbCount(e.getKey(), count));
-                     }
-                     verbCounts.sort(Comparator.comparing(v -> -v.count));
+                         verbCounts.sort(Comparator.comparing(v -> -v.count));
 
-                     StringBuilder verbSummary = new StringBuilder();
-                     for (VerbCount vs : verbCounts)
-                     {
+                         StringBuilder verbSummary = new StringBuilder();
+                         for (VerbCount vs : verbCounts)
                          {
-                             if (verbSummary.length() > 0)
-                                 verbSummary.append(", ");
-                             verbSummary.append(vs.verb);
-                             verbSummary.append(": ");
-                             verbSummary.append(vs.count);
+                             {
+                                 if (verbSummary.length() > 0)
+                                     verbSummary.append(", ");
+                                 verbSummary.append(vs.verb);
+                                 verbSummary.append(": ");
+                                 verbSummary.append(vs.count);
+                             }
                          }
+                         System.out.printf("%tT verbs: %s\n", date, 
verbSummary);
                      }
-                     System.out.printf("%tT verbs: %s\n", date, verbSummary);
-                 }
+                }
+                catch (Throwable t)
+                {
+                    t.printStackTrace();
+                }
              }
         );
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to