Repository: reef
Updated Branches:
  refs/heads/master d8fe048d1 -> 7d5a14169


[REEF-1023] Handle Exceptions in Vortex callback

This addressed the issue by
  * Use FutureCallback instead of EventHandler.
  * Use Exectuor for executing callbacks.
  * Added tests for tasklet exception scenarios.

JIRA:
  [REEF-1023](https://issues.apache.org/jira/browse/REEF-1023)

Pull Request:
  Closes #693


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/7d5a1416
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/7d5a1416
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/7d5a1416

Branch: refs/heads/master
Commit: 7d5a14169daf26970b56e3f1273e65fd54815c47
Parents: d8fe048
Author: Andrew Chung <[email protected]>
Authored: Tue Dec 1 23:33:28 2015 -0800
Committer: Byung-Gon Chun <[email protected]>
Committed: Mon Dec 7 08:55:51 2015 +0900

----------------------------------------------------------------------
 .../apache/reef/vortex/api/FutureCallback.java  | 40 +++++++++++
 .../apache/reef/vortex/api/VortexFuture.java    | 35 +++++++---
 .../reef/vortex/api/VortexThreadPool.java       |  5 +-
 .../reef/vortex/driver/DefaultVortexMaster.java | 16 +++--
 .../reef/vortex/driver/VortexConfHelper.java    |  3 +
 .../apache/reef/vortex/driver/VortexMaster.java |  4 +-
 .../reef/vortex/driver/VortexMasterConf.java    | 13 ++++
 .../vortex/driver/DefaultVortexMasterTest.java  | 65 +++++++++++++++---
 .../org/apache/reef/vortex/driver/TestUtil.java |  7 +-
 .../applications/vortex/VortexTestSuite.java    |  4 +-
 .../vortex/addone/AddOneCallbackTestStart.java  | 13 ++--
 .../exception/ExceptionCallbackTestStart.java   | 72 ++++++++++++++++++++
 .../vortex/exception/ExceptionFunction.java     | 31 +++++++++
 .../vortex/exception/VortexExceptionTest.java   | 64 +++++++++++++++++
 .../vortex/exception/package-info.java          | 23 +++++++
 15 files changed, 360 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java
new file mode 100644
index 0000000..75df796
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+/**
+ * A callback for accepting the results of a {@link VortexFuture} computation 
asynchronously.
+ * Only one of either {@link FutureCallback#onSuccess} or {@link 
FutureCallback#onFailure(Throwable)}
+ * will be invoked.
+ * Based on Google Guava's FutureCallback.
+ */
+public interface FutureCallback<V> {
+
+  /**
+   * Invoked with the result of the Tasklet computation on success.
+   * @param result the result.
+   */
+  void onSuccess(final V result);
+
+  /**
+   * Invoked with the error of the Tasklet computation on failure.
+   * @param t the error.
+   */
+  void onFailure(final Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index 7b84b19..34cf6b6 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -19,9 +19,8 @@
 package org.apache.reef.vortex.api;
 
 import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.util.Optional;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.impl.ThreadPoolStage;
 
 import java.util.concurrent.*;
 
@@ -35,20 +34,25 @@ public final class VortexFuture<TOutput> implements 
Future<TOutput> {
   private Optional<TOutput> userResult = null;
   private Exception userException;
   private final CountDownLatch countDownLatch = new CountDownLatch(1);
-  private final ThreadPoolStage<TOutput> stage;
+  private final FutureCallback<TOutput> callbackHandler;
+  private final Executor executor;
 
   /**
    * Creates a {@link VortexFuture}.
    */
-  public VortexFuture() {
-    stage = null;
+  @Private
+  public VortexFuture(final Executor executor) {
+    this(executor, null);
   }
 
   /**
    * Creates a {@link VortexFuture} with a callback.
    */
-  public VortexFuture(final EventHandler<TOutput> callbackHandler) {
-    stage = new ThreadPoolStage<>(callbackHandler, 1);
+  @Private
+  public VortexFuture(final Executor executor,
+                      final FutureCallback<TOutput> callbackHandler) {
+    this.executor = executor;
+    this.callbackHandler = callbackHandler;
   }
 
   /**
@@ -112,8 +116,13 @@ public final class VortexFuture<TOutput> implements 
Future<TOutput> {
    */
   public void completed(final TOutput result) {
     this.userResult = Optional.ofNullable(result);
-    if (stage != null) {
-      stage.onNext(userResult.get());
+    if (callbackHandler != null) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          callbackHandler.onSuccess(userResult.get());
+        }
+      });
     }
     this.countDownLatch.countDown();
   }
@@ -123,6 +132,14 @@ public final class VortexFuture<TOutput> implements 
Future<TOutput> {
    */
   public void threwException(final Exception exception) {
     this.userException = exception;
+    if (callbackHandler != null) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          callbackHandler.onFailure(exception);
+        }
+      });
+    }
     this.countDownLatch.countDown();
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
index 221f852..1b02894 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
@@ -21,7 +21,6 @@ package org.apache.reef.vortex.api;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.driver.VortexMaster;
-import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.io.Serializable;
@@ -47,7 +46,7 @@ public final class VortexThreadPool {
    */
   public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
       submit(final VortexFunction<TInput, TOutput> function, final TInput 
input) {
-    return vortexMaster.enqueueTasklet(function, input, 
Optional.<EventHandler<TOutput>>empty());
+    return vortexMaster.enqueueTasklet(function, input, 
Optional.<FutureCallback<TOutput>>empty());
   }
 
   /**
@@ -60,7 +59,7 @@ public final class VortexThreadPool {
    */
   public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
       submit(final VortexFunction<TInput, TOutput> function, final TInput 
input,
-             final EventHandler<TOutput> callback) {
+             final FutureCallback<TOutput> callback) {
     return vortexMaster.enqueueTasklet(function, input, Optional.of(callback));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index a2b6161..a4b5ef0 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -20,14 +20,17 @@ package org.apache.reef.vortex.driver;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
-import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.io.Serializable;
 import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -40,13 +43,16 @@ final class DefaultVortexMaster implements VortexMaster {
   private final AtomicInteger taskletIdCounter = new AtomicInteger();
   private final RunningWorkers runningWorkers;
   private final PendingTasklets pendingTasklets;
+  private final Executor executor;
 
   /**
    * @param runningWorkers for managing all running workers.
    */
   @Inject
   DefaultVortexMaster(final RunningWorkers runningWorkers,
-                      final PendingTasklets pendingTasklets) {
+                      final PendingTasklets pendingTasklets,
+                      
@Parameter(VortexMasterConf.CallbackThreadPoolSize.class) final int 
threadPoolSize) {
+    this.executor = Executors.newFixedThreadPool(threadPoolSize);
     this.runningWorkers = runningWorkers;
     this.pendingTasklets = pendingTasklets;
   }
@@ -57,13 +63,13 @@ final class DefaultVortexMaster implements VortexMaster {
   @Override
   public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
       enqueueTasklet(final VortexFunction<TInput, TOutput> function, final 
TInput input,
-                     final Optional<EventHandler<TOutput>> callback) {
+                     final Optional<FutureCallback<TOutput>> callback) {
     // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
     final VortexFuture<TOutput> vortexFuture;
     if (callback.isPresent()) {
-      vortexFuture = new VortexFuture<>(callback.get());
+      vortexFuture = new VortexFuture<>(executor, callback.get());
     } else {
-      vortexFuture = new VortexFuture<>();
+      vortexFuture = new VortexFuture<>(executor);
     }
 
     this.pendingTasklets.addLast(new 
Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture));

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
index 7b58f0c..6117e4e 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
@@ -63,6 +63,9 @@ public final class VortexConfHelper {
         .set(VortexMasterConf.NUM_OF_VORTEX_START_THREAD, 
DEFAULT_NUM_OF_VORTEX_START_THREAD) // fixed to 1 for now
         .build();
 
+    // TODO[JIRA REEF-1000]: Consider exposing 
VortexMasterConf.FUTURE_CALLBACK_THREAD_POOL_SIZE.
+    // For now, use default value defined in the NamedParameter.
+
     return Configurations.merge(vortexDriverConf, vortexMasterConf);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index c7c3e79..5c7f684 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -22,9 +22,9 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
-import org.apache.reef.wake.EventHandler;
 
 import java.io.Serializable;
 
@@ -41,7 +41,7 @@ public interface VortexMaster {
    */
   <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
       enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, 
final TInput input,
-                     final Optional<EventHandler<TOutput>> callback);
+                     final Optional<FutureCallback<TOutput>> callback);
 
   /**
    * Call this when a new worker is up and running.

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
index 9851fd8..da7c4ad 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
@@ -67,6 +67,13 @@ public final class VortexMasterConf extends 
ConfigurationModuleBuilder {
   }
 
   /**
+   * Size of threadpool for callbacks on {@link 
org.apache.reef.vortex.api.VortexFuture}.
+   */
+  @NamedParameter(doc = "Size of threadpool for callbacks on VortexFuture.", 
default_value = "10")
+  final class CallbackThreadPoolSize implements Name<Integer> {
+  }
+
+  /**
    * Number of Workers.
    */
   public static final RequiredParameter<Integer> WORKER_NUM = new 
RequiredParameter<>();
@@ -97,6 +104,11 @@ public final class VortexMasterConf extends 
ConfigurationModuleBuilder {
   public static final RequiredParameter<Integer> NUM_OF_VORTEX_START_THREAD = 
new RequiredParameter<>();
 
   /**
+   * Size of threadpool for callbacks on VortexFuture.
+   */
+  public static final OptionalParameter<Integer> 
FUTURE_CALLBACK_THREAD_POOL_SIZE = new OptionalParameter<>();
+
+  /**
    * Vortex Master configuration.
    */
   public static final ConfigurationModule CONF = new VortexMasterConf()
@@ -106,5 +118,6 @@ public final class VortexMasterConf extends 
ConfigurationModuleBuilder {
       .bindNamedParameter(WorkerCapacity.class, WORKER_CAPACITY)
       .bindImplementation(VortexStart.class, VORTEX_START)
       .bindNamedParameter(NumberOfVortexStartThreads.class, 
NUM_OF_VORTEX_START_THREAD)
+      .bindNamedParameter(CallbackThreadPoolSize.class, 
FUTURE_CALLBACK_THREAD_POOL_SIZE)
       .build();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index ff3ae92..37298fa 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -19,9 +19,9 @@
 package org.apache.reef.vortex.driver;
 
 import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
-import org.apache.reef.wake.EventHandler;
 import org.junit.Test;
 
 import java.util.*;
@@ -45,19 +45,25 @@ public class DefaultVortexMasterTest {
     final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
     final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
     final PendingTasklets pendingTasklets = new PendingTasklets();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets);
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
 
     final AtomicBoolean callbackReceived = new AtomicBoolean(false);
     final CountDownLatch latch = new CountDownLatch(1);
 
     vortexMaster.workerAllocated(vortexWorkerManager1);
 
-    final EventHandler<Integer> testCallbackHandler = new 
EventHandler<Integer>() {
+    final FutureCallback<Integer> testCallbackHandler = new 
FutureCallback<Integer>() {
       @Override
-      public void onNext(final Integer value) {
+      public void onSuccess(final Integer integer) {
         callbackReceived.set(true);
         latch.countDown();
-      }};
+      }
+
+      @Override
+      public void onFailure(final Throwable throwable) {
+        throw new RuntimeException("Did not expect exception in test.", 
throwable);
+      }
+    };
 
     final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, 
null, Optional.of(testCallbackHandler));
 
@@ -81,12 +87,12 @@ public class DefaultVortexMasterTest {
     final VortexWorkerManager vortexWorkerManager2 = testUtil.newWorker();
     final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
     final PendingTasklets pendingTasklets = new PendingTasklets();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets);
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
 
     // Allocate worker & tasklet and schedule
     vortexMaster.workerAllocated(vortexWorkerManager1);
     final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, 
null,
-        Optional.<EventHandler<Integer>>empty());
+        Optional.<FutureCallback<Integer>>empty());
     final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, 
pendingTasklets, 1);
 
     // Preemption!
@@ -114,7 +120,7 @@ public class DefaultVortexMasterTest {
     final ArrayList<VortexFuture> vortexFutures = new ArrayList<>();
     final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
     final PendingTasklets pendingTasklets = new PendingTasklets();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets);
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
 
     // Allocate iniital evaluators (will all be preempted later...)
     final List<VortexWorkerManager> initialWorkers = new ArrayList<>();
@@ -129,7 +135,7 @@ public class DefaultVortexMasterTest {
     final int numOfTasklets = 100;
     for (int i = 0; i < numOfTasklets; i++) {
       vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), 
null,
-          Optional.<EventHandler<Integer>>empty()));
+          Optional.<FutureCallback<Integer>>empty()));
     }
     final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, 
pendingTasklets, numOfTasklets);
 
@@ -157,6 +163,47 @@ public class DefaultVortexMasterTest {
   }
 
   /**
+   * Test handling of single tasklet execution with a failure.
+   */
+  @Test(timeout = 10000)
+  public void testTaskletThrowException() throws Exception {
+    final VortexFunction vortexFunction = testUtil.newIntegerFunction();
+    final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
+    final PendingTasklets pendingTasklets = new PendingTasklets();
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+
+    final AtomicBoolean callbackReceived = new AtomicBoolean(false);
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    vortexMaster.workerAllocated(vortexWorkerManager1);
+
+    final FutureCallback<Integer> testCallbackHandler = new 
FutureCallback<Integer>() {
+      @Override
+      public void onSuccess(final Integer integer) {
+        throw new RuntimeException("Did not expect success in test.");
+      }
+
+      @Override
+      public void onFailure(final Throwable throwable) {
+        callbackReceived.set(true);
+        latch.countDown();
+      }
+    };
+
+    final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, 
null, Optional.of(testCallbackHandler));
+
+    final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, 
pendingTasklets, 1);
+    for (final int taskletId : taskletIds) {
+      vortexMaster.taskletErrored(vortexWorkerManager1.getId(), taskletId, new 
RuntimeException("Test exception"));
+    }
+
+    assertTrue("The VortexFuture should be done", future.isDone());
+    latch.await();
+    assertTrue("Callback should have been received", callbackReceived.get());
+  }
+
+  /**
    * Launch specified number of tasklets as a substitute for 
PendingTaskletLauncher.
    * @return ids of launched tasklets
    */

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index 80ee597..fc333e3 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -23,6 +23,8 @@ import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
 
 import java.io.Serializable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.mockito.Mockito.mock;
@@ -31,9 +33,10 @@ import static org.mockito.Mockito.when;
 /**
  * Utility methods for tests.
  */
-public class TestUtil {
+public final class TestUtil {
   private final AtomicInteger taskletId = new AtomicInteger(0);
   private final AtomicInteger workerId = new AtomicInteger(0);
+  private final Executor executor = Executors.newFixedThreadPool(5);
 
   /**
    * @return a new mocked worker.
@@ -49,7 +52,7 @@ public class TestUtil {
    * @return a new dummy tasklet.
    */
   public Tasklet newTasklet() {
-    return new Tasklet(taskletId.getAndIncrement(), null, null, new 
VortexFuture());
+    return new Tasklet(taskletId.getAndIncrement(), null, null, new 
VortexFuture(executor));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
index d47f427..f9068cd 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
@@ -19,12 +19,14 @@
 package org.apache.reef.tests.applications.vortex;
 
 import org.apache.reef.tests.applications.vortex.addone.AddOneTest;
+import org.apache.reef.tests.applications.vortex.exception.VortexExceptionTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    AddOneTest.class
+    AddOneTest.class,
+    VortexExceptionTest.class
     })
 public final class VortexTestSuite {
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
index 3ab4f5f..016e77f 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
@@ -19,10 +19,10 @@
 package org.apache.reef.tests.applications.vortex.addone;
 
 import io.netty.util.internal.ConcurrentSet;
+import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFuture;
 import org.apache.reef.vortex.api.VortexStart;
 import org.apache.reef.vortex.api.VortexThreadPool;
-import org.apache.reef.wake.EventHandler;
 import org.junit.Assert;
 
 import javax.inject.Inject;
@@ -54,12 +54,17 @@ public final class AddOneCallbackTestStart implements 
VortexStart {
     final AddOneFunction addOneFunction = new AddOneFunction();
 
     for (final int i : inputVector) {
-      futures.add(vortexThreadPool.submit(addOneFunction, i, new 
EventHandler<Integer>() {
+      futures.add(vortexThreadPool.submit(addOneFunction, i, new 
FutureCallback<Integer>() {
         @Override
-        public void onNext(final Integer value) {
-          outputSet.add(value - 1);
+        public void onSuccess(final Integer result) {
+          outputSet.add(result - 1);
           latch.countDown();
         }
+
+        @Override
+        public void onFailure(final Throwable t) {
+          throw new RuntimeException("Did not expect exception in test.", t);
+        }
       }));
     }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java
new file mode 100644
index 0000000..48cc5e0
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.applications.vortex.exception;
+
+import org.apache.reef.vortex.api.FutureCallback;
+import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.api.VortexThreadPool;
+import org.junit.Assert;
+
+import javax.inject.Inject;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test that an {@link ExecutionException} is thrown on tasklets that throws 
an Exception, and
+ * that the callback handler gets invoked on failure.
+ */
+public final class ExceptionCallbackTestStart implements VortexStart {
+
+  @Inject
+  private ExceptionCallbackTestStart() {
+  }
+
+  @Override
+  public void start(final VortexThreadPool vortexThreadPool) {
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    final ExceptionFunction exceptionFunction = new ExceptionFunction();
+
+    final VortexFuture<Integer> future = 
vortexThreadPool.submit(exceptionFunction, 1, new FutureCallback<Integer>() {
+      @Override
+      public void onSuccess(final Integer result) {
+        throw new RuntimeException("Did not expect success in test.");
+      }
+
+      @Override
+      public void onFailure(final Throwable t) {
+        latch.countDown();
+      }
+    });
+
+    boolean gotFailure = false;
+    try {
+      latch.await();
+      future.get();
+    } catch (final InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
+    } catch (final ExecutionException ex) {
+      gotFailure = true;
+    }
+
+    Assert.assertTrue(gotFailure);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
new file mode 100644
index 0000000..a6a5737
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.applications.vortex.exception;
+
+import org.apache.reef.vortex.api.VortexFunction;
+
+/**
+ * A test Vortex function that throws an Exception.
+ */
+public final class ExceptionFunction implements VortexFunction<Integer, 
Integer> {
+  @Override
+  public Integer call(final Integer integer) throws Exception {
+    throw new RuntimeException("Expected test exception.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java
new file mode 100644
index 0000000..948fc04
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.applications.vortex.exception;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.vortex.driver.VortexConfHelper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test(s) for tasklets that throw Exceptions.
+ */
+public final class VortexExceptionTest {
+  private final TestEnvironment testEnvironment = 
TestEnvironmentFactory.getNewTestEnvironment();
+
+  /**
+   * Set up the test environment.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.testEnvironment.setUp();
+  }
+
+  /**
+   * Tear down the test environment.
+   */
+  @After
+  public void tearDown() throws Exception {
+    this.testEnvironment.tearDown();
+  }
+
+  /**
+   * Tests that callback is invoked on Vortex Tasklet failure.
+   */
+  @Test
+  public void testVortexExceptionCallback() {
+    final Configuration conf =
+        VortexConfHelper.getVortexConf("TEST_Vortex_ExceptionCallbackTest",
+            ExceptionCallbackTestStart.class, 2, 64, 4, 2000);
+    final LauncherStatus status = this.testEnvironment.run(conf);
+    Assert.assertTrue("Job state after execution: " + status, 
status.isSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java
new file mode 100644
index 0000000..bf800a1
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Vortex Exception test.
+ */
+package org.apache.reef.tests.applications.vortex.exception;
\ No newline at end of file

Reply via email to