This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 72abe53868 Bring back Journal simulator (w/o Accord at least for now); 
add semaphore interceptor.
72abe53868 is described below

commit 72abe5386897f8c29b96f953eb8338aa06ee7bda
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Jun 5 08:57:05 2024 +0200

    Bring back Journal simulator (w/o Accord at least for now); add semaphore 
interceptor.
    
    Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-19695.
---
 .../org/apache/cassandra/utils/MonotonicClock.java |   9 +-
 .../distributed/test/log/CMSTestBase.java          |   9 +-
 .../systems/InterceptingGlobalMethods.java         |  13 +
 .../simulator/systems/InterceptingSemaphore.java   | 197 ++++++++++++
 .../systems/InterceptorOfGlobalMethods.java        |  27 +-
 .../test/AccordJournalSimulationTest.java          | 331 ++++++++++-----------
 .../cassandra/simulator/test/SemaphoreTest.java    | 154 ++++++++++
 .../simulator/test/SimulationTestBase.java         |  36 ++-
 .../simulator/test/TrivialSimulationTest.java      |  14 +-
 9 files changed, 608 insertions(+), 182 deletions(-)

diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java 
b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index 7be54c008b..c68ed5d9ef 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -248,7 +248,14 @@ public interface MonotonicClock
 
     public static class SystemClock extends AbstractEpochSamplingClock
     {
-        private SystemClock()
+        // Without making this constructor public you may start getting the 
following exception in the simulator:
+        //    java.lang.IncompatibleClassChangeError: Type
+        //       org.apache.cassandra.utils.MonotonicClock$Global is not a 
nest member of
+        //       org.apache.cassandra.utils.MonotonicClock: types are in 
different packages
+        // There might be a problem with a simulator and how we allow access, 
but I verified the change access
+        // flags on <init> method of the 
org/apache/cassandra/utils/MonotonicClock$SystemClock
+        // class to ACC_PUBLIC, and ensured proper testing relationship from 
both the surrounding and nested class.
+        public SystemClock()
         {
             super(Clock.Global::currentTimeMillis);
         }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
index c1975cdbfe..e254798915 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
@@ -91,10 +91,15 @@ public class CMSTestBase
         public final TokenPlacementModel.ReplicationFactor rf;
 
         public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog, 
Processor> processorFactory, boolean addListeners, 
TokenPlacementModel.ReplicationFactor rf)
+        {
+            this(processorFactory, addListeners, 
Mockito.mock(SchemaProvider.class), rf);
+        }
+
+        public CMSSut(IIsolatedExecutor.SerializableFunction<LocalLog, 
Processor> processorFactory, boolean addListeners, SchemaProvider 
schemaProvider, TokenPlacementModel.ReplicationFactor rf)
         {
             partitioner = Murmur3Partitioner.instance;
             this.rf = rf;
-            schemaProvider = Mockito.mock(SchemaProvider.class);
+            this.schemaProvider = schemaProvider;
             ClusterMetadata initial = new ClusterMetadata(partitioner);
             log = LocalLog.logSpec()
                           .sync()
@@ -127,7 +132,7 @@ public class CMSTestBase
             }, schemaProvider));
         }
 
-        public void close() throws Exception
+        public void close()
         {
             ClusterMetadataService.unsetInstance();
         }
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
index abc71eae35..34c0f6bacc 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingGlobalMethods.java
@@ -33,6 +33,7 @@ import 
org.apache.cassandra.simulator.systems.InterceptedWait.InterceptedConditi
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.concurrent.Condition;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
+import org.apache.cassandra.utils.concurrent.Semaphore;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.TEST_SIMULATOR_DETERMINISM_CHECK;
@@ -60,6 +61,18 @@ public class InterceptingGlobalMethods extends 
InterceptingMonitors implements I
         this.onUncaughtException = onUncaughtException;
     }
 
+    @Override
+    public Semaphore newSemaphore(int count)
+    {
+        return new InterceptingSemaphore(count, false);
+    }
+
+    @Override
+    public Semaphore newFairSemaphore(int count)
+    {
+        return new InterceptingSemaphore(count, true);
+    }
+
     @Override
     public WaitQueue newWaitQueue()
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingSemaphore.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingSemaphore.java
new file mode 100644
index 0000000000..60207e0c7f
--- /dev/null
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingSemaphore.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.systems;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.utils.concurrent.Semaphore;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
+import static 
org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods.Global.ifIntercepted;
+
+public class InterceptingSemaphore extends Semaphore.Standard
+{
+    final Queue<SemaphoreSignal> interceptible = new ConcurrentLinkedQueue<>();
+    final AtomicInteger permits;
+    final boolean fair;
+
+    private static class SemaphoreSignal extends 
InterceptingAwaitable.InterceptingSignal<Void>
+    {
+        private final int permits;
+
+        private SemaphoreSignal(int permits)
+        {
+            this.permits = permits;
+        }
+    }
+
+    public InterceptingSemaphore(int permits, boolean fair)
+    {
+        super(permits);
+        this.permits = new AtomicInteger(permits);
+        this.fair = fair;
+    }
+
+    @Override
+    public int permits()
+    {
+        if (ifIntercepted() == null)
+            return super.permits();
+
+        return permits.get();
+    }
+
+    @Override
+    public int drain()
+    {
+        if (ifIntercepted() == null)
+            return super.drain();
+
+        for (int i = 0; i < 10; i++)
+        {
+            int current = permits.get();
+            if (permits.compareAndSet(current, 0))
+                return current;
+        }
+
+        throw new IllegalStateException("Too much contention");
+    }
+
+    @Override
+    public void release(int release)
+    {
+        if (ifIntercepted() == null)
+        {
+            super.release(release);
+            return;
+        }
+
+        int remaining = permits.addAndGet(release);
+        while (!interceptible.isEmpty() && remaining > 0)
+        {
+            SemaphoreSignal signal = interceptible.peek();
+            if (signal.permits >= remaining)
+                interceptible.poll().signal();
+            else if (fair)
+                // Do not break enqueue order if using fair scheduler
+                break;
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(int acquire)
+    {
+        if (ifIntercepted() == null)
+            return super.tryAcquire(acquire);
+
+        for (int i = 0; i < 10; i++)
+        {
+            int current = permits.get();
+            if (current >= acquire)
+            {
+                if (permits.compareAndSet(current, current - acquire))
+                    return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+        throw new IllegalStateException("Too much contention");
+    }
+
+    @Override
+    public boolean tryAcquire(int acquire, long time, TimeUnit unit) throws 
InterruptedException
+    {
+        if (ifIntercepted() == null)
+            return super.tryAcquire(acquire, time, unit);
+
+        while (true)
+        {
+            int current = permits.get();
+            if (current >= acquire && permits.compareAndSet(current, current - 
acquire))
+                return true;
+
+            SemaphoreSignal signal = new SemaphoreSignal(acquire);
+            interceptible.add(signal);
+            boolean res = signal.await(time, unit);
+            interceptible.remove(signal);
+            if (!res)
+                return false;
+        }
+    }
+
+    @Override
+    public boolean tryAcquireUntil(int acquire, long deadline) throws 
InterruptedException
+    {
+        if (ifIntercepted() == null)
+            return super.tryAcquireUntil(acquire, deadline);
+
+        while (true)
+        {
+            int current = permits.get();
+            if (current >= acquire && permits.compareAndSet(current, current - 
acquire))
+                    return true;
+
+            SemaphoreSignal signal = new SemaphoreSignal(acquire);
+            interceptible.add(signal);
+            boolean res = signal.awaitUntil(deadline);
+            interceptible.remove(signal);
+            if (!res)
+                return false;
+        }
+    }
+
+    @Override
+    public void acquire(int acquire) throws InterruptedException
+    {
+        if (ifIntercepted() == null)
+        {
+            super.acquire(acquire);
+            return;
+        }
+
+        while (true)
+        {
+            if (tryAcquire(acquire))
+                return;
+
+            SemaphoreSignal signal = new SemaphoreSignal(acquire);
+            interceptible.add(signal);
+            signal.await();
+            interceptible.remove(signal);
+        }
+    }
+
+    @Override
+    public void acquireThrowUncheckedOnInterrupt(int acquire) throws 
UncheckedInterruptedException
+    {
+        try
+        {
+            acquire(acquire);
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
index bd8478d47b..eca4ebb2b4 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.utils.concurrent.BlockingQueues;
 import org.apache.cassandra.utils.concurrent.Condition;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 import org.apache.cassandra.utils.concurrent.Semaphore;
-import org.apache.cassandra.utils.concurrent.Semaphore.Standard;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
@@ -47,6 +46,8 @@ import static 
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 @Shared(scope = SIMULATION, inner = INTERFACES)
 public interface InterceptorOfGlobalMethods extends 
InterceptorOfSystemMethods, Closeable
 {
+    Semaphore newSemaphore(int count);
+    Semaphore newFairSemaphore(int count);
     WaitQueue newWaitQueue();
     CountDownLatch newCountDownLatch(int count);
     Condition newOneTimeCondition();
@@ -72,6 +73,26 @@ public interface InterceptorOfGlobalMethods extends 
InterceptorOfSystemMethods,
     {
         static LongConsumer threadLocalRandomCheck;
 
+        @Override
+        public Semaphore newSemaphore(int count)
+        {
+            Thread thread = Thread.currentThread();
+            if (thread instanceof InterceptibleThread)
+                return ((InterceptibleThread) 
thread).interceptorOfGlobalMethods().newSemaphore(count);
+
+            return Semaphore.newSemaphore(count);
+        }
+
+        @Override
+        public Semaphore newFairSemaphore(int count)
+        {
+            Thread thread = Thread.currentThread();
+            if (thread instanceof InterceptibleThread)
+                return ((InterceptibleThread) 
thread).interceptorOfGlobalMethods().newFairSemaphore(count);
+
+            return Semaphore.newFairSemaphore(count);
+        }
+
         @Override
         public WaitQueue newWaitQueue()
         {
@@ -383,12 +404,12 @@ public interface InterceptorOfGlobalMethods extends 
InterceptorOfSystemMethods,
 
         public static Semaphore newSemaphore(int count)
         {
-            return new Standard(count, false);
+            return methods.newSemaphore(count);
         }
 
         public static Semaphore newFairSemaphore(int count)
         {
-            return new Standard(count, true);
+            return methods.newFairSemaphore(count);
         }
 
         public static Condition newOneTimeCondition()
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index 22dd55bb31..53569af1b1 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -18,250 +18,249 @@
 package org.apache.cassandra.simulator.test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import javax.annotation.Nullable;
+import java.util.zip.Checksum;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.jimfs.Jimfs;
 
-import accord.topology.TopologyUtils;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.config.AccordSpec;
-import org.apache.cassandra.schema.*;
-import org.junit.Ignore;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.filesystem.ListenableFileSystem;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.journal.AsyncCallbacks;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.ValueSerializer;
+
+import org.junit.Assert;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.Utils;
-import accord.api.Data;
-import accord.api.RoutingKey;
-import accord.api.Update;
-import accord.api.Write;
-import accord.local.Node;
-import accord.messages.MessageType;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRoute;
-import accord.primitives.Keys;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.io.util.Files;
-import org.apache.cassandra.service.accord.AccordJournal;
-import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
-import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.service.accord.txn.TxnNamedRead;
-import org.apache.cassandra.service.accord.txn.TxnQuery;
-import org.apache.cassandra.service.accord.txn.TxnRead;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Isolated;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 
 public class AccordJournalSimulationTest extends SimulationTestBase
 {
     @Test
-    @Ignore // TODO: re-enable
-    public void test() throws IOException
-    {
-        simulate(arr(() -> run()),
-                 () -> check());
-    }
-
-    private static void run()
+    public void simpleRWTest()
     {
-        for (int i = 0; i < State.events; i++)
-        {
-            int finalI = i;
-            State.executor.execute(() -> State.append(finalI));
-        }
+        simulate(arr(() -> {
+                    ListenableFileSystem fs = new 
ListenableFileSystem(Jimfs.newFileSystem());
+                    File.unsafeSetFilesystem(fs);
+                    DatabaseDescriptor.daemonInitialization();
+                    DatabaseDescriptor.setCommitLogCompression(new 
ParameterizedClass("LZ4Compressor", ImmutableMap.of())); //
+                    
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.standard);
+                    DatabaseDescriptor.initializeCommitLogDiskAccessMode();
+                    
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+                    DatabaseDescriptor.setAccordJournalDirectory("/journal");
+                    new File("/journal").createDirectoriesIfNotExists();
 
-        try
-        {
-            State.eventsDurable.await();
-            State.logger.info("All events are durable done!");
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+                    DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
 
-        if (!State.exceptions.isEmpty())
-        {
-            AssertionError error = new AssertionError("Exceptions found during 
test");
-            State.exceptions.forEach(error::addSuppressed);
-            throw error;
-        }
+                    Keyspace.setInitialized();
 
-        State.journal.shutdown();
-        State.logger.info("Run complete");
+                    State.journal = new Journal<>("AccordJournal",
+                            new File("/journal"),
+                            new AccordSpec.JournalSpec(),
+                            new TestCallbacks(),
+                            new IdentityKeySerializer(),
+                            new IdentityValueSerializer());
+                }),
+                () -> check());
     }
 
-    private static void check()
+    public static void check()
     {
-        State.logger.info("Check starting");
-        State.journal.start(null); // to avoid a while true deadlock
+        State.journal.start();
         try
         {
-            for (int i = 0; i < State.events; i++)
+            final int count = 100;
+            for (int i = 0; i < count; i++)
+            {
+                int finalI = i;
+                State.executor.submit(() -> State.journal.asyncWrite("test" + 
finalI, "test" + finalI, Collections.singleton(1), null));
+            }
+
+            State.latch.await();
+
+            for (int i = 0; i < count; i++)
             {
-                TxnRequest<?> event = 
State.journal.readMessage(State.toTxnId(i), MessageType.PRE_ACCEPT_REQ, 
PreAccept.class);
-                State.logger.info("Event {} -> {}", i, event);
-                if (event == null)
-                    throw new AssertionError(String.format("Unable to read 
event %d", i));
+                State.logger.debug("Reading {}", i);
+                Assert.assertEquals(State.journal.readFirst("test" + i), 
"test" + i);
             }
-            State.logger.info("Check complete");
+        }
+
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
         }
         finally
         {
             State.journal.shutdown();
+
+            if (!State.thrown.isEmpty())
+            {
+                AssertionError throwable = new AssertionError("Caught 
exceptions");
+                for (Throwable t: State.thrown)
+                    throwable.addSuppressed(t);
+                throw throwable;
+            }
         }
     }
 
-    @Isolated
-    public static class State
+    public static class TestCallbacks implements AsyncCallbacks<String, String>
     {
-        private static final Logger logger = 
LoggerFactory.getLogger(State.class);
-        private static final String KEYSPACE = "test";
 
-        static
+        @Override
+        public void onWrite(long segment, int position, int size, String key, 
String value, Object writeContext)
         {
-            Files.newGlobalInMemoryFileSystem();
-            DatabaseDescriptor.clientWithDaemonConfig();
-            
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
-            DatabaseDescriptor.setAccordJournalDirectory("/journal");
-            new File("/journal").createDirectoriesIfNotExists();
-            DatabaseDescriptor.setCommitLogCompression(new 
ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
-            DatabaseDescriptor.setDumpHeapOnUncaughtException(false);
-
-            // in order to do journal.read, we need all this setup first!
-            Keyspace.setInitialized();
-            
Schema.instance.submit(SchemaTransformations.addKeyspace(KeyspaceMetadata.create(State.KEYSPACE,
 KeyspaceParams.simple(1)), true));
-            Keyspace ks = Keyspace.open(State.KEYSPACE);
-            ks.initCfCustom(ColumnFamilyStore.createColumnFamilyStore(ks, 
TableMetadataRef.forOfflineTools(TableMetadata.builder(State.KEYSPACE, 
State.KEYSPACE)
-                                                                               
                                         .addPartitionKeyColumn("pk", 
Int32Type.instance)
-                                                                               
                                         .build()).get(), false));
-
-            try
-            {
-                CommitLog.instance.shutdownBlocking();
-            }
-            catch (InterruptedException e)
-            {
-                // ignore
-            }
+            State.latch.decrement();
         }
-        private static final ExecutorPlus executor = 
ExecutorFactory.Global.executorFactory().pooled("name", 10);
-        private static final AccordJournal journal = new AccordJournal(null, 
new AccordSpec.JournalSpec());
-        private static final int events = 100;
-        private static final CountDownLatch eventsWritten = 
CountDownLatch.newCountDownLatch(events);
-        private static final CountDownLatch eventsDurable = 
CountDownLatch.newCountDownLatch(events);
-        private static final List<Throwable> exceptions = new 
CopyOnWriteArrayList<>();
-
-        static
+
+        @Override
+        public void onWriteFailed(String key, String value, Object 
writeContext, Throwable cause)
         {
-            journal.start(null);
+            State.thrown.add(new IllegalStateException("Write failed for " + 
key));
+            State.latch.decrement();
         }
 
-        public static void append(int event)
+        @Override
+        public void onFlush(long segment, int position)
         {
-            TxnRequest<?> request = toRequest(event);
-//            journal.appendMessageTest(request, executor, new 
AsyncWriteCallback()
-//            {
-//                @Override
-//                public void run()
-//                {
-//                    durable(event);
-//                }
-//
-//                @Override
-//                public void onFailure(Throwable error)
-//                {
-//                    eventsDurable.decrement(); // to make sure we don't 
block forever
-//                    exceptions.add(error);
-//                }
-//            });
-            eventsWritten.decrement();
-            logger.info("append({}); remaining {}", event, 
eventsWritten.count());
         }
 
-        private static void durable(int event)
+        @Override
+        public void onFlushFailed(Throwable cause)
         {
-            eventsDurable.decrement();
-            logger.info("durable({}); remaining {}", event, 
eventsDurable.count());
+            State.thrown.add(new RuntimeException("Could not flush", cause));
         }
+    }
 
-        private static TxnRequest<?> toRequest(int event)
+    @Isolated
+    public static class IdentityValueSerializer implements 
ValueSerializer<String, String>
+    {
+        @Override
+        public int serializedSize(String key, String value, int userVersion)
         {
-            TxnId id = toTxnId(event);
-            Ranges ranges = Ranges.of(new 
TokenRange(AccordRoutingKey.SentinelKey.min(tableId), 
AccordRoutingKey.SentinelKey.max(tableId)));
-            Topologies topologies = 
Utils.topologies(TopologyUtils.initialTopology(new Node.Id[] { node}, ranges, 
3));
-            Keys keys = Keys.of(toKey(0));
-            Txn txn = new Txn.InMemory(keys, new TxnRead(new TxnNamedRead[0], 
keys, null), TxnQuery.ALL, new NoopUpdate());
-            FullRoute<?> route = route();
-            return new PreAccept(node, topologies, id, txn, route);
+            return TypeSizes.INT_SIZE + key.length();
         }
 
-        private static TxnId toTxnId(int event)
+        @Override
+        public void serialize(String key, String value, DataOutputPlus out, 
int userVersion) throws IOException
         {
-            return TxnId.fromValues(1, event, 0, node);
+            out.writeInt(key.length());
+            out.writeBytes(key);
         }
 
-        private static PartitionKey toKey(int a)
+        @Override
+        public String deserialize(String key, DataInputPlus in, int 
userVersion) throws IOException
         {
-            return new PartitionKey(tableId, 
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(a)));
+            int size = in.readInt();
+            byte[] value = new byte[size];
+            for (int i = 0; i < size; i++)
+                value[i] = in.readByte();
+
+            return new String(value);
         }
+    }
 
-        private static final TableId tableId = TableId.fromUUID(new UUID(0, 
0));
-        private static final Node.Id node = new Node.Id(0);
+    @Isolated
+    public static class IdentityKeySerializer implements KeySupport<String>
+    {
+        private final byte aByte = 0xd;
+        @Override
+        public int serializedSize(int userVersion)
+        {
+            return 16;
+        }
 
-        private static FullRoute<?> route()
+        @Override
+        public void serialize(String key, DataOutputPlus out, int userVersion) 
throws IOException
         {
-            return new FullKeyRoute(key, true, new RoutingKey[]{ key });
+            int maxSize = 16 - TypeSizes.INT_SIZE;
+            if (key.length() > maxSize)
+                throw new IllegalStateException();
+
+            out.writeInt(key.length());
+            out.writeBytes(key);
+            int remaining = maxSize - key.length();
+            for (int i = 0; i < remaining; i++)
+                out.writeByte(aByte + i);
         }
 
-        private static final RoutingKey key = new 
AccordRoutingKey.TokenKey(tableId, new Murmur3Partitioner.LongToken(42));
-    }
+        @Override
+        public String deserialize(DataInputPlus in, int userVersion) throws 
IOException
+        {
+            int size = in.readInt();
+            byte[] key = new byte[size];
+            for (int i = 0; i < size; i++)
+                key[i] = in.readByte();
+
+            int maxSize = 16 - TypeSizes.INT_SIZE;
+            int remaining = maxSize - size;
+            for (int i = 0; i < remaining; i++)
+                Assert.assertEquals(aByte + i, in.readByte());
+
+            return new String(key);
+        }
 
-    public static class NoopUpdate implements Update
-    {
         @Override
-        public Seekables<?, ?> keys()
+        public String deserialize(ByteBuffer buffer, int position, int 
userVersion)
         {
-            return null;
+            int size = buffer.getInt();
+            byte[] key = new byte[size];
+            for (int i = 0; i < size; i++)
+                key[i] = buffer.get();
+
+            int maxSize = 16 - TypeSizes.INT_SIZE;
+            int remaining = maxSize - size;
+            for (int i = 0; i < remaining; i++)
+                Assert.assertEquals(aByte + i, buffer.get());
+
+            return new String(key);
         }
 
         @Override
-        public Write apply(Timestamp executeAt, @Nullable Data data)
+        public void updateChecksum(Checksum crc, String key, int userVersion)
         {
-            return null;
+            crc.update(key.getBytes());
         }
 
         @Override
-        public Update slice(Ranges ranges)
+        public int compareWithKeyAt(String key, ByteBuffer buffer, int 
position, int userVersion)
         {
-            return null;
+            throw new IllegalStateException();
         }
 
         @Override
-        public Update merge(Update other)
+        public int compare(String o1, String o2)
         {
-            return null;
+            return o1.compareTo(o2);
         }
     }
-}
+
+    @Isolated
+    public static class State
+    {
+        private static final Logger logger = 
LoggerFactory.getLogger(State.class);
+        static Journal<String, String> journal;
+        static CountDownLatch latch = CountDownLatch.newCountDownLatch(100);
+        static List<Throwable> thrown = new ArrayList<>();
+        static ExecutorPlus executor = 
ExecutorFactory.Global.executorFactory().pooled("name", 10);
+    }
+}
\ No newline at end of file
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/SemaphoreTest.java 
b/test/simulator/test/org/apache/cassandra/simulator/test/SemaphoreTest.java
new file mode 100644
index 0000000000..9e1126a1e0
--- /dev/null
+++ b/test/simulator/test/org/apache/cassandra/simulator/test/SemaphoreTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.Shared;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+import org.apache.cassandra.utils.concurrent.Semaphore;
+
+public class SemaphoreTest extends SimulationTestBase
+{
+    @Test
+    public void semaphoreAcquireUntilTest()
+    {
+        simulate(arr(() -> {
+                     try
+                     {
+                         Semaphore semaphore = Semaphore.newSemaphore(1);
+                         semaphore.acquire(1);
+                         long start = Clock.Global.nanoTime();
+                         Assert.assertFalse(semaphore.tryAcquire(1, 5000, 
TimeUnit.MILLISECONDS));
+                         long elapsed = 
TimeUnit.NANOSECONDS.toMillis(Clock.Global.nanoTime() - start);
+                         Assert.assertTrue(String.format("Elapsed only %sms, 
while should have at least 5000", elapsed),
+                                           elapsed > 5000);
+                     }
+                     catch (Throwable t)
+                     {
+                         throw new RuntimeException(t);
+                     }
+                 }),
+                 () -> {},
+                 System.currentTimeMillis());
+    }
+
+    @Test
+    public void semaphoreAcquireReleaseRepeatabilityTest()
+    {
+        long seed = System.currentTimeMillis();
+        semaphoreTestInternal(seed);
+        State.record = false;
+        // Verify that subsequent interleavings will be the same
+        semaphoreTestInternal(seed);
+    }
+
+    protected void semaphoreTestInternal(long seed)
+    {
+        simulate(arr(() -> {
+                     ExecutorPlus executor = 
ExecutorFactory.Global.executorFactory().pooled("semaphore-test-", 10);
+                     Semaphore semaphore = Semaphore.newSemaphore(5);
+                     CountDownLatch latch = 
CountDownLatch.newCountDownLatch(5);
+
+                     for (int i = 0; i < 5; i++)
+                     {
+                         int thread = i;
+                         executor.submit(() -> {
+                             for (int j = 0; j < 100; j++)
+                             {
+                                 int permits = semaphore.permits();
+                                 Assert.assertTrue(permits + " should be non 
negative", permits >= 0);
+
+                                 try
+                                 {
+                                     semaphore.acquire(1);
+                                     State.tick(thread, j);
+                                     semaphore.release(1);
+                                 }
+                                 catch (Throwable e)
+                                 {
+                                     throw new RuntimeException(e);
+                                 }
+                             }
+                             latch.decrement();
+                         });
+                     }
+
+                     latch.awaitUninterruptibly();
+                     int permits = semaphore.permits();
+                     Assert.assertEquals(5, permits);
+                 }),
+                 () -> {},
+                 seed);
+    }
+
+    @Shared
+    public static class State
+    {
+        static final List<Tick> ticks = new ArrayList<>();
+        static boolean record = true;
+        static int i = 0;
+
+        public static void tick(int thread, int iteration)
+        {
+            if (record)
+            {
+                Tick tick = new Tick(thread, iteration);
+                ticks.add(tick);
+            }
+            else
+            {
+                Tick tick = ticks.get(i);
+                Assert.assertEquals(tick.thread, thread);
+                Assert.assertEquals(tick.iteration, iteration);
+                i++;
+            }
+        }
+    }
+
+    @Shared
+    public static class Tick
+    {
+        final int thread;
+        final int iteration;
+
+        public Tick(int thread, int iteration)
+        {
+            this.thread = thread;
+            this.iteration = iteration;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Tick{" +
+                   "thread=" + thread +
+                   ", iteration=" + iteration +
+                   '}';
+        }
+    }
+}
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
index 2849a464ce..acc335645f 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/SimulationTestBase.java
@@ -24,13 +24,16 @@ import java.util.Collections;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
+import java.util.function.LongConsumer;
 import java.util.function.Predicate;
 
 import com.google.common.collect.Iterators;
+import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
@@ -42,6 +45,7 @@ import org.apache.cassandra.simulator.ActionSchedule.Work;
 import org.apache.cassandra.simulator.asm.InterceptClasses;
 import org.apache.cassandra.simulator.asm.NemesisFieldSelectors;
 import org.apache.cassandra.simulator.systems.Failures;
+import org.apache.cassandra.simulator.systems.InterceptedWait;
 import org.apache.cassandra.simulator.systems.InterceptibleThread;
 import org.apache.cassandra.simulator.systems.InterceptingExecutorFactory;
 import org.apache.cassandra.simulator.systems.InterceptingGlobalMethods;
@@ -51,24 +55,36 @@ import 
org.apache.cassandra.simulator.systems.SimulatedQuery;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.simulator.systems.SimulatedTime;
 import org.apache.cassandra.simulator.utils.LongRange;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.CloseableIterator;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_GLOBAL;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_APPROX;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_PRECISE;
 import static org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_LIMITED;
 import static org.apache.cassandra.simulator.ActionSchedule.Mode.UNLIMITED;
 import static org.apache.cassandra.simulator.ClusterSimulation.ISOLATE;
 import static org.apache.cassandra.simulator.ClusterSimulation.SHARE;
 import static org.apache.cassandra.simulator.SimulatorUtils.failWithOOM;
-import static 
org.apache.cassandra.simulator.systems.InterceptedWait.CaptureSites.Capture.NONE;
 import static org.apache.cassandra.simulator.utils.KindOfSequence.UNIFORM;
 import static org.apache.cassandra.utils.Shared.Scope.ANY;
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 public class SimulationTestBase
 {
+    @BeforeClass
+    public static void beforeAll()
+    {
+        // Disallow time on the bootstrap classloader
+        for (CassandraRelevantProperties property : 
Arrays.asList(CLOCK_GLOBAL, CLOCK_MONOTONIC_APPROX, CLOCK_MONOTONIC_PRECISE))
+            
property.setString("org.apache.cassandra.simulator.systems.SimulatedTime$Delegating");
+        try { Clock.Global.nanoTime(); } catch (IllegalStateException e) {} // 
make sure static initializer gets called
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(Logger.class);
 
     static abstract class DTestClusterSimulation implements Simulation
@@ -215,10 +231,16 @@ public class SimulationTestBase
 
     public static void simulate(IIsolatedExecutor.SerializableRunnable[] 
runnables,
                                 IIsolatedExecutor.SerializableRunnable check)
+    {
+        simulate(runnables, check, System.currentTimeMillis());
+    }
+
+    public static void simulate(IIsolatedExecutor.SerializableRunnable[] 
runnables,
+                                IIsolatedExecutor.SerializableRunnable check,
+                                long seed)
     {
         Failures failures = new HarrySimulatorTest.HaltOnError();
         RandomSource random = new RandomSource.Default();
-        long seed = System.currentTimeMillis();
         System.out.println("Using seed: " + seed);
         random.reset(seed);
         SimulatedTime time = new SimulatedTime(1, random, 1577836800000L /*Jan 
1st UTC*/, new LongRange(1, 100, MILLISECONDS, NANOSECONDS),
@@ -229,10 +251,16 @@ public class SimulationTestBase
         InstanceClassLoader classLoader = new InstanceClassLoader(1, 1, 
AbstractCluster.CURRENT_VERSION.classpath,
                                                                   
Thread.currentThread().getContextClassLoader(),
                                                                   
sharedClassPredicate,
-                                                                  new 
InterceptClasses((x) -> () -> 1.0f, (x) -> () -> 1.0f, 
NemesisFieldSelectors.get(), ClassLoader.getSystemClassLoader(), 
sharedClassPredicate.negate())::apply);
+                                                                  new 
InterceptClasses((x) -> () -> 1.0f, (x) -> () -> 1.0f,
+                                                                               
        NemesisFieldSelectors.get(),
+                                                                               
        ClassLoader.getSystemClassLoader(),
+                                                                               
        sharedClassPredicate.negate())::apply);
 
         ThreadGroup tg = new ThreadGroup("test");
-        InterceptorOfGlobalMethods interceptorOfGlobalMethods = new 
InterceptingGlobalMethods(NONE, null, failures, random);
+        InterceptedWait.CaptureSites.Capture capture = new 
InterceptedWait.CaptureSites.Capture(false, false, false);
+        InterceptorOfGlobalMethods interceptorOfGlobalMethods = 
IsolatedExecutor.transferAdhoc((IIsolatedExecutor.SerializableQuadFunction<InterceptedWait.CaptureSites.Capture,
 LongConsumer, Consumer<Throwable>, RandomSource, InterceptorOfGlobalMethods>) 
InterceptingGlobalMethods::new, classLoader)
+                                                                               
 .apply(capture, (ignore) -> {}, failures, random);
+
         InterceptingExecutorFactory factory = 
execution.factory(interceptorOfGlobalMethods, classLoader, tg);
 
         time.setup(1, classLoader);
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
index 548cf11671..7be09a1b5c 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/TrivialSimulationTest.java
@@ -39,6 +39,13 @@ import static 
org.apache.cassandra.simulator.cluster.ClusterActions.Options.noAc
 
 public class TrivialSimulationTest extends SimulationTestBase
 {
+    @Test
+    public void identityHashMapTest()
+    {
+        simulate(arr(() -> new IdentityHashMap<>().put(1, 1)),
+                 () -> {});
+    }
+
     @Test
     public void trivialTest() throws IOException // for 
demonstration/experiment purposes
     {
@@ -84,10 +91,5 @@ public class TrivialSimulationTest extends SimulationTestBase
                  () -> {});
     }
 
-    @Test
-    public void identityHashMapTest()
-    {
-        simulate(arr(() -> new IdentityHashMap<>().put(1, 1)),
-                 () -> {});
-    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to