This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git
commit 6ed308d6a814826be145f6ba30304ea474bf263e Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Mar 9 17:13:18 2023 +0100 Pull in Cassandra concurrent utils until there is a common shared library Patch by Alex Petrov; reviewed by Caleb Rackliffe and Abe Ratnofsky for CASSANDRA-18315 --- harry-core/src/harry/concurrent/Awaitable.java | 399 +++++++++++++++++++++ harry-core/src/harry/concurrent/Clock.java | 103 ++++++ harry-core/src/harry/concurrent/Condition.java | 99 +++++ .../src/harry/concurrent/CountDownLatch.java | 107 ++++++ .../src/harry/concurrent/ExecutorFactory.java | 155 ++++++++ .../src/harry/concurrent/InfiniteLoopExecutor.java | 178 +++++++++ harry-core/src/harry/concurrent/Interruptible.java | 50 +++ .../src/harry/concurrent/NamedThreadFactory.java | 195 ++++++++++ harry-core/src/harry/concurrent/Shutdownable.java | 44 +++ .../concurrent/UncheckedInterruptedException.java | 35 ++ harry-core/src/harry/concurrent/WaitQueue.java | 390 ++++++++++++++++++++ 11 files changed, 1755 insertions(+) diff --git a/harry-core/src/harry/concurrent/Awaitable.java b/harry-core/src/harry/concurrent/Awaitable.java new file mode 100644 index 0000000..3781e78 --- /dev/null +++ b/harry-core/src/harry/concurrent/Awaitable.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Predicate; + +import static harry.concurrent.Clock.Global.nanoTime; +import static harry.concurrent.WaitQueue.newWaitQueue; + +/** + * A generic signal consumer, supporting all of the typical patterns used in Cassandra. + * All of the methods defined in {@link Awaitable} may be waited on without a loop, + * as this interface declares that there are no spurious wake-ups. + * + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface Awaitable +{ + /** + * Await until the deadline (in nanoTime), throwing any interrupt. + * No spurious wakeups. + * @return true if we were signalled, false if the deadline elapsed + * @throws InterruptedException if interrupted + */ + boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException; + + /** + * Await until the deadline (in nanoTime), throwing any interrupt as an unchecked exception. + * No spurious wakeups. + * @return true if we were signalled, false if the deadline elapsed + * @throws UncheckedInterruptedException if interrupted + */ + boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException; + + /** + * Await until the deadline (in nanoTime), ignoring interrupts (but maintaining the interrupt flag on exit). + * No spurious wakeups. + * @return true if we were signalled, false if the deadline elapsed + */ + boolean awaitUntilUninterruptibly(long nanoTimeDeadline); + + /** + * Await for the specified period, throwing any interrupt. + * No spurious wakeups. + * @return true if we were signalled, false if the timeout elapses + * @throws InterruptedException if interrupted + */ + boolean await(long time, TimeUnit units) throws InterruptedException; + + /** + * Await for the specified period, throwing any interrupt as an unchecked exception. + * No spurious wakeups. + * @return true if we were signalled, false if the timeout elapses + * @throws UncheckedInterruptedException if interrupted + */ + boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException; + + /** + * Await until the deadline (in nanoTime), ignoring interrupts (but maintaining the interrupt flag on exit). + * No spurious wakeups. + * @return true if we were signalled, false if the timeout elapses + */ + boolean awaitUninterruptibly(long time, TimeUnit units); + + /** + * Await indefinitely, throwing any interrupt. + * No spurious wakeups. + * @throws InterruptedException if interrupted + */ + Awaitable await() throws InterruptedException; + + /** + * Await indefinitely, throwing any interrupt as an unchecked exception. + * No spurious wakeups. + * @throws UncheckedInterruptedException if interrupted + */ + Awaitable awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException; + + /** + * Await indefinitely, ignoring interrupts (but maintaining the interrupt flag on exit). + * No spurious wakeups. + */ + Awaitable awaitUninterruptibly(); + + // we must declare the static implementation methods outside of the interface, + // so that they can be loaded by different classloaders during simulation + class Defaults + { + public static boolean await(Awaitable await, long time, TimeUnit unit) throws InterruptedException + { + return await.awaitUntil(nanoTime() + unit.toNanos(time)); + } + + public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, long time, TimeUnit units) throws UncheckedInterruptedException + { + return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + units.toNanos(time)); + } + + public static boolean awaitUninterruptibly(Awaitable await, long time, TimeUnit units) + { + return awaitUntilUninterruptibly(await, nanoTime() + units.toNanos(time)); + } + + public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A await) throws UncheckedInterruptedException + { + try + { + await.await(); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(); + } + return await; + } + + public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable await, long nanoTimeDeadline) throws UncheckedInterruptedException + { + try + { + return await.awaitUntil(nanoTimeDeadline); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(); + } + } + + /** + * {@link Awaitable#awaitUntilUninterruptibly(long)} + */ + public static boolean awaitUntilUninterruptibly(Awaitable await, long nanoTimeDeadline) + { + boolean interrupted = false; + boolean result; + while (true) + { + try + { + result = await.awaitUntil(nanoTimeDeadline); + break; + } + catch (InterruptedException e) + { + interrupted = true; + } + } + if (interrupted) + Thread.currentThread().interrupt(); + return result; + } + + /** + * {@link Awaitable#awaitUninterruptibly()} + */ + public static <A extends Awaitable> A awaitUninterruptibly(A await) + { + boolean interrupted = false; + while (true) + { + try + { + await.await(); + break; + } + catch (InterruptedException e) + { + interrupted = true; + } + } + if (interrupted) + Thread.currentThread().interrupt(); + return await; + } + } + + abstract class AbstractAwaitable implements Awaitable + { + protected AbstractAwaitable() {} + + /** + * {@link Awaitable#await(long, TimeUnit)} + */ + @Override + public boolean await(long time, TimeUnit unit) throws InterruptedException + { + return Defaults.await(this, time, unit); + } + + /** + * {@link Awaitable#awaitThrowUncheckedOnInterrupt(long, TimeUnit)} + */ + @Override + public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException + { + return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units); + } + + /** + * {@link Awaitable#awaitUninterruptibly(long, TimeUnit)} + */ + public boolean awaitUninterruptibly(long time, TimeUnit units) + { + return awaitUntilUninterruptibly(nanoTime() + units.toNanos(time)); + } + + /** + * {@link Awaitable#awaitThrowUncheckedOnInterrupt()} + */ + public Awaitable awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException + { + return Defaults.awaitThrowUncheckedOnInterrupt(this); + } + + /** + * {@link Awaitable#awaitUntilThrowUncheckedOnInterrupt(long)} + */ + public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException + { + return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline); + } + + /** + * {@link Awaitable#awaitUntilUninterruptibly(long)} + */ + public boolean awaitUntilUninterruptibly(long nanoTimeDeadline) + { + return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline); + } + + /** + * {@link Awaitable#awaitUninterruptibly()} + */ + public Awaitable awaitUninterruptibly() + { + return Defaults.awaitUninterruptibly(this); + } + } + + /** + * A barebones asynchronous {@link Awaitable}. + * If your state is minimal, or can be updated concurrently, extend this class. + */ + abstract class AsyncAwaitable extends AbstractAwaitable + { + /** + * Maintain an internal variable containing a lazily-initialized wait queue + * @return null if is done + */ + private static <A extends Awaitable> WaitQueue.Signal register(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, Predicate<A> isDone, A awaitable) + { + if (isDone.test(awaitable)) + return null; + + WaitQueue waiting = waitingUpdater.get(awaitable); + if (waiting == null) + { + if (!waitingUpdater.compareAndSet(awaitable, null, waiting = newWaitQueue())) + { + waiting = waitingUpdater.get(awaitable); + if (waiting == null) + { + assert isDone.test(awaitable); + return null; + } + } + } + + WaitQueue.Signal s = waiting.register(); + if (!isDone.test(awaitable)) + return s; + + s.cancel(); + return null; + } + + static <A extends Awaitable> A await(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, Predicate<A> isDone, A awaitable) throws InterruptedException + { + WaitQueue.Signal s = register(waitingUpdater, isDone, awaitable); + if (s != null) + s.await(); + return awaitable; + } + + static <A extends Awaitable> boolean awaitUntil(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, Predicate<A> isDone, A awaitable, long nanoTimeDeadline) throws InterruptedException + { + WaitQueue.Signal s = register(waitingUpdater, isDone, awaitable); + return s == null || s.awaitUntil(nanoTimeDeadline) || isDone.test(awaitable); + } + + static <A extends Awaitable> void signalAll(AtomicReferenceFieldUpdater<A, WaitQueue> waitingUpdater, A awaitable) + { + WaitQueue waiting = waitingUpdater.get(awaitable); + if (waiting == null) + return; + + waiting.signalAll(); + waitingUpdater.lazySet(awaitable, null); + } + + private static final AtomicReferenceFieldUpdater<AsyncAwaitable, WaitQueue> waitingUpdater = AtomicReferenceFieldUpdater.newUpdater(AsyncAwaitable.class, WaitQueue.class, "waiting"); + private volatile WaitQueue waiting; + + protected AsyncAwaitable() {} + + /** + * {@link Awaitable#await()} + */ + public Awaitable await() throws InterruptedException + { + return await(waitingUpdater, AsyncAwaitable::isSignalled, this); + } + + /** + * {@link Awaitable#awaitUntil(long)} + */ + public boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException + { + return awaitUntil(waitingUpdater, AsyncAwaitable::isSignalled, this, nanoTimeDeadline); + } + + /** + * Signal any waiting threads; {@link #isSignalled()} must return {@code true} before this method is invoked. + */ + protected void signal() + { + signalAll(waitingUpdater, this); + } + + /** + * Return true once signalled. Unidirectional; once true, must never again be false. + */ + protected abstract boolean isSignalled(); + } + + /** + * A barebones {@link Awaitable} that uses mutual exclusion. + * If your state will be updated while holding the object monitor, extend this class. + */ + abstract class SyncAwaitable extends AbstractAwaitable + { + protected SyncAwaitable() {} + + /** + * {@link Awaitable#await()} + */ + public synchronized Awaitable await() throws InterruptedException + { + while (!isSignalled()) + wait(); + return this; + } + + /** + * {@link Awaitable#awaitUntil(long)} + */ + public synchronized boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException + { + while (true) + { + if (isSignalled()) return true; + if (!waitUntil(this, nanoTimeDeadline)) return false; + } + } + + /** + * Return true once signalled. Unidirectional; once true, must never again be false. + */ + protected abstract boolean isSignalled(); + + public static boolean waitUntil(Object monitor, long deadlineNanos) throws InterruptedException + { + long wait = deadlineNanos - nanoTime(); + if (wait <= 0) + return false; + + monitor.wait((wait + 999999) / 1000000); + return true; + } + } +} diff --git a/harry-core/src/harry/concurrent/Clock.java b/harry-core/src/harry/concurrent/Clock.java new file mode 100644 index 0000000..4ebbce2 --- /dev/null +++ b/harry-core/src/harry/concurrent/Clock.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * Wrapper around time related functions that are either implemented by using the default JVM calls + * or by using a custom implementation for testing purposes. + * + * See {@link Global#instance} for how to use a custom implementation. + * + * Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an + * implementation for {@link #nanoTime()} with the exact same properties of {@link System#nanoTime()}. + * + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface Clock +{ + public static class Global + { + /** + * Static singleton object that will be instantiated by default with a system clock + * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a + * different implementation instead. + */ + private static final Clock instance; + + static + { + Clock clock = new Default(); + instance = clock; + } + + /** + * Semantically equivalent to {@link System#nanoTime()} + */ + public static long nanoTime() + { + return instance.nanoTime(); + } + + /** + * Semantically equivalent to {@link System#currentTimeMillis()} + */ + public static long currentTimeMillis() + { + return instance.currentTimeMillis(); + } + } + + public static class Default implements Clock + { + /** + * {@link System#nanoTime()} + */ + public long nanoTime() + { + return System.nanoTime(); // checkstyle: permit system clock + } + + /** + * {@link System#currentTimeMillis()} + */ + public long currentTimeMillis() + { + return System.currentTimeMillis(); // checkstyle: permit system clock + } + } + + /** + * Semantically equivalent to {@link System#nanoTime()} + */ + public long nanoTime(); + + /** + * Semantically equivalent to {@link System#currentTimeMillis()} + */ + public long currentTimeMillis(); + + public static void waitUntil(long deadlineNanos) throws InterruptedException + { + long waitNanos = Clock.Global.nanoTime() - deadlineNanos; + if (waitNanos > 0) + TimeUnit.NANOSECONDS.sleep(waitNanos); + } +} diff --git a/harry-core/src/harry/concurrent/Condition.java b/harry-core/src/harry/concurrent/Condition.java new file mode 100644 index 0000000..ad59289 --- /dev/null +++ b/harry-core/src/harry/concurrent/Condition.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +/** + * Simpler API than java.util.concurrent.Condition; would be nice to extend it, but also nice + * to share API with Future, for which Netty's API is incompatible with java.util.concurrent.Condition + * + * {@link Awaitable} for explicit external signals. + * + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface Condition extends Awaitable +{ + /** + * Returns true once signalled. Unidirectional; once true, will never again be false. + */ + boolean isSignalled(); + + /** + * Signal the condition as met, and wake all waiting threads. + */ + void signal(); + + /** + * Signal the condition as met, and wake all waiting threads. + */ + default void signalAll() { signal(); } + + /** + * Factory method used to capture and redirect instantiations for simulation + */ + static Condition newOneTimeCondition() + { + return new Async(); + } + + /** + * An asynchronous {@link Condition}. Typically lower overhead than {@link Sync}. + */ + public static class Async extends AsyncAwaitable implements Condition + { + private volatile boolean signaled = false; + + // WARNING: if extending this class, consider simulator interactions + protected Async() {} + + public boolean isSignalled() + { + return signaled; + } + + public void signal() + { + signaled = true; + super.signal(); + } + } + + /** + * A {@link Condition} based on its object monitor. + * WARNING: lengthy operations performed while holding the lock may prevent timely notification of waiting threads + * that a deadline has passed. + */ + public static class Sync extends SyncAwaitable implements Condition + { + private boolean signaled = false; + + // this can be instantiated directly, as we intercept monitors directly with byte weaving + public Sync() {} + + public synchronized boolean isSignalled() + { + return signaled; + } + + public synchronized void signal() + { + signaled = true; + notifyAll(); + } + } +} diff --git a/harry-core/src/harry/concurrent/CountDownLatch.java b/harry-core/src/harry/concurrent/CountDownLatch.java new file mode 100644 index 0000000..c673550 --- /dev/null +++ b/harry-core/src/harry/concurrent/CountDownLatch.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +/** + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface CountDownLatch extends Awaitable +{ + /** + * Count down by 1, signalling waiters if we have reached zero + */ + void decrement(); + + /** + * @return the current count + */ + int count(); + + /** + * Factory method used to capture and redirect instantiations for simulation + */ + static CountDownLatch newCountDownLatch(int count) + { + return new Async(count); + } + + static class Async extends AsyncAwaitable implements CountDownLatch + { + private static final AtomicIntegerFieldUpdater<CountDownLatch.Async> countUpdater = AtomicIntegerFieldUpdater.newUpdater(CountDownLatch.Async.class, "count"); + private volatile int count; + + // WARNING: if extending this class, consider simulator interactions + protected Async(int count) + { + this.count = count; + if (count == 0) + signal(); + } + + public void decrement() + { + if (countUpdater.decrementAndGet(this) == 0) + signal(); + } + + public int count() + { + return count; + } + + @Override + protected boolean isSignalled() + { + return count <= 0; + } + } + + static final class Sync extends SyncAwaitable implements CountDownLatch + { + private int count; + + public Sync(int count) + { + this.count = count; + } + + public synchronized void decrement() + { + if (count > 0 && --count == 0) + notifyAll(); + } + + public synchronized int count() + { + return count; + } + + /** + * not synchronized as only intended for internal usage by externally synchronized methods + */ + + @Override + protected boolean isSignalled() + { + return count <= 0; + } + } +} diff --git a/harry-core/src/harry/concurrent/ExecutorFactory.java b/harry-core/src/harry/concurrent/ExecutorFactory.java new file mode 100644 index 0000000..834d511 --- /dev/null +++ b/harry-core/src/harry/concurrent/ExecutorFactory.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import static harry.concurrent.InfiniteLoopExecutor.Daemon.DAEMON; +import static harry.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED; +import static harry.concurrent.NamedThreadFactory.createThread; +import static harry.concurrent.NamedThreadFactory.setupThread; + +/** + * Entry point for configuring and creating new executors. + * + * + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface ExecutorFactory +{ + public enum SimulatorSemantics + { + NORMAL, DISCARD + } + + /** + * Create and start a new thread to execute {@code runnable} + * @param name the name of the thread + * @param runnable the task to execute + * @param daemon flag to indicate whether the thread should be a daemon or not + * @return the new thread + */ + Thread startThread(String name, Runnable runnable, InfiniteLoopExecutor.Daemon daemon); + + /** + * Create and start a new thread to execute {@code runnable}; this thread will be a daemon thread. + * @param name the name of the thread + * @param runnable the task to execute + * @return the new thread + */ + default Thread startThread(String name, Runnable runnable) + { + return startThread(name, runnable, DAEMON); + } + + /** + * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}. + * On shutdown, the executing thread will be interrupted; to support clean shutdown + * {@code runnable} should propagate {@link InterruptedException} + * + * @param name the name of the thread used to invoke the task repeatedly + * @param task the task to execute repeatedly + * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation + * @param daemon flag to indicate whether the loop thread should be a daemon thread or not + * @param interrupts flag to indicate whether to synchronize interrupts of the task execution thread + * using the task's monitor this can be used to prevent interruption while performing + * IO operations which forbid interrupted threads. + * See: {@link org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager::start} + * @return the new thread + */ + Interruptible infiniteLoop(String name, Interruptible.Task task, InfiniteLoopExecutor.SimulatorSafe simulatorSafe, InfiniteLoopExecutor.Daemon daemon, InfiniteLoopExecutor.Interrupts interrupts); + + /** + * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}. + * On shutdown, the executing thread will be interrupted; to support clean shutdown + * {@code runnable} should propagate {@link InterruptedException} + * + * @param name the name of the thread used to invoke the task repeatedly + * @param task the task to execute repeatedly + * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation + * @return the new thread + */ + default Interruptible infiniteLoop(String name, Interruptible.SimpleTask task, InfiniteLoopExecutor.SimulatorSafe simulatorSafe) + { + return infiniteLoop(name, Interruptible.Task.from(task), simulatorSafe, DAEMON, UNSYNCHRONIZED); + } + + /** + * Create a new thread group for use with builders - this thread group will be situated within + * this factory's parent thread group, and may be supplied to multiple executor builders. + */ + ThreadGroup newThreadGroup(String name); + + public static final class Global + { + // deliberately not volatile to ensure zero overhead outside of testing; + // depend on other memory visibility primitives to ensure visibility + private static ExecutorFactory FACTORY = new ExecutorFactory.Default(Global.class.getClassLoader(), null, (t, e) -> e.printStackTrace()); + private static boolean modified; + + public static ExecutorFactory executorFactory() + { + return FACTORY; + } + + public static synchronized void unsafeSet(ExecutorFactory executorFactory) + { + FACTORY = executorFactory; + modified = true; + } + + public static synchronized boolean tryUnsafeSet(ExecutorFactory executorFactory) + { + if (modified) + return false; + unsafeSet(executorFactory); + return true; + } + } + + public static final class Default extends NamedThreadFactory.MetaFactory implements ExecutorFactory + { + public Default(ClassLoader contextClassLoader, ThreadGroup threadGroup, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + { + super(contextClassLoader, threadGroup, uncaughtExceptionHandler); + } + + + @Override + public Thread startThread(String name, Runnable runnable, InfiniteLoopExecutor.Daemon daemon) + { + Thread thread = setupThread(createThread(threadGroup, runnable, name, daemon == DAEMON), + Thread.NORM_PRIORITY, + contextClassLoader, + uncaughtExceptionHandler); + thread.start(); + return thread; + } + + @Override + public Interruptible infiniteLoop(String name, Interruptible.Task task, InfiniteLoopExecutor.SimulatorSafe simulatorSafe, InfiniteLoopExecutor.Daemon daemon, InfiniteLoopExecutor.Interrupts interrupts) + { + return new InfiniteLoopExecutor(this, name, task, daemon, interrupts); + } + + @Override + public ThreadGroup newThreadGroup(String name) + { + return threadGroup == null ? null : new ThreadGroup(threadGroup, name); + } + } +} \ No newline at end of file diff --git a/harry-core/src/harry/concurrent/InfiniteLoopExecutor.java b/harry-core/src/harry/concurrent/InfiniteLoopExecutor.java new file mode 100644 index 0000000..32fcf93 --- /dev/null +++ b/harry-core/src/harry/concurrent/InfiniteLoopExecutor.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static harry.concurrent.Clock.Global.nanoTime; +import static harry.concurrent.Condition.newOneTimeCondition; +import static harry.concurrent.InfiniteLoopExecutor.InternalState.SHUTTING_DOWN_NOW; +import static harry.concurrent.InfiniteLoopExecutor.InternalState.TERMINATED; +import static harry.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED; +import static harry.concurrent.Interruptible.State.INTERRUPTED; +import static harry.concurrent.Interruptible.State.NORMAL; +import static harry.concurrent.Interruptible.State.SHUTTING_DOWN; + +/** + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public class InfiniteLoopExecutor implements Interruptible +{ + private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class); + + public enum InternalState { SHUTTING_DOWN_NOW, TERMINATED } + + public enum SimulatorSafe { SAFE, UNSAFE } + + public enum Daemon { DAEMON, NON_DAEMON } + + public enum Interrupts { SYNCHRONIZED, UNSYNCHRONIZED } + + private static final AtomicReferenceFieldUpdater<InfiniteLoopExecutor, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(InfiniteLoopExecutor.class, Object.class, "state"); + private final Thread thread; + private final Task task; + private volatile Object state = NORMAL; + private final Consumer<Thread> interruptHandler; + private final Condition isTerminated = newOneTimeCondition(); + + public InfiniteLoopExecutor(String name, Task task, Daemon daemon) + { + this(ExecutorFactory.Global.executorFactory(), name, task, daemon, UNSYNCHRONIZED); + } + + public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon) + { + this(factory, name, task, daemon, UNSYNCHRONIZED); + } + + public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon, Interrupts interrupts) + { + this.task = task; + this.thread = factory.startThread(name, this::loop, daemon); + this.interruptHandler = interrupts == Interrupts.SYNCHRONIZED + ? interruptHandler(task) + : Thread::interrupt; + } + + public InfiniteLoopExecutor(BiFunction<String, Runnable, Thread> threadStarter, String name, Task task, Interrupts interrupts) + { + this.task = task; + this.thread = threadStarter.apply(name, this::loop); + this.interruptHandler = interrupts == Interrupts.SYNCHRONIZED + ? interruptHandler(task) + : Thread::interrupt; + } + + private static Consumer<Thread> interruptHandler(final Object monitor) + { + return thread -> { + synchronized (monitor) + { + thread.interrupt(); + } + }; + } + + + private void loop() + { + boolean interrupted = false; + try + { + while (true) + { + try + { + Object cur = state; + if (cur == SHUTTING_DOWN_NOW) break; + + interrupted |= Thread.interrupted(); + if (cur == NORMAL && interrupted) cur = INTERRUPTED; + task.run((State) cur); + + interrupted = false; + if (cur == SHUTTING_DOWN) break; + } + catch (TerminateException ignore) + { + break; + } + catch (UncheckedInterruptedException | InterruptedException ignore) + { + interrupted = true; + } + catch (Throwable t) + { + logger.error("Exception thrown by runnable, continuing with loop", t); + } + } + } + finally + { + state = TERMINATED; + isTerminated.signal(); + } + } + + public void interrupt() + { + interruptHandler.accept(thread); + } + + public void shutdown() + { + stateUpdater.updateAndGet(this, cur -> cur != TERMINATED && cur != SHUTTING_DOWN_NOW ? SHUTTING_DOWN : cur); + // TODO: InfiniteLoopExecutor should let the threads quiesce themselves rather then send interrupts + //interruptHandler.accept(thread); + } + + public Object shutdownNow() + { + stateUpdater.updateAndGet(this, cur -> cur != TERMINATED ? SHUTTING_DOWN_NOW : TERMINATED); + interruptHandler.accept(thread); + return null; + } + + @Override + public boolean isTerminated() + { + return state == TERMINATED; + } + + public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException + { + if (isTerminated()) + return true; + + long deadlineNanos = nanoTime() + unit.toNanos(time); + isTerminated.awaitUntil(deadlineNanos); + return isTerminated(); + } + + public boolean isAlive() + { + return this.thread.isAlive(); + } +} diff --git a/harry-core/src/harry/concurrent/Interruptible.java b/harry-core/src/harry/concurrent/Interruptible.java new file mode 100644 index 0000000..18e605d --- /dev/null +++ b/harry-core/src/harry/concurrent/Interruptible.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +/** + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface Interruptible extends Shutdownable +{ + public enum State { NORMAL, INTERRUPTED, SHUTTING_DOWN } + + public static class TerminateException extends InterruptedException {} + + public interface Task + { + void run(State state) throws InterruptedException; + + static Task from(SimpleTask simpleTask) + { + return state -> { if (state == State.NORMAL) simpleTask.run(); }; + } + } + + /** + * A Task that only runs on NORMAL states + */ + public interface SimpleTask + { + void run() throws InterruptedException; + } + + void interrupt(); +} + diff --git a/harry-core/src/harry/concurrent/NamedThreadFactory.java b/harry-core/src/harry/concurrent/NamedThreadFactory.java new file mode 100644 index 0000000..9ed4f78 --- /dev/null +++ b/harry-core/src/harry/concurrent/NamedThreadFactory.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class is an implementation of the <i>ThreadFactory</i> interface. This + * is useful to give Java threads meaningful names which is useful when using + * a tool like JConsole. + * + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public class NamedThreadFactory implements ThreadFactory +{ + private static final AtomicInteger anonymousCounter = new AtomicInteger(); + private static volatile String globalPrefix; + + public static void setGlobalPrefix(String prefix) { globalPrefix = prefix; } + public static String globalPrefix() + { + String prefix = globalPrefix; + return prefix == null ? "" : prefix; + } + + public static class MetaFactory + { + protected ClassLoader contextClassLoader; + protected ThreadGroup threadGroup; + protected Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + + public MetaFactory(ClassLoader contextClassLoader, ThreadGroup threadGroup, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + { + this.contextClassLoader = contextClassLoader; + if (threadGroup == null) + { + threadGroup = Thread.currentThread().getThreadGroup(); + while (threadGroup.getParent() != null) + threadGroup = threadGroup.getParent(); + } + this.threadGroup = threadGroup; + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + } + + NamedThreadFactory newThreadFactory(String name, int threadPriority) + { + // We create a unique thread group for each factory, so that e.g. executors can determine which threads are members of the executor + ThreadGroup threadGroup = this.threadGroup == null ? null : new ThreadGroup(this.threadGroup, name); + return new NamedThreadFactory(name, threadPriority, contextClassLoader, threadGroup, uncaughtExceptionHandler); + } + } + + public final String id; + private final int priority; + private final ClassLoader contextClassLoader; + public final ThreadGroup threadGroup; + protected final AtomicInteger n = new AtomicInteger(1); + private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler; + + public NamedThreadFactory(String id) + { + this(id, Thread.NORM_PRIORITY); + } + + public NamedThreadFactory(String id, int priority) + { + this(id, priority, null, null, (t, e) -> e.printStackTrace()); + } + + public NamedThreadFactory(String id, ClassLoader contextClassLoader, ThreadGroup threadGroup) + { + this(id, Thread.NORM_PRIORITY, contextClassLoader, threadGroup, (t, e) -> e.printStackTrace()); + } + + public NamedThreadFactory(String id, int priority, ClassLoader contextClassLoader, ThreadGroup threadGroup) + { + this(id, priority, contextClassLoader, threadGroup, (t, e) -> e.printStackTrace()); + } + + public NamedThreadFactory(String id, int priority, ClassLoader contextClassLoader, ThreadGroup threadGroup, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + { + this.id = id; + this.priority = priority; + this.contextClassLoader = contextClassLoader; + this.threadGroup = threadGroup; + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + } + + @Override + public Thread newThread(Runnable runnable) + { + String name = id + ':' + n.getAndIncrement(); + return newThread(threadGroup, runnable, name); + } + + protected Thread newThread(ThreadGroup threadGroup, Runnable runnable, String name) + { + return setupThread(createThread(threadGroup, runnable, name, true)); + } + + protected <T extends Thread> T setupThread(T thread) + { + return setupThread(thread, priority, contextClassLoader, uncaughtExceptionHandler); + } + + public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name, int priority, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + { + String prefix = globalPrefix; + Thread thread = createThread(threadGroup, runnable, prefix != null ? prefix + name : name, true); + thread.setPriority(priority); + if (contextClassLoader != null) + thread.setContextClassLoader(contextClassLoader); + if (uncaughtExceptionHandler != null) + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); + return thread; + } + + public static Thread createAnonymousThread(Runnable runnable) + { + return createThread(null, runnable, "anonymous-" + anonymousCounter.incrementAndGet()); + } + + public static Thread createThread(Runnable runnable, String name) + { + return createThread(null, runnable, name); + } + + public Thread createThread(Runnable runnable, String name, boolean daemon) + { + return createThread(null, runnable, name, daemon); + } + + public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name) + { + return createThread(threadGroup, runnable, name, false); + } + + public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name, boolean daemon) + { + String prefix = globalPrefix; + Thread thread = new Thread(threadGroup, runnable, prefix != null ? prefix + name : name); + thread.setDaemon(daemon); + return thread; + } + + public static <T extends Thread> T setupThread(T thread, int priority, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + { + thread.setPriority(priority); + if (contextClassLoader != null) + thread.setContextClassLoader(contextClassLoader); + if (uncaughtExceptionHandler != null) + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); + return thread; + } + + @Override + public String toString() + { + return threadGroup != null ? id + " in " + threadGroup.getName() : id; + } + + public void close() + { + synchronized (threadGroup) + { + threadGroup.setDaemon(true); + // ThreadGroup API is terrible; setDaemon does not destroy if already empty, and establishing if empty + // otherwise is tortuous - easier to just try to destroy and fail if currently an invalid action + try + { + threadGroup.destroy(); + } + catch (IllegalThreadStateException ignore) + { + } + } + } +} diff --git a/harry-core/src/harry/concurrent/Shutdownable.java b/harry-core/src/harry/concurrent/Shutdownable.java new file mode 100644 index 0000000..0e8f0da --- /dev/null +++ b/harry-core/src/harry/concurrent/Shutdownable.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface Shutdownable +{ + boolean isTerminated(); + + /** + * Shutdown once any remaining work has completed (however this is defined for the implementation). + */ + void shutdown(); + + /** + * Shutdown immediately, possibly interrupting ongoing work, and cancelling work that is queued. + */ + Object shutdownNow(); + + /** + * Await termination of this object, i.e. the cessation of all current and future work. + */ + public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException; +} diff --git a/harry-core/src/harry/concurrent/UncheckedInterruptedException.java b/harry-core/src/harry/concurrent/UncheckedInterruptedException.java new file mode 100644 index 0000000..c2be5a5 --- /dev/null +++ b/harry-core/src/harry/concurrent/UncheckedInterruptedException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +/** + * Unchecked {@link InterruptedException}, to be thrown in places where an interrupt is unexpected + * + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public class UncheckedInterruptedException extends RuntimeException +{ + public UncheckedInterruptedException() + { + } + public UncheckedInterruptedException(InterruptedException cause) + { + super(cause); + } +} diff --git a/harry-core/src/harry/concurrent/WaitQueue.java b/harry-core/src/harry/concurrent/WaitQueue.java new file mode 100644 index 0000000..ab96dc5 --- /dev/null +++ b/harry-core/src/harry/concurrent/WaitQueue.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.concurrent; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.LockSupport; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; + +import harry.concurrent.Awaitable.AbstractAwaitable; + +import static harry.concurrent.Clock.Global.nanoTime; + +/** + * This class was borrowed from Apache Cassandra, org.cassandra.utils.concurrent, until there's a shared concurrency lib. + */ +public interface WaitQueue +{ + /** + * A Signal is a one-time-use mechanism for a thread to wait for notification that some condition + * state has transitioned that it may be interested in (and hence should check if it is). + * It is potentially transient, i.e. the state can change in the meantime, it only indicates + * that it should be checked, not necessarily anything about what the expected state should be. + * + * Signal implementations should never wake up spuriously, they are always woken up by a + * signal() or signalAll(). + * + * This abstract definition of Signal does not need to be tied to a WaitQueue. + * Whilst RegisteredSignal is the main building block of Signals, this abstract + * definition allows us to compose Signals in useful ways. The Signal is 'owned' by the + * thread that registered itself with WaitQueue(s) to obtain the underlying RegisteredSignal(s); + * only the owning thread should use a Signal. + */ + public static interface Signal extends Condition + { + /** + * @return true if cancelled; once cancelled, must be discarded by the owning thread. + */ + public boolean isCancelled(); + + /** + * @return isSignalled() || isCancelled(). Once true, the state is fixed and the Signal should be discarded + * by the owning thread. + */ + public boolean isSet(); + + /** + * atomically: cancels the Signal if !isSet(), or returns true if isSignalled() + * + * @return true if isSignalled() + */ + public boolean checkAndClear(); + + /** + * Should only be called by the owning thread. Indicates the signal can be retired, + * and if signalled propagates the signal to another waiting thread + */ + public abstract void cancel(); + } + + /** + * The calling thread MUST be the thread that uses the signal + */ + public Signal register(); + + /** + * The calling thread MUST be the thread that uses the signal. + * If the Signal is waited on, context.stop() will be called when the wait times out, the Signal is signalled, + * or the waiting thread is interrupted. + */ + public <V> Signal register(V supplyOnDone, Consumer<V> receiveOnDone); + + /** + * Signal one waiting thread + */ + public boolean signal(); + + /** + * Signal all waiting threads + */ + public void signalAll(); + + /** getWaiting() > 0 */ + public boolean hasWaiters(); + + /** Return how many threads are waiting */ + public int getWaiting(); + + /** + * Factory method used to capture and redirect instantiations for simulation + */ + public static WaitQueue newWaitQueue() + { + return new Standard(); + } + + class Standard implements WaitQueue + { + private static final int CANCELLED = -1; + private static final int SIGNALLED = 1; + private static final int NOT_SET = 0; + + private static final AtomicIntegerFieldUpdater<RegisteredSignal> signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, "state"); + + // the waiting signals + private final ConcurrentLinkedQueue<RegisteredSignal> queue = new ConcurrentLinkedQueue<>(); + + protected Standard() {} + + /** + * The calling thread MUST be the thread that uses the signal + * @return x + */ + public Signal register() + { + RegisteredSignal signal = new RegisteredSignal(); + queue.add(signal); + return signal; + } + + /** + * The calling thread MUST be the thread that uses the signal. + * If the Signal is waited on, context.stop() will be called when the wait times out, the Signal is signalled, + * or the waiting thread is interrupted. + */ + public <V> Signal register(V supplyOnDone, Consumer<V> receiveOnDone) + { + RegisteredSignal signal = new SignalWithListener<>(supplyOnDone, receiveOnDone); + queue.add(signal); + return signal; + } + + /** + * Signal one waiting thread + */ + public boolean signal() + { + while (true) + { + RegisteredSignal s = queue.poll(); + if (s == null || s.doSignal() != null) + return s != null; + } + } + + /** + * Signal all waiting threads + */ + public void signalAll() + { + if (!hasWaiters()) + return; + + // to avoid a race where the condition is not met and the woken thread managed to wait on the queue before + // we finish signalling it all, we pick a random thread we have woken-up and hold onto it, so that if we encounter + // it again we know we're looping. We reselect a random thread periodically, progressively less often. + // the "correct" solution to this problem is to use a queue that permits snapshot iteration, but this solution is sufficient + // TODO: this is only necessary because we use CLQ - which is only for historical any-NIH reasons + int i = 0, s = 5; + Thread randomThread = null; + Iterator<RegisteredSignal> iter = queue.iterator(); + while (iter.hasNext()) + { + RegisteredSignal signal = iter.next(); + Thread signalled = signal.doSignal(); + + if (signalled != null) + { + if (signalled == randomThread) + break; + + if (++i == s) + { + randomThread = signalled; + s <<= 1; + } + } + + iter.remove(); + } + } + + private void cleanUpCancelled() + { + // TODO: attempt to remove the cancelled from the beginning only (need atomic cas of head) + queue.removeIf(RegisteredSignal::isCancelled); + } + + public boolean hasWaiters() + { + return !queue.isEmpty(); + } + + /** + * @return how many threads are waiting + */ + public int getWaiting() + { + if (!hasWaiters()) + return 0; + Iterator<RegisteredSignal> iter = queue.iterator(); + int count = 0; + while (iter.hasNext()) + { + Signal next = iter.next(); + if (!next.isCancelled()) + count++; + } + return count; + } + + /** + * An abstract signal implementation + * + * TODO: use intrusive linked list + */ + public static abstract class AbstractSignal extends AbstractAwaitable implements Signal + { + public Signal await() throws InterruptedException + { + while (!isSignalled()) + { + checkInterrupted(); + LockSupport.park(); + } + checkAndClear(); + return this; + } + + public boolean awaitUntil(long nanoTimeDeadline) throws InterruptedException + { + long now; + while (nanoTimeDeadline > (now = nanoTime()) && !isSignalled()) + { + checkInterrupted(); + long delta = nanoTimeDeadline - now; + LockSupport.parkNanos(delta); + } + return checkAndClear(); + } + + private void checkInterrupted() throws InterruptedException + { + if (Thread.interrupted()) + { + cancel(); + throw new InterruptedException(); + } + } + } + + /** + * A signal registered with this WaitQueue + */ + private class RegisteredSignal extends AbstractSignal + { + private volatile Thread thread = Thread.currentThread(); + volatile int state; + + public boolean isSignalled() + { + return state == SIGNALLED; + } + + public boolean isCancelled() + { + return state == CANCELLED; + } + + public boolean isSet() + { + return state != NOT_SET; + } + + private Thread doSignal() + { + if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED)) + { + Thread thread = this.thread; + LockSupport.unpark(thread); + this.thread = null; + return thread; + } + return null; + } + + public void signal() + { + doSignal(); + } + + public boolean checkAndClear() + { + if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED)) + { + thread = null; + cleanUpCancelled(); + return false; + } + // must now be signalled assuming correct API usage + return true; + } + + /** + * Should only be called by the registered thread. Indicates the signal can be retired, + * and if signalled propagates the signal to another waiting thread + */ + public void cancel() + { + if (isCancelled()) + return; + if (!signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED)) + { + // must already be signalled - switch to cancelled and + state = CANCELLED; + // propagate the signal + WaitQueue.Standard.this.signal(); + } + thread = null; + cleanUpCancelled(); + } + } + + /** + * A RegisteredSignal that stores a TimerContext, and stops the timer when either cancelled or + * finished waiting. i.e. if the timer is started when the signal is registered it tracks the + * time in between registering and invalidating the signal. + */ + private final class SignalWithListener<V> extends RegisteredSignal + { + private final V supplyOnDone; + private final Consumer<V> receiveOnDone; + + private SignalWithListener(V supplyOnDone, Consumer<V> receiveOnDone) + { + this.receiveOnDone = receiveOnDone; + this.supplyOnDone = supplyOnDone; + } + + + @Override + public boolean checkAndClear() + { + receiveOnDone.accept(supplyOnDone); + return super.checkAndClear(); + } + + @Override + public void cancel() + { + if (!isCancelled()) + { + receiveOnDone.accept(supplyOnDone); + super.cancel(); + } + } + } + } + + /** + * Loops waiting on the supplied condition and WaitQueue and will not return until the condition is true + */ + public static void waitOnCondition(BooleanSupplier condition, WaitQueue queue) throws InterruptedException + { + while (!condition.getAsBoolean()) + { + Signal s = queue.register(); + if (!condition.getAsBoolean()) s.await(); + else s.cancel(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org