This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git

commit 6ed308d6a814826be145f6ba30304ea474bf263e
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Mar 9 17:13:18 2023 +0100

    Pull in Cassandra concurrent utils until there is a common shared library
    
    Patch by Alex Petrov; reviewed by Caleb Rackliffe and Abe Ratnofsky for 
CASSANDRA-18315
---
 harry-core/src/harry/concurrent/Awaitable.java     | 399 +++++++++++++++++++++
 harry-core/src/harry/concurrent/Clock.java         | 103 ++++++
 harry-core/src/harry/concurrent/Condition.java     |  99 +++++
 .../src/harry/concurrent/CountDownLatch.java       | 107 ++++++
 .../src/harry/concurrent/ExecutorFactory.java      | 155 ++++++++
 .../src/harry/concurrent/InfiniteLoopExecutor.java | 178 +++++++++
 harry-core/src/harry/concurrent/Interruptible.java |  50 +++
 .../src/harry/concurrent/NamedThreadFactory.java   | 195 ++++++++++
 harry-core/src/harry/concurrent/Shutdownable.java  |  44 +++
 .../concurrent/UncheckedInterruptedException.java  |  35 ++
 harry-core/src/harry/concurrent/WaitQueue.java     | 390 ++++++++++++++++++++
 11 files changed, 1755 insertions(+)

diff --git a/harry-core/src/harry/concurrent/Awaitable.java 
b/harry-core/src/harry/concurrent/Awaitable.java
new file mode 100644
index 0000000..3781e78
--- /dev/null
+++ b/harry-core/src/harry/concurrent/Awaitable.java
@@ -0,0 +1,399 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Predicate;
+
+import static harry.concurrent.Clock.Global.nanoTime;
+import static harry.concurrent.WaitQueue.newWaitQueue;
+
+/**
+ * A generic signal consumer, supporting all of the typical patterns used in 
Cassandra.
+ * All of the methods defined in {@link Awaitable} may be waited on without a 
loop,
+ * as this interface declares that there are no spurious wake-ups.
+ *
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface Awaitable
+{
+    /**
+     * Await until the deadline (in nanoTime), throwing any interrupt.
+     * No spurious wakeups.
+     * @return true if we were signalled, false if the deadline elapsed
+     * @throws InterruptedException if interrupted
+     */
+    boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException;
+
+    /**
+     * Await until the deadline (in nanoTime), throwing any interrupt as an 
unchecked exception.
+     * No spurious wakeups.
+     * @return true if we were signalled, false if the deadline elapsed
+     * @throws UncheckedInterruptedException if interrupted
+     */
+    boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws 
UncheckedInterruptedException;
+
+    /**
+     * Await until the deadline (in nanoTime), ignoring interrupts (but 
maintaining the interrupt flag on exit).
+     * No spurious wakeups.
+     * @return true if we were signalled, false if the deadline elapsed
+     */
+    boolean awaitUntilUninterruptibly(long nanoTimeDeadline);
+
+    /**
+     * Await for the specified period, throwing any interrupt.
+     * No spurious wakeups.
+     * @return true if we were signalled, false if the timeout elapses
+     * @throws InterruptedException if interrupted
+     */
+    boolean await(long time, TimeUnit units) throws InterruptedException;
+
+    /**
+     * Await for the specified period, throwing any interrupt as an unchecked 
exception.
+     * No spurious wakeups.
+     * @return true if we were signalled, false if the timeout elapses
+     * @throws UncheckedInterruptedException if interrupted
+     */
+    boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws 
UncheckedInterruptedException;
+
+    /**
+     * Await until the deadline (in nanoTime), ignoring interrupts (but 
maintaining the interrupt flag on exit).
+     * No spurious wakeups.
+     * @return true if we were signalled, false if the timeout elapses
+     */
+    boolean awaitUninterruptibly(long time, TimeUnit units);
+
+    /**
+     * Await indefinitely, throwing any interrupt.
+     * No spurious wakeups.
+     * @throws InterruptedException if interrupted
+     */
+    Awaitable await() throws InterruptedException;
+
+    /**
+     * Await indefinitely, throwing any interrupt as an unchecked exception.
+     * No spurious wakeups.
+     * @throws UncheckedInterruptedException if interrupted
+     */
+    Awaitable awaitThrowUncheckedOnInterrupt() throws 
UncheckedInterruptedException;
+
+    /**
+     * Await indefinitely, ignoring interrupts (but maintaining the interrupt 
flag on exit).
+     * No spurious wakeups.
+     */
+    Awaitable awaitUninterruptibly();
+
+    // we must declare the static implementation methods outside of the 
interface,
+    // so that they can be loaded by different classloaders during simulation
+    class Defaults
+    {
+        public static boolean await(Awaitable await, long time, TimeUnit unit) 
throws InterruptedException
+        {
+            return await.awaitUntil(nanoTime() + unit.toNanos(time));
+        }
+
+        public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, 
long time, TimeUnit units) throws UncheckedInterruptedException
+        {
+            return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + 
units.toNanos(time));
+        }
+
+        public static boolean awaitUninterruptibly(Awaitable await, long time, 
TimeUnit units)
+        {
+            return awaitUntilUninterruptibly(await, nanoTime() + 
units.toNanos(time));
+        }
+
+        public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A 
await) throws UncheckedInterruptedException
+        {
+            try
+            {
+                await.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new UncheckedInterruptedException();
+            }
+            return await;
+        }
+
+        public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable 
await, long nanoTimeDeadline) throws UncheckedInterruptedException
+        {
+            try
+            {
+                return await.awaitUntil(nanoTimeDeadline);
+            }
+            catch (InterruptedException e)
+            {
+                throw new UncheckedInterruptedException();
+            }
+        }
+
+        /**
+         * {@link Awaitable#awaitUntilUninterruptibly(long)}
+         */
+        public static boolean awaitUntilUninterruptibly(Awaitable await, long 
nanoTimeDeadline)
+        {
+            boolean interrupted = false;
+            boolean result;
+            while (true)
+            {
+                try
+                {
+                    result = await.awaitUntil(nanoTimeDeadline);
+                    break;
+                }
+                catch (InterruptedException e)
+                {
+                    interrupted = true;
+                }
+            }
+            if (interrupted)
+                Thread.currentThread().interrupt();
+            return result;
+        }
+
+        /**
+         * {@link Awaitable#awaitUninterruptibly()}
+         */
+        public static <A extends Awaitable> A awaitUninterruptibly(A await)
+        {
+            boolean interrupted = false;
+            while (true)
+            {
+                try
+                {
+                    await.await();
+                    break;
+                }
+                catch (InterruptedException e)
+                {
+                    interrupted = true;
+                }
+            }
+            if (interrupted)
+                Thread.currentThread().interrupt();
+            return await;
+        }
+    }
+
+    abstract class AbstractAwaitable implements Awaitable
+    {
+        protected AbstractAwaitable() {}
+
+        /**
+         * {@link Awaitable#await(long, TimeUnit)}
+         */
+        @Override
+        public boolean await(long time, TimeUnit unit) throws 
InterruptedException
+        {
+            return Defaults.await(this, time, unit);
+        }
+
+        /**
+         * {@link Awaitable#awaitThrowUncheckedOnInterrupt(long, TimeUnit)}
+         */
+        @Override
+        public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit 
units) throws UncheckedInterruptedException
+        {
+            return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units);
+        }
+
+        /**
+         * {@link Awaitable#awaitUninterruptibly(long, TimeUnit)}
+         */
+        public boolean awaitUninterruptibly(long time, TimeUnit units)
+        {
+            return awaitUntilUninterruptibly(nanoTime() + units.toNanos(time));
+        }
+
+        /**
+         * {@link Awaitable#awaitThrowUncheckedOnInterrupt()}
+         */
+        public Awaitable awaitThrowUncheckedOnInterrupt() throws 
UncheckedInterruptedException
+        {
+            return Defaults.awaitThrowUncheckedOnInterrupt(this);
+        }
+
+        /**
+         * {@link Awaitable#awaitUntilThrowUncheckedOnInterrupt(long)}
+         */
+        public boolean awaitUntilThrowUncheckedOnInterrupt(long 
nanoTimeDeadline) throws UncheckedInterruptedException
+        {
+            return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, 
nanoTimeDeadline);
+        }
+
+        /**
+         * {@link Awaitable#awaitUntilUninterruptibly(long)}
+         */
+        public boolean awaitUntilUninterruptibly(long nanoTimeDeadline)
+        {
+            return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline);
+        }
+
+        /**
+         * {@link Awaitable#awaitUninterruptibly()}
+         */
+        public Awaitable awaitUninterruptibly()
+        {
+            return Defaults.awaitUninterruptibly(this);
+        }
+    }
+
+    /**
+     * A barebones asynchronous {@link Awaitable}.
+     * If your state is minimal, or can be updated concurrently, extend this 
class.
+     */
+    abstract class AsyncAwaitable extends AbstractAwaitable
+    {
+        /**
+         * Maintain an internal variable containing a lazily-initialized wait 
queue
+         * @return null if is done
+         */
+        private static <A extends Awaitable> WaitQueue.Signal 
register(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, Predicate<A> 
isDone, A awaitable)
+        {
+            if (isDone.test(awaitable))
+                return null;
+
+            WaitQueue waiting = waitingUpdater.get(awaitable);
+            if (waiting == null)
+            {
+                if (!waitingUpdater.compareAndSet(awaitable, null, waiting = 
newWaitQueue()))
+                {
+                    waiting = waitingUpdater.get(awaitable);
+                    if (waiting == null)
+                    {
+                        assert isDone.test(awaitable);
+                        return null;
+                    }
+                }
+            }
+
+            WaitQueue.Signal s = waiting.register();
+            if (!isDone.test(awaitable))
+                return s;
+
+            s.cancel();
+            return null;
+        }
+
+        static <A extends Awaitable> A await(AtomicReferenceFieldUpdater<A, 
WaitQueue> waitingUpdater, Predicate<A> isDone, A awaitable) throws 
InterruptedException
+        {
+            WaitQueue.Signal s = register(waitingUpdater, isDone, awaitable);
+            if (s != null)
+                s.await();
+            return awaitable;
+        }
+
+        static <A extends Awaitable> boolean 
awaitUntil(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, 
Predicate<A> isDone, A awaitable, long nanoTimeDeadline) throws 
InterruptedException
+        {
+            WaitQueue.Signal s = register(waitingUpdater, isDone, awaitable);
+            return s == null || s.awaitUntil(nanoTimeDeadline) || 
isDone.test(awaitable);
+        }
+
+        static <A extends Awaitable> void 
signalAll(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, A awaitable)
+        {
+            WaitQueue waiting = waitingUpdater.get(awaitable);
+            if (waiting == null)
+                return;
+
+            waiting.signalAll();
+            waitingUpdater.lazySet(awaitable, null);
+        }
+
+        private static final AtomicReferenceFieldUpdater<AsyncAwaitable, 
WaitQueue> waitingUpdater = 
AtomicReferenceFieldUpdater.newUpdater(AsyncAwaitable.class, WaitQueue.class, 
"waiting");
+        private volatile WaitQueue waiting;
+
+        protected AsyncAwaitable() {}
+
+        /**
+         * {@link Awaitable#await()}
+         */
+        public Awaitable await() throws InterruptedException
+        {
+            return await(waitingUpdater, AsyncAwaitable::isSignalled, this);
+        }
+
+        /**
+         * {@link Awaitable#awaitUntil(long)}
+         */
+        public boolean awaitUntil(long nanoTimeDeadline) throws 
InterruptedException
+        {
+            return awaitUntil(waitingUpdater, AsyncAwaitable::isSignalled, 
this, nanoTimeDeadline);
+        }
+
+        /**
+         * Signal any waiting threads; {@link #isSignalled()} must return 
{@code true} before this method is invoked.
+         */
+        protected void signal()
+        {
+            signalAll(waitingUpdater, this);
+        }
+
+        /**
+         * Return true once signalled. Unidirectional; once true, must never 
again be false.
+         */
+        protected abstract boolean isSignalled();
+    }
+
+    /**
+     * A barebones {@link Awaitable} that uses mutual exclusion.
+     * If your state will be updated while holding the object monitor, extend 
this class.
+     */
+    abstract class SyncAwaitable extends AbstractAwaitable
+    {
+        protected SyncAwaitable() {}
+
+        /**
+         * {@link Awaitable#await()}
+         */
+        public synchronized Awaitable await() throws InterruptedException
+        {
+            while (!isSignalled())
+                wait();
+            return this;
+        }
+
+        /**
+         * {@link Awaitable#awaitUntil(long)}
+         */
+        public synchronized boolean awaitUntil(long nanoTimeDeadline) throws 
InterruptedException
+        {
+            while (true)
+            {
+                if (isSignalled()) return true;
+                if (!waitUntil(this, nanoTimeDeadline)) return false;
+            }
+        }
+
+        /**
+         * Return true once signalled. Unidirectional; once true, must never 
again be false.
+         */
+        protected abstract boolean isSignalled();
+
+        public static boolean waitUntil(Object monitor, long deadlineNanos) 
throws InterruptedException
+        {
+            long wait = deadlineNanos - nanoTime();
+            if (wait <= 0)
+                return false;
+
+            monitor.wait((wait + 999999) / 1000000);
+            return true;
+        }
+    }
+}
diff --git a/harry-core/src/harry/concurrent/Clock.java 
b/harry-core/src/harry/concurrent/Clock.java
new file mode 100644
index 0000000..4ebbce2
--- /dev/null
+++ b/harry-core/src/harry/concurrent/Clock.java
@@ -0,0 +1,103 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around time related functions that are either implemented by using 
the default JVM calls
+ * or by using a custom implementation for testing purposes.
+ *
+ * See {@link Global#instance} for how to use a custom implementation.
+ *
+ * Please note that {@link java.time.Clock} wasn't used, as it would not be 
possible to provide an
+ * implementation for {@link #nanoTime()} with the exact same properties of 
{@link System#nanoTime()}.
+ *
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface Clock
+{
+    public static class Global
+    {
+        /**
+         * Static singleton object that will be instantiated by default with a 
system clock
+         * implementation. Set <code>cassandra.clock</code> system property to 
a FQCN to use a
+         * different implementation instead.
+         */
+        private static final Clock instance;
+
+        static
+        {
+            Clock clock = new Default();
+            instance = clock;
+        }
+
+        /**
+         * Semantically equivalent to {@link System#nanoTime()}
+         */
+        public static long nanoTime()
+        {
+            return instance.nanoTime();
+        }
+
+        /**
+         * Semantically equivalent to {@link System#currentTimeMillis()}
+         */
+        public static long currentTimeMillis()
+        {
+            return instance.currentTimeMillis();
+        }
+    }
+
+    public static class Default implements Clock
+    {
+        /**
+         * {@link System#nanoTime()}
+         */
+        public long nanoTime()
+        {
+            return System.nanoTime(); // checkstyle: permit system clock
+        }
+
+        /**
+         * {@link System#currentTimeMillis()}
+         */
+        public long currentTimeMillis()
+        {
+            return System.currentTimeMillis(); // checkstyle: permit system 
clock
+        }
+    }
+
+    /**
+     * Semantically equivalent to {@link System#nanoTime()}
+     */
+    public long nanoTime();
+
+    /**
+     * Semantically equivalent to {@link System#currentTimeMillis()}
+     */
+    public long currentTimeMillis();
+
+    public static void waitUntil(long deadlineNanos) throws 
InterruptedException
+    {
+        long waitNanos = Clock.Global.nanoTime() - deadlineNanos;
+        if (waitNanos > 0)
+            TimeUnit.NANOSECONDS.sleep(waitNanos);
+    }
+}
diff --git a/harry-core/src/harry/concurrent/Condition.java 
b/harry-core/src/harry/concurrent/Condition.java
new file mode 100644
index 0000000..ad59289
--- /dev/null
+++ b/harry-core/src/harry/concurrent/Condition.java
@@ -0,0 +1,99 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+/**
+ * Simpler API than java.util.concurrent.Condition; would be nice to extend 
it, but also nice
+ * to share API with Future, for which Netty's API is incompatible with 
java.util.concurrent.Condition
+ *
+ * {@link Awaitable} for explicit external signals.
+ *
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface Condition extends Awaitable
+{
+    /**
+     * Returns true once signalled. Unidirectional; once true, will never 
again be false.
+     */
+    boolean isSignalled();
+
+    /**
+     * Signal the condition as met, and wake all waiting threads.
+     */
+    void signal();
+
+    /**
+     * Signal the condition as met, and wake all waiting threads.
+     */
+    default void signalAll() { signal(); }
+
+    /**
+     * Factory method used to capture and redirect instantiations for 
simulation
+     */
+    static Condition newOneTimeCondition()
+    {
+        return new Async();
+    }
+
+    /**
+     * An asynchronous {@link Condition}. Typically lower overhead than {@link 
Sync}.
+     */
+    public static class Async extends AsyncAwaitable implements Condition
+    {
+        private volatile boolean signaled = false;
+
+        // WARNING: if extending this class, consider simulator interactions
+        protected Async() {}
+
+        public boolean isSignalled()
+        {
+            return signaled;
+        }
+
+        public void signal()
+        {
+            signaled = true;
+            super.signal();
+        }
+    }
+
+    /**
+     * A {@link Condition} based on its object monitor.
+     * WARNING: lengthy operations performed while holding the lock may 
prevent timely notification of waiting threads
+     * that a deadline has passed.
+     */
+    public static class Sync extends SyncAwaitable implements Condition
+    {
+        private boolean signaled = false;
+
+        // this can be instantiated directly, as we intercept monitors 
directly with byte weaving
+        public Sync() {}
+
+        public synchronized boolean isSignalled()
+        {
+            return signaled;
+        }
+
+        public synchronized void signal()
+        {
+            signaled = true;
+            notifyAll();
+        }
+    }
+}
diff --git a/harry-core/src/harry/concurrent/CountDownLatch.java 
b/harry-core/src/harry/concurrent/CountDownLatch.java
new file mode 100644
index 0000000..c673550
--- /dev/null
+++ b/harry-core/src/harry/concurrent/CountDownLatch.java
@@ -0,0 +1,107 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface CountDownLatch extends Awaitable
+{
+    /**
+     * Count down by 1, signalling waiters if we have reached zero
+     */
+    void decrement();
+
+    /**
+     * @return the current count
+     */
+    int count();
+
+    /**
+     * Factory method used to capture and redirect instantiations for 
simulation
+     */
+    static CountDownLatch newCountDownLatch(int count)
+    {
+        return new Async(count);
+    }
+
+    static class Async extends AsyncAwaitable implements CountDownLatch
+    {
+        private static final AtomicIntegerFieldUpdater<CountDownLatch.Async> 
countUpdater = AtomicIntegerFieldUpdater.newUpdater(CountDownLatch.Async.class, 
"count");
+        private volatile int count;
+
+        // WARNING: if extending this class, consider simulator interactions
+        protected Async(int count)
+        {
+            this.count = count;
+            if (count == 0)
+                signal();
+        }
+
+        public void decrement()
+        {
+            if (countUpdater.decrementAndGet(this) == 0)
+                signal();
+        }
+
+        public int count()
+        {
+            return count;
+        }
+
+        @Override
+        protected boolean isSignalled()
+        {
+            return count <= 0;
+        }
+    }
+
+    static final class Sync extends SyncAwaitable implements CountDownLatch
+    {
+        private int count;
+
+        public Sync(int count)
+        {
+            this.count = count;
+        }
+
+        public synchronized void decrement()
+        {
+            if (count > 0 && --count == 0)
+                notifyAll();
+        }
+
+        public synchronized int count()
+        {
+            return count;
+        }
+
+        /**
+         * not synchronized as only intended for internal usage by externally 
synchronized methods
+         */
+
+        @Override
+        protected boolean isSignalled()
+        {
+            return count <= 0;
+        }
+    }
+}
diff --git a/harry-core/src/harry/concurrent/ExecutorFactory.java 
b/harry-core/src/harry/concurrent/ExecutorFactory.java
new file mode 100644
index 0000000..834d511
--- /dev/null
+++ b/harry-core/src/harry/concurrent/ExecutorFactory.java
@@ -0,0 +1,155 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import static harry.concurrent.InfiniteLoopExecutor.Daemon.DAEMON;
+import static harry.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED;
+import static harry.concurrent.NamedThreadFactory.createThread;
+import static harry.concurrent.NamedThreadFactory.setupThread;
+
+/**
+ * Entry point for configuring and creating new executors.
+ *
+ *
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface ExecutorFactory
+{
+    public enum SimulatorSemantics
+    {
+        NORMAL, DISCARD
+    }
+
+    /**
+     * Create and start a new thread to execute {@code runnable}
+     * @param name the name of the thread
+     * @param runnable the task to execute
+     * @param daemon flag to indicate whether the thread should be a daemon or 
not
+     * @return the new thread
+     */
+    Thread startThread(String name, Runnable runnable, 
InfiniteLoopExecutor.Daemon daemon);
+
+    /**
+     * Create and start a new thread to execute {@code runnable}; this thread 
will be a daemon thread.
+     * @param name the name of the thread
+     * @param runnable the task to execute
+     * @return the new thread
+     */
+    default Thread startThread(String name, Runnable runnable)
+    {
+        return startThread(name, runnable, DAEMON);
+    }
+
+    /**
+     * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code 
runnable}.
+     * On shutdown, the executing thread will be interrupted; to support clean 
shutdown
+     * {@code runnable} should propagate {@link InterruptedException}
+     *
+     * @param name the name of the thread used to invoke the task repeatedly
+     * @param task the task to execute repeatedly
+     * @param simulatorSafe flag indicating if the loop thread can be 
intercepted / rescheduled during cluster simulation
+     * @param daemon flag to indicate whether the loop thread should be a 
daemon thread or not
+     * @param interrupts flag to indicate whether to synchronize interrupts of 
the task execution thread
+     *                   using the task's monitor this can be used to prevent 
interruption while performing
+     *                   IO operations which forbid interrupted threads.
+     *                   See: {@link 
org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager::start}
+     * @return the new thread
+     */
+    Interruptible infiniteLoop(String name, Interruptible.Task task, 
InfiniteLoopExecutor.SimulatorSafe simulatorSafe, InfiniteLoopExecutor.Daemon 
daemon, InfiniteLoopExecutor.Interrupts interrupts);
+
+    /**
+     * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code 
runnable}.
+     * On shutdown, the executing thread will be interrupted; to support clean 
shutdown
+     * {@code runnable} should propagate {@link InterruptedException}
+     *
+     * @param name the name of the thread used to invoke the task repeatedly
+     * @param task the task to execute repeatedly
+     * @param simulatorSafe flag indicating if the loop thread can be 
intercepted / rescheduled during cluster simulation
+     * @return the new thread
+     */
+    default Interruptible infiniteLoop(String name, Interruptible.SimpleTask 
task, InfiniteLoopExecutor.SimulatorSafe simulatorSafe)
+    {
+        return infiniteLoop(name, Interruptible.Task.from(task), 
simulatorSafe, DAEMON, UNSYNCHRONIZED);
+    }
+
+    /**
+     * Create a new thread group for use with builders - this thread group 
will be situated within
+     * this factory's parent thread group, and may be supplied to multiple 
executor builders.
+     */
+    ThreadGroup newThreadGroup(String name);
+
+    public static final class Global
+    {
+        // deliberately not volatile to ensure zero overhead outside of 
testing;
+        // depend on other memory visibility primitives to ensure visibility
+        private static ExecutorFactory FACTORY = new 
ExecutorFactory.Default(Global.class.getClassLoader(), null, (t, e) -> 
e.printStackTrace());
+        private static boolean modified;
+
+        public static ExecutorFactory executorFactory()
+        {
+            return FACTORY;
+        }
+
+        public static synchronized void unsafeSet(ExecutorFactory 
executorFactory)
+        {
+            FACTORY = executorFactory;
+            modified = true;
+        }
+
+        public static synchronized boolean tryUnsafeSet(ExecutorFactory 
executorFactory)
+        {
+            if (modified)
+                return false;
+            unsafeSet(executorFactory);
+            return true;
+        }
+    }
+
+    public static final class Default extends NamedThreadFactory.MetaFactory 
implements ExecutorFactory
+    {
+        public Default(ClassLoader contextClassLoader, ThreadGroup 
threadGroup, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
+        {
+            super(contextClassLoader, threadGroup, uncaughtExceptionHandler);
+        }
+
+
+        @Override
+        public Thread startThread(String name, Runnable runnable, 
InfiniteLoopExecutor.Daemon daemon)
+        {
+            Thread thread = setupThread(createThread(threadGroup, runnable, 
name, daemon == DAEMON),
+                                        Thread.NORM_PRIORITY,
+                                        contextClassLoader,
+                                        uncaughtExceptionHandler);
+            thread.start();
+            return thread;
+        }
+
+        @Override
+        public Interruptible infiniteLoop(String name, Interruptible.Task 
task, InfiniteLoopExecutor.SimulatorSafe simulatorSafe, 
InfiniteLoopExecutor.Daemon daemon, InfiniteLoopExecutor.Interrupts interrupts)
+        {
+            return new InfiniteLoopExecutor(this, name, task, daemon, 
interrupts);
+        }
+
+        @Override
+        public ThreadGroup newThreadGroup(String name)
+        {
+            return threadGroup == null ? null : new ThreadGroup(threadGroup, 
name);
+        }
+    }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/concurrent/InfiniteLoopExecutor.java 
b/harry-core/src/harry/concurrent/InfiniteLoopExecutor.java
new file mode 100644
index 0000000..32fcf93
--- /dev/null
+++ b/harry-core/src/harry/concurrent/InfiniteLoopExecutor.java
@@ -0,0 +1,178 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static harry.concurrent.Clock.Global.nanoTime;
+import static harry.concurrent.Condition.newOneTimeCondition;
+import static 
harry.concurrent.InfiniteLoopExecutor.InternalState.SHUTTING_DOWN_NOW;
+import static harry.concurrent.InfiniteLoopExecutor.InternalState.TERMINATED;
+import static harry.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED;
+import static harry.concurrent.Interruptible.State.INTERRUPTED;
+import static harry.concurrent.Interruptible.State.NORMAL;
+import static harry.concurrent.Interruptible.State.SHUTTING_DOWN;
+
+/**
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public class InfiniteLoopExecutor implements Interruptible
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(InfiniteLoopExecutor.class);
+
+    public enum InternalState { SHUTTING_DOWN_NOW, TERMINATED }
+
+    public enum SimulatorSafe { SAFE, UNSAFE }
+
+    public enum Daemon        { DAEMON, NON_DAEMON }
+
+    public enum Interrupts    { SYNCHRONIZED, UNSYNCHRONIZED }
+
+    private static final AtomicReferenceFieldUpdater<InfiniteLoopExecutor, 
Object> stateUpdater = 
AtomicReferenceFieldUpdater.newUpdater(InfiniteLoopExecutor.class, 
Object.class, "state");
+    private final Thread thread;
+    private final Task task;
+    private volatile Object state = NORMAL;
+    private final Consumer<Thread> interruptHandler;
+    private final Condition isTerminated = newOneTimeCondition();
+
+    public InfiniteLoopExecutor(String name, Task task, Daemon daemon)
+    {
+        this(ExecutorFactory.Global.executorFactory(), name, task, daemon, 
UNSYNCHRONIZED);
+    }
+
+    public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task 
task, Daemon daemon)
+    {
+        this(factory, name, task, daemon, UNSYNCHRONIZED);
+    }
+
+    public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task 
task, Daemon daemon, Interrupts interrupts)
+    {
+        this.task = task;
+        this.thread = factory.startThread(name, this::loop, daemon);
+        this.interruptHandler = interrupts == Interrupts.SYNCHRONIZED
+                                ? interruptHandler(task)
+                                : Thread::interrupt;
+    }
+
+    public InfiniteLoopExecutor(BiFunction<String, Runnable, Thread> 
threadStarter, String name, Task task, Interrupts interrupts)
+    {
+        this.task = task;
+        this.thread = threadStarter.apply(name, this::loop);
+        this.interruptHandler = interrupts == Interrupts.SYNCHRONIZED
+                                ? interruptHandler(task)
+                                : Thread::interrupt;
+    }
+
+    private static Consumer<Thread> interruptHandler(final Object monitor)
+    {
+        return thread -> {
+            synchronized (monitor)
+            {
+                thread.interrupt();
+            }
+        };
+    }
+
+
+    private void loop()
+    {
+        boolean interrupted = false;
+        try
+        {
+            while (true)
+            {
+                try
+                {
+                    Object cur = state;
+                    if (cur == SHUTTING_DOWN_NOW) break;
+
+                    interrupted |= Thread.interrupted();
+                    if (cur == NORMAL && interrupted) cur = INTERRUPTED;
+                    task.run((State) cur);
+
+                    interrupted = false;
+                    if (cur == SHUTTING_DOWN) break;
+                }
+                catch (TerminateException ignore)
+                {
+                    break;
+                }
+                catch (UncheckedInterruptedException | InterruptedException 
ignore)
+                {
+                    interrupted = true;
+                }
+                catch (Throwable t)
+                {
+                    logger.error("Exception thrown by runnable, continuing 
with loop", t);
+                }
+            }
+        }
+        finally
+        {
+            state = TERMINATED;
+            isTerminated.signal();
+        }
+    }
+
+    public void interrupt()
+    {
+        interruptHandler.accept(thread);
+    }
+
+    public void shutdown()
+    {
+        stateUpdater.updateAndGet(this, cur -> cur != TERMINATED && cur != 
SHUTTING_DOWN_NOW ? SHUTTING_DOWN : cur);
+        // TODO: InfiniteLoopExecutor should let the threads quiesce 
themselves rather then send interrupts
+        //interruptHandler.accept(thread);
+    }
+
+    public Object shutdownNow()
+    {
+        stateUpdater.updateAndGet(this, cur -> cur != TERMINATED ? 
SHUTTING_DOWN_NOW : TERMINATED);
+        interruptHandler.accept(thread);
+        return null;
+    }
+
+    @Override
+    public boolean isTerminated()
+    {
+        return state == TERMINATED;
+    }
+
+    public boolean awaitTermination(long time, TimeUnit unit) throws 
InterruptedException
+    {
+        if (isTerminated())
+            return true;
+
+        long deadlineNanos = nanoTime() + unit.toNanos(time);
+        isTerminated.awaitUntil(deadlineNanos);
+        return isTerminated();
+    }
+
+    public boolean isAlive()
+    {
+        return this.thread.isAlive();
+    }
+}
diff --git a/harry-core/src/harry/concurrent/Interruptible.java 
b/harry-core/src/harry/concurrent/Interruptible.java
new file mode 100644
index 0000000..18e605d
--- /dev/null
+++ b/harry-core/src/harry/concurrent/Interruptible.java
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+/**
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface Interruptible extends Shutdownable
+{
+    public enum State { NORMAL, INTERRUPTED, SHUTTING_DOWN }
+
+    public static class TerminateException extends InterruptedException {}
+
+    public interface Task
+    {
+        void run(State state) throws InterruptedException;
+
+        static Task from(SimpleTask simpleTask)
+        {
+            return state -> { if (state == State.NORMAL) simpleTask.run(); };
+        }
+    }
+
+    /**
+     * A Task that only runs on NORMAL states
+     */
+    public interface SimpleTask
+    {
+        void run() throws InterruptedException;
+    }
+
+    void interrupt();
+}
+
diff --git a/harry-core/src/harry/concurrent/NamedThreadFactory.java 
b/harry-core/src/harry/concurrent/NamedThreadFactory.java
new file mode 100644
index 0000000..9ed4f78
--- /dev/null
+++ b/harry-core/src/harry/concurrent/NamedThreadFactory.java
@@ -0,0 +1,195 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class is an implementation of the <i>ThreadFactory</i> interface. This
+ * is useful to give Java threads meaningful names which is useful when using
+ * a tool like JConsole.
+ *
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public class NamedThreadFactory implements ThreadFactory
+{
+    private static final AtomicInteger anonymousCounter = new AtomicInteger();
+    private static volatile String globalPrefix;
+
+    public static void setGlobalPrefix(String prefix) { globalPrefix = prefix; 
}
+    public static String globalPrefix()
+    {
+        String prefix = globalPrefix;
+        return prefix == null ? "" : prefix;
+    }
+
+    public static class MetaFactory
+    {
+        protected ClassLoader contextClassLoader;
+        protected ThreadGroup threadGroup;
+        protected Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+        public MetaFactory(ClassLoader contextClassLoader, ThreadGroup 
threadGroup, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
+        {
+            this.contextClassLoader = contextClassLoader;
+            if (threadGroup == null)
+            {
+                threadGroup = Thread.currentThread().getThreadGroup();
+                while (threadGroup.getParent() != null)
+                    threadGroup = threadGroup.getParent();
+            }
+            this.threadGroup = threadGroup;
+            this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+        }
+
+        NamedThreadFactory newThreadFactory(String name, int threadPriority)
+        {
+            // We create a unique thread group for each factory, so that e.g. 
executors can determine which threads are members of the executor
+            ThreadGroup threadGroup = this.threadGroup == null ? null : new 
ThreadGroup(this.threadGroup, name);
+            return new NamedThreadFactory(name, threadPriority, 
contextClassLoader, threadGroup, uncaughtExceptionHandler);
+        }
+    }
+
+    public final String id;
+    private final int priority;
+    private final ClassLoader contextClassLoader;
+    public final ThreadGroup threadGroup;
+    protected final AtomicInteger n = new AtomicInteger(1);
+    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+    public NamedThreadFactory(String id)
+    {
+        this(id, Thread.NORM_PRIORITY);
+    }
+
+    public NamedThreadFactory(String id, int priority)
+    {
+        this(id, priority, null, null, (t, e) -> e.printStackTrace());
+    }
+
+    public NamedThreadFactory(String id, ClassLoader contextClassLoader, 
ThreadGroup threadGroup)
+    {
+        this(id, Thread.NORM_PRIORITY, contextClassLoader, threadGroup, (t, e) 
-> e.printStackTrace());
+    }
+
+    public NamedThreadFactory(String id, int priority, ClassLoader 
contextClassLoader, ThreadGroup threadGroup)
+    {
+        this(id, priority, contextClassLoader, threadGroup, (t, e) -> 
e.printStackTrace());
+    }
+
+    public NamedThreadFactory(String id, int priority, ClassLoader 
contextClassLoader, ThreadGroup threadGroup, Thread.UncaughtExceptionHandler 
uncaughtExceptionHandler)
+    {
+        this.id = id;
+        this.priority = priority;
+        this.contextClassLoader = contextClassLoader;
+        this.threadGroup = threadGroup;
+        this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+    }
+
+    @Override
+    public Thread newThread(Runnable runnable)
+    {
+        String name = id + ':' + n.getAndIncrement();
+        return newThread(threadGroup, runnable, name);
+    }
+
+    protected Thread newThread(ThreadGroup threadGroup, Runnable runnable, 
String name)
+    {
+        return setupThread(createThread(threadGroup, runnable, name, true));
+    }
+
+    protected <T extends Thread> T setupThread(T thread)
+    {
+        return setupThread(thread, priority, contextClassLoader, 
uncaughtExceptionHandler);
+    }
+
+    public static Thread createThread(ThreadGroup threadGroup, Runnable 
runnable, String name, int priority, ClassLoader contextClassLoader, 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
+    {
+        String prefix = globalPrefix;
+        Thread thread = createThread(threadGroup, runnable, prefix != null ? 
prefix + name : name, true);
+        thread.setPriority(priority);
+        if (contextClassLoader != null)
+            thread.setContextClassLoader(contextClassLoader);
+        if (uncaughtExceptionHandler != null)
+            thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+        return thread;
+    }
+
+    public static Thread createAnonymousThread(Runnable runnable)
+    {
+        return createThread(null, runnable, "anonymous-" + 
anonymousCounter.incrementAndGet());
+    }
+
+    public static Thread createThread(Runnable runnable, String name)
+    {
+        return createThread(null, runnable, name);
+    }
+
+    public Thread createThread(Runnable runnable, String name, boolean daemon)
+    {
+        return createThread(null, runnable, name, daemon);
+    }
+
+    public static Thread createThread(ThreadGroup threadGroup, Runnable 
runnable, String name)
+    {
+        return createThread(threadGroup, runnable, name, false);
+    }
+
+    public static Thread createThread(ThreadGroup threadGroup, Runnable 
runnable, String name, boolean daemon)
+    {
+        String prefix = globalPrefix;
+        Thread thread = new Thread(threadGroup, runnable, prefix != null ? 
prefix + name : name);
+        thread.setDaemon(daemon);
+        return thread;
+    }
+
+    public static  <T extends Thread> T setupThread(T thread, int priority, 
ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler 
uncaughtExceptionHandler)
+    {
+        thread.setPriority(priority);
+        if (contextClassLoader != null)
+            thread.setContextClassLoader(contextClassLoader);
+        if (uncaughtExceptionHandler != null)
+            thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+        return thread;
+    }
+
+    @Override
+    public String toString()
+    {
+        return threadGroup != null ? id + " in " + threadGroup.getName() : id;
+    }
+
+    public void close()
+    {
+        synchronized (threadGroup)
+        {
+            threadGroup.setDaemon(true);
+            // ThreadGroup API is terrible; setDaemon does not destroy if 
already empty, and establishing if empty
+            // otherwise is tortuous - easier to just try to destroy and fail 
if currently an invalid action
+            try
+            {
+                threadGroup.destroy();
+            }
+            catch (IllegalThreadStateException ignore)
+            {
+            }
+        }
+    }
+}
diff --git a/harry-core/src/harry/concurrent/Shutdownable.java 
b/harry-core/src/harry/concurrent/Shutdownable.java
new file mode 100644
index 0000000..0e8f0da
--- /dev/null
+++ b/harry-core/src/harry/concurrent/Shutdownable.java
@@ -0,0 +1,44 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface Shutdownable
+{
+    boolean isTerminated();
+
+    /**
+     * Shutdown once any remaining work has completed (however this is defined 
for the implementation).
+     */
+    void shutdown();
+
+    /**
+     * Shutdown immediately, possibly interrupting ongoing work, and 
cancelling work that is queued.
+     */
+    Object shutdownNow();
+
+    /**
+     * Await termination of this object, i.e. the cessation of all current and 
future work.
+     */
+    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException;
+}
diff --git a/harry-core/src/harry/concurrent/UncheckedInterruptedException.java 
b/harry-core/src/harry/concurrent/UncheckedInterruptedException.java
new file mode 100644
index 0000000..c2be5a5
--- /dev/null
+++ b/harry-core/src/harry/concurrent/UncheckedInterruptedException.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+/**
+ * Unchecked {@link InterruptedException}, to be thrown in places where an 
interrupt is unexpected
+ *
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public class UncheckedInterruptedException extends RuntimeException
+{
+    public UncheckedInterruptedException()
+    {
+    }
+    public UncheckedInterruptedException(InterruptedException cause)
+    {
+        super(cause);
+    }
+}
diff --git a/harry-core/src/harry/concurrent/WaitQueue.java 
b/harry-core/src/harry/concurrent/WaitQueue.java
new file mode 100644
index 0000000..ab96dc5
--- /dev/null
+++ b/harry-core/src/harry/concurrent/WaitQueue.java
@@ -0,0 +1,390 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.concurrent;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+
+import harry.concurrent.Awaitable.AbstractAwaitable;
+
+import static harry.concurrent.Clock.Global.nanoTime;
+
+/**
+ * This class was borrowed from Apache Cassandra, 
org.cassandra.utils.concurrent, until there's a shared concurrency lib.
+ */
+public interface WaitQueue
+{
+    /**
+     * A Signal is a one-time-use mechanism for a thread to wait for 
notification that some condition
+     * state has transitioned that it may be interested in (and hence should 
check if it is).
+     * It is potentially transient, i.e. the state can change in the meantime, 
it only indicates
+     * that it should be checked, not necessarily anything about what the 
expected state should be.
+     *
+     * Signal implementations should never wake up spuriously, they are always 
woken up by a
+     * signal() or signalAll().
+     *
+     * This abstract definition of Signal does not need to be tied to a 
WaitQueue.
+     * Whilst RegisteredSignal is the main building block of Signals, this 
abstract
+     * definition allows us to compose Signals in useful ways. The Signal is 
'owned' by the
+     * thread that registered itself with WaitQueue(s) to obtain the 
underlying RegisteredSignal(s);
+     * only the owning thread should use a Signal.
+     */
+    public static interface Signal extends Condition
+    {
+        /**
+         * @return true if cancelled; once cancelled, must be discarded by the 
owning thread.
+         */
+        public boolean isCancelled();
+
+        /**
+         * @return isSignalled() || isCancelled(). Once true, the state is 
fixed and the Signal should be discarded
+         * by the owning thread.
+         */
+        public boolean isSet();
+
+        /**
+         * atomically: cancels the Signal if !isSet(), or returns true if 
isSignalled()
+         *
+         * @return true if isSignalled()
+         */
+        public boolean checkAndClear();
+
+        /**
+         * Should only be called by the owning thread. Indicates the signal 
can be retired,
+         * and if signalled propagates the signal to another waiting thread
+         */
+        public abstract void cancel();
+    }
+
+    /**
+     * The calling thread MUST be the thread that uses the signal
+     */
+    public Signal register();
+
+    /**
+     * The calling thread MUST be the thread that uses the signal.
+     * If the Signal is waited on, context.stop() will be called when the wait 
times out, the Signal is signalled,
+     * or the waiting thread is interrupted.
+     */
+    public <V> Signal register(V supplyOnDone, Consumer<V> receiveOnDone);
+
+    /**
+     * Signal one waiting thread
+     */
+    public boolean signal();
+
+    /**
+     * Signal all waiting threads
+     */
+    public void signalAll();
+
+    /** getWaiting() > 0 */
+    public boolean hasWaiters();
+
+    /** Return how many threads are waiting */
+    public int getWaiting();
+
+    /**
+     * Factory method used to capture and redirect instantiations for 
simulation
+     */
+    public static WaitQueue newWaitQueue()
+    {
+        return new Standard();
+    }
+
+    class Standard implements WaitQueue
+    {
+        private static final int CANCELLED = -1;
+        private static final int SIGNALLED = 1;
+        private static final int NOT_SET = 0;
+
+        private static final AtomicIntegerFieldUpdater<RegisteredSignal> 
signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, 
"state");
+
+        // the waiting signals
+        private final ConcurrentLinkedQueue<RegisteredSignal> queue = new 
ConcurrentLinkedQueue<>();
+
+        protected Standard() {}
+
+        /**
+         * The calling thread MUST be the thread that uses the signal
+         * @return                                x
+         */
+        public Signal register()
+        {
+            RegisteredSignal signal = new RegisteredSignal();
+            queue.add(signal);
+            return signal;
+        }
+
+        /**
+         * The calling thread MUST be the thread that uses the signal.
+         * If the Signal is waited on, context.stop() will be called when the 
wait times out, the Signal is signalled,
+         * or the waiting thread is interrupted.
+         */
+        public <V> Signal register(V supplyOnDone, Consumer<V> receiveOnDone)
+        {
+            RegisteredSignal signal = new SignalWithListener<>(supplyOnDone, 
receiveOnDone);
+            queue.add(signal);
+            return signal;
+        }
+
+        /**
+         * Signal one waiting thread
+         */
+        public boolean signal()
+        {
+            while (true)
+            {
+                RegisteredSignal s = queue.poll();
+                if (s == null || s.doSignal() != null)
+                    return s != null;
+            }
+        }
+
+        /**
+         * Signal all waiting threads
+         */
+        public void signalAll()
+        {
+            if (!hasWaiters())
+                return;
+
+            // to avoid a race where the condition is not met and the woken 
thread managed to wait on the queue before
+            // we finish signalling it all, we pick a random thread we have 
woken-up and hold onto it, so that if we encounter
+            // it again we know we're looping. We reselect a random thread 
periodically, progressively less often.
+            // the "correct" solution to this problem is to use a queue that 
permits snapshot iteration, but this solution is sufficient
+            // TODO: this is only necessary because we use CLQ - which is only 
for historical any-NIH reasons
+            int i = 0, s = 5;
+            Thread randomThread = null;
+            Iterator<RegisteredSignal> iter = queue.iterator();
+            while (iter.hasNext())
+            {
+                RegisteredSignal signal = iter.next();
+                Thread signalled = signal.doSignal();
+
+                if (signalled != null)
+                {
+                    if (signalled == randomThread)
+                        break;
+
+                    if (++i == s)
+                    {
+                        randomThread = signalled;
+                        s <<= 1;
+                    }
+                }
+
+                iter.remove();
+            }
+        }
+
+        private void cleanUpCancelled()
+        {
+            // TODO: attempt to remove the cancelled from the beginning only 
(need atomic cas of head)
+            queue.removeIf(RegisteredSignal::isCancelled);
+        }
+
+        public boolean hasWaiters()
+        {
+            return !queue.isEmpty();
+        }
+
+        /**
+         * @return how many threads are waiting
+         */
+        public int getWaiting()
+        {
+            if (!hasWaiters())
+                return 0;
+            Iterator<RegisteredSignal> iter = queue.iterator();
+            int count = 0;
+            while (iter.hasNext())
+            {
+                Signal next = iter.next();
+                if (!next.isCancelled())
+                    count++;
+            }
+            return count;
+        }
+
+        /**
+         * An abstract signal implementation
+         *
+         * TODO: use intrusive linked list
+         */
+        public static abstract class AbstractSignal extends AbstractAwaitable 
implements Signal
+        {
+            public Signal await() throws InterruptedException
+            {
+                while (!isSignalled())
+                {
+                    checkInterrupted();
+                    LockSupport.park();
+                }
+                checkAndClear();
+                return this;
+            }
+
+            public boolean awaitUntil(long nanoTimeDeadline) throws 
InterruptedException
+            {
+                long now;
+                while (nanoTimeDeadline > (now = nanoTime()) && !isSignalled())
+                {
+                    checkInterrupted();
+                    long delta = nanoTimeDeadline - now;
+                    LockSupport.parkNanos(delta);
+                }
+                return checkAndClear();
+            }
+
+            private void checkInterrupted() throws InterruptedException
+            {
+                if (Thread.interrupted())
+                {
+                    cancel();
+                    throw new InterruptedException();
+                }
+            }
+        }
+
+        /**
+         * A signal registered with this WaitQueue
+         */
+        private class RegisteredSignal extends AbstractSignal
+        {
+            private volatile Thread thread = Thread.currentThread();
+            volatile int state;
+
+            public boolean isSignalled()
+            {
+                return state == SIGNALLED;
+            }
+
+            public boolean isCancelled()
+            {
+                return state == CANCELLED;
+            }
+
+            public boolean isSet()
+            {
+                return state != NOT_SET;
+            }
+
+            private Thread doSignal()
+            {
+                if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, 
SIGNALLED))
+                {
+                    Thread thread = this.thread;
+                    LockSupport.unpark(thread);
+                    this.thread = null;
+                    return thread;
+                }
+                return null;
+            }
+
+            public void signal()
+            {
+                doSignal();
+            }
+
+            public boolean checkAndClear()
+            {
+                if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, 
CANCELLED))
+                {
+                    thread = null;
+                    cleanUpCancelled();
+                    return false;
+                }
+                // must now be signalled assuming correct API usage
+                return true;
+            }
+
+            /**
+             * Should only be called by the registered thread. Indicates the 
signal can be retired,
+             * and if signalled propagates the signal to another waiting thread
+             */
+            public void cancel()
+            {
+                if (isCancelled())
+                    return;
+                if (!signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED))
+                {
+                    // must already be signalled - switch to cancelled and
+                    state = CANCELLED;
+                    // propagate the signal
+                    WaitQueue.Standard.this.signal();
+                }
+                thread = null;
+                cleanUpCancelled();
+            }
+        }
+
+        /**
+         * A RegisteredSignal that stores a TimerContext, and stops the timer 
when either cancelled or
+         * finished waiting. i.e. if the timer is started when the signal is 
registered it tracks the
+         * time in between registering and invalidating the signal.
+         */
+        private final class SignalWithListener<V> extends RegisteredSignal
+        {
+            private final V supplyOnDone;
+            private final Consumer<V> receiveOnDone;
+
+            private SignalWithListener(V supplyOnDone, Consumer<V> 
receiveOnDone)
+            {
+                this.receiveOnDone = receiveOnDone;
+                this.supplyOnDone = supplyOnDone;
+            }
+
+
+            @Override
+            public boolean checkAndClear()
+            {
+                receiveOnDone.accept(supplyOnDone);
+                return super.checkAndClear();
+            }
+
+            @Override
+            public void cancel()
+            {
+                if (!isCancelled())
+                {
+                    receiveOnDone.accept(supplyOnDone);
+                    super.cancel();
+                }
+            }
+        }
+    }
+
+    /**
+     * Loops waiting on the supplied condition and WaitQueue and will not 
return until the condition is true
+     */
+    public static void waitOnCondition(BooleanSupplier condition, WaitQueue 
queue) throws InterruptedException
+    {
+        while (!condition.getAsBoolean())
+        {
+            Signal s = queue.register();
+            if (!condition.getAsBoolean()) s.await();
+            else s.cancel();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to