Repository: reef Updated Branches: refs/heads/master 766883f0c -> fe6194cac
[REEF-502] Support Vortex Tasklet(s) cancellation by user JIRA: [REEF-502](https://issues.apache.org/jira/browse/REEF-502) Pull Request: Closes #708 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/fe6194ca Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/fe6194ca Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/fe6194ca Branch: refs/heads/master Commit: fe6194cacbfa38714a295790fc302d5c86a01726 Parents: 766883f Author: Andrew Chung <[email protected]> Authored: Fri Dec 4 17:38:34 2015 -0800 Committer: Byung-Gon Chun <[email protected]> Committed: Thu Dec 10 18:46:11 2015 +0900 ---------------------------------------------------------------------- .../src/main/avro/VortexRequest.avsc | 18 +++- .../reef-vortex/src/main/avro/WorkerReport.avsc | 22 ++-- .../apache/reef/vortex/api/VortexFuture.java | 101 +++++++++++++++++-- .../common/TaskletCancellationRequest.java | 43 ++++++++ .../vortex/common/TaskletCancelledReport.java | 46 +++++++++ .../vortex/common/TaskletExecutionRequest.java | 1 + .../vortex/common/TaskletFailureReport.java | 1 + .../reef/vortex/common/TaskletResultReport.java | 1 + .../reef/vortex/common/VortexAvroUtils.java | 45 +++++++-- .../reef/vortex/common/VortexRequest.java | 8 +- .../apache/reef/vortex/common/WorkerReport.java | 6 ++ .../reef/vortex/driver/DefaultVortexMaster.java | 24 ++++- .../vortex/driver/FirstFitSchedulingPolicy.java | 6 ++ .../reef/vortex/driver/PendingTasklets.java | 2 +- .../vortex/driver/RandomSchedulingPolicy.java | 8 ++ .../reef/vortex/driver/RunningWorkers.java | 86 ++++++++++++++-- .../reef/vortex/driver/SchedulingPolicy.java | 5 + .../org/apache/reef/vortex/driver/Tasklet.java | 7 ++ .../apache/reef/vortex/driver/VortexDriver.java | 5 + .../apache/reef/vortex/driver/VortexMaster.java | 13 +++ .../reef/vortex/driver/VortexWorkerManager.java | 13 +++ .../reef/vortex/evaluator/VortexWorker.java | 93 +++++++++++------ .../vortex/driver/DefaultVortexMasterTest.java | 52 ++++++++++ .../org/apache/reef/vortex/driver/TestUtil.java | 42 +++++++- .../applications/vortex/VortexTestSuite.java | 4 +- .../InfiniteLoopWithCancellationFunction.java | 44 ++++++++ .../cancellation/TaskletCancellationTest.java | 62 ++++++++++++ .../TaskletCancellationTestStart.java | 72 +++++++++++++ .../vortex/cancellation/package-info.java | 23 +++++ 29 files changed, 780 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc index 2453761..bf4396e 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc @@ -30,10 +30,24 @@ { "namespace": "org.apache.reef.vortex.common.avro", "type": "record", + "name": "AvroTaskletCancellationRequest", + "fields": [{"name": "taskletId", "type": "int"}] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", "name": "AvroVortexRequest", "fields": [ - {"name": "requestType", "type": {"type": "enum", "name": "AvroRequestType", "symbols": ["ExecuteTasklet"]}}, - {"name": "taskletExecutionRequest", "type": ["null", "AvroTaskletExecutionRequest"], "default": null} + { + "name": "requestType", + "type": {"type": "enum", "name": "AvroRequestType", + "symbols": ["ExecuteTasklet", "CancelTasklet"]} + }, + { + "name": "taskletRequest", + "type": ["null", "AvroTaskletExecutionRequest", "AvroTaskletCancellationRequest"], + "default": null + } ] } ] http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc index 19a655d..11adb12 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc @@ -29,6 +29,14 @@ { "namespace": "org.apache.reef.vortex.common.avro", "type": "record", + "name": "AvroTaskletCancelledReport", + "fields": [ + {"name": "taskletId", "type": "int"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", "name": "AvroTaskletFailureReport", "fields": [ {"name": "taskletId", "type": "int"}, @@ -42,15 +50,15 @@ "fields": [ { "name": "reportType", - "type": {"type": "enum", "name": "AvroReportType", "symbols": ["TaskletResult", "TaskletFailure"]} - }, - { - "name": "taskletResult", - "type": ["null", "AvroTaskletResultReport"], "default": null + "type": { + "type": "enum", + "name": "AvroReportType", + "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]} }, { - "name": "taskletFailure", - "type": ["null", "AvroTaskletFailureReport"], "default": null + "name": "taskletReport", + "type": ["null", "AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"], + "default": null } ] } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java index 34cf6b6..019eeb0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java @@ -21,28 +21,37 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.Private; import org.apache.reef.util.Optional; +import org.apache.reef.vortex.driver.VortexMaster; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; /** * The interface between user code and submitted task. */ @Unstable public final class VortexFuture<TOutput> implements Future<TOutput> { + private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName()); + // userResult starts out as null. If not null => variable is set and tasklet returned. // Otherwise tasklet has not completed. private Optional<TOutput> userResult = null; private Exception userException; + private AtomicBoolean cancelled = new AtomicBoolean(false); private final CountDownLatch countDownLatch = new CountDownLatch(1); private final FutureCallback<TOutput> callbackHandler; private final Executor executor; + private final VortexMaster vortexMaster; + private final int taskletId; /** * Creates a {@link VortexFuture}. */ @Private - public VortexFuture(final Executor executor) { - this(executor, null); + public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId) { + this(executor, vortexMaster, taskletId, null); } /** @@ -50,25 +59,71 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { */ @Private public VortexFuture(final Executor executor, + final VortexMaster vortexMaster, + final int taskletId, final FutureCallback<TOutput> callbackHandler) { this.executor = executor; + this.vortexMaster = vortexMaster; + this.taskletId = taskletId; this.callbackHandler = callbackHandler; } /** - * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user. + * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed. + * @return true if task did not start or was cancelled, false if task failed or completed */ @Override public boolean cancel(final boolean mayInterruptIfRunning) { - throw new UnsupportedOperationException("Cancel not yet supported"); + try { + return cancel(mayInterruptIfRunning, Optional.<Long>empty(), Optional.<TimeUnit>empty()); + } catch (final TimeoutException e) { + // This should never happen. + LOG.log(Level.WARNING, "Received a TimeoutException in VortexFuture.cancel(). Should not have occurred."); + return false; + } } /** - * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user. + * Sends a cancel signal and blocks and waits until the task is cancelled, completed, or failed, or + * if the timeout has expired. + * @return true if task did not start or was cancelled, false if task failed or completed + */ + public boolean cancel(final boolean mayInterruptIfRunning, final long timeout, final TimeUnit unit) + throws TimeoutException { + return cancel(mayInterruptIfRunning, Optional.of(timeout), Optional.of(unit)); + } + + private boolean cancel(final boolean mayInterruptIfRunning, + final Optional<Long> timeout, + final Optional<TimeUnit> unit) throws TimeoutException { + if (isDone()) { + return isCancelled(); + } + + vortexMaster.cancelTasklet(mayInterruptIfRunning, taskletId); + + try { + if (timeout.isPresent() && unit.isPresent()) { + if (!countDownLatch.await(timeout.get(), unit.get())) { + throw new TimeoutException(); + } + } else { + countDownLatch.await(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + + return isCancelled(); + } + + /** + * @return true if the task is cancelled, false if not. */ @Override public boolean isCancelled() { - throw new UnsupportedOperationException("Cancel not yet supported"); + return cancelled.get(); } /** @@ -88,8 +143,12 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { if (userResult != null) { return userResult.get(); } else { - assert userException != null; - throw new ExecutionException(userException); + assert this.cancelled.get() || userException != null; + if (userException != null) { + throw new ExecutionException(userException); + } + + throw new ExecutionException(new InterruptedException("Task was cancelled.")); } } @@ -106,8 +165,12 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { if (userResult != null) { return userResult.get(); } else { - assert userException != null; - throw new ExecutionException(userException); + assert this.cancelled.get() || userException != null; + if (userException != null) { + throw new ExecutionException(userException); + } + + throw new ExecutionException(new InterruptedException("Task was cancelled.")); } } @@ -130,6 +193,7 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { /** * Called by VortexMaster to let the user know that the task threw an exception. */ + @Private public void threwException(final Exception exception) { this.userException = exception; if (callbackHandler != null) { @@ -142,4 +206,21 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { } this.countDownLatch.countDown(); } + + /** + * Called by VortexMaster to let the user know that the task was cancelled. + */ + @Private + public void cancelled() { + this.cancelled.set(true); + if (callbackHandler != null) { + executor.execute(new Runnable() { + @Override + public void run() { + callbackHandler.onFailure(new InterruptedException("VortexFuture has been cancelled on request.")); + } + }); + } + this.countDownLatch.countDown(); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java new file mode 100644 index 0000000..8f725a8 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; + +/** + * A {@link VortexRequest} to cancel tasklets. + */ +@Unstable +public final class TaskletCancellationRequest implements VortexRequest { + private final int taskletId; + + public TaskletCancellationRequest(final int taskletId) { + this.taskletId = taskletId; + } + + @Override + public int getTaskletId() { + return taskletId; + } + + @Override + public RequestType getType() { + return RequestType.CancelTasklet; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java new file mode 100644 index 0000000..b51219b --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; + +/** + * The report of a cancelled Tasklet. + */ +@Unstable +public final class TaskletCancelledReport implements WorkerReport { + private int taskletId; + + /** + * @param taskletId of the cancelled tasklet. + */ + public TaskletCancelledReport(final int taskletId) { + this.taskletId = taskletId; + } + + @Override + public WorkerReportType getType() { + return WorkerReportType.TaskletCancelled; + } + + @Override + public int getTaskletId() { + return taskletId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java index 961b574..8e43e4b 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java @@ -62,6 +62,7 @@ public final class TaskletExecutionRequest<TInput extends Serializable, TOutput /** * Get id of the tasklet. */ + @Override public int getTaskletId() { return taskletId; } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java index dc847e7..96df2e1 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java @@ -48,6 +48,7 @@ public final class TaskletFailureReport implements WorkerReport { /** * @return the id of the tasklet. */ + @Override public int getTaskletId() { return taskletId; } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java index 0a6fab3..cd3a597 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java @@ -50,6 +50,7 @@ public final class TaskletResultReport<TOutput extends Serializable> implements /** * @return the id of the tasklet. */ + @Override public int getTaskletId() { return taskletId; } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java index c3d01de..4ab4cb5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -54,7 +54,7 @@ public final class VortexAvroUtils { final byte[] serializedFunction = SerializationUtils.serialize(taskletExecutionRequest.getFunction()); avroVortexRequest = AvroVortexRequest.newBuilder() .setRequestType(AvroRequestType.ExecuteTasklet) - .setTaskletExecutionRequest( + .setTaskletRequest( AvroTaskletExecutionRequest.newBuilder() .setTaskletId(taskletExecutionRequest.getTaskletId()) .setSerializedInput(ByteBuffer.wrap(serializedInput)) @@ -62,6 +62,16 @@ public final class VortexAvroUtils { .build()) .build(); break; + case CancelTasklet: + final TaskletCancellationRequest taskletCancellationRequest = (TaskletCancellationRequest) vortexRequest; + avroVortexRequest = AvroVortexRequest.newBuilder() + .setRequestType(AvroRequestType.CancelTasklet) + .setTaskletRequest( + AvroTaskletCancellationRequest.newBuilder() + .setTaskletId(taskletCancellationRequest.getTaskletId()) + .build()) + .build(); + break; default: throw new RuntimeException("Undefined message type"); } @@ -85,19 +95,29 @@ public final class VortexAvroUtils { final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult()); avroWorkerReport = AvroWorkerReport.newBuilder() .setReportType(AvroReportType.TaskletResult) - .setTaskletResult( + .setTaskletReport( AvroTaskletResultReport.newBuilder() .setTaskletId(taskletResultReport.getTaskletId()) .setSerializedOutput(ByteBuffer.wrap(serializedOutput)) .build()) .build(); break; + case TaskletCancelled: + final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) workerReport; + avroWorkerReport = AvroWorkerReport.newBuilder() + .setReportType(AvroReportType.TaskletCancelled) + .setTaskletReport( + AvroTaskletCancelledReport.newBuilder() + .setTaskletId(workerReport.getTaskletId()) + .build()) + .build(); + break; case TaskletFailure: final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport; final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException()); avroWorkerReport = AvroWorkerReport.newBuilder() .setReportType(AvroReportType.TaskletFailure) - .setTaskletFailure( + .setTaskletReport( AvroTaskletFailureReport.newBuilder() .setTaskletId(taskletFailureReport.getTaskletId()) .setSerializedException(ByteBuffer.wrap(serializedException)) @@ -123,7 +143,8 @@ public final class VortexAvroUtils { final VortexRequest vortexRequest; switch (avroVortexRequest.getRequestType()) { case ExecuteTasklet: - final AvroTaskletExecutionRequest taskletExecutionRequest = avroVortexRequest.getTaskletExecutionRequest(); + final AvroTaskletExecutionRequest taskletExecutionRequest = + (AvroTaskletExecutionRequest)avroVortexRequest.getTaskletRequest(); // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction final VortexFunction function = (VortexFunction) SerializationUtils.deserialize( @@ -134,6 +155,11 @@ public final class VortexAvroUtils { taskletExecutionRequest.getSerializedInput().array()); vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, input); break; + case CancelTasklet: + final AvroTaskletCancellationRequest taskletCancellationRequest = + (AvroTaskletCancellationRequest)avroVortexRequest.getTaskletRequest(); + vortexRequest = new TaskletCancellationRequest(taskletCancellationRequest.getTaskletId()); + break; default: throw new RuntimeException("Undefined VortexRequest type"); } @@ -150,14 +176,21 @@ public final class VortexAvroUtils { final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class); switch (avroWorkerReport.getReportType()) { case TaskletResult: - final AvroTaskletResultReport taskletResultReport = avroWorkerReport.getTaskletResult(); + final AvroTaskletResultReport taskletResultReport = + (AvroTaskletResultReport)avroWorkerReport.getTaskletReport(); // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. final Serializable output = (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array()); workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output); break; + case TaskletCancelled: + final AvroTaskletCancelledReport taskletCancelledReport = + (AvroTaskletCancelledReport)avroWorkerReport.getTaskletReport(); + workerReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId()); + break; case TaskletFailure: - final AvroTaskletFailureReport taskletFailureReport = avroWorkerReport.getTaskletFailure(); + final AvroTaskletFailureReport taskletFailureReport = + (AvroTaskletFailureReport)avroWorkerReport.getTaskletReport(); final Exception exception = (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java index 51061c2..5d59a96 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java @@ -31,10 +31,16 @@ public interface VortexRequest extends Serializable { * Type of Request. */ enum RequestType { - ExecuteTasklet + ExecuteTasklet, + CancelTasklet } /** + * @return the ID of the VortexTasklet associated with this VortexRequest. + */ + int getTaskletId(); + + /** * @return the type of this VortexRequest. */ RequestType getType(); http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java index 7e320cb..192299a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java @@ -32,6 +32,7 @@ public interface WorkerReport extends Serializable { */ enum WorkerReportType { TaskletResult, + TaskletCancelled, TaskletFailure } @@ -39,4 +40,9 @@ public interface WorkerReport extends Serializable { * @return the type of this WorkerReport. */ WorkerReportType getType(); + + /** + * @return the taskletId of this WorkerReport. + */ + int getTaskletId(); } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index a4b5ef0..cb62049 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -66,17 +66,27 @@ final class DefaultVortexMaster implements VortexMaster { final Optional<FutureCallback<TOutput>> callback) { // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch. final VortexFuture<TOutput> vortexFuture; + final int id = taskletIdCounter.getAndIncrement(); if (callback.isPresent()) { - vortexFuture = new VortexFuture<>(executor, callback.get()); + vortexFuture = new VortexFuture<>(executor, this, id, callback.get()); } else { - vortexFuture = new VortexFuture<>(executor); + vortexFuture = new VortexFuture<>(executor, this, id); } - this.pendingTasklets.addLast(new Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture)); + final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture); + this.pendingTasklets.addLast(tasklet); return vortexFuture; } /** + * Cancels tasklets on the running workers. + */ + @Override + public void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId) { + this.runningWorkers.cancelTasklet(mayInterruptIfRunning, taskletId); + } + + /** * Add a new worker to runningWorkers. */ @Override @@ -116,6 +126,14 @@ final class DefaultVortexMaster implements VortexMaster { } /** + * Notify tasklet cancellation to runningWorkers. + */ + @Override + public void taskletCancelled(final String workerId, final int taskletId) { + runningWorkers.taskletCancelled(workerId, taskletId); + } + + /** * Terminate the job. */ @Override http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java index 6dfd8bb..8b28eab 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java @@ -143,6 +143,12 @@ class FirstFitSchedulingPolicy implements SchedulingPolicy { removeTasklet(workerId); } + @Override + public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + final String workerId = vortexWorker.getId(); + removeTasklet(workerId); + } + private void removeTasklet(final String workerId) { if (idLoadMap.containsKey(workerId)) { idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1)); http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java index eac4fc6..9b3e318 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTasklets.java @@ -48,4 +48,4 @@ final class PendingTasklets { Tasklet takeFirst() throws InterruptedException { return pendingTasklets.takeFirst(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java index 2f71cd7..ffa2fbf 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java @@ -102,4 +102,12 @@ class RandomSchedulingPolicy implements SchedulingPolicy { // Do nothing } + /** + * Do nothing. + */ + @Override + public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + // Do nothing + } + } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java index eebac5f..7b05c0c 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -30,6 +30,8 @@ import java.util.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Keeps track of all running VortexWorkers and Tasklets. @@ -38,8 +40,12 @@ import java.util.concurrent.locks.ReentrantLock; @ThreadSafe @DriverSide final class RunningWorkers { + private static final Logger LOG = Logger.getLogger(RunningWorkers.class.getName()); + // RunningWorkers and its locks private final HashMap<String, VortexWorkerManager> runningWorkers = new HashMap<>(); // Running workers/tasklets + private final Set<Integer> taskletsToCancel = new HashSet<>(); + private final Lock lock = new ReentrantLock(); private final Condition noWorkerOrResource = lock.newCondition(); @@ -132,6 +138,14 @@ final class RunningWorkers { } } + // TODO[JIRA REEF-500]: Will need to support duplicate tasklets. + if (taskletsToCancel.contains(tasklet.getId())) { + tasklet.cancelled(); + taskletsToCancel.remove(tasklet.getId()); + LOG.log(Level.FINE, "Cancelled tasklet {0}.", tasklet.getId()); + return; + } + final VortexWorkerManager vortexWorkerManager = runningWorkers.get(workerId.get()); vortexWorkerManager.launchTasklet(tasklet); schedulingPolicy.taskletLaunched(vortexWorkerManager, tasklet); @@ -143,6 +157,30 @@ final class RunningWorkers { /** * Concurrency: Called by multiple threads. + * Parameter: Same taskletId can come in multiple times. + */ + void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId) { + lock.lock(); + try { + // This is not ideal since we are using a linear time search on all the workers. + final String workerId = getWhereTaskletWasScheduledTo(taskletId); + if (workerId == null) { + // launchTasklet called but not yet running. + taskletsToCancel.add(taskletId); + return; + } + + if (mayInterruptIfRunning) { + LOG.log(Level.FINE, "Cancelling running Tasklet with ID {0}.", taskletId); + runningWorkers.get(workerId).cancelTasklet(taskletId); + } + } finally { + lock.unlock(); + } + } + + /** + * Concurrency: Called by multiple threads. * Parameter: Same arguments can come in multiple times. * (e.g. preemption message coming before tasklet completion message multiple times) */ @@ -158,6 +196,8 @@ final class RunningWorkers { // Notify (possibly) waiting scheduler noWorkerOrResource.signal(); + + taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak. } } finally { lock.unlock(); @@ -181,6 +221,32 @@ final class RunningWorkers { // Notify (possibly) waiting scheduler noWorkerOrResource.signal(); + + taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak. + } + } finally { + lock.unlock(); + } + } + + /** + * Concurrency: Called by multiple threads. + * Parameter: Same arguments can come in multiple times. + * (e.g. preemption message coming before tasklet error message multiple times) + */ + void taskletCancelled(final String workerId, + final int taskletId) { + lock.lock(); + try { + if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before + final VortexWorkerManager worker = this.runningWorkers.get(workerId); + final Tasklet tasklet = worker.taskletCancelled(taskletId); + this.schedulingPolicy.taskletCancelled(worker, tasklet); + + // Notify (possibly) waiting scheduler + noWorkerOrResource.signal(); + + taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak. } } finally { lock.unlock(); @@ -209,17 +275,8 @@ final class RunningWorkers { return terminated; } - ///////////////////////////////////////// For Tests Only - - /** - * For unit tests to check whether the worker is running. - */ - boolean isWorkerRunning(final String workerId) { - return runningWorkers.containsKey(workerId); - } - /** - * For unit tests to see where a tasklet is scheduled to. + * Find where a tasklet is scheduled to. * @param taskletId id of the tasklet in question * @return id of the worker (null if the tasklet was not scheduled to any worker) */ @@ -233,4 +290,13 @@ final class RunningWorkers { } return null; } + + ///////////////////////////////////////// For Tests Only + + /** + * For unit tests to check whether the worker is running. + */ + boolean isWorkerRunning(final String workerId) { + return runningWorkers.containsKey(workerId); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java index cef1cb6..a058f6f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java @@ -57,4 +57,9 @@ interface SchedulingPolicy { * Tasklet failed. */ void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet); + + /** + * Tasklet cancelled. + */ + void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet); } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java index 6cd6c44..b0138d0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java @@ -80,6 +80,13 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> { } /** + * Called by VortexMaster to let the user know that the task has been cancelled. + */ + void cancelled(){ + vortexFuture.cancelled(); + } + + /** * For tests. */ boolean isCompleted() { http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index 352cdbd..75674cc 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -28,6 +28,7 @@ import org.apache.reef.tang.Configurations; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.annotations.Unit; import org.apache.reef.vortex.api.VortexStart; +import org.apache.reef.vortex.common.TaskletCancelledReport; import org.apache.reef.vortex.common.TaskletFailureReport; import org.apache.reef.vortex.common.TaskletResultReport; import org.apache.reef.vortex.common.VortexAvroUtils; @@ -160,6 +161,10 @@ final class VortexDriver { final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport; vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult()); break; + case TaskletCancelled: + final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport)workerReport; + vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId()); + break; case TaskletFailure: final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport; vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java index 5c7f684..38bbcc5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -44,6 +44,14 @@ public interface VortexMaster { final Optional<FutureCallback<TOutput>> callback); /** + * Call this when a Tasklet is to be cancelled. + * @param mayInterruptIfRunning if true, will attempt to cancel running Tasklets; otherwise will only + * prevent a pending Tasklet from running. + * @param taskletId the ID of the Tasklet. + */ + void cancelTasklet(final boolean mayInterruptIfRunning, final int taskletId); + + /** * Call this when a new worker is up and running. */ void workerAllocated(final VortexWorkerManager vortexWorkerManager); @@ -64,6 +72,11 @@ public interface VortexMaster { void taskletErrored(final String workerId, final int taskletId, final Exception exception); /** + * Call this when a Tasklet is cancelled and the cancellation is honored. + */ + void taskletCancelled(final String workerId, final int taskletId); + + /** * Release all resources and shut down. */ void terminate(); http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java index f019668..76d6cec 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java @@ -21,6 +21,7 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.NotThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.vortex.common.TaskletCancellationRequest; import org.apache.reef.vortex.common.TaskletExecutionRequest; import java.io.Serializable; @@ -51,6 +52,11 @@ class VortexWorkerManager { vortexRequestor.send(reefTask, taskletExecutionRequest); } + void cancelTasklet(final int taskletId) { + final TaskletCancellationRequest cancellationRequest = new TaskletCancellationRequest(taskletId); + vortexRequestor.send(reefTask, cancellationRequest); + } + <TOutput extends Serializable> Tasklet taskletCompleted(final Integer taskletId, final TOutput result) { final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId); assert tasklet != null; // Tasklet should complete/error only once @@ -65,6 +71,13 @@ class VortexWorkerManager { return tasklet; } + Tasklet taskletCancelled(final Integer taskletId) { + final Tasklet tasklet = runningTasklets.remove(taskletId); + assert tasklet != null; // Tasklet should finish only once. + tasklet.cancelled(); + return tasklet; + } + Collection<Tasklet> removed() { return runningTasklets.isEmpty() ? null : runningTasklets.values(); } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index e65830c..10d23a8 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -36,6 +36,8 @@ import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.io.Serializable; import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Receives commands from VortexMaster, executes them, and returns the results. @@ -45,6 +47,7 @@ import java.util.concurrent.*; @Unit @TaskSide public final class VortexWorker implements Task, TaskMessageSource { + private static final Logger LOG = Logger.getLogger(VortexWorker.class.getName()); private static final String MESSAGE_SOURCE_ID = ""; // empty string as there is no use for it private final BlockingDeque<byte[]> pendingRequests = new LinkedBlockingDeque<>(); @@ -68,6 +71,7 @@ public final class VortexWorker implements Task, TaskMessageSource { public byte[] call(final byte[] memento) throws Exception { final ExecutorService schedulerThread = Executors.newSingleThreadExecutor(); final ExecutorService commandExecutor = Executors.newFixedThreadPool(numOfThreads); + final ConcurrentMap<Integer, Future> futures = new ConcurrentHashMap<>(); // Scheduling thread starts schedulerThread.execute(new Runnable() { @@ -82,37 +86,66 @@ public final class VortexWorker implements Task, TaskMessageSource { throw new RuntimeException(e); } - // Scheduler Thread: Pass the command to the worker thread pool to be executed - commandExecutor.execute(new Runnable() { - @Override - public void run() { - // Command Executor: Deserialize the command - final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message); - switch (vortexRequest.getType()) { - case ExecuteTasklet: - final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; - try { - // Command Executor: Execute the command - final Serializable result = taskletExecutionRequest.execute(); - - // Command Executor: Tasklet successfully returns result - final WorkerReport report = - new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); - workerReports.addLast(VortexAvroUtils.toBytes(report)); - } catch (Exception e) { - // Command Executor: Tasklet throws an exception - final WorkerReport report = - new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); - workerReports.addLast(VortexAvroUtils.toBytes(report)); - } - - heartBeatTriggerManager.triggerHeartBeat(); - break; - default: - throw new RuntimeException("Unknown Command"); + // Command Executor: Deserialize the command + final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message); + + switch (vortexRequest.getType()) { + case ExecuteTasklet: + final CountDownLatch latch = new CountDownLatch(1); + + // Scheduler Thread: Pass the command to the worker thread pool to be executed + // Record future to support cancellation. + futures.put( + vortexRequest.getTaskletId(), + commandExecutor.submit(new Runnable() { + @Override + public void run() { + final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; + try { + // Command Executor: Execute the command + final Serializable result = taskletExecutionRequest.execute(); + final WorkerReport report = + new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); + workerReports.addLast(VortexAvroUtils.toBytes(report)); + } catch (final InterruptedException ex) { + // Assumes that user's thread follows convention that cancelled Futures + // should throw InterruptedException. + final WorkerReport report = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); + LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId()); + workerReports.addLast(VortexAvroUtils.toBytes(report)); + } catch (Exception e) { + // Command Executor: Tasklet throws an exception + final WorkerReport report = + new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); + workerReports.addLast(VortexAvroUtils.toBytes(report)); + } + + try { + latch.await(); + } catch (final InterruptedException e) { + LOG.log(Level.SEVERE, "Cannot wait for Future to be put."); + throw new RuntimeException(e); + } + + futures.remove(vortexRequest.getTaskletId()); + heartBeatTriggerManager.triggerHeartBeat(); + } + })); + + // Signal that future is put. + latch.countDown(); + break; + case CancelTasklet: + LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", vortexRequest.getTaskletId()); + final Future future = futures.get(vortexRequest.getTaskletId()); + if (future != null) { + future.cancel(true); } - } - }); + + break; + default: + throw new RuntimeException("Unknown Command"); + } } } }); http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java index 37298fa..6c2162a 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java @@ -26,6 +26,8 @@ import org.junit.Test; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.*; @@ -204,6 +206,56 @@ public class DefaultVortexMasterTest { } /** + * Test handling of single tasklet execution with a cancellation after launch. + */ + @Test(timeout = 10000) + public void testSingleTaskletCancellation() throws Exception { + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final PendingTasklets pendingTasklets = new PendingTasklets(); + final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets); + launchTasklets(runningWorkers, pendingTasklets, 1); + + assertTrue(future.cancel(true)); + assertTrue("The VortexFuture should be cancelled.", future.isCancelled()); + assertTrue("The VortexFuture should be done", future.isDone()); + } + + /** + * Test handling of single tasklet execution with a cancellation before launch. + */ + @Test(timeout = 10000) + public void testSingleTaskletCancellationBeforeLaunch() throws Exception { + + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final PendingTasklets pendingTasklets = new PendingTasklets(); + final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets); + + try { + future.cancel(true, 100, TimeUnit.MILLISECONDS); + fail(); + } catch (final TimeoutException e) { + // TimeoutException is expected. + } + + launchTasklets(runningWorkers, pendingTasklets, 1); + assertTrue(future.cancel(true)); + assertTrue("The VortexFuture should be cancelled.", future.isCancelled()); + assertTrue("The VortexFuture should be done", future.isDone()); + } + + private VortexFuture createTaskletCancellationFuture(final RunningWorkers runningWorkers, + final PendingTasklets pendingTasklets) { + final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction(); + final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); + + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + + // Allocate worker & tasklet and schedule + vortexMaster.workerAllocated(vortexWorkerManager1); + return vortexMaster.enqueueTasklet(vortexFunction, null, Optional.<FutureCallback<Integer>>empty()); + } + + /** * Launch specified number of tasklets as a substitute for PendingTaskletLauncher. * @return ids of launched tasklets */ http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java index fc333e3..8704b0b 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java @@ -21,12 +21,18 @@ package org.apache.reef.vortex.driver; import org.apache.reef.driver.task.RunningTask; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.common.TaskletCancellationRequest; +import org.apache.reef.vortex.common.VortexRequest; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.Serializable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,6 +43,7 @@ public final class TestUtil { private final AtomicInteger taskletId = new AtomicInteger(0); private final AtomicInteger workerId = new AtomicInteger(0); private final Executor executor = Executors.newFixedThreadPool(5); + private final VortexMaster vortexMaster = mock(VortexMaster.class); /** * @return a new mocked worker. @@ -45,14 +52,28 @@ public final class TestUtil { final RunningTask reefTask = mock(RunningTask.class); when(reefTask.getId()).thenReturn("worker" + String.valueOf(workerId.getAndIncrement())); final VortexRequestor vortexRequestor = mock(VortexRequestor.class); - return new VortexWorkerManager(vortexRequestor, reefTask); + final VortexWorkerManager workerManager = new VortexWorkerManager(vortexRequestor, reefTask); + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final VortexRequest request = (VortexRequest)invocation.getArguments()[1]; + if (request instanceof TaskletCancellationRequest) { + workerManager.taskletCancelled(request.getTaskletId()); + } + + return null; + } + }).when(vortexRequestor).send(any(RunningTask.class), any(VortexRequest.class)); + + return workerManager; } /** * @return a new dummy tasklet. */ public Tasklet newTasklet() { - return new Tasklet(taskletId.getAndIncrement(), null, null, new VortexFuture(executor)); + final int id = taskletId.getAndIncrement(); + return new Tasklet(id, null, null, new VortexFuture(executor, vortexMaster, id)); } /** @@ -68,6 +89,23 @@ public final class TestUtil { } /** + * @return a new dummy function. + */ + public VortexFunction newInfiniteLoopFunction() { + return new VortexFunction() { + @Override + public Serializable call(final Serializable serializable) throws Exception { + while(true) { + Thread.sleep(100); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + } + } + }; + } + + /** * @return a dummy integer-integer function. */ public VortexFunction<Integer, Integer> newIntegerFunction() { http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java index f9068cd..fca17f9 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java @@ -20,13 +20,15 @@ package org.apache.reef.tests.applications.vortex; import org.apache.reef.tests.applications.vortex.addone.AddOneTest; import org.apache.reef.tests.applications.vortex.exception.VortexExceptionTest; +import org.apache.reef.tests.applications.vortex.cancellation.TaskletCancellationTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @RunWith(Suite.class) @Suite.SuiteClasses({ AddOneTest.class, - VortexExceptionTest.class + VortexExceptionTest.class, + TaskletCancellationTest.class }) public final class VortexTestSuite { } http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java new file mode 100644 index 0000000..f1c982e --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.tests.applications.vortex.cancellation; + +import org.apache.reef.vortex.api.VortexFunction; + +import java.io.Serializable; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Runs an infinite loop and waits for cancellation. + */ +public final class InfiniteLoopWithCancellationFunction implements VortexFunction { + private static final Logger LOG = Logger.getLogger(InfiniteLoopWithCancellationFunction.class.getName()); + + @Override + public Serializable call(final Serializable serializable) throws Exception { + LOG.log(Level.FINE, "Entered Infinite Loop Tasklet."); + while (true) { + Thread.sleep(100); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Expected exception!"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java new file mode 100644 index 0000000..d8ea09c --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.tests.applications.vortex.cancellation; + +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tests.TestEnvironment; +import org.apache.reef.tests.TestEnvironmentFactory; +import org.apache.reef.vortex.driver.VortexConfHelper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests the cancellation of a tasklet. + */ +public final class TaskletCancellationTest { + private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment(); + + /** + * Set up the test environment. + */ + @Before + public void setUp() throws Exception { + this.testEnvironment.setUp(); + } + + /** + * Tear down the test environment. + */ + @After + public void tearDown() throws Exception { + this.testEnvironment.tearDown(); + } + + @Test + public void testVortexTaskletCancellation() { + final Configuration conf = + VortexConfHelper.getVortexConf( + "TEST_Vortex_TaskletCancellationTest", TaskletCancellationTestStart.class, 2, 64, 4, 2000); + final LauncherStatus status = this.testEnvironment.run(conf); + Assert.assertTrue("Job state after execution: " + status, status.isSuccess()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java new file mode 100644 index 0000000..d60e0b5 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.tests.applications.vortex.cancellation; + +import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.api.VortexStart; +import org.apache.reef.vortex.api.VortexThreadPool; +import org.junit.Assert; + +import javax.inject.Inject; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Tests the cancellation of a tasklet. + */ +public final class TaskletCancellationTestStart implements VortexStart { + + @Inject + private TaskletCancellationTestStart() { + } + + @Override + public void start(final VortexThreadPool vortexThreadPool) { + final InfiniteLoopWithCancellationFunction function = new InfiniteLoopWithCancellationFunction(); + final VortexFuture future = vortexThreadPool.submit(function, 0); + + try { + // Hacky way to increase probability that the task has been launched. + // TODO[JIRA REEF-1051]: Query the VortexMaster for the Tasklet status. + future.get(10, TimeUnit.SECONDS); + } catch (final TimeoutException e) { + // Harmless. + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException("Unexpected exception."); + } + + Assert.assertTrue(future.cancel(true)); + + try { + future.get(); + Assert.fail(); + } catch (final ExecutionException e) { + // Expected. + } catch (InterruptedException e) { + e.printStackTrace(); + Assert.fail(); + } + + Assert.assertTrue(future.isCancelled()); + Assert.assertTrue(future.isDone()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fe6194ca/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java new file mode 100644 index 0000000..8bd04c7 --- /dev/null +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Package for testing Vortex Tasklet cancellation. + */ +package org.apache.reef.tests.applications.vortex.cancellation; \ No newline at end of file
