Repository: reef
Updated Branches:
  refs/heads/master b6c9a93c4 -> 0314e61a5


[REEF-1902] Use external executor and Runnable in MultiAsyncToSync

Summary of changes:
   * add a new `MultiAsyncToSync` constructor that takes an ExecutorService
   * make `MultiAsyncToSync` take `Runnable` instead of its subclass 
`FutureTask`
   * Use finer logging levels whenever possible
   * refactor the unut tests in `MultiAsyncToSyncTest`
   * minor fixes and refactorings in `MultiAsyncToSync` and 
`MultiAsyncToSyncTest`

JIRA: [REEF-1902](https://issues.apache.org/jira/browse/REEF-1902)

Pull request:
  The closes #1391


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/0314e61a
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/0314e61a
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/0314e61a

Branch: refs/heads/master
Commit: 0314e61a5829b1f3a8c15e34821043c2139367fd
Parents: b6c9a93
Author: Sergiy Matusevych <[email protected]>
Authored: Wed Oct 18 15:31:30 2017 -0700
Committer: Doug Service <[email protected]>
Committed: Tue Oct 24 01:23:19 2017 +0000

----------------------------------------------------------------------
 .../org/apache/reef/util/MultiAsyncToSync.java  | 40 ++++++++----
 .../apache/reef/util/MultiAsyncToSyncTest.java  | 66 +++++++++-----------
 2 files changed, 58 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/0314e61a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java
index 272cab6..7fea49c 100644
--- 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java
@@ -19,10 +19,7 @@ package org.apache.reef.util;
 
 import org.apache.reef.util.exception.InvalidIdentifierException;
 
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -37,12 +34,15 @@ import java.util.logging.Logger;
  * caller is released with a call to {@code release()}.
  */
 public final class MultiAsyncToSync {
+
   private static final Logger LOG = 
Logger.getLogger(MultiAsyncToSync.class.getName());
 
   private final ConcurrentLinkedQueue<ComplexCondition> freeQueue = new 
ConcurrentLinkedQueue<>();
   private final ConcurrentHashMap<Long, ComplexCondition> sleeperMap = new 
ConcurrentHashMap<>();
+
   private final long timeoutPeriod;
   private final TimeUnit timeoutUnits;
+  private final ExecutorService executor;
 
   /**
    * Initialize a multiple asynchronous to synchronous object with a specified 
timeout value.
@@ -51,46 +51,63 @@ public final class MultiAsyncToSync {
    * @param timeoutUnits The unit of time for the timeoutPeriod parameter.
    */
   public MultiAsyncToSync(final long timeoutPeriod, final TimeUnit 
timeoutUnits) {
+    this(timeoutPeriod, timeoutUnits, null);
+  }
+
+  /**
+   * Initialize a multiple asynchronous to synchronous object with a specified 
timeout value.
+   * @param timeoutPeriod The length of time in units given by the the 
timeoutUnits
+   *                      parameter before the condition automatically times 
out.
+   * @param timeoutUnits The unit of time for the timeoutPeriod parameter.
+   * @param executor An executor service used to run async processors in the 
block method. Can be null.
+   */
+  public MultiAsyncToSync(final long timeoutPeriod, final TimeUnit 
timeoutUnits, final ExecutorService executor) {
     this.timeoutPeriod = timeoutPeriod;
     this.timeoutUnits = timeoutUnits;
+    this.executor = executor;
   }
 
   /**
    * Put the caller to sleep on a specific release identifier.
    * @param identifier The identifier required to awake the caller via the 
{@code release()} method.
-   * @param asyncProcessor A {@code FutureTask} object which returns {@code 
TAsync} that initiates the asynchronous
+   * @param asyncProcessor A {@code Runnable} object that initiates the 
asynchronous
    *                       processing associated with the call. This will 
occur inside the condition lock
    *                       to prevent the processing from generating the 
signal before the calling thread blocks.
    *                       Error conditions should be handled by throwing an 
exception which the caller
    *                       will catch. The caller can retrieve the results of 
the processing by calling
    *                       {@code asyndProcessor.get()}.
-   * @param <TAsync> The return type of the {@code asyncProcessor};
    * @return A boolean value that indicates whether or not a timeout or error 
occurred.
    * @throws InterruptedException The thread was interrupted while waiting on 
a condition.
    * @throws InvalidIdentifierException The identifier parameter is invalid.
    */
-  public <TAsync> boolean block(final long identifier, final 
FutureTask<TAsync> asyncProcessor)
+  public boolean block(final long identifier, final Runnable asyncProcessor)
         throws InterruptedException, InvalidIdentifierException {
-    final boolean timeoutOccurred;
+
     final ComplexCondition call = allocate();
     if (call.isHeldByCurrentThread()) {
       throw new RuntimeException("release() must not be called on same thread 
as block() to prevent deadlock");
     }
+
     try {
       call.lock();
       // Add the call identifier to the sleeper map so release() can identify 
this instantiation.
       addSleeper(identifier, call);
       // Invoke the caller's asynchronous processing while holding the lock
       // so a wakeup cannot occur before the caller sleeps.
-      asyncProcessor.run();
+      if (executor == null) {
+        asyncProcessor.run();
+      } else {
+        executor.execute(asyncProcessor);
+      }
       // Put the caller to sleep until the ack comes back. Note: we atomically
       // give up the look as the caller sleeps and atomically reacquire the
       // the lock as we wake up.
       LOG.log(Level.FINER, "Putting caller to sleep on identifier [{0}]", 
identifier);
-      timeoutOccurred = !call.await();
+      final boolean timeoutOccurred = !call.await();
       if (timeoutOccurred) {
-        LOG.log(Level.SEVERE, "Call timed out on identifier [{0}]", 
identifier);
+        LOG.log(Level.WARNING, "Call timed out on identifier [{0}]", 
identifier);
       }
+      return timeoutOccurred;
     } finally {
       // Whether or not the call completed successfully, always remove
       // the call from the sleeper map, release the lock and cleanup.
@@ -101,7 +118,6 @@ public final class MultiAsyncToSync {
         call.unlock();
       }
     }
-    return timeoutOccurred;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/0314e61a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
index f31b453..1a1d4d3 100644
--- 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
@@ -27,13 +27,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 /**
  * Performs an asynchronous increment of an Integer.
  */
 final class AsynchronousIncrementer implements Callable<Integer> {
+
   private static final Logger LOG = 
Logger.getLogger(AsynchronousIncrementer.class.getName());
+
   private final int sleepTimeMillis;
   private final int input;
   private final long identifier;
@@ -57,9 +57,9 @@ final class AsynchronousIncrementer implements 
Callable<Integer> {
   /**
    * Sleep and then increment the input value by one.
    * @return The input value of the operation incremented by one.
-   * @throws Exception
    */
-  public Integer call() throws Exception {
+  @Override
+  public Integer call() throws InterruptedException, 
InvalidIdentifierException {
     LOG.log(Level.INFO, "Sleeping...");
     Thread.sleep(sleepTimeMillis);
     LOG.log(Level.INFO, "Releasing caller on identifier [{0}]...", identifier);
@@ -89,7 +89,7 @@ final class SynchronousApi implements AutoCloseable {
   SynchronousApi(final int incrementerSleepTimeSeconds,
                  final long timeoutPeriodSeconds, final int numberOfThreads) {
     this.incrementerSleepTimeMillis = 1000 * incrementerSleepTimeSeconds;
-    this.blocker = new MultiAsyncToSync(timeoutPeriodSeconds, SECONDS);
+    this.blocker = new MultiAsyncToSync(timeoutPeriodSeconds, 
TimeUnit.SECONDS);
     this.executor = Executors.newFixedThreadPool(numberOfThreads);
   }
 
@@ -105,6 +105,7 @@ final class SynchronousApi implements AutoCloseable {
       this.executor = executor;
     }
 
+    @Override
     public Boolean call() {
       executor.execute(task);
       return true;
@@ -118,9 +119,8 @@ final class SynchronousApi implements AutoCloseable {
    * @throws InterruptedException Thread was interrupted by another thread.
    * @throws ExecutionException An exception was thrown an internal processing 
function.
    * @throws InvalidIdentifierException The call identifier is invalid.
-   * @throws Exception The asynchronous processing generated an exception.
    */
-  public int apiCall(final Integer input) throws Exception {
+  public int apiCall(final Integer input) throws InterruptedException, 
InvalidIdentifierException, ExecutionException {
     // Create a future to run the asynchronous processing.
     final long identifier = idCounter.getAndIncrement();
     final FutureTask<Integer> task =
@@ -139,13 +139,12 @@ final class SynchronousApi implements AutoCloseable {
 
   /**
    * Ensure all test tasks have completed.
-   * @throws ExecutionException Asynchronous processing generated an exception.
    */
-  public void close() throws ExecutionException, InterruptedException {
+  public void close() throws InterruptedException {
     for (final FutureTask<Integer> task : taskQueue) {
       try {
         task.get();
-      } catch (final Exception e) {
+      } catch (final ExecutionException e) {
         LOG.log(Level.INFO, "Caught exception waiting for completion...", e);
       }
     }
@@ -163,7 +162,7 @@ public final class MultiAsyncToSyncTest {
    * Verify calculations successfully complete when no timeout occurs.
    */
   @Test
-  public void testNoTimeout() throws Exception {
+  public void testNoTimeout() throws InterruptedException, 
InvalidIdentifierException, ExecutionException {
     LOG.log(Level.INFO, "Starting...");
 
     // Parameters that do not force a timeout.
@@ -182,7 +181,7 @@ public final class MultiAsyncToSyncTest {
    * Verify an error is returned when a timeout occurs.
    */
   @Test
-  public void testTimeout() throws Exception {
+  public void testTimeout()  throws InterruptedException, 
InvalidIdentifierException, ExecutionException {
     LOG.log(Level.INFO, "Starting...");
 
     // Parameters that force a timeout.
@@ -201,7 +200,8 @@ public final class MultiAsyncToSyncTest {
    * Verify no interaction occurs when multiple calls are in flight.
    */
   @Test
-  public void testMultipleCalls() throws Exception {
+  public void testMultipleCalls()
+      throws InterruptedException, InvalidIdentifierException, 
ExecutionException, NoSuchMethodException {
 
     LOG.log(Level.INFO, "Starting...");
 
@@ -237,7 +237,8 @@ public final class MultiAsyncToSyncTest {
    * Verify no race conditions occurs when multiple calls are in flight.
    */
   @Test
-  public void testRaceConditions() throws Exception {
+  public void testRaceConditions()
+      throws InterruptedException, InvalidIdentifierException, 
ExecutionException, NoSuchMethodException {
 
     LOG.log(Level.INFO, "Starting...");
 
@@ -269,31 +270,24 @@ public final class MultiAsyncToSyncTest {
   /**
    * Verify calling block and release on same thread generates an exception.
    */
-  @Test
-  public void testCallOnSameThread() throws Exception {
+  @Test(expected = ExecutionException.class)
+  public void testCallOnSameThread()  throws InterruptedException, 
InvalidIdentifierException, ExecutionException {
+
     LOG.log(Level.INFO, "Starting...");
 
-    final long timeoutPeriodSeconds = 2;
     final long identifier = 78;
-    boolean result = false;
-
-    try {
-      final MultiAsyncToSync asyncToSync = new 
MultiAsyncToSync(timeoutPeriodSeconds, TimeUnit.SECONDS);
-      FutureTask<Object> syncProc = new FutureTask<>(new Callable<Object>() {
-        public Object call() throws InterruptedException, 
InvalidIdentifierException {
-          asyncToSync.release(identifier);
-          return null;
-        }
-      });
-      asyncToSync.block(identifier, syncProc);
-      syncProc.get();
-    } catch (ExecutionException ee) {
-      if (ee.getCause() instanceof RuntimeException) {
-        LOG.log(Level.INFO, "Caught expected runtime exception...", ee);
-        result = true;
+    final MultiAsyncToSync asyncToSync = new MultiAsyncToSync(2, 
TimeUnit.SECONDS);
+    final FutureTask<Object> syncProc = new FutureTask<>(new 
Callable<Object>() {
+      @Override
+      public Object call() throws InterruptedException, 
InvalidIdentifierException {
+        asyncToSync.release(identifier);
+        return null;
       }
-    }
-    Assert.assertTrue("Expected runtime exception", result);
+    });
+
+    asyncToSync.block(identifier, syncProc);
+    syncProc.get(); // must throw ExecutionException
+
+    Assert.fail("syncProc.get() must throw");
   }
 }
-

Reply via email to