Repository: reef Updated Branches: refs/heads/master b6c9a93c4 -> 0314e61a5
[REEF-1902] Use external executor and Runnable in MultiAsyncToSync Summary of changes: * add a new `MultiAsyncToSync` constructor that takes an ExecutorService * make `MultiAsyncToSync` take `Runnable` instead of its subclass `FutureTask` * Use finer logging levels whenever possible * refactor the unut tests in `MultiAsyncToSyncTest` * minor fixes and refactorings in `MultiAsyncToSync` and `MultiAsyncToSyncTest` JIRA: [REEF-1902](https://issues.apache.org/jira/browse/REEF-1902) Pull request: The closes #1391 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/0314e61a Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/0314e61a Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/0314e61a Branch: refs/heads/master Commit: 0314e61a5829b1f3a8c15e34821043c2139367fd Parents: b6c9a93 Author: Sergiy Matusevych <[email protected]> Authored: Wed Oct 18 15:31:30 2017 -0700 Committer: Doug Service <[email protected]> Committed: Tue Oct 24 01:23:19 2017 +0000 ---------------------------------------------------------------------- .../org/apache/reef/util/MultiAsyncToSync.java | 40 ++++++++---- .../apache/reef/util/MultiAsyncToSyncTest.java | 66 +++++++++----------- 2 files changed, 58 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/0314e61a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java index 272cab6..7fea49c 100644 --- a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java +++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java @@ -19,10 +19,7 @@ package org.apache.reef.util; import org.apache.reef.util.exception.InvalidIdentifierException; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,12 +34,15 @@ import java.util.logging.Logger; * caller is released with a call to {@code release()}. */ public final class MultiAsyncToSync { + private static final Logger LOG = Logger.getLogger(MultiAsyncToSync.class.getName()); private final ConcurrentLinkedQueue<ComplexCondition> freeQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentHashMap<Long, ComplexCondition> sleeperMap = new ConcurrentHashMap<>(); + private final long timeoutPeriod; private final TimeUnit timeoutUnits; + private final ExecutorService executor; /** * Initialize a multiple asynchronous to synchronous object with a specified timeout value. @@ -51,46 +51,63 @@ public final class MultiAsyncToSync { * @param timeoutUnits The unit of time for the timeoutPeriod parameter. */ public MultiAsyncToSync(final long timeoutPeriod, final TimeUnit timeoutUnits) { + this(timeoutPeriod, timeoutUnits, null); + } + + /** + * Initialize a multiple asynchronous to synchronous object with a specified timeout value. + * @param timeoutPeriod The length of time in units given by the the timeoutUnits + * parameter before the condition automatically times out. + * @param timeoutUnits The unit of time for the timeoutPeriod parameter. + * @param executor An executor service used to run async processors in the block method. Can be null. + */ + public MultiAsyncToSync(final long timeoutPeriod, final TimeUnit timeoutUnits, final ExecutorService executor) { this.timeoutPeriod = timeoutPeriod; this.timeoutUnits = timeoutUnits; + this.executor = executor; } /** * Put the caller to sleep on a specific release identifier. * @param identifier The identifier required to awake the caller via the {@code release()} method. - * @param asyncProcessor A {@code FutureTask} object which returns {@code TAsync} that initiates the asynchronous + * @param asyncProcessor A {@code Runnable} object that initiates the asynchronous * processing associated with the call. This will occur inside the condition lock * to prevent the processing from generating the signal before the calling thread blocks. * Error conditions should be handled by throwing an exception which the caller * will catch. The caller can retrieve the results of the processing by calling * {@code asyndProcessor.get()}. - * @param <TAsync> The return type of the {@code asyncProcessor}; * @return A boolean value that indicates whether or not a timeout or error occurred. * @throws InterruptedException The thread was interrupted while waiting on a condition. * @throws InvalidIdentifierException The identifier parameter is invalid. */ - public <TAsync> boolean block(final long identifier, final FutureTask<TAsync> asyncProcessor) + public boolean block(final long identifier, final Runnable asyncProcessor) throws InterruptedException, InvalidIdentifierException { - final boolean timeoutOccurred; + final ComplexCondition call = allocate(); if (call.isHeldByCurrentThread()) { throw new RuntimeException("release() must not be called on same thread as block() to prevent deadlock"); } + try { call.lock(); // Add the call identifier to the sleeper map so release() can identify this instantiation. addSleeper(identifier, call); // Invoke the caller's asynchronous processing while holding the lock // so a wakeup cannot occur before the caller sleeps. - asyncProcessor.run(); + if (executor == null) { + asyncProcessor.run(); + } else { + executor.execute(asyncProcessor); + } // Put the caller to sleep until the ack comes back. Note: we atomically // give up the look as the caller sleeps and atomically reacquire the // the lock as we wake up. LOG.log(Level.FINER, "Putting caller to sleep on identifier [{0}]", identifier); - timeoutOccurred = !call.await(); + final boolean timeoutOccurred = !call.await(); if (timeoutOccurred) { - LOG.log(Level.SEVERE, "Call timed out on identifier [{0}]", identifier); + LOG.log(Level.WARNING, "Call timed out on identifier [{0}]", identifier); } + return timeoutOccurred; } finally { // Whether or not the call completed successfully, always remove // the call from the sleeper map, release the lock and cleanup. @@ -101,7 +118,6 @@ public final class MultiAsyncToSync { call.unlock(); } } - return timeoutOccurred; } /** http://git-wip-us.apache.org/repos/asf/reef/blob/0314e61a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java index f31b453..1a1d4d3 100644 --- a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java +++ b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java @@ -27,13 +27,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * Performs an asynchronous increment of an Integer. */ final class AsynchronousIncrementer implements Callable<Integer> { + private static final Logger LOG = Logger.getLogger(AsynchronousIncrementer.class.getName()); + private final int sleepTimeMillis; private final int input; private final long identifier; @@ -57,9 +57,9 @@ final class AsynchronousIncrementer implements Callable<Integer> { /** * Sleep and then increment the input value by one. * @return The input value of the operation incremented by one. - * @throws Exception */ - public Integer call() throws Exception { + @Override + public Integer call() throws InterruptedException, InvalidIdentifierException { LOG.log(Level.INFO, "Sleeping..."); Thread.sleep(sleepTimeMillis); LOG.log(Level.INFO, "Releasing caller on identifier [{0}]...", identifier); @@ -89,7 +89,7 @@ final class SynchronousApi implements AutoCloseable { SynchronousApi(final int incrementerSleepTimeSeconds, final long timeoutPeriodSeconds, final int numberOfThreads) { this.incrementerSleepTimeMillis = 1000 * incrementerSleepTimeSeconds; - this.blocker = new MultiAsyncToSync(timeoutPeriodSeconds, SECONDS); + this.blocker = new MultiAsyncToSync(timeoutPeriodSeconds, TimeUnit.SECONDS); this.executor = Executors.newFixedThreadPool(numberOfThreads); } @@ -105,6 +105,7 @@ final class SynchronousApi implements AutoCloseable { this.executor = executor; } + @Override public Boolean call() { executor.execute(task); return true; @@ -118,9 +119,8 @@ final class SynchronousApi implements AutoCloseable { * @throws InterruptedException Thread was interrupted by another thread. * @throws ExecutionException An exception was thrown an internal processing function. * @throws InvalidIdentifierException The call identifier is invalid. - * @throws Exception The asynchronous processing generated an exception. */ - public int apiCall(final Integer input) throws Exception { + public int apiCall(final Integer input) throws InterruptedException, InvalidIdentifierException, ExecutionException { // Create a future to run the asynchronous processing. final long identifier = idCounter.getAndIncrement(); final FutureTask<Integer> task = @@ -139,13 +139,12 @@ final class SynchronousApi implements AutoCloseable { /** * Ensure all test tasks have completed. - * @throws ExecutionException Asynchronous processing generated an exception. */ - public void close() throws ExecutionException, InterruptedException { + public void close() throws InterruptedException { for (final FutureTask<Integer> task : taskQueue) { try { task.get(); - } catch (final Exception e) { + } catch (final ExecutionException e) { LOG.log(Level.INFO, "Caught exception waiting for completion...", e); } } @@ -163,7 +162,7 @@ public final class MultiAsyncToSyncTest { * Verify calculations successfully complete when no timeout occurs. */ @Test - public void testNoTimeout() throws Exception { + public void testNoTimeout() throws InterruptedException, InvalidIdentifierException, ExecutionException { LOG.log(Level.INFO, "Starting..."); // Parameters that do not force a timeout. @@ -182,7 +181,7 @@ public final class MultiAsyncToSyncTest { * Verify an error is returned when a timeout occurs. */ @Test - public void testTimeout() throws Exception { + public void testTimeout() throws InterruptedException, InvalidIdentifierException, ExecutionException { LOG.log(Level.INFO, "Starting..."); // Parameters that force a timeout. @@ -201,7 +200,8 @@ public final class MultiAsyncToSyncTest { * Verify no interaction occurs when multiple calls are in flight. */ @Test - public void testMultipleCalls() throws Exception { + public void testMultipleCalls() + throws InterruptedException, InvalidIdentifierException, ExecutionException, NoSuchMethodException { LOG.log(Level.INFO, "Starting..."); @@ -237,7 +237,8 @@ public final class MultiAsyncToSyncTest { * Verify no race conditions occurs when multiple calls are in flight. */ @Test - public void testRaceConditions() throws Exception { + public void testRaceConditions() + throws InterruptedException, InvalidIdentifierException, ExecutionException, NoSuchMethodException { LOG.log(Level.INFO, "Starting..."); @@ -269,31 +270,24 @@ public final class MultiAsyncToSyncTest { /** * Verify calling block and release on same thread generates an exception. */ - @Test - public void testCallOnSameThread() throws Exception { + @Test(expected = ExecutionException.class) + public void testCallOnSameThread() throws InterruptedException, InvalidIdentifierException, ExecutionException { + LOG.log(Level.INFO, "Starting..."); - final long timeoutPeriodSeconds = 2; final long identifier = 78; - boolean result = false; - - try { - final MultiAsyncToSync asyncToSync = new MultiAsyncToSync(timeoutPeriodSeconds, TimeUnit.SECONDS); - FutureTask<Object> syncProc = new FutureTask<>(new Callable<Object>() { - public Object call() throws InterruptedException, InvalidIdentifierException { - asyncToSync.release(identifier); - return null; - } - }); - asyncToSync.block(identifier, syncProc); - syncProc.get(); - } catch (ExecutionException ee) { - if (ee.getCause() instanceof RuntimeException) { - LOG.log(Level.INFO, "Caught expected runtime exception...", ee); - result = true; + final MultiAsyncToSync asyncToSync = new MultiAsyncToSync(2, TimeUnit.SECONDS); + final FutureTask<Object> syncProc = new FutureTask<>(new Callable<Object>() { + @Override + public Object call() throws InterruptedException, InvalidIdentifierException { + asyncToSync.release(identifier); + return null; } - } - Assert.assertTrue("Expected runtime exception", result); + }); + + asyncToSync.block(identifier, syncProc); + syncProc.get(); // must throw ExecutionException + + Assert.fail("syncProc.get() must throw"); } } -
