Repository: reef
Updated Branches:
  refs/heads/master 9937c7519 -> d609c002c


[REEF-1864] Utility to convert asynchronous API to synchronous.

JIRA:
  [REEF-1864](https://issues.apache.org/jira/browse/REEF-1864)

  * Adds condition variable class to manage sleep on condition variables.
  * Adds MultiAsyncToSynch class to for managing the callers of an API
    which is synchronous but is implmented asynchronously.
  * Adds MultiAsyncToSyncTest which verifies that non-timeout, timeout,
    and multiple non-timeout calls are complete correctly.

Pull Request:
  Closes #1365


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d609c002
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d609c002
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d609c002

Branch: refs/heads/master
Commit: d609c002c38032f55c915c72434ae1443604461b
Parents: 9937c75
Author: Doug Service <[email protected]>
Authored: Fri Aug 11 01:41:29 2017 +0000
Committer: Sergiy Matusevych <[email protected]>
Committed: Mon Aug 28 13:50:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/reef/util/ComplexCondition.java  | 135 +++++++++
 .../org/apache/reef/util/MethodCallable.java    |  64 ++++
 .../org/apache/reef/util/MultiAsyncToSync.java  | 181 +++++++++++
 .../org/apache/reef/util/SimpleCondition.java   | 136 +++++++++
 .../exception/InvalidIdentifierException.java   |  42 +++
 .../reef/util/exception/package-info.java       |  22 ++
 .../apache/reef/util/MultiAsyncToSyncTest.java  | 299 +++++++++++++++++++
 .../apache/reef/util/SimpleConditionTest.java   |  80 +++++
 8 files changed, 959 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java
new file mode 100644
index 0000000..cf039ae
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/ComplexCondition.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Manages Java lock and condition objects to create a simplified
+ * condition variable interface.
+ */
+public final class ComplexCondition {
+  private final ReentrantLock lockVar = new ReentrantLock();
+  private final Condition conditionVar = lockVar.newCondition();
+  private final long timeoutPeriod;
+  private final TimeUnit timeoutUnits;
+  private volatile boolean isSignal = false;
+
+  /**
+   * Default constructor which with infinite timeout period.
+   */
+  public ComplexCondition() {
+    this(Long.MAX_VALUE, TimeUnit.DAYS);
+  }
+
+  /**
+   * Wrap a lock and associated condition together into a single atomic 
condition
+   * variable that can be used synchronize waiters and signalers. Signal must
+   * come from a different thread than the sleeper thread.
+   * Typical usage for the sleeper:
+   * {@code
+   *   try {
+   *     cv.lock();
+   *     // Access shared objects.
+   *     cv.await(); // lock is atomically given up sleeping and reacquired on 
wakeup.
+   *     // Access shared objects.
+   *   } finally {
+   *     // Cleanup.
+   *     cv.unlock();
+   *   }
+   * }
+   * Typical usage for the signaler:
+   * {@code
+   *   try {
+   *     cv.lock();
+   *     // Access shared objects.
+   *     cv.signal();
+   *     // Access shared objects.
+   *   } finally {
+   *     cv.unlock()
+   *   }
+   * }
+   * @param timeoutPeriod The length of time in units given by the the 
timeoutUnits
+   *                      parameter before the condition automatically times 
out.
+   * @param timeoutUnits The unit of time for the timeoutPeriod parameter.
+   */
+  public ComplexCondition(final long timeoutPeriod, final TimeUnit 
timeoutUnits) {
+    this.timeoutPeriod = timeoutPeriod;
+    this.timeoutUnits = timeoutUnits;
+  }
+
+  /**
+   * Declare a threads intention to either wait or signal the condition. Any 
work
+   * with objects that are shared between the waiter and signaler should only 
be
+   * accessed after calling {@code preop()} and before calling {@code await()} 
or
+   * {@code signal()}.
+   */
+  public void lock() {
+    lockVar.lock();
+  }
+
+  /**
+   * Declare a threads intention release the condition after a call to wait or 
signal.
+   * Any work with objects that are shared between the waiter and signaler 
should only
+   * be access after {@code await()} or {@code signal()} and before
+   * calling {@code unlock()}.
+   */
+  public void unlock() {
+    lockVar.unlock();
+  }
+
+  /**
+   * Wait for a signal on the condition. Must call {@code lock()} first
+   * and {@code unlock()} afterwards.
+   * @return A boolean value that indicates whether or not a signal was 
received. False
+   * indicates a timeout occurred before a signal was received.
+   * @throws InterruptedException The calling thread was interrupted by 
another thread.
+   */
+  public boolean await() throws InterruptedException {
+    boolean noTimeout = true;
+    // Use a loop and a boolean to avoid spurious time outs.
+    try {
+      while (!isSignal && noTimeout) {
+        noTimeout = conditionVar.await(timeoutPeriod, timeoutUnits);
+      }
+    } finally {
+      isSignal = false;
+    }
+    return noTimeout;
+  }
+
+  /**
+   * Signal the sleeper on the condition. Must have called {@code lock()} first
+   * and {@code unlock()} afterwards.
+   */
+  public void signal() {
+    isSignal = true;
+    conditionVar.signal();
+  }
+
+  /**
+   * Check if the internal lock is currently held by the calling thread.
+   * @return A boolean value that indicates if the internal lock is currently
+   * held by the calling thread.
+   */
+  public boolean isHeldByCurrentThread() {
+    return lockVar.isHeldByCurrentThread();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java
new file mode 100644
index 0000000..ee96344
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MethodCallable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+
+/**
+ * Generic class which provides a simple mechanism to call a single method
+ * on a concrete class.
+ * @param <TReturn> The class type of the method return value.
+ */
+public final class MethodCallable<TReturn> implements Callable<TReturn> {
+  private final Object obj;
+  private final Object[] input;
+  private final Method method;
+
+  /**
+   * @param obj An subclass of object on which the method will be invoked.
+   * @param input Parameter input values for the method invocation.
+   * @param function A string which contains the name of the method to be 
invoked.
+   */
+  public MethodCallable(final Object obj, final String function,
+                        final Object... input) throws NoSuchMethodException {
+    this.obj = obj;
+    this.input = input;
+
+    // Get the argument types.
+    final Class[] inputClass = new Class[input.length];
+    for (int idx = 0; idx < input.length; ++idx) {
+      inputClass[idx] = input[idx].getClass();
+    }
+
+    this.method = obj.getClass().getDeclaredMethod(function, inputClass);
+  }
+
+  /**
+   * @return A object of class type TReturn.
+   * @throws InvocationTargetException The function specified in the 
constructor threw
+   *         an exception.
+   * @throws IllegalAccessException Method is not accessible in calling 
context.
+   */
+  @SuppressWarnings("unchecked")
+  public TReturn call() throws InvocationTargetException, 
IllegalAccessException {
+    return (TReturn)method.invoke(obj, input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java
new file mode 100644
index 0000000..272cab6
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/MultiAsyncToSync.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.util;
+
+import org.apache.reef.util.exception.InvalidIdentifierException;
+
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Assists a class in presenting a synchronous interface that is implemented
+ * via asynchronous interfaces and events. When a method call is received
+ * by the interface, parameter values are captured and the initiation of
+ * asynchronous processing is encapsulated in a callable object. When
+ * {@code block()} is called with the internal interface identifier, the
+ * lock is taken, the asynchronous processing is initiated, and the caller
+ * is put to sleep. After all of the asynchronous processing is complete the
+ * caller is released with a call to {@code release()}.
+ */
+public final class MultiAsyncToSync {
+  private static final Logger LOG = 
Logger.getLogger(MultiAsyncToSync.class.getName());
+
+  private final ConcurrentLinkedQueue<ComplexCondition> freeQueue = new 
ConcurrentLinkedQueue<>();
+  private final ConcurrentHashMap<Long, ComplexCondition> sleeperMap = new 
ConcurrentHashMap<>();
+  private final long timeoutPeriod;
+  private final TimeUnit timeoutUnits;
+
+  /**
+   * Initialize a multiple asynchronous to synchronous object with a specified 
timeout value.
+   * @param timeoutPeriod The length of time in units given by the the 
timeoutUnits
+   *                      parameter before the condition automatically times 
out.
+   * @param timeoutUnits The unit of time for the timeoutPeriod parameter.
+   */
+  public MultiAsyncToSync(final long timeoutPeriod, final TimeUnit 
timeoutUnits) {
+    this.timeoutPeriod = timeoutPeriod;
+    this.timeoutUnits = timeoutUnits;
+  }
+
+  /**
+   * Put the caller to sleep on a specific release identifier.
+   * @param identifier The identifier required to awake the caller via the 
{@code release()} method.
+   * @param asyncProcessor A {@code FutureTask} object which returns {@code 
TAsync} that initiates the asynchronous
+   *                       processing associated with the call. This will 
occur inside the condition lock
+   *                       to prevent the processing from generating the 
signal before the calling thread blocks.
+   *                       Error conditions should be handled by throwing an 
exception which the caller
+   *                       will catch. The caller can retrieve the results of 
the processing by calling
+   *                       {@code asyndProcessor.get()}.
+   * @param <TAsync> The return type of the {@code asyncProcessor};
+   * @return A boolean value that indicates whether or not a timeout or error 
occurred.
+   * @throws InterruptedException The thread was interrupted while waiting on 
a condition.
+   * @throws InvalidIdentifierException The identifier parameter is invalid.
+   */
+  public <TAsync> boolean block(final long identifier, final 
FutureTask<TAsync> asyncProcessor)
+        throws InterruptedException, InvalidIdentifierException {
+    final boolean timeoutOccurred;
+    final ComplexCondition call = allocate();
+    if (call.isHeldByCurrentThread()) {
+      throw new RuntimeException("release() must not be called on same thread 
as block() to prevent deadlock");
+    }
+    try {
+      call.lock();
+      // Add the call identifier to the sleeper map so release() can identify 
this instantiation.
+      addSleeper(identifier, call);
+      // Invoke the caller's asynchronous processing while holding the lock
+      // so a wakeup cannot occur before the caller sleeps.
+      asyncProcessor.run();
+      // Put the caller to sleep until the ack comes back. Note: we atomically
+      // give up the look as the caller sleeps and atomically reacquire the
+      // the lock as we wake up.
+      LOG.log(Level.FINER, "Putting caller to sleep on identifier [{0}]", 
identifier);
+      timeoutOccurred = !call.await();
+      if (timeoutOccurred) {
+        LOG.log(Level.SEVERE, "Call timed out on identifier [{0}]", 
identifier);
+      }
+    } finally {
+      // Whether or not the call completed successfully, always remove
+      // the call from the sleeper map, release the lock and cleanup.
+      try {
+        removeSleeper(identifier);
+        recycle(call);
+      } finally {
+        call.unlock();
+      }
+    }
+    return timeoutOccurred;
+  }
+
+  /**
+   * Wake the caller sleeping on the specific identifier.
+   * @param identifier The message identifier of the caller who should be 
released.
+   */
+  public void release(final long identifier) throws InterruptedException, 
InvalidIdentifierException {
+    final ComplexCondition call = getSleeper(identifier);
+    if (call.isHeldByCurrentThread()) {
+      throw new RuntimeException("release() must not be called on same thread 
as block() to prevent deadlock");
+    }
+    try {
+      call.lock();
+      LOG.log(Level.FINER, "Waking caller sleeping on identifier [{0}]", 
identifier);
+      call.signal();
+    } finally {
+      call.unlock();
+    }
+  }
+
+  /**
+   * Allocate a condition variable. May reuse existing ones.
+   * @return A complex condition object.
+   */
+  private ComplexCondition allocate() {
+    final ComplexCondition call = freeQueue.poll();
+    return call != null ? call : new ComplexCondition(timeoutPeriod, 
timeoutUnits);
+  }
+
+  /**
+   * Return a complex condition object to the free queueu.
+   * @param call The complex condition to be recycled.
+   */
+  private void recycle(final ComplexCondition call) {
+    freeQueue.add(call);
+  }
+
+  /**
+   * Atomically add a coll to the sleeper map.
+   * @param identifier The unique call identifier.
+   * @param call The call object to be added to the sleeper map.
+   */
+  private void addSleeper(final long identifier, final ComplexCondition call) {
+    if (sleeperMap.put(identifier, call) != null) {
+      throw new RuntimeException(String.format("Duplicate identifier [%d] in 
sleeper map", identifier));
+    }
+  }
+
+  /**
+   * Get a reference to a sleeper with a specific identifier without removing
+   * it from the sleeper map.
+   * @param identifier The unique identifier of the sleeper to be retrieved.
+   * @return The complex condition object associated with the input identifier.
+   * @throws InvalidIdentifierException The sleeper map does not contain a call
+   * with the specified identifier.
+   */
+  private ComplexCondition getSleeper(final long identifier) throws 
InvalidIdentifierException {
+    final ComplexCondition call = sleeperMap.get(identifier);
+    if (null == call) {
+      throw new InvalidIdentifierException(identifier);
+    }
+    return call;
+  }
+
+  /**
+   * Remove the specified call from the sleeper map.
+   * @param identifier The unique identifier of the call to be removed.
+   * @throws InvalidIdentifierException The sleeper map does not contain a call
+   * with the specified identifier.
+   */
+  private void removeSleeper(final long identifier) throws 
InvalidIdentifierException {
+    final ComplexCondition call = sleeperMap.remove(identifier);
+    if (null == call) {
+      throw new InvalidIdentifierException(identifier);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java
new file mode 100644
index 0000000..be8ee12
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/SimpleCondition.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.util;
+
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages Java lock and condition objects to create a simplified
+ * condition variable interface.
+ */
+public final class SimpleCondition {
+  private static final Logger LOG = 
Logger.getLogger(SimpleCondition.class.getName());
+  private final ReentrantLock lockVar = new ReentrantLock();
+  private final Condition conditionVar = lockVar.newCondition();
+  private final long timeoutPeriod;
+  private final TimeUnit timeoutUnits;
+  private volatile boolean isSignal = false;
+
+  /**
+   * Default constructor which initializes timeout period to 10 seconds.
+   */
+  public SimpleCondition() {
+    this(Long.MAX_VALUE, TimeUnit.DAYS);
+  }
+
+  /**
+   * Initialize condition variable with user specified timeout.
+   * @param timeoutPeriod The length of time in units given by the the 
timeoutUnits
+   *                      parameter before the condition automatically times 
out.
+   * @param timeoutUnits The unit of time for the timeoutPeriod parameter.
+   */
+  public SimpleCondition(final long timeoutPeriod, final TimeUnit 
timeoutUnits) {
+    this.timeoutPeriod = timeoutPeriod;
+    this.timeoutUnits = timeoutUnits;
+  }
+
+  /**
+   * Blocks the caller until {@code signal()} is called or a timeout occurs.
+   * Logical structure:
+   * {@code
+   *   cv.lock();
+   *   try {
+   *     doTry.run();
+   *     cv.await(); // or cv.signal()
+   *   } finally {
+   *     doFinally.run();
+   *     cv.unlock();
+   *   }
+   * }
+   * @param doTry A {@code FutureTask<TTry>} object that is run after the 
internal
+   *              condition lock is taken but before waiting on the condition 
occurs.
+   * @param doFinally A {@code FutureTask<TFinally>} object that is run after 
the wakeup
+   *                  on the condition occurs but before giving up the 
condition lock
+   *                  is released.
+   * @param <TTry> The return type of the {@code doTry} future task.
+   * @param <TFinally> The return type of the {@code doFinally} future task.
+   * @return A boolean value that indicates whether or not a signal was 
received. False
+   * indicates that a timeout occurred before a signal was received.
+   * @throws InterruptedException Thread was interrupted by another thread 
while
+   * waiting for the signal.
+   * @throws Exception The callers (@code doTry} or {@code doFinally} future 
task
+   *                   threw an exception.
+   */
+  public <TTry, TFinally> boolean await(final FutureTask<TTry> doTry,
+                                        final FutureTask<TFinally> doFinally) 
throws Exception {
+    boolean noTimeout = true;
+    if (lockVar.isHeldByCurrentThread()) {
+      throw new RuntimeException("signal() must not be called on same thread 
as await()");
+    }
+    try {
+      lockVar.lock();
+      if (null != doTry) {
+        // Invoke the caller's asynchronous processing while holding the lock
+        // so a wakeup cannot occur before the caller sleeps.
+        doTry.run();
+      }
+      // Put the caller asleep on the condition until a signal is received
+      // or a timeout occurs. Ignore spurious wake ups.
+      LOG.log(Level.FINER, "Putting caller to sleep...");
+      while (!isSignal && noTimeout) {
+        noTimeout = conditionVar.await(timeoutPeriod, timeoutUnits);
+      }
+      LOG.log(Level.FINER, "Caller waking up...");
+    } finally {
+      isSignal = false;
+      if (null != doFinally) {
+        try {
+          // Whether or not a timeout occurred, call the user's cleanup code.
+          doFinally.run();
+        } finally {
+          lockVar.unlock();
+        }
+      } else {
+        lockVar.unlock();
+      }
+    }
+    return noTimeout;
+  }
+
+  /**
+   * Wakes the thread sleeping in (@code await()}.
+   */
+  public void signal() {
+    if (lockVar.isHeldByCurrentThread()) {
+      throw new RuntimeException("signal() must not be called on same thread 
as await()");
+    }
+    try {
+      lockVar.lock();
+      LOG.log(Level.INFO, "Signalling sleeper...");
+      isSignal = true;
+      conditionVar.signal();
+    } finally {
+      lockVar.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java
 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java
new file mode 100644
index 0000000..f921c36
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/InvalidIdentifierException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.util.exception;
+/**
+ * Generated when the identifier passed to MultiAsyncToSync.release is
+ * invalid.
+ */
+public final class InvalidIdentifierException extends Exception {
+  final long identifier;
+  /**:
+   * See java.lang.Exception.
+   */
+  public InvalidIdentifierException(final long identifier) {
+    super("Unknown blocked caller identifier");
+    this.identifier = identifier;
+  }
+
+  /**
+   * Concatenates the invalid identifier to the error message.
+   * @return The message string with the invalid identifier.
+   */
+  public String toString() {
+    return String.format("%s = %d", super.toString(), identifier);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java
 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java
new file mode 100644
index 0000000..cfee839
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/exception/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Exceptions generated by utilities used across REEF modules.
+ */
+package org.apache.reef.util.exception;

http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
new file mode 100644
index 0000000..f31b453
--- /dev/null
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/MultiAsyncToSyncTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+import org.apache.reef.util.exception.InvalidIdentifierException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Performs an asynchronous increment of an Integer.
+ */
+final class AsynchronousIncrementer implements Callable<Integer> {
+  private static final Logger LOG = 
Logger.getLogger(AsynchronousIncrementer.class.getName());
+  private final int sleepTimeMillis;
+  private final int input;
+  private final long identifier;
+  private final MultiAsyncToSync blocker;
+
+  /**
+   * Instantiate an incrementer with specific job parameters.
+   * @param input The input parameter for the work.
+   * @param identifier The identifier of the caller to wake on completion.
+   * @param sleepTimeMillis How long to work.
+   * @param blocker The MultiAsyncToSync object which is holding the blocked 
client.
+   */
+  AsynchronousIncrementer(final int input, final long identifier,
+        final int sleepTimeMillis, final MultiAsyncToSync blocker) {
+    this.sleepTimeMillis = sleepTimeMillis;
+    this.input = input;
+    this.identifier = identifier;
+    this.blocker = blocker;
+  }
+
+  /**
+   * Sleep and then increment the input value by one.
+   * @return The input value of the operation incremented by one.
+   * @throws Exception
+   */
+  public Integer call() throws Exception {
+    LOG.log(Level.INFO, "Sleeping...");
+    Thread.sleep(sleepTimeMillis);
+    LOG.log(Level.INFO, "Releasing caller on identifier [{0}]...", identifier);
+    blocker.release(identifier);
+    return input + 1;
+  }
+}
+
+/**
+ * Use the MultiAsyncToSync class to implement a synchronous API
+ * that uses asynchronous processing internally.
+ */
+final class SynchronousApi implements AutoCloseable {
+  private static final Logger LOG = 
Logger.getLogger(SynchronousApi.class.getName());
+  private final int incrementerSleepTimeMillis;
+  private final MultiAsyncToSync blocker;
+  private final ExecutorService executor;
+  private final ConcurrentLinkedQueue<FutureTask<Integer>> taskQueue = new 
ConcurrentLinkedQueue<>();
+  private final AtomicLong idCounter = new AtomicLong(0);
+
+  /**
+   * Parameterize the object as to length of processing time and call timeout.
+   * @param incrementerSleepTimeSeconds Length of time the incrementer sleeps 
before
+   *                                    performing the increment and returning.
+   * @param timeoutPeriodSeconds The length of time before the call will 
timeout.
+   */
+  SynchronousApi(final int incrementerSleepTimeSeconds,
+                 final long timeoutPeriodSeconds, final int numberOfThreads) {
+    this.incrementerSleepTimeMillis = 1000 * incrementerSleepTimeSeconds;
+    this.blocker = new MultiAsyncToSync(timeoutPeriodSeconds, SECONDS);
+    this.executor = Executors.newFixedThreadPool(numberOfThreads);
+  }
+
+  /**
+   * Initiates asynchronous processing inside the condition lock.
+   */
+  private class AsyncInitiator implements Callable<Boolean> {
+    private final FutureTask<Integer> task;
+    private final ExecutorService executor;
+
+    AsyncInitiator(final FutureTask<Integer> task, final ExecutorService 
executor) {
+      this.task = task;
+      this.executor = executor;
+    }
+
+    public Boolean call() {
+      executor.execute(task);
+      return true;
+    }
+  }
+
+  /**
+   * Asynchronously increment the input parameter.
+   * @param input An integer object whose value is to be incremented by one.
+   * @return The input parameter incremented by one or zero for a timeout.
+   * @throws InterruptedException Thread was interrupted by another thread.
+   * @throws ExecutionException An exception was thrown an internal processing 
function.
+   * @throws InvalidIdentifierException The call identifier is invalid.
+   * @throws Exception The asynchronous processing generated an exception.
+   */
+  public int apiCall(final Integer input) throws Exception {
+    // Create a future to run the asynchronous processing.
+    final long identifier = idCounter.getAndIncrement();
+    final FutureTask<Integer> task =
+        new FutureTask<>(new AsynchronousIncrementer(input, identifier, 
incrementerSleepTimeMillis, blocker));
+    taskQueue.add(task);
+
+    LOG.log(Level.INFO, "Running the incrementer on identifier [{0}]...", 
identifier);
+    if (blocker.block(identifier, new FutureTask<>(new AsyncInitiator(task, 
executor)))) {
+      LOG.log(Level.INFO, "Call timed out...");
+      // Timeout occurred before the asynchronous processing completed.
+      return 0;
+    }
+    LOG.log(Level.INFO, "Call getting task result...");
+    return task.get();
+  }
+
+  /**
+   * Ensure all test tasks have completed.
+   * @throws ExecutionException Asynchronous processing generated an exception.
+   */
+  public void close() throws ExecutionException, InterruptedException {
+    for (final FutureTask<Integer> task : taskQueue) {
+      try {
+        task.get();
+      } catch (final Exception e) {
+        LOG.log(Level.INFO, "Caught exception waiting for completion...", e);
+      }
+    }
+    executor.shutdownNow();
+  }
+}
+
+/**
+ * Verify proper operation of the MultiAsyncToSync class.
+ */
+public final class MultiAsyncToSyncTest {
+  private static final Logger LOG = 
Logger.getLogger(MultiAsyncToSyncTest.class.getName());
+
+  /**
+   * Verify calculations successfully complete when no timeout occurs.
+   */
+  @Test
+  public void testNoTimeout() throws Exception {
+    LOG.log(Level.INFO, "Starting...");
+
+    // Parameters that do not force a timeout.
+    final int incrementerSleepTimeSeconds = 2;
+    final long timeoutPeriodSeconds = 4;
+    final int input = 1;
+
+    try (final SynchronousApi apiObject =
+           new SynchronousApi(incrementerSleepTimeSeconds, 
timeoutPeriodSeconds, 2)) {
+      final int result = apiObject.apiCall(input);
+      Assert.assertEquals("Value incremented by one", input + 1, result);
+    }
+  }
+
+  /**
+   * Verify an error is returned when a timeout occurs.
+   */
+  @Test
+  public void testTimeout() throws Exception {
+    LOG.log(Level.INFO, "Starting...");
+
+    // Parameters that force a timeout.
+    final int incrementerSleepTimeSeconds = 4;
+    final long timeoutPeriodSeconds = 2;
+    final int input = 1;
+
+    try (final SynchronousApi apiObject =
+           new SynchronousApi(incrementerSleepTimeSeconds, 
timeoutPeriodSeconds, 2)) {
+      final int result = apiObject.apiCall(input);
+      Assert.assertEquals("Timeout occurred", result, 0);
+    }
+  }
+
+  /**
+   * Verify no interaction occurs when multiple calls are in flight.
+   */
+  @Test
+  public void testMultipleCalls() throws Exception {
+
+    LOG.log(Level.INFO, "Starting...");
+
+    // Parameters that do not force a timeout.
+    final int incrementerSleepTimeSeconds = 2;
+    final long timeoutPeriodSeconds = 4;
+
+    try (final SynchronousApi apiObject =
+           new SynchronousApi(incrementerSleepTimeSeconds, 
timeoutPeriodSeconds, 2)) {
+      final String function = "apiCall";
+      final int input = 1;
+      final FutureTask<Integer> task1 =
+          new FutureTask<>(new MethodCallable<Integer>(apiObject, function, 
input));
+      final FutureTask<Integer> task2
+          = new FutureTask<>(new MethodCallable<Integer>(apiObject, function, 
input + 1));
+
+      // Execute API calls concurrently.
+      final ExecutorService executor = Executors.newFixedThreadPool(2);
+      executor.execute(task1);
+      executor.execute(task2);
+
+      final int result1 = task1.get();
+      final int result2 = task2.get();
+
+      Assert.assertEquals("Input must be incremented by one", input + 1, 
result1);
+      Assert.assertEquals("Input must be incremented by one", input + 2, 
result2);
+
+      executor.shutdownNow();
+    }
+  }
+
+  /**
+   * Verify no race conditions occurs when multiple calls are in flight.
+   */
+  @Test
+  public void testRaceConditions() throws Exception {
+
+    LOG.log(Level.INFO, "Starting...");
+
+    // Parameters that do not force a timeout.
+    final int incrementerSleepTimeSeconds = 1;
+    final long timeoutPeriodSeconds = 10;
+    final String function = "apiCall";
+
+    final int nTasks = 100;
+    final FutureTask[] tasks = new FutureTask[nTasks];
+    final ExecutorService executor = Executors.newFixedThreadPool(10);
+
+    try (final SynchronousApi apiObject =
+           new SynchronousApi(incrementerSleepTimeSeconds, 
timeoutPeriodSeconds, 10)) {
+
+      for (int idx = 0; idx < nTasks; ++idx) {
+        tasks[idx] = new FutureTask<>(new MethodCallable<Integer>(apiObject, 
function, idx));
+        executor.execute(tasks[idx]);
+      }
+
+      for (int idx = 0; idx < nTasks; ++idx) {
+        final int result = (int)tasks[idx].get();
+        Assert.assertEquals("Input must be incremented by one", idx + 1, 
result);
+      }
+    }
+    executor.shutdownNow();
+  }
+
+  /**
+   * Verify calling block and release on same thread generates an exception.
+   */
+  @Test
+  public void testCallOnSameThread() throws Exception {
+    LOG.log(Level.INFO, "Starting...");
+
+    final long timeoutPeriodSeconds = 2;
+    final long identifier = 78;
+    boolean result = false;
+
+    try {
+      final MultiAsyncToSync asyncToSync = new 
MultiAsyncToSync(timeoutPeriodSeconds, TimeUnit.SECONDS);
+      FutureTask<Object> syncProc = new FutureTask<>(new Callable<Object>() {
+        public Object call() throws InterruptedException, 
InvalidIdentifierException {
+          asyncToSync.release(identifier);
+          return null;
+        }
+      });
+      asyncToSync.block(identifier, syncProc);
+      syncProc.get();
+    } catch (ExecutionException ee) {
+      if (ee.getCause() instanceof RuntimeException) {
+        LOG.log(Level.INFO, "Caught expected runtime exception...", ee);
+        result = true;
+      }
+    }
+    Assert.assertTrue("Expected runtime exception", result);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/reef/blob/d609c002/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java
new file mode 100644
index 0000000..431208c
--- /dev/null
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/SimpleConditionTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Verify functionality of simple condition objects.
+ */
+public class SimpleConditionTest {
+  private static final Logger LOG = 
Logger.getLogger(SimpleConditionTest.class.getName());
+
+  /**
+   * Verify proper operation when not timeout occurs.
+   * @throws Exception An unexpected exception occurred.
+   */
+  @Test
+  public void testNoTimeout() throws Exception {
+    LOG.log(Level.INFO, "Starting...");
+
+    final ExecutorService executor = Executors.newCachedThreadPool();
+    final SimpleCondition condition = new SimpleCondition();
+
+    FutureTask<FutureTask<Integer>> doTry = new FutureTask<>(new 
Callable<FutureTask<Integer>>() {
+      @Override
+      public FutureTask<Integer> call() throws Exception {
+        LOG.log(Level.INFO, "doTry executing...");
+
+        // Spawn a future which can be passed back to the caller.
+        FutureTask<Integer> task = new FutureTask<>(new Callable<Integer>() {
+          @Override
+          public Integer call() throws Exception {
+            LOG.log(Level.INFO, "doTry sleeping...");
+            Thread.sleep(3000);
+            LOG.log(Level.INFO, "doTry signaling the condition...");
+            condition.signal();
+            LOG.log(Level.INFO, "doTry condition is signaled...");
+            return 5;
+          }
+        });
+        executor.submit(task);
+
+        return task;
+      }
+    });
+
+    FutureTask<Integer> doFinally = new FutureTask<>(new Callable<Integer>() {
+      public Integer call() {
+        LOG.log(Level.INFO, "doFinally executing...");
+        return 5;
+      }
+    });
+
+    condition.await(doTry, doFinally);
+    Thread.sleep(3000);
+    Assert.assertEquals("No exceptions", doTry.get().get(), doFinally.get());
+
+    executor.shutdownNow();
+  }
+}

Reply via email to