Repository: reef Updated Branches: refs/heads/master 9937c7519 -> d609c002c
[REEF-1864] Utility to convert asynchronous API to synchronous. JIRA: [REEF-1864](https://issues.apache.org/jira/browse/REEF-1864) * Adds condition variable class to manage sleep on condition variables. * Adds MultiAsyncToSynch class to for managing the callers of an API which is synchronous but is implmented asynchronously. * Adds MultiAsyncToSyncTest which verifies that non-timeout, timeout, and multiple non-timeout calls are complete correctly. Pull Request: Closes #1365 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d609c002 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d609c002 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d609c002 Branch: refs/heads/master Commit: d609c002c38032f55c915c72434ae1443604461b Parents: 9937c75 Author: Doug Service <[email protected]> Authored: Fri Aug 11 01:41:29 2017 +0000 Committer: Sergiy Matusevych <[email protected]> Committed: Mon Aug 28 13:50:08 2017 -0700 ---------------------------------------------------------------------- .../org/apache/reef/util/ComplexCondition.java | 135 +++++++++ .../org/apache/reef/util/MethodCallable.java | 64 ++++ .../org/apache/reef/util/MultiAsyncToSync.java | 181 +++++++++++ .../org/apache/reef/util/SimpleCondition.java | 136 +++++++++ .../exception/InvalidIdentifierException.java | 42 +++ .../reef/util/exception/package-info.java | 22 ++ .../apache/reef/util/MultiAsyncToSyncTest.java | 299 +++++++++++++++++++ .../apache/reef/util/SimpleConditionTest.java | 80 +++++ 8 files changed, 959 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java new file mode 100644 index 0000000..cf039ae --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.reef.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Manages Java lock and condition objects to create a simplified + * condition variable interface. + */ +public final class ComplexCondition { + private final ReentrantLock lockVar = new ReentrantLock(); + private final Condition conditionVar = lockVar.newCondition(); + private final long timeoutPeriod; + private final TimeUnit timeoutUnits; + private volatile boolean isSignal = false; + + /** + * Default constructor which with infinite timeout period. + */ + public ComplexCondition() { + this(Long.MAX_VALUE, TimeUnit.DAYS); + } + + /** + * Wrap a lock and associated condition together into a single atomic condition + * variable that can be used synchronize waiters and signalers. Signal must + * come from a different thread than the sleeper thread. + * Typical usage for the sleeper: + * {@code + * try { + * cv.lock(); + * // Access shared objects. + * cv.await(); // lock is atomically given up sleeping and reacquired on wakeup. + * // Access shared objects. + * } finally { + * // Cleanup. + * cv.unlock(); + * } + * } + * Typical usage for the signaler: + * {@code + * try { + * cv.lock(); + * // Access shared objects. + * cv.signal(); + * // Access shared objects. + * } finally { + * cv.unlock() + * } + * } + * @param timeoutPeriod The length of time in units given by the the timeoutUnits + * parameter before the condition automatically times out. + * @param timeoutUnits The unit of time for the timeoutPeriod parameter. + */ + public ComplexCondition(final long timeoutPeriod, final TimeUnit timeoutUnits) { + this.timeoutPeriod = timeoutPeriod; + this.timeoutUnits = timeoutUnits; + } + + /** + * Declare a threads intention to either wait or signal the condition. Any work + * with objects that are shared between the waiter and signaler should only be + * accessed after calling {@code preop()} and before calling {@code await()} or + * {@code signal()}. + */ + public void lock() { + lockVar.lock(); + } + + /** + * Declare a threads intention release the condition after a call to wait or signal. + * Any work with objects that are shared between the waiter and signaler should only + * be access after {@code await()} or {@code signal()} and before + * calling {@code unlock()}. + */ + public void unlock() { + lockVar.unlock(); + } + + /** + * Wait for a signal on the condition. Must call {@code lock()} first + * and {@code unlock()} afterwards. + * @return A boolean value that indicates whether or not a signal was received. False + * indicates a timeout occurred before a signal was received. + * @throws InterruptedException The calling thread was interrupted by another thread. + */ + public boolean await() throws InterruptedException { + boolean noTimeout = true; + // Use a loop and a boolean to avoid spurious time outs. + try { + while (!isSignal && noTimeout) { + noTimeout = conditionVar.await(timeoutPeriod, timeoutUnits); + } + } finally { + isSignal = false; + } + return noTimeout; + } + + /** + * Signal the sleeper on the condition. Must have called {@code lock()} first + * and {@code unlock()} afterwards. + */ + public void signal() { + isSignal = true; + conditionVar.signal(); + } + + /** + * Check if the internal lock is currently held by the calling thread. + * @return A boolean value that indicates if the internal lock is currently + * held by the calling thread. + */ + public boolean isHeldByCurrentThread() { + return lockVar.isHeldByCurrentThread(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java new file mode 100644 index 0000000..ee96344 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.Callable; + +/** + * Generic class which provides a simple mechanism to call a single method + * on a concrete class. + * @param <TReturn> The class type of the method return value. + */ +public final class MethodCallable<TReturn> implements Callable<TReturn> { + private final Object obj; + private final Object[] input; + private final Method method; + + /** + * @param obj An subclass of object on which the method will be invoked. + * @param input Parameter input values for the method invocation. + * @param function A string which contains the name of the method to be invoked. + */ + public MethodCallable(final Object obj, final String function, + final Object... input) throws NoSuchMethodException { + this.obj = obj; + this.input = input; + + // Get the argument types. + final Class[] inputClass = new Class[input.length]; + for (int idx = 0; idx < input.length; ++idx) { + inputClass[idx] = input[idx].getClass(); + } + + this.method = obj.getClass().getDeclaredMethod(function, inputClass); + } + + /** + * @return A object of class type TReturn. + * @throws InvocationTargetException The function specified in the constructor threw + * an exception. + * @throws IllegalAccessException Method is not accessible in calling context. + */ + @SuppressWarnings("unchecked") + public TReturn call() throws InvocationTargetException, IllegalAccessException { + return (TReturn)method.invoke(obj, input); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java new file mode 100644 index 0000000..272cab6 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.reef.util; + +import org.apache.reef.util.exception.InvalidIdentifierException; + +import java.util.concurrent.FutureTask; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Assists a class in presenting a synchronous interface that is implemented + * via asynchronous interfaces and events. When a method call is received + * by the interface, parameter values are captured and the initiation of + * asynchronous processing is encapsulated in a callable object. When + * {@code block()} is called with the internal interface identifier, the + * lock is taken, the asynchronous processing is initiated, and the caller + * is put to sleep. After all of the asynchronous processing is complete the + * caller is released with a call to {@code release()}. + */ +public final class MultiAsyncToSync { + private static final Logger LOG = Logger.getLogger(MultiAsyncToSync.class.getName()); + + private final ConcurrentLinkedQueue<ComplexCondition> freeQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentHashMap<Long, ComplexCondition> sleeperMap = new ConcurrentHashMap<>(); + private final long timeoutPeriod; + private final TimeUnit timeoutUnits; + + /** + * Initialize a multiple asynchronous to synchronous object with a specified timeout value. + * @param timeoutPeriod The length of time in units given by the the timeoutUnits + * parameter before the condition automatically times out. + * @param timeoutUnits The unit of time for the timeoutPeriod parameter. + */ + public MultiAsyncToSync(final long timeoutPeriod, final TimeUnit timeoutUnits) { + this.timeoutPeriod = timeoutPeriod; + this.timeoutUnits = timeoutUnits; + } + + /** + * Put the caller to sleep on a specific release identifier. + * @param identifier The identifier required to awake the caller via the {@code release()} method. + * @param asyncProcessor A {@code FutureTask} object which returns {@code TAsync} that initiates the asynchronous + * processing associated with the call. This will occur inside the condition lock + * to prevent the processing from generating the signal before the calling thread blocks. + * Error conditions should be handled by throwing an exception which the caller + * will catch. The caller can retrieve the results of the processing by calling + * {@code asyndProcessor.get()}. + * @param <TAsync> The return type of the {@code asyncProcessor}; + * @return A boolean value that indicates whether or not a timeout or error occurred. + * @throws InterruptedException The thread was interrupted while waiting on a condition. + * @throws InvalidIdentifierException The identifier parameter is invalid. + */ + public <TAsync> boolean block(final long identifier, final FutureTask<TAsync> asyncProcessor) + throws InterruptedException, InvalidIdentifierException { + final boolean timeoutOccurred; + final ComplexCondition call = allocate(); + if (call.isHeldByCurrentThread()) { + throw new RuntimeException("release() must not be called on same thread as block() to prevent deadlock"); + } + try { + call.lock(); + // Add the call identifier to the sleeper map so release() can identify this instantiation. + addSleeper(identifier, call); + // Invoke the caller's asynchronous processing while holding the lock + // so a wakeup cannot occur before the caller sleeps. + asyncProcessor.run(); + // Put the caller to sleep until the ack comes back. Note: we atomically + // give up the look as the caller sleeps and atomically reacquire the + // the lock as we wake up. + LOG.log(Level.FINER, "Putting caller to sleep on identifier [{0}]", identifier); + timeoutOccurred = !call.await(); + if (timeoutOccurred) { + LOG.log(Level.SEVERE, "Call timed out on identifier [{0}]", identifier); + } + } finally { + // Whether or not the call completed successfully, always remove + // the call from the sleeper map, release the lock and cleanup. + try { + removeSleeper(identifier); + recycle(call); + } finally { + call.unlock(); + } + } + return timeoutOccurred; + } + + /** + * Wake the caller sleeping on the specific identifier. + * @param identifier The message identifier of the caller who should be released. + */ + public void release(final long identifier) throws InterruptedException, InvalidIdentifierException { + final ComplexCondition call = getSleeper(identifier); + if (call.isHeldByCurrentThread()) { + throw new RuntimeException("release() must not be called on same thread as block() to prevent deadlock"); + } + try { + call.lock(); + LOG.log(Level.FINER, "Waking caller sleeping on identifier [{0}]", identifier); + call.signal(); + } finally { + call.unlock(); + } + } + + /** + * Allocate a condition variable. May reuse existing ones. + * @return A complex condition object. + */ + private ComplexCondition allocate() { + final ComplexCondition call = freeQueue.poll(); + return call != null ? call : new ComplexCondition(timeoutPeriod, timeoutUnits); + } + + /** + * Return a complex condition object to the free queueu. + * @param call The complex condition to be recycled. + */ + private void recycle(final ComplexCondition call) { + freeQueue.add(call); + } + + /** + * Atomically add a coll to the sleeper map. + * @param identifier The unique call identifier. + * @param call The call object to be added to the sleeper map. + */ + private void addSleeper(final long identifier, final ComplexCondition call) { + if (sleeperMap.put(identifier, call) != null) { + throw new RuntimeException(String.format("Duplicate identifier [%d] in sleeper map", identifier)); + } + } + + /** + * Get a reference to a sleeper with a specific identifier without removing + * it from the sleeper map. + * @param identifier The unique identifier of the sleeper to be retrieved. + * @return The complex condition object associated with the input identifier. + * @throws InvalidIdentifierException The sleeper map does not contain a call + * with the specified identifier. + */ + private ComplexCondition getSleeper(final long identifier) throws InvalidIdentifierException { + final ComplexCondition call = sleeperMap.get(identifier); + if (null == call) { + throw new InvalidIdentifierException(identifier); + } + return call; + } + + /** + * Remove the specified call from the sleeper map. + * @param identifier The unique identifier of the call to be removed. + * @throws InvalidIdentifierException The sleeper map does not contain a call + * with the specified identifier. + */ + private void removeSleeper(final long identifier) throws InvalidIdentifierException { + final ComplexCondition call = sleeperMap.remove(identifier); + if (null == call) { + throw new InvalidIdentifierException(identifier); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java new file mode 100644 index 0000000..be8ee12 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.reef.util; + +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Manages Java lock and condition objects to create a simplified + * condition variable interface. + */ +public final class SimpleCondition { + private static final Logger LOG = Logger.getLogger(SimpleCondition.class.getName()); + private final ReentrantLock lockVar = new ReentrantLock(); + private final Condition conditionVar = lockVar.newCondition(); + private final long timeoutPeriod; + private final TimeUnit timeoutUnits; + private volatile boolean isSignal = false; + + /** + * Default constructor which initializes timeout period to 10 seconds. + */ + public SimpleCondition() { + this(Long.MAX_VALUE, TimeUnit.DAYS); + } + + /** + * Initialize condition variable with user specified timeout. + * @param timeoutPeriod The length of time in units given by the the timeoutUnits + * parameter before the condition automatically times out. + * @param timeoutUnits The unit of time for the timeoutPeriod parameter. + */ + public SimpleCondition(final long timeoutPeriod, final TimeUnit timeoutUnits) { + this.timeoutPeriod = timeoutPeriod; + this.timeoutUnits = timeoutUnits; + } + + /** + * Blocks the caller until {@code signal()} is called or a timeout occurs. + * Logical structure: + * {@code + * cv.lock(); + * try { + * doTry.run(); + * cv.await(); // or cv.signal() + * } finally { + * doFinally.run(); + * cv.unlock(); + * } + * } + * @param doTry A {@code FutureTask<TTry>} object that is run after the internal + * condition lock is taken but before waiting on the condition occurs. + * @param doFinally A {@code FutureTask<TFinally>} object that is run after the wakeup + * on the condition occurs but before giving up the condition lock + * is released. + * @param <TTry> The return type of the {@code doTry} future task. + * @param <TFinally> The return type of the {@code doFinally} future task. + * @return A boolean value that indicates whether or not a signal was received. False + * indicates that a timeout occurred before a signal was received. + * @throws InterruptedException Thread was interrupted by another thread while + * waiting for the signal. + * @throws Exception The callers (@code doTry} or {@code doFinally} future task + * threw an exception. + */ + public <TTry, TFinally> boolean await(final FutureTask<TTry> doTry, + final FutureTask<TFinally> doFinally) throws Exception { + boolean noTimeout = true; + if (lockVar.isHeldByCurrentThread()) { + throw new RuntimeException("signal() must not be called on same thread as await()"); + } + try { + lockVar.lock(); + if (null != doTry) { + // Invoke the caller's asynchronous processing while holding the lock + // so a wakeup cannot occur before the caller sleeps. + doTry.run(); + } + // Put the caller asleep on the condition until a signal is received + // or a timeout occurs. Ignore spurious wake ups. + LOG.log(Level.FINER, "Putting caller to sleep..."); + while (!isSignal && noTimeout) { + noTimeout = conditionVar.await(timeoutPeriod, timeoutUnits); + } + LOG.log(Level.FINER, "Caller waking up..."); + } finally { + isSignal = false; + if (null != doFinally) { + try { + // Whether or not a timeout occurred, call the user's cleanup code. + doFinally.run(); + } finally { + lockVar.unlock(); + } + } else { + lockVar.unlock(); + } + } + return noTimeout; + } + + /** + * Wakes the thread sleeping in (@code await()}. + */ + public void signal() { + if (lockVar.isHeldByCurrentThread()) { + throw new RuntimeException("signal() must not be called on same thread as await()"); + } + try { + lockVar.lock(); + LOG.log(Level.INFO, "Signalling sleeper..."); + isSignal = true; + conditionVar.signal(); + } finally { + lockVar.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java new file mode 100644 index 0000000..f921c36 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.util.exception; +/** + * Generated when the identifier passed to MultiAsyncToSync.release is + * invalid. + */ +public final class InvalidIdentifierException extends Exception { + final long identifier; + /**: + * See java.lang.Exception. + */ + public InvalidIdentifierException(final long identifier) { + super("Unknown blocked caller identifier"); + this.identifier = identifier; + } + + /** + * Concatenates the invalid identifier to the error message. + * @return The message string with the invalid identifier. + */ + public String toString() { + return String.format("%s = %d", super.toString(), identifier); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java new file mode 100644 index 0000000..cfee839 --- /dev/null +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Exceptions generated by utilities used across REEF modules. + */ +package org.apache.reef.util.exception; http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java new file mode 100644 index 0000000..f31b453 --- /dev/null +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util; + +import org.apache.reef.util.exception.InvalidIdentifierException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Performs an asynchronous increment of an Integer. + */ +final class AsynchronousIncrementer implements Callable<Integer> { + private static final Logger LOG = Logger.getLogger(AsynchronousIncrementer.class.getName()); + private final int sleepTimeMillis; + private final int input; + private final long identifier; + private final MultiAsyncToSync blocker; + + /** + * Instantiate an incrementer with specific job parameters. + * @param input The input parameter for the work. + * @param identifier The identifier of the caller to wake on completion. + * @param sleepTimeMillis How long to work. + * @param blocker The MultiAsyncToSync object which is holding the blocked client. + */ + AsynchronousIncrementer(final int input, final long identifier, + final int sleepTimeMillis, final MultiAsyncToSync blocker) { + this.sleepTimeMillis = sleepTimeMillis; + this.input = input; + this.identifier = identifier; + this.blocker = blocker; + } + + /** + * Sleep and then increment the input value by one. + * @return The input value of the operation incremented by one. + * @throws Exception + */ + public Integer call() throws Exception { + LOG.log(Level.INFO, "Sleeping..."); + Thread.sleep(sleepTimeMillis); + LOG.log(Level.INFO, "Releasing caller on identifier [{0}]...", identifier); + blocker.release(identifier); + return input + 1; + } +} + +/** + * Use the MultiAsyncToSync class to implement a synchronous API + * that uses asynchronous processing internally. + */ +final class SynchronousApi implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(SynchronousApi.class.getName()); + private final int incrementerSleepTimeMillis; + private final MultiAsyncToSync blocker; + private final ExecutorService executor; + private final ConcurrentLinkedQueue<FutureTask<Integer>> taskQueue = new ConcurrentLinkedQueue<>(); + private final AtomicLong idCounter = new AtomicLong(0); + + /** + * Parameterize the object as to length of processing time and call timeout. + * @param incrementerSleepTimeSeconds Length of time the incrementer sleeps before + * performing the increment and returning. + * @param timeoutPeriodSeconds The length of time before the call will timeout. + */ + SynchronousApi(final int incrementerSleepTimeSeconds, + final long timeoutPeriodSeconds, final int numberOfThreads) { + this.incrementerSleepTimeMillis = 1000 * incrementerSleepTimeSeconds; + this.blocker = new MultiAsyncToSync(timeoutPeriodSeconds, SECONDS); + this.executor = Executors.newFixedThreadPool(numberOfThreads); + } + + /** + * Initiates asynchronous processing inside the condition lock. + */ + private class AsyncInitiator implements Callable<Boolean> { + private final FutureTask<Integer> task; + private final ExecutorService executor; + + AsyncInitiator(final FutureTask<Integer> task, final ExecutorService executor) { + this.task = task; + this.executor = executor; + } + + public Boolean call() { + executor.execute(task); + return true; + } + } + + /** + * Asynchronously increment the input parameter. + * @param input An integer object whose value is to be incremented by one. + * @return The input parameter incremented by one or zero for a timeout. + * @throws InterruptedException Thread was interrupted by another thread. + * @throws ExecutionException An exception was thrown an internal processing function. + * @throws InvalidIdentifierException The call identifier is invalid. + * @throws Exception The asynchronous processing generated an exception. + */ + public int apiCall(final Integer input) throws Exception { + // Create a future to run the asynchronous processing. + final long identifier = idCounter.getAndIncrement(); + final FutureTask<Integer> task = + new FutureTask<>(new AsynchronousIncrementer(input, identifier, incrementerSleepTimeMillis, blocker)); + taskQueue.add(task); + + LOG.log(Level.INFO, "Running the incrementer on identifier [{0}]...", identifier); + if (blocker.block(identifier, new FutureTask<>(new AsyncInitiator(task, executor)))) { + LOG.log(Level.INFO, "Call timed out..."); + // Timeout occurred before the asynchronous processing completed. + return 0; + } + LOG.log(Level.INFO, "Call getting task result..."); + return task.get(); + } + + /** + * Ensure all test tasks have completed. + * @throws ExecutionException Asynchronous processing generated an exception. + */ + public void close() throws ExecutionException, InterruptedException { + for (final FutureTask<Integer> task : taskQueue) { + try { + task.get(); + } catch (final Exception e) { + LOG.log(Level.INFO, "Caught exception waiting for completion...", e); + } + } + executor.shutdownNow(); + } +} + +/** + * Verify proper operation of the MultiAsyncToSync class. + */ +public final class MultiAsyncToSyncTest { + private static final Logger LOG = Logger.getLogger(MultiAsyncToSyncTest.class.getName()); + + /** + * Verify calculations successfully complete when no timeout occurs. + */ + @Test + public void testNoTimeout() throws Exception { + LOG.log(Level.INFO, "Starting..."); + + // Parameters that do not force a timeout. + final int incrementerSleepTimeSeconds = 2; + final long timeoutPeriodSeconds = 4; + final int input = 1; + + try (final SynchronousApi apiObject = + new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 2)) { + final int result = apiObject.apiCall(input); + Assert.assertEquals("Value incremented by one", input + 1, result); + } + } + + /** + * Verify an error is returned when a timeout occurs. + */ + @Test + public void testTimeout() throws Exception { + LOG.log(Level.INFO, "Starting..."); + + // Parameters that force a timeout. + final int incrementerSleepTimeSeconds = 4; + final long timeoutPeriodSeconds = 2; + final int input = 1; + + try (final SynchronousApi apiObject = + new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 2)) { + final int result = apiObject.apiCall(input); + Assert.assertEquals("Timeout occurred", result, 0); + } + } + + /** + * Verify no interaction occurs when multiple calls are in flight. + */ + @Test + public void testMultipleCalls() throws Exception { + + LOG.log(Level.INFO, "Starting..."); + + // Parameters that do not force a timeout. + final int incrementerSleepTimeSeconds = 2; + final long timeoutPeriodSeconds = 4; + + try (final SynchronousApi apiObject = + new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 2)) { + final String function = "apiCall"; + final int input = 1; + final FutureTask<Integer> task1 = + new FutureTask<>(new MethodCallable<Integer>(apiObject, function, input)); + final FutureTask<Integer> task2 + = new FutureTask<>(new MethodCallable<Integer>(apiObject, function, input + 1)); + + // Execute API calls concurrently. + final ExecutorService executor = Executors.newFixedThreadPool(2); + executor.execute(task1); + executor.execute(task2); + + final int result1 = task1.get(); + final int result2 = task2.get(); + + Assert.assertEquals("Input must be incremented by one", input + 1, result1); + Assert.assertEquals("Input must be incremented by one", input + 2, result2); + + executor.shutdownNow(); + } + } + + /** + * Verify no race conditions occurs when multiple calls are in flight. + */ + @Test + public void testRaceConditions() throws Exception { + + LOG.log(Level.INFO, "Starting..."); + + // Parameters that do not force a timeout. + final int incrementerSleepTimeSeconds = 1; + final long timeoutPeriodSeconds = 10; + final String function = "apiCall"; + + final int nTasks = 100; + final FutureTask[] tasks = new FutureTask[nTasks]; + final ExecutorService executor = Executors.newFixedThreadPool(10); + + try (final SynchronousApi apiObject = + new SynchronousApi(incrementerSleepTimeSeconds, timeoutPeriodSeconds, 10)) { + + for (int idx = 0; idx < nTasks; ++idx) { + tasks[idx] = new FutureTask<>(new MethodCallable<Integer>(apiObject, function, idx)); + executor.execute(tasks[idx]); + } + + for (int idx = 0; idx < nTasks; ++idx) { + final int result = (int)tasks[idx].get(); + Assert.assertEquals("Input must be incremented by one", idx + 1, result); + } + } + executor.shutdownNow(); + } + + /** + * Verify calling block and release on same thread generates an exception. + */ + @Test + public void testCallOnSameThread() throws Exception { + LOG.log(Level.INFO, "Starting..."); + + final long timeoutPeriodSeconds = 2; + final long identifier = 78; + boolean result = false; + + try { + final MultiAsyncToSync asyncToSync = new MultiAsyncToSync(timeoutPeriodSeconds, TimeUnit.SECONDS); + FutureTask<Object> syncProc = new FutureTask<>(new Callable<Object>() { + public Object call() throws InterruptedException, InvalidIdentifierException { + asyncToSync.release(identifier); + return null; + } + }); + asyncToSync.block(identifier, syncProc); + syncProc.get(); + } catch (ExecutionException ee) { + if (ee.getCause() instanceof RuntimeException) { + LOG.log(Level.INFO, "Caught expected runtime exception...", ee); + result = true; + } + } + Assert.assertTrue("Expected runtime exception", result); + } +} + http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java new file mode 100644 index 0000000..431208c --- /dev/null +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.reef.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Verify functionality of simple condition objects. + */ +public class SimpleConditionTest { + private static final Logger LOG = Logger.getLogger(SimpleConditionTest.class.getName()); + + /** + * Verify proper operation when not timeout occurs. + * @throws Exception An unexpected exception occurred. + */ + @Test + public void testNoTimeout() throws Exception { + LOG.log(Level.INFO, "Starting..."); + + final ExecutorService executor = Executors.newCachedThreadPool(); + final SimpleCondition condition = new SimpleCondition(); + + FutureTask<FutureTask<Integer>> doTry = new FutureTask<>(new Callable<FutureTask<Integer>>() { + @Override + public FutureTask<Integer> call() throws Exception { + LOG.log(Level.INFO, "doTry executing..."); + + // Spawn a future which can be passed back to the caller. + FutureTask<Integer> task = new FutureTask<>(new Callable<Integer>() { + @Override + public Integer call() throws Exception { + LOG.log(Level.INFO, "doTry sleeping..."); + Thread.sleep(3000); + LOG.log(Level.INFO, "doTry signaling the condition..."); + condition.signal(); + LOG.log(Level.INFO, "doTry condition is signaled..."); + return 5; + } + }); + executor.submit(task); + + return task; + } + }); + + FutureTask<Integer> doFinally = new FutureTask<>(new Callable<Integer>() { + public Integer call() { + LOG.log(Level.INFO, "doFinally executing..."); + return 5; + } + }); + + condition.await(doTry, doFinally); + Thread.sleep(3000); + Assert.assertEquals("No exceptions", doTry.get().get(), doFinally.get()); + + executor.shutdownNow(); + } +}
