This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 72abe53868 Bring back Journal simulator (w/o Accord at least for now);
add semaphore interceptor.
72abe53868 is described below
commit 72abe5386897f8c29b96f953eb8338aa06ee7bda
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Jun 5 08:57:05 2024 +0200
Bring back Journal simulator (w/o Accord at least for now); add semaphore
interceptor.
Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-19695.
---
.../org/apache/cassandra/utils/MonotonicClock.java | 9 +-
.../distributed/test/log/CMSTestBase.java | 9 +-
.../systems/InterceptingGlobalMethods.java | 13 +
.../simulator/systems/InterceptingSemaphore.java | 197 ++++++++++++
.../systems/InterceptorOfGlobalMethods.java | 27 +-
.../test/AccordJournalSimulationTest.java | 331 ++++++++++-----------
.../cassandra/simulator/test/SemaphoreTest.java | 154 ++++++++++
.../simulator/test/SimulationTestBase.java | 36 ++-
.../simulator/test/TrivialSimulationTest.java | 14 +-
9 files changed, 608 insertions(+), 182 deletions(-)
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java
b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index 7be54c008b..c68ed5d9ef 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -248,7 +248,14 @@ public interface MonotonicClock
public static class SystemClock extends AbstractEpochSamplingClock
{
- private SystemClock()
+ // Without making this constructor public you may start getting the
following exception in the simulator:
+ // java.lang.IncompatibleClassChangeError: Type
+ // org.apache.cassandra.utils.MonotonicClock$Global is not a
nest member of
+ // org.apache.cassandra.utils.MonotonicClock: types are in
different packages
+ // There might be a problem with a simulator and how we allow access,
but I verified the change access
+ // flags on <init> method of the
org/apache/cassandra/utils/MonotonicClock$SystemClock
+ // class to ACC_PUBLIC, and ensured proper testing relationship from
both the surrounding and nested class.
+ public SystemClock()
{
super(Clock.Global::currentTimeMillis);
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
index c1975cdbfe..e254798915 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
@@ -91,10 +91,15 @@ public class CMSTestBase
public final TokenPlacementModel.ReplicationFactor rf;
public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog,
Processor> processorFactory, boolean addListeners,
TokenPlacementModel.ReplicationFactor rf)
+ {
+ this(processorFactory, addListeners,
Mockito.mock(SchemaProvider.class), rf);
+ }
+
+ public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog,
Processor> processorFactory, boolean addListeners, SchemaProvider
schemaProvider, TokenPlacementModel.ReplicationFactor rf)
{
partitioner = Murmur3Partitioner.instance;
this.rf = rf;
- schemaProvider = Mockito.mock(SchemaProvider.class);
+ this.schemaProvider = schemaProvider;
ClusterMetadata initial = new ClusterMetadata(partitioner);
log = LocalLog.logSpec()
.sync()
@@ -127,7 +132,7 @@ public class CMSTestBase
}, schemaProvider));
}
- public void close() throws Exception
+ public void close()
{
ClusterMetadataService.unsetInstance();
}
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
index abc71eae35..34c0f6bacc 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
@@ -33,6 +33,7 @@ import
org.apache.cassandra.simulator.systems.InterceptedWait.InterceptedConditi
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
+import org.apache.cassandra.utils.concurrent.Semaphore;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static
org.apache.cassandra.config.CassandraRelevantProperties.TEST_SIMULATOR_DETERMINISM_CHECK;
@@ -60,6 +61,18 @@ public class InterceptingGlobalMethods extends
InterceptingMonitors implements I
this.onUncaughtException = onUncaughtException;
}
+ @Override
+ public Semaphore newSemaphore(int count)
+ {
+ return new InterceptingSemaphore(count, false);
+ }
+
+ @Override
+ public Semaphore newFairSemaphore(int count)
+ {
+ return new InterceptingSemaphore(count, true);
+ }
+
@Override
public WaitQueue newWaitQueue()
{
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingSemaphore.java
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingSemaphore.java
new file mode 100644
index 0000000000..60207e0c7f
--- /dev/null
+++
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingSemaphore.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.systems;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.utils.concurrent.Semaphore;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
+import static
org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods.Global.ifIntercepted;
+
+public class InterceptingSemaphore extends Semaphore.Standard
+{
+ final Queue<SemaphoreSignal> interceptible = new ConcurrentLinkedQueue<>();
+ final AtomicInteger permits;
+ final boolean fair;
+
+ private static class SemaphoreSignal extends
InterceptingAwaitable.InterceptingSignal<Void>
+ {
+ private final int permits;
+
+ private SemaphoreSignal(int permits)
+ {
+ this.permits = permits;
+ }
+ }
+
+ public InterceptingSemaphore(int permits, boolean fair)
+ {
+ super(permits);
+ this.permits = new AtomicInteger(permits);
+ this.fair = fair;
+ }
+
+ @Override
+ public int permits()
+ {
+ if (ifIntercepted() == null)
+ return super.permits();
+
+ return permits.get();
+ }
+
+ @Override
+ public int drain()
+ {
+ if (ifIntercepted() == null)
+ return super.drain();
+
+ for (int i = 0; i < 10; i++)
+ {
+ int current = permits.get();
+ if (permits.compareAndSet(current, 0))
+ return current;
+ }
+
+ throw new IllegalStateException("Too much contention");
+ }
+
+ @Override
+ public void release(int release)
+ {
+ if (ifIntercepted() == null)
+ {
+ super.release(release);
+ return;
+ }
+
+ int remaining = permits.addAndGet(release);
+ while (!interceptible.isEmpty() && remaining > 0)
+ {
+ SemaphoreSignal signal = interceptible.peek();
+ if (signal.permits >= remaining)
+ interceptible.poll().signal();
+ else if (fair)
+ // Do not break enqueue order if using fair scheduler
+ break;
+ }
+ }
+
+ @Override
+ public boolean tryAcquire(int acquire)
+ {
+ if (ifIntercepted() == null)
+ return super.tryAcquire(acquire);
+
+ for (int i = 0; i < 10; i++)
+ {
+ int current = permits.get();
+ if (current >= acquire)
+ {
+ if (permits.compareAndSet(current, current - acquire))
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ throw new IllegalStateException("Too much contention");
+ }
+
+ @Override
+ public boolean tryAcquire(int acquire, long time, TimeUnit unit) throws
InterruptedException
+ {
+ if (ifIntercepted() == null)
+ return super.tryAcquire(acquire, time, unit);
+
+ while (true)
+ {
+ int current = permits.get();
+ if (current >= acquire && permits.compareAndSet(current, current -
acquire))
+ return true;
+
+ SemaphoreSignal signal = new SemaphoreSignal(acquire);
+ interceptible.add(signal);
+ boolean res = signal.await(time, unit);
+ interceptible.remove(signal);
+ if (!res)
+ return false;
+ }
+ }
+
+ @Override
+ public boolean tryAcquireUntil(int acquire, long deadline) throws
InterruptedException
+ {
+ if (ifIntercepted() == null)
+ return super.tryAcquireUntil(acquire, deadline);
+
+ while (true)
+ {
+ int current = permits.get();
+ if (current >= acquire && permits.compareAndSet(current, current -
acquire))
+ return true;
+
+ SemaphoreSignal signal = new SemaphoreSignal(acquire);
+ interceptible.add(signal);
+ boolean res = signal.awaitUntil(deadline);
+ interceptible.remove(signal);
+ if (!res)
+ return false;
+ }
+ }
+
+ @Override
+ public void acquire(int acquire) throws InterruptedException
+ {
+ if (ifIntercepted() == null)
+ {
+ super.acquire(acquire);
+ return;
+ }
+
+ while (true)
+ {
+ if (tryAcquire(acquire))
+ return;
+
+ SemaphoreSignal signal = new SemaphoreSignal(acquire);
+ interceptible.add(signal);
+ signal.await();
+ interceptible.remove(signal);
+ }
+ }
+
+ @Override
+ public void acquireThrowUncheckedOnInterrupt(int acquire) throws
UncheckedInterruptedException
+ {
+ try
+ {
+ acquire(acquire);
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ }
+}
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
index bd8478d47b..eca4ebb2b4 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.utils.concurrent.BlockingQueues;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.apache.cassandra.utils.concurrent.Semaphore;
-import org.apache.cassandra.utils.concurrent.Semaphore.Standard;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
@@ -47,6 +46,8 @@ import static
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@Shared(scope = SIMULATION, inner = INTERFACES)
public interface InterceptorOfGlobalMethods extends
InterceptorOfSystemMethods, Closeable
{
+ Semaphore newSemaphore(int count);
+ Semaphore newFairSemaphore(int count);
WaitQueue newWaitQueue();
CountDownLatch newCountDownLatch(int count);
Condition newOneTimeCondition();
@@ -72,6 +73,26 @@ public interface InterceptorOfGlobalMethods extends
InterceptorOfSystemMethods,
{
static LongConsumer threadLocalRandomCheck;
+ @Override
+ public Semaphore newSemaphore(int count)
+ {
+ Thread thread = Thread.currentThread();
+ if (thread instanceof InterceptibleThread)
+ return ((InterceptibleThread)
thread).interceptorOfGlobalMethods().newSemaphore(count);
+
+ return Semaphore.newSemaphore(count);
+ }
+
+ @Override
+ public Semaphore newFairSemaphore(int count)
+ {
+ Thread thread = Thread.currentThread();
+ if (thread instanceof InterceptibleThread)
+ return ((InterceptibleThread)
thread).interceptorOfGlobalMethods().newFairSemaphore(count);
+
+ return Semaphore.newFairSemaphore(count);
+ }
+
@Override
public WaitQueue newWaitQueue()
{
@@ -383,12 +404,12 @@ public interface InterceptorOfGlobalMethods extends
InterceptorOfSystemMethods,
public static Semaphore newSemaphore(int count)
{
- return new Standard(count, false);
+ return methods.newSemaphore(count);
}
public static Semaphore newFairSemaphore(int count)
{
- return new Standard(count, true);
+ return methods.newFairSemaphore(count);
}
public static Condition newOneTimeCondition()
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index 22dd55bb31..53569af1b1 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -18,250 +18,249 @@
package org.apache.cassandra.simulator.test;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import javax.annotation.Nullable;
+import java.util.zip.Checksum;
import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.AccordSpec;
-import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
+
+import org.junit.Assert;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
public class AccordJournalSimulationTest extends SimulationTestBase
{
@Test
- @Ignore // TODO: re-enable
- public void test() throws IOException
- {
- simulate(arr(() -> run()),
- () -> check());
- }
-
- private static void run()
+ public void simpleRWTest()
{
- for (int i = 0; i < State.events; i++)
- {
- int finalI = i;
- State.executor.execute(() -> State.append(finalI));
- }
+ simulate(arr(() -> {
+ ListenableFileSystem fs = new
ListenableFileSystem(Jimfs.newFileSystem());
+ File.unsafeSetFilesystem(fs);
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ DatabaseDescriptor.setAccordJournalDirectory("/journal");
+ new File("/journal").createDirectoriesIfNotExists();
- try
- {
- State.eventsDurable.await();
- State.logger.info("All events are durable done!");
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
- if (!State.exceptions.isEmpty())
- {
- AssertionError error = new AssertionError("Exceptions found during
test");
- State.exceptions.forEach(error::addSuppressed);
- throw error;
- }
+ Keyspace.setInitialized();
- State.journal.shutdown();
- State.logger.info("Run complete");
+ State.journal = new Journal<>("AccordJournal",
+ new File("/journal"),
+ new AccordSpec.JournalSpec(),
+ new TestCallbacks(),
+ new IdentityKeySerializer(),
+ new IdentityValueSerializer());
+ }),
+ () -> check());
}
- private static void check()
+ public static void check()
{
- State.logger.info("Check starting");
- State.journal.start(null); // to avoid a while true deadlock
+ State.journal.start();
try
{
- for (int i = 0; i < State.events; i++)
+ final int count = 100;
+ for (int i = 0; i < count; i++)
+ {
+ int finalI = i;
+ State.executor.submit(() -> State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1), null));
+ }
+
+ State.latch.await();
+
+ for (int i = 0; i < count; i++)
{
- TxnRequest<?> event =
State.journal.readMessage(State.toTxnId(i), MessageType.PRE_ACCEPT_REQ,
PreAccept.class);
- State.logger.info("Event {} -> {}", i, event);
- if (event == null)
- throw new AssertionError(String.format("Unable to read
event %d", i));
+ State.logger.debug("Reading {}", i);
+ Assert.assertEquals(State.journal.readFirst("test" + i),
"test" + i);
}
- State.logger.info("Check complete");
+ }
+
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
}
finally
{
State.journal.shutdown();
+
+ if (!State.thrown.isEmpty())
+ {
+ AssertionError throwable = new AssertionError("Caught
exceptions");
+ for (Throwable t: State.thrown)
+ throwable.addSuppressed(t);
+ throw throwable;
+ }
}
}
- @Isolated
- public static class State
+ public static class TestCallbacks implements AsyncCallbacks<String, String>
{
- private static final Logger logger =
LoggerFactory.getLogger(State.class);
- private static final String KEYSPACE = "test";
- static
+ @Override
+ public void onWrite(long segment, int position, int size, String key,
String value, Object writeContext)
{
- Files.newGlobalInMemoryFileSystem();
- DatabaseDescriptor.clientWithDaemonConfig();
-
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
- DatabaseDescriptor.setAccordJournalDirectory("/journal");
- new File("/journal").createDirectoriesIfNotExists();
- DatabaseDescriptor.setCommitLogCompression(new
ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
- DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
-
- // in order to do journal.read, we need all this setup first!
- Keyspace.setInitialized();
-
Schema.instance.submit(SchemaTransformations.addKeyspace(KeyspaceMetadata.create(State.KEYSPACE,
KeyspaceParams.simple(1)), true));
- Keyspace ks = Keyspace.open(State.KEYSPACE);
- ks.initCfCustom(ColumnFamilyStore.createColumnFamilyStore(ks,
TableMetadataRef.forOfflineTools(TableMetadata.builder(State.KEYSPACE,
State.KEYSPACE)
-
.addPartitionKeyColumn("pk",
Int32Type.instance)
-
.build()).get(), false));
-
- try
- {
- CommitLog.instance.shutdownBlocking();
- }
- catch (InterruptedException e)
- {
- // ignore
- }
+ State.latch.decrement();
}
- private static final ExecutorPlus executor =
ExecutorFactory.Global.executorFactory().pooled("name", 10);
- private static final AccordJournal journal = new AccordJournal(null,
new AccordSpec.JournalSpec());
- private static final int events = 100;
- private static final CountDownLatch eventsWritten =
CountDownLatch.newCountDownLatch(events);
- private static final CountDownLatch eventsDurable =
CountDownLatch.newCountDownLatch(events);
- private static final List<Throwable> exceptions = new
CopyOnWriteArrayList<>();
-
- static
+
+ @Override
+ public void onWriteFailed(String key, String value, Object
writeContext, Throwable cause)
{
- journal.start(null);
+ State.thrown.add(new IllegalStateException("Write failed for " +
key));
+ State.latch.decrement();
}
- public static void append(int event)
+ @Override
+ public void onFlush(long segment, int position)
{
- TxnRequest<?> request = toRequest(event);
-// journal.appendMessageTest(request, executor, new
AsyncWriteCallback()
-// {
-// @Override
-// public void run()
-// {
-// durable(event);
-// }
-//
-// @Override
-// public void onFailure(Throwable error)
-// {
-// eventsDurable.decrement(); // to make sure we don't
block forever
-// exceptions.add(error);
-// }
-// });
- eventsWritten.decrement();
- logger.info("append({}); remaining {}", event,
eventsWritten.count());
}
- private static void durable(int event)
+ @Override
+ public void onFlushFailed(Throwable cause)
{
- eventsDurable.decrement();
- logger.info("durable({}); remaining {}", event,
eventsDurable.count());
+ State.thrown.add(new RuntimeException("Could not flush", cause));
}
+ }
- private static TxnRequest<?> toRequest(int event)
+ @Isolated
+ public static class IdentityValueSerializer implements
ValueSerializer<String, String>
+ {
+ @Override
+ public int serializedSize(String key, String value, int userVersion)
{
- TxnId id = toTxnId(event);
- Ranges ranges = Ranges.of(new
TokenRange(AccordRoutingKey.SentinelKey.min(tableId),
AccordRoutingKey.SentinelKey.max(tableId)));
- Topologies topologies =
Utils.topologies(TopologyUtils.initialTopology(new Node.Id[] { node}, ranges,
3));
- Keys keys = Keys.of(toKey(0));
- Txn txn = new Txn.InMemory(keys, new TxnRead(new TxnNamedRead[0],
keys, null), TxnQuery.ALL, new NoopUpdate());
- FullRoute<?> route = route();
- return new PreAccept(node, topologies, id, txn, route);
+ return TypeSizes.INT_SIZE + key.length();
}
- private static TxnId toTxnId(int event)
+ @Override
+ public void serialize(String key, String value, DataOutputPlus out,
int userVersion) throws IOException
{
- return TxnId.fromValues(1, event, 0, node);
+ out.writeInt(key.length());
+ out.writeBytes(key);
}
- private static PartitionKey toKey(int a)
+ @Override
+ public String deserialize(String key, DataInputPlus in, int
userVersion) throws IOException
{
- return new PartitionKey(tableId,
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(a)));
+ int size = in.readInt();
+ byte[] value = new byte[size];
+ for (int i = 0; i < size; i++)
+ value[i] = in.readByte();
+
+ return new String(value);
}
+ }
- private static final TableId tableId = TableId.fromUUID(new UUID(0,
0));
- private static final Node.Id node = new Node.Id(0);
+ @Isolated
+ public static class IdentityKeySerializer implements KeySupport<String>
+ {
+ private final byte aByte = 0xd;
+ @Override
+ public int serializedSize(int userVersion)
+ {
+ return 16;
+ }
- private static FullRoute<?> route()
+ @Override
+ public void serialize(String key, DataOutputPlus out, int userVersion)
throws IOException
{
- return new FullKeyRoute(key, true, new RoutingKey[]{ key });
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ if (key.length() > maxSize)
+ throw new IllegalStateException();
+
+ out.writeInt(key.length());
+ out.writeBytes(key);
+ int remaining = maxSize - key.length();
+ for (int i = 0; i < remaining; i++)
+ out.writeByte(aByte + i);
}
- private static final RoutingKey key = new
AccordRoutingKey.TokenKey(tableId, new Murmur3Partitioner.LongToken(42));
- }
+ @Override
+ public String deserialize(DataInputPlus in, int userVersion) throws
IOException
+ {
+ int size = in.readInt();
+ byte[] key = new byte[size];
+ for (int i = 0; i < size; i++)
+ key[i] = in.readByte();
+
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ int remaining = maxSize - size;
+ for (int i = 0; i < remaining; i++)
+ Assert.assertEquals(aByte + i, in.readByte());
+
+ return new String(key);
+ }
- public static class NoopUpdate implements Update
- {
@Override
- public Seekables<?, ?> keys()
+ public String deserialize(ByteBuffer buffer, int position, int
userVersion)
{
- return null;
+ int size = buffer.getInt();
+ byte[] key = new byte[size];
+ for (int i = 0; i < size; i++)
+ key[i] = buffer.get();
+
+ int maxSize = 16 - TypeSizes.INT_SIZE;
+ int remaining = maxSize - size;
+ for (int i = 0; i < remaining; i++)
+ Assert.assertEquals(aByte + i, buffer.get());
+
+ return new String(key);
}
@Override
- public Write apply(Timestamp executeAt, @Nullable Data data)
+ public void updateChecksum(Checksum crc, String key, int userVersion)
{
- return null;
+ crc.update(key.getBytes());
}
@Override
- public Update slice(Ranges ranges)
+ public int compareWithKeyAt(String key, ByteBuffer buffer, int
position, int userVersion)
{
- return null;
+ throw new IllegalStateException();
}
@Override
- public Update merge(Update other)
+ public int compare(String o1, String o2)
{
- return null;
+ return o1.compareTo(o2);
}
}
-}
+
+ @Isolated
+ public static class State
+ {
+ private static final Logger logger =
LoggerFactory.getLogger(State.class);
+ static Journal<String, String> journal;
+ static CountDownLatch latch = CountDownLatch.newCountDownLatch(100);
+ static List<Throwable> thrown = new ArrayList<>();
+ static ExecutorPlus executor =
ExecutorFactory.Global.executorFactory().pooled("name", 10);
+ }
+}
\ No newline at end of file
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/SemaphoreTest.java
b/test/simulator/test/org/apache/cassandra/simulator/test/SemaphoreTest.java
new file mode 100644
index 0000000000..9e1126a1e0
--- /dev/null
+++ b/test/simulator/test/org/apache/cassandra/simulator/test/SemaphoreTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.Shared;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+import org.apache.cassandra.utils.concurrent.Semaphore;
+
+public class SemaphoreTest extends SimulationTestBase
+{
+ @Test
+ public void semaphoreAcquireUntilTest()
+ {
+ simulate(arr(() -> {
+ try
+ {
+ Semaphore semaphore = Semaphore.newSemaphore(1);
+ semaphore.acquire(1);
+ long start = Clock.Global.nanoTime();
+ Assert.assertFalse(semaphore.tryAcquire(1, 5000,
TimeUnit.MILLISECONDS));
+ long elapsed =
TimeUnit.NANOSECONDS.toMillis(Clock.Global.nanoTime() - start);
+ Assert.assertTrue(String.format("Elapsed only %sms,
while should have at least 5000", elapsed),
+ elapsed > 5000);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ }),
+ () -> {},
+ System.currentTimeMillis());
+ }
+
+ @Test
+ public void semaphoreAcquireReleaseRepeatabilityTest()
+ {
+ long seed = System.currentTimeMillis();
+ semaphoreTestInternal(seed);
+ State.record = false;
+ // Verify that subsequent interleavings will be the same
+ semaphoreTestInternal(seed);
+ }
+
+ protected void semaphoreTestInternal(long seed)
+ {
+ simulate(arr(() -> {
+ ExecutorPlus executor =
ExecutorFactory.Global.executorFactory().pooled("semaphore-test-", 10);
+ Semaphore semaphore = Semaphore.newSemaphore(5);
+ CountDownLatch latch =
CountDownLatch.newCountDownLatch(5);
+
+ for (int i = 0; i < 5; i++)
+ {
+ int thread = i;
+ executor.submit(() -> {
+ for (int j = 0; j < 100; j++)
+ {
+ int permits = semaphore.permits();
+ Assert.assertTrue(permits + " should be non
negative", permits >= 0);
+
+ try
+ {
+ semaphore.acquire(1);
+ State.tick(thread, j);
+ semaphore.release(1);
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ latch.decrement();
+ });
+ }
+
+ latch.awaitUninterruptibly();
+ int permits = semaphore.permits();
+ Assert.assertEquals(5, permits);
+ }),
+ () -> {},
+ seed);
+ }
+
+ @Shared
+ public static class State
+ {
+ static final List<Tick> ticks = new ArrayList<>();
+ static boolean record = true;
+ static int i = 0;
+
+ public static void tick(int thread, int iteration)
+ {
+ if (record)
+ {
+ Tick tick = new Tick(thread, iteration);
+ ticks.add(tick);
+ }
+ else
+ {
+ Tick tick = ticks.get(i);
+ Assert.assertEquals(tick.thread, thread);
+ Assert.assertEquals(tick.iteration, iteration);
+ i++;
+ }
+ }
+ }
+
+ @Shared
+ public static class Tick
+ {
+ final int thread;
+ final int iteration;
+
+ public Tick(int thread, int iteration)
+ {
+ this.thread = thread;
+ this.iteration = iteration;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Tick{" +
+ "thread=" + thread +
+ ", iteration=" + iteration +
+ '}';
+ }
+ }
+}
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
b/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
index 2849a464ce..acc335645f 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
@@ -24,13 +24,16 @@ import java.util.Collections;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
+import java.util.function.LongConsumer;
import java.util.function.Predicate;
import com.google.common.collect.Iterators;
+import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
@@ -42,6 +45,7 @@ import org.apache.cassandra.simulator.ActionSchedule.Work;
import org.apache.cassandra.simulator.asm.InterceptClasses;
import org.apache.cassandra.simulator.asm.NemesisFieldSelectors;
import org.apache.cassandra.simulator.systems.Failures;
+import org.apache.cassandra.simulator.systems.InterceptedWait;
import org.apache.cassandra.simulator.systems.InterceptibleThread;
import org.apache.cassandra.simulator.systems.InterceptingExecutorFactory;
import org.apache.cassandra.simulator.systems.InterceptingGlobalMethods;
@@ -51,24 +55,36 @@ import
org.apache.cassandra.simulator.systems.SimulatedQuery;
import org.apache.cassandra.simulator.systems.SimulatedSystems;
import org.apache.cassandra.simulator.systems.SimulatedTime;
import org.apache.cassandra.simulator.utils.LongRange;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.CloseableIterator;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_GLOBAL;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_APPROX;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_PRECISE;
import static org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_LIMITED;
import static org.apache.cassandra.simulator.ActionSchedule.Mode.UNLIMITED;
import static org.apache.cassandra.simulator.ClusterSimulation.ISOLATE;
import static org.apache.cassandra.simulator.ClusterSimulation.SHARE;
import static org.apache.cassandra.simulator.SimulatorUtils.failWithOOM;
-import static
org.apache.cassandra.simulator.systems.InterceptedWait.CaptureSites.Capture.NONE;
import static org.apache.cassandra.simulator.utils.KindOfSequence.UNIFORM;
import static org.apache.cassandra.utils.Shared.Scope.ANY;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
public class SimulationTestBase
{
+ @BeforeClass
+ public static void beforeAll()
+ {
+ // Disallow time on the bootstrap classloader
+ for (CassandraRelevantProperties property :
Arrays.asList(CLOCK_GLOBAL, CLOCK_MONOTONIC_APPROX, CLOCK_MONOTONIC_PRECISE))
+
property.setString("org.apache.cassandra.simulator.systems.SimulatedTime$Delegating");
+ try { Clock.Global.nanoTime(); } catch (IllegalStateException e) {} //
make sure static initializer gets called
+ }
+
private static final Logger logger = LoggerFactory.getLogger(Logger.class);
static abstract class DTestClusterSimulation implements Simulation
@@ -215,10 +231,16 @@ public class SimulationTestBase
public static void simulate(IIsolatedExecutor.SerializableRunnable[]
runnables,
IIsolatedExecutor.SerializableRunnable check)
+ {
+ simulate(runnables, check, System.currentTimeMillis());
+ }
+
+ public static void simulate(IIsolatedExecutor.SerializableRunnable[]
runnables,
+ IIsolatedExecutor.SerializableRunnable check,
+ long seed)
{
Failures failures = new HarrySimulatorTest.HaltOnError();
RandomSource random = new RandomSource.Default();
- long seed = System.currentTimeMillis();
System.out.println("Using seed: " + seed);
random.reset(seed);
SimulatedTime time = new SimulatedTime(1, random, 1577836800000L /*Jan
1st UTC*/, new LongRange(1, 100, MILLISECONDS, NANOSECONDS),
@@ -229,10 +251,16 @@ public class SimulationTestBase
InstanceClassLoader classLoader = new InstanceClassLoader(1, 1,
AbstractCluster.CURRENT_VERSION.classpath,
Thread.currentThread().getContextClassLoader(),
sharedClassPredicate,
- new
InterceptClasses((x) -> () -> 1.0f, (x) -> () -> 1.0f,
NemesisFieldSelectors.get(), ClassLoader.getSystemClassLoader(),
sharedClassPredicate.negate())::apply);
+ new
InterceptClasses((x) -> () -> 1.0f, (x) -> () -> 1.0f,
+
NemesisFieldSelectors.get(),
+
ClassLoader.getSystemClassLoader(),
+
sharedClassPredicate.negate())::apply);
ThreadGroup tg = new ThreadGroup("test");
- InterceptorOfGlobalMethods interceptorOfGlobalMethods = new
InterceptingGlobalMethods(NONE, null, failures, random);
+ InterceptedWait.CaptureSites.Capture capture = new
InterceptedWait.CaptureSites.Capture(false, false, false);
+ InterceptorOfGlobalMethods interceptorOfGlobalMethods =
IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableQuadFunction<InterceptedWait.CaptureSites.Capture,
LongConsumer, Consumer<Throwable>, RandomSource, InterceptorOfGlobalMethods>)
InterceptingGlobalMethods::new, classLoader)
+
.apply(capture, (ignore) -> {}, failures, random);
+
InterceptingExecutorFactory factory =
execution.factory(interceptorOfGlobalMethods, classLoader, tg);
time.setup(1, classLoader);
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
b/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
index 548cf11671..7be09a1b5c 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
@@ -39,6 +39,13 @@ import static
org.apache.cassandra.simulator.cluster.ClusterActions.Options.noAc
public class TrivialSimulationTest extends SimulationTestBase
{
+ @Test
+ public void identityHashMapTest()
+ {
+ simulate(arr(() -> new IdentityHashMap<>().put(1, 1)),
+ () -> {});
+ }
+
@Test
public void trivialTest() throws IOException // for
demonstration/experiment purposes
{
@@ -84,10 +91,5 @@ public class TrivialSimulationTest extends SimulationTestBase
() -> {});
}
- @Test
- public void identityHashMapTest()
- {
- simulate(arr(() -> new IdentityHashMap<>().put(1, 1)),
- () -> {});
- }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]