Repository: reef Updated Branches: refs/heads/master d8fe048d1 -> 7d5a14169
[REEF-1023] Handle Exceptions in Vortex callback This addressed the issue by * Use FutureCallback instead of EventHandler. * Use Exectuor for executing callbacks. * Added tests for tasklet exception scenarios. JIRA: [REEF-1023](https://issues.apache.org/jira/browse/REEF-1023) Pull Request: Closes #693 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/7d5a1416 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/7d5a1416 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/7d5a1416 Branch: refs/heads/master Commit: 7d5a14169daf26970b56e3f1273e65fd54815c47 Parents: d8fe048 Author: Andrew Chung <[email protected]> Authored: Tue Dec 1 23:33:28 2015 -0800 Committer: Byung-Gon Chun <[email protected]> Committed: Mon Dec 7 08:55:51 2015 +0900 ---------------------------------------------------------------------- .../apache/reef/vortex/api/FutureCallback.java | 40 +++++++++++ .../apache/reef/vortex/api/VortexFuture.java | 35 +++++++--- .../reef/vortex/api/VortexThreadPool.java | 5 +- .../reef/vortex/driver/DefaultVortexMaster.java | 16 +++-- .../reef/vortex/driver/VortexConfHelper.java | 3 + .../apache/reef/vortex/driver/VortexMaster.java | 4 +- .../reef/vortex/driver/VortexMasterConf.java | 13 ++++ .../vortex/driver/DefaultVortexMasterTest.java | 65 +++++++++++++++--- .../org/apache/reef/vortex/driver/TestUtil.java | 7 +- .../applications/vortex/VortexTestSuite.java | 4 +- .../vortex/addone/AddOneCallbackTestStart.java | 13 ++-- .../exception/ExceptionCallbackTestStart.java | 72 ++++++++++++++++++++ .../vortex/exception/ExceptionFunction.java | 31 +++++++++ .../vortex/exception/VortexExceptionTest.java | 64 +++++++++++++++++ .../vortex/exception/package-info.java | 23 +++++++ 15 files changed, 360 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java new file mode 100644 index 0000000..75df796 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/FutureCallback.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +/** + * A callback for accepting the results of a {@link VortexFuture} computation asynchronously. + * Only one of either {@link FutureCallback#onSuccess} or {@link FutureCallback#onFailure(Throwable)} + * will be invoked. + * Based on Google Guava's FutureCallback. + */ +public interface FutureCallback<V> { + + /** + * Invoked with the result of the Tasklet computation on success. + * @param result the result. + */ + void onSuccess(final V result); + + /** + * Invoked with the error of the Tasklet computation on failure. + * @param t the error. + */ + void onFailure(final Throwable t); +} http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java index 7b84b19..34cf6b6 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java @@ -19,9 +19,8 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; import org.apache.reef.util.Optional; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.ThreadPoolStage; import java.util.concurrent.*; @@ -35,20 +34,25 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { private Optional<TOutput> userResult = null; private Exception userException; private final CountDownLatch countDownLatch = new CountDownLatch(1); - private final ThreadPoolStage<TOutput> stage; + private final FutureCallback<TOutput> callbackHandler; + private final Executor executor; /** * Creates a {@link VortexFuture}. */ - public VortexFuture() { - stage = null; + @Private + public VortexFuture(final Executor executor) { + this(executor, null); } /** * Creates a {@link VortexFuture} with a callback. */ - public VortexFuture(final EventHandler<TOutput> callbackHandler) { - stage = new ThreadPoolStage<>(callbackHandler, 1); + @Private + public VortexFuture(final Executor executor, + final FutureCallback<TOutput> callbackHandler) { + this.executor = executor; + this.callbackHandler = callbackHandler; } /** @@ -112,8 +116,13 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { */ public void completed(final TOutput result) { this.userResult = Optional.ofNullable(result); - if (stage != null) { - stage.onNext(userResult.get()); + if (callbackHandler != null) { + executor.execute(new Runnable() { + @Override + public void run() { + callbackHandler.onSuccess(userResult.get()); + } + }); } this.countDownLatch.countDown(); } @@ -123,6 +132,14 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { */ public void threwException(final Exception exception) { this.userException = exception; + if (callbackHandler != null) { + executor.execute(new Runnable() { + @Override + public void run() { + callbackHandler.onFailure(exception); + } + }); + } this.countDownLatch.countDown(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java index 221f852..1b02894 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java @@ -21,7 +21,6 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; import org.apache.reef.util.Optional; import org.apache.reef.vortex.driver.VortexMaster; -import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.io.Serializable; @@ -47,7 +46,7 @@ public final class VortexThreadPool { */ public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> submit(final VortexFunction<TInput, TOutput> function, final TInput input) { - return vortexMaster.enqueueTasklet(function, input, Optional.<EventHandler<TOutput>>empty()); + return vortexMaster.enqueueTasklet(function, input, Optional.<FutureCallback<TOutput>>empty()); } /** @@ -60,7 +59,7 @@ public final class VortexThreadPool { */ public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> submit(final VortexFunction<TInput, TOutput> function, final TInput input, - final EventHandler<TOutput> callback) { + final FutureCallback<TOutput> callback) { return vortexMaster.enqueueTasklet(function, input, Optional.of(callback)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index a2b6161..a4b5ef0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -20,14 +20,17 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.util.Optional; +import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; -import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.io.Serializable; import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** @@ -40,13 +43,16 @@ final class DefaultVortexMaster implements VortexMaster { private final AtomicInteger taskletIdCounter = new AtomicInteger(); private final RunningWorkers runningWorkers; private final PendingTasklets pendingTasklets; + private final Executor executor; /** * @param runningWorkers for managing all running workers. */ @Inject DefaultVortexMaster(final RunningWorkers runningWorkers, - final PendingTasklets pendingTasklets) { + final PendingTasklets pendingTasklets, + @Parameter(VortexMasterConf.CallbackThreadPoolSize.class) final int threadPoolSize) { + this.executor = Executors.newFixedThreadPool(threadPoolSize); this.runningWorkers = runningWorkers; this.pendingTasklets = pendingTasklets; } @@ -57,13 +63,13 @@ final class DefaultVortexMaster implements VortexMaster { @Override public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> enqueueTasklet(final VortexFunction<TInput, TOutput> function, final TInput input, - final Optional<EventHandler<TOutput>> callback) { + final Optional<FutureCallback<TOutput>> callback) { // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch. final VortexFuture<TOutput> vortexFuture; if (callback.isPresent()) { - vortexFuture = new VortexFuture<>(callback.get()); + vortexFuture = new VortexFuture<>(executor, callback.get()); } else { - vortexFuture = new VortexFuture<>(); + vortexFuture = new VortexFuture<>(executor); } this.pendingTasklets.addLast(new Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture)); http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java index 7b58f0c..6117e4e 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java @@ -63,6 +63,9 @@ public final class VortexConfHelper { .set(VortexMasterConf.NUM_OF_VORTEX_START_THREAD, DEFAULT_NUM_OF_VORTEX_START_THREAD) // fixed to 1 for now .build(); + // TODO[JIRA REEF-1000]: Consider exposing VortexMasterConf.FUTURE_CALLBACK_THREAD_POOL_SIZE. + // For now, use default value defined in the NamedParameter. + return Configurations.merge(vortexDriverConf, vortexMasterConf); } } http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java index c7c3e79..5c7f684 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -22,9 +22,9 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.util.Optional; +import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; -import org.apache.reef.wake.EventHandler; import java.io.Serializable; @@ -41,7 +41,7 @@ public interface VortexMaster { */ <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, final TInput input, - final Optional<EventHandler<TOutput>> callback); + final Optional<FutureCallback<TOutput>> callback); /** * Call this when a new worker is up and running. http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java index 9851fd8..da7c4ad 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java @@ -67,6 +67,13 @@ public final class VortexMasterConf extends ConfigurationModuleBuilder { } /** + * Size of threadpool for callbacks on {@link org.apache.reef.vortex.api.VortexFuture}. + */ + @NamedParameter(doc = "Size of threadpool for callbacks on VortexFuture.", default_value = "10") + final class CallbackThreadPoolSize implements Name<Integer> { + } + + /** * Number of Workers. */ public static final RequiredParameter<Integer> WORKER_NUM = new RequiredParameter<>(); @@ -97,6 +104,11 @@ public final class VortexMasterConf extends ConfigurationModuleBuilder { public static final RequiredParameter<Integer> NUM_OF_VORTEX_START_THREAD = new RequiredParameter<>(); /** + * Size of threadpool for callbacks on VortexFuture. + */ + public static final OptionalParameter<Integer> FUTURE_CALLBACK_THREAD_POOL_SIZE = new OptionalParameter<>(); + + /** * Vortex Master configuration. */ public static final ConfigurationModule CONF = new VortexMasterConf() @@ -106,5 +118,6 @@ public final class VortexMasterConf extends ConfigurationModuleBuilder { .bindNamedParameter(WorkerCapacity.class, WORKER_CAPACITY) .bindImplementation(VortexStart.class, VORTEX_START) .bindNamedParameter(NumberOfVortexStartThreads.class, NUM_OF_VORTEX_START_THREAD) + .bindNamedParameter(CallbackThreadPoolSize.class, FUTURE_CALLBACK_THREAD_POOL_SIZE) .build(); } http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java index ff3ae92..37298fa 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java @@ -19,9 +19,9 @@ package org.apache.reef.vortex.driver; import org.apache.reef.util.Optional; +import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; -import org.apache.reef.wake.EventHandler; import org.junit.Test; import java.util.*; @@ -45,19 +45,25 @@ public class DefaultVortexMasterTest { final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); final PendingTasklets pendingTasklets = new PendingTasklets(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); final AtomicBoolean callbackReceived = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); vortexMaster.workerAllocated(vortexWorkerManager1); - final EventHandler<Integer> testCallbackHandler = new EventHandler<Integer>() { + final FutureCallback<Integer> testCallbackHandler = new FutureCallback<Integer>() { @Override - public void onNext(final Integer value) { + public void onSuccess(final Integer integer) { callbackReceived.set(true); latch.countDown(); - }}; + } + + @Override + public void onFailure(final Throwable throwable) { + throw new RuntimeException("Did not expect exception in test.", throwable); + } + }; final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler)); @@ -81,12 +87,12 @@ public class DefaultVortexMasterTest { final VortexWorkerManager vortexWorkerManager2 = testUtil.newWorker(); final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); final PendingTasklets pendingTasklets = new PendingTasklets(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); // Allocate worker & tasklet and schedule vortexMaster.workerAllocated(vortexWorkerManager1); final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, - Optional.<EventHandler<Integer>>empty()); + Optional.<FutureCallback<Integer>>empty()); final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, 1); // Preemption! @@ -114,7 +120,7 @@ public class DefaultVortexMasterTest { final ArrayList<VortexFuture> vortexFutures = new ArrayList<>(); final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); final PendingTasklets pendingTasklets = new PendingTasklets(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); // Allocate iniital evaluators (will all be preempted later...) final List<VortexWorkerManager> initialWorkers = new ArrayList<>(); @@ -129,7 +135,7 @@ public class DefaultVortexMasterTest { final int numOfTasklets = 100; for (int i = 0; i < numOfTasklets; i++) { vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null, - Optional.<EventHandler<Integer>>empty())); + Optional.<FutureCallback<Integer>>empty())); } final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets); @@ -157,6 +163,47 @@ public class DefaultVortexMasterTest { } /** + * Test handling of single tasklet execution with a failure. + */ + @Test(timeout = 10000) + public void testTaskletThrowException() throws Exception { + final VortexFunction vortexFunction = testUtil.newIntegerFunction(); + final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final PendingTasklets pendingTasklets = new PendingTasklets(); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + + final AtomicBoolean callbackReceived = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + + vortexMaster.workerAllocated(vortexWorkerManager1); + + final FutureCallback<Integer> testCallbackHandler = new FutureCallback<Integer>() { + @Override + public void onSuccess(final Integer integer) { + throw new RuntimeException("Did not expect success in test."); + } + + @Override + public void onFailure(final Throwable throwable) { + callbackReceived.set(true); + latch.countDown(); + } + }; + + final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler)); + + final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1); + for (final int taskletId : taskletIds) { + vortexMaster.taskletErrored(vortexWorkerManager1.getId(), taskletId, new RuntimeException("Test exception")); + } + + assertTrue("The VortexFuture should be done", future.isDone()); + latch.await(); + assertTrue("Callback should have been received", callbackReceived.get()); + } + + /** * Launch specified number of tasklets as a substitute for PendingTaskletLauncher. * @return ids of launched tasklets */ http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java index 80ee597..fc333e3 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java @@ -23,6 +23,8 @@ import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; import java.io.Serializable; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.mock; @@ -31,9 +33,10 @@ import static org.mockito.Mockito.when; /** * Utility methods for tests. */ -public class TestUtil { +public final class TestUtil { private final AtomicInteger taskletId = new AtomicInteger(0); private final AtomicInteger workerId = new AtomicInteger(0); + private final Executor executor = Executors.newFixedThreadPool(5); /** * @return a new mocked worker. @@ -49,7 +52,7 @@ public class TestUtil { * @return a new dummy tasklet. */ public Tasklet newTasklet() { - return new Tasklet(taskletId.getAndIncrement(), null, null, new VortexFuture()); + return new Tasklet(taskletId.getAndIncrement(), null, null, new VortexFuture(executor)); } /** http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java index d47f427..f9068cd 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java @@ -19,12 +19,14 @@ package org.apache.reef.tests.applications.vortex; import org.apache.reef.tests.applications.vortex.addone.AddOneTest; +import org.apache.reef.tests.applications.vortex.exception.VortexExceptionTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @RunWith(Suite.class) @Suite.SuiteClasses({ - AddOneTest.class + AddOneTest.class, + VortexExceptionTest.class }) public final class VortexTestSuite { } http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java index 3ab4f5f..016e77f 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java @@ -19,10 +19,10 @@ package org.apache.reef.tests.applications.vortex.addone; import io.netty.util.internal.ConcurrentSet; +import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFuture; import org.apache.reef.vortex.api.VortexStart; import org.apache.reef.vortex.api.VortexThreadPool; -import org.apache.reef.wake.EventHandler; import org.junit.Assert; import javax.inject.Inject; @@ -54,12 +54,17 @@ public final class AddOneCallbackTestStart implements VortexStart { final AddOneFunction addOneFunction = new AddOneFunction(); for (final int i : inputVector) { - futures.add(vortexThreadPool.submit(addOneFunction, i, new EventHandler<Integer>() { + futures.add(vortexThreadPool.submit(addOneFunction, i, new FutureCallback<Integer>() { @Override - public void onNext(final Integer value) { - outputSet.add(value - 1); + public void onSuccess(final Integer result) { + outputSet.add(result - 1); latch.countDown(); } + + @Override + public void onFailure(final Throwable t) { + throw new RuntimeException("Did not expect exception in test.", t); + } })); } http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java new file mode 100644 index 0000000..48cc5e0 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionCallbackTestStart.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.applications.vortex.exception; + +import org.apache.reef.vortex.api.FutureCallback; +import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.api.VortexStart; +import org.apache.reef.vortex.api.VortexThreadPool; +import org.junit.Assert; + +import javax.inject.Inject; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + +/** + * Test that an {@link ExecutionException} is thrown on tasklets that throws an Exception, and + * that the callback handler gets invoked on failure. + */ +public final class ExceptionCallbackTestStart implements VortexStart { + + @Inject + private ExceptionCallbackTestStart() { + } + + @Override + public void start(final VortexThreadPool vortexThreadPool) { + final CountDownLatch latch = new CountDownLatch(1); + + final ExceptionFunction exceptionFunction = new ExceptionFunction(); + + final VortexFuture<Integer> future = vortexThreadPool.submit(exceptionFunction, 1, new FutureCallback<Integer>() { + @Override + public void onSuccess(final Integer result) { + throw new RuntimeException("Did not expect success in test."); + } + + @Override + public void onFailure(final Throwable t) { + latch.countDown(); + } + }); + + boolean gotFailure = false; + try { + latch.await(); + future.get(); + } catch (final InterruptedException e) { + e.printStackTrace(); + Assert.fail(); + } catch (final ExecutionException ex) { + gotFailure = true; + } + + Assert.assertTrue(gotFailure); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java new file mode 100644 index 0000000..a6a5737 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.applications.vortex.exception; + +import org.apache.reef.vortex.api.VortexFunction; + +/** + * A test Vortex function that throws an Exception. + */ +public final class ExceptionFunction implements VortexFunction<Integer, Integer> { + @Override + public Integer call(final Integer integer) throws Exception { + throw new RuntimeException("Expected test exception."); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java new file mode 100644 index 0000000..948fc04 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/VortexExceptionTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.tests.applications.vortex.exception; + +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tests.TestEnvironment; +import org.apache.reef.tests.TestEnvironmentFactory; +import org.apache.reef.vortex.driver.VortexConfHelper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test(s) for tasklets that throw Exceptions. + */ +public final class VortexExceptionTest { + private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment(); + + /** + * Set up the test environment. + */ + @Before + public void setUp() throws Exception { + this.testEnvironment.setUp(); + } + + /** + * Tear down the test environment. + */ + @After + public void tearDown() throws Exception { + this.testEnvironment.tearDown(); + } + + /** + * Tests that callback is invoked on Vortex Tasklet failure. + */ + @Test + public void testVortexExceptionCallback() { + final Configuration conf = + VortexConfHelper.getVortexConf("TEST_Vortex_ExceptionCallbackTest", + ExceptionCallbackTestStart.class, 2, 64, 4, 2000); + final LauncherStatus status = this.testEnvironment.run(conf); + Assert.assertTrue("Job state after execution: " + status, status.isSuccess()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/7d5a1416/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java new file mode 100644 index 0000000..bf800a1 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Vortex Exception test. + */ +package org.apache.reef.tests.applications.vortex.exception; \ No newline at end of file
