Repository: reef Updated Branches: refs/heads/master f10bbf462 -> d6b316925
[REEF-1075] Create Tasklet abstraction to enable one-to-many mapping of "Future-like" object to TaskletIds This addressed the issue by * Added VortexFutureDelegate to map a single result/failure to multiple tasklets. * Moved logic of calling back to Future to VortexMaster. JIRA: [REEF-1075](https://issues.apache.org/jira/browse/REEF-1075) Pull request: This closes #738 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d6b31692 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d6b31692 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d6b31692 Branch: refs/heads/master Commit: d6b31692589addfa8dccf7e70892b8eebabc8093 Parents: f10bbf4 Author: Andrew Chung <[email protected]> Authored: Wed Dec 16 11:27:33 2015 -0800 Committer: Yunseong Lee <[email protected]> Committed: Tue Dec 22 14:54:52 2015 +0800 ---------------------------------------------------------------------- .../reef-vortex/src/main/avro/WorkerReport.avsc | 34 ++++++- .../apache/reef/vortex/api/VortexFuture.java | 46 +++++++-- .../common/TaskletAggregationFailureReport.java | 65 ++++++++++++ .../common/TaskletAggregationResultReport.java | 71 +++++++++++++ .../vortex/common/TaskletFailureReport.java | 28 +++--- .../reef/vortex/common/TaskletReport.java | 4 +- .../reef/vortex/common/TaskletResultReport.java | 27 +++-- .../reef/vortex/common/VortexAvroUtils.java | 61 ++++++++++- .../vortex/common/VortexFutureDelegate.java | 61 +++++++++++ .../reef/vortex/driver/DefaultVortexMaster.java | 100 +++++++++++++++---- .../vortex/driver/FirstFitSchedulingPolicy.java | 26 +---- .../vortex/driver/RandomSchedulingPolicy.java | 19 +--- .../reef/vortex/driver/RunningWorkers.java | 60 +---------- .../reef/vortex/driver/SchedulingPolicy.java | 16 +-- .../org/apache/reef/vortex/driver/Tasklet.java | 35 ++----- .../apache/reef/vortex/driver/VortexDriver.java | 30 +----- .../apache/reef/vortex/driver/VortexMaster.java | 15 +-- .../reef/vortex/driver/VortexWorkerManager.java | 27 ++--- .../reef/vortex/evaluator/VortexWorker.java | 7 +- .../vortex/driver/DefaultVortexMasterTest.java | 26 +++-- .../reef/vortex/driver/RunningWorkersTest.java | 14 ++- .../org/apache/reef/vortex/driver/TestUtil.java | 70 ++++++++++++- 22 files changed, 567 insertions(+), 275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc index 24110fe..5d7395d 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc @@ -22,6 +22,15 @@ "type": "record", "name": "AvroTaskletResultReport", "fields": [ + {"name": "taskletId", "type": "int"}, + {"name": "serializedOutput", "type": "bytes"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroTaskletAggregationResultReport", + "fields": [ { "name": "taskletIds", "type": @@ -46,6 +55,15 @@ "type": "record", "name": "AvroTaskletFailureReport", "fields": [ + {"name": "taskletId", "type": "int"}, + {"name": "serializedException", "type": "bytes"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroTaskletAggregationFailureReport", + "fields": [ { "name": "taskletIds", "type": @@ -68,12 +86,24 @@ { "type": "enum", "name": "AvroReportType", - "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"] + "symbols": [ + "TaskletResult", + "TaskletAggregationResult", + "TaskletCancelled", + "TaskletFailure", + "TaskletAggregationFailure" + ] } }, { "name": "taskletReport", - "type": ["AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"] + "type": [ + "AvroTaskletResultReport", + "AvroTaskletAggregationResultReport", + "AvroTaskletCancelledReport", + "AvroTaskletFailureReport", + "AvroTaskletAggregationFailureReport" + ] } ] }, http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java index 7f4fbb7..5e7f9a2 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java @@ -21,8 +21,11 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.Private; import org.apache.reef.util.Optional; +import org.apache.reef.vortex.common.VortexFutureDelegate; import org.apache.reef.vortex.driver.VortexMaster; +import java.io.Serializable; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; @@ -32,7 +35,8 @@ import java.util.logging.Logger; * The interface between user code and submitted task. */ @Unstable -public final class VortexFuture<TOutput> implements Future<TOutput> { +public final class VortexFuture<TOutput extends Serializable> + implements Future<TOutput>, VortexFutureDelegate<TOutput> { private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName()); // userResult starts out as null. If not null => variable is set and tasklet returned. @@ -173,9 +177,13 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { } /** - * Called by VortexMaster to let the user know that the task completed. + * Called by VortexMaster to let the user know that the Tasklet completed. */ - public void completed(final TOutput result) { + @Private + @Override + public void completed(final int pTaskletId, final TOutput result) { + assert taskletId == pTaskletId; + this.userResult = Optional.ofNullable(result); if (callbackHandler != null) { executor.execute(new Runnable() { @@ -189,10 +197,22 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { } /** - * Called by VortexMaster to let the user know that the task threw an exception. + * VortexMaster should never call this. + */ + @Private + @Override + public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) { + throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated."); + } + + /** + * Called by VortexMaster to let the user know that the Tasklet threw an exception. */ @Private - public void threwException(final Exception exception) { + @Override + public void threwException(final int pTaskletId, final Exception exception) { + assert taskletId == pTaskletId; + this.userException = exception; if (callbackHandler != null) { executor.execute(new Runnable() { @@ -206,10 +226,22 @@ public final class VortexFuture<TOutput> implements Future<TOutput> { } /** - * Called by VortexMaster to let the user know that the task was cancelled. + * VortexMaster should never call this. */ @Private - public void cancelled() { + @Override + public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) { + throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated"); + } + + /** + * Called by VortexMaster to let the user know that the Tasklet was cancelled. + */ + @Private + @Override + public void cancelled(final int pTaskletId) { + assert taskletId == pTaskletId; + this.cancelled.set(true); if (callbackHandler != null) { executor.execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java new file mode 100644 index 0000000..e6a4c82 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Report of a tasklet exception on aggregation. + */ +@Unstable +public final class TaskletAggregationFailureReport implements TaskletReport { + private final List<Integer> taskletIds; + private final Exception exception; + + /** + * @param taskletIds of the failed tasklet(s). + * @param exception that caused the tasklet failure. + */ + public TaskletAggregationFailureReport(final List<Integer> taskletIds, final Exception exception) { + this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); + this.exception = exception; + } + + /** + * @return the type of this TaskletReport. + */ + @Override + public TaskletReportType getType() { + return TaskletReportType.TaskletAggregationFailure; + } + + /** + * @return the taskletIds that failed on aggregation. + */ + public List<Integer> getTaskletIds() { + return taskletIds; + } + + /** + * @return the exception that caused the tasklet aggregation failure. + */ + public Exception getException() { + return exception; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java new file mode 100644 index 0000000..ce4a015 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Report of a Tasklet aggregation execution result. + */ +@Private +@DriverSide +@Unstable +public final class TaskletAggregationResultReport<TOutput extends Serializable> implements TaskletReport { + private final List<Integer> taskletIds; + private final TOutput result; + + /** + * @param taskletIds of the tasklets. + * @param result of the tasklet execution. + */ + public TaskletAggregationResultReport(final List<Integer> taskletIds, final TOutput result) { + this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); + this.result = result; + } + + /** + * @return the type of this TaskletReport. + */ + @Override + public TaskletReportType getType() { + return TaskletReportType.TaskletAggregationResult; + } + + /** + * @return the TaskletId(s) of this TaskletReport + */ + public List<Integer> getTaskletIds() { + return taskletIds; + } + + /** + * @return the result of the Tasklet aggregation execution. + */ + public TOutput getResult() { + return result; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java index 487a3d2..5c0b2de 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java @@ -19,25 +19,25 @@ package org.apache.reef.vortex.common; import org.apache.reef.annotations.Unstable; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; /** - * Report of a tasklet exception. + * Report of a Tasklet exception. */ @Unstable +@Private +@DriverSide public final class TaskletFailureReport implements TaskletReport { - private final List<Integer> taskletIds; + private final int taskletId; private final Exception exception; /** - * @param taskletIds of the failed tasklet(s). + * @param taskletId of the failed Tasklet. * @param exception that caused the tasklet failure. */ - public TaskletFailureReport(final List<Integer> taskletIds, final Exception exception) { - this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); + public TaskletFailureReport(final int taskletId, final Exception exception) { + this.taskletId = taskletId; this.exception = exception; } @@ -50,16 +50,14 @@ public final class TaskletFailureReport implements TaskletReport { } /** - * Returns multiple TaskletIds if an aggregation of Tasklets fail. - * Returns a single TaskletId if a Tasklet fails. - * @return the taskletId(s) of this TaskletReport. + * @return the taskletId of this TaskletReport. */ - public List<Integer> getTaskletIds() { - return taskletIds; + public int getTaskletId() { + return taskletId; } /** - * @return the exception that caused the tasklet failure. + * @return the exception that caused the Tasklet failure. */ public Exception getException() { return exception; http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java index 7e083eb..6392b23 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java @@ -36,8 +36,10 @@ public interface TaskletReport extends Serializable { */ enum TaskletReportType { TaskletResult, + TaskletAggregationResult, TaskletCancelled, - TaskletFailure + TaskletFailure, + TaskletAggregationFailure } /** http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java index 08e8d06..8e3bac3 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java @@ -19,26 +19,27 @@ package org.apache.reef.vortex.common; import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; /** * Report of a tasklet execution result. */ @Unstable +@Private +@DriverSide public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport { - private final List<Integer> taskletIds; + private final int taskletId; private final TOutput result; /** - * @param taskletIds of the tasklets. - * @param result of the tasklet execution. + * @param taskletId of the Tasklet. + * @param result of the Tasklet execution. */ - public TaskletResultReport(final List<Integer> taskletIds, final TOutput result) { - this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); + public TaskletResultReport(final int taskletId, final TOutput result) { + this.taskletId = taskletId; this.result = result; } @@ -51,16 +52,14 @@ public final class TaskletResultReport<TOutput extends Serializable> implements } /** - * Returns multiple TaskletIds if the result is from an Aggregation. - * Returns a single TaskletId if the result is from a single Tasklet. - * @return the TaskletId(s) of this TaskletReport + * @return the TaskletId of this TaskletReport */ - public List<Integer> getTaskletIds() { - return taskletIds; + public int getTaskletId() { + return taskletId; } /** - * @return the result of the tasklet execution. + * @return the result of the Tasklet execution. */ public TOutput getResult() { return result; http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java index e4f747d..cc3cced 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -22,6 +22,9 @@ import org.apache.avro.io.*; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.commons.lang.SerializationUtils; +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.common.avro.*; @@ -35,6 +38,9 @@ import java.util.List; /** * Serialize and deserialize Vortex message to/from byte array. */ +@Private +@DriverSide +@Unstable public final class VortexAvroUtils { /** * Serialize VortexRequest to byte array. @@ -101,11 +107,26 @@ public final class VortexAvroUtils { .setReportType(AvroReportType.TaskletResult) .setTaskletReport( AvroTaskletResultReport.newBuilder() - .setTaskletIds(taskletResultReport.getTaskletIds()) + .setTaskletId(taskletResultReport.getTaskletId()) .setSerializedOutput(ByteBuffer.wrap(serializedOutput)) .build()) .build(); break; + case TaskletAggregationResult: + final TaskletAggregationResultReport taskletAggregationResultReport = + (TaskletAggregationResultReport) taskletReport; + // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. + final byte[] serializedAggregationOutput = + SerializationUtils.serialize(taskletAggregationResultReport.getResult()); + avroTaskletReport = AvroTaskletReport.newBuilder() + .setReportType(AvroReportType.TaskletAggregationResult) + .setTaskletReport( + AvroTaskletAggregationResultReport.newBuilder() + .setTaskletIds(taskletAggregationResultReport.getTaskletIds()) + .setSerializedOutput(ByteBuffer.wrap(serializedAggregationOutput)) + .build()) + .build(); + break; case TaskletCancelled: final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; avroTaskletReport = AvroTaskletReport.newBuilder() @@ -123,11 +144,25 @@ public final class VortexAvroUtils { .setReportType(AvroReportType.TaskletFailure) .setTaskletReport( AvroTaskletFailureReport.newBuilder() - .setTaskletIds(taskletFailureReport.getTaskletIds()) + .setTaskletId(taskletFailureReport.getTaskletId()) .setSerializedException(ByteBuffer.wrap(serializedException)) .build()) .build(); break; + case TaskletAggregationFailure: + final TaskletAggregationFailureReport taskletAggregationFailureReport = + (TaskletAggregationFailureReport) taskletReport; + final byte[] serializedAggregationException = + SerializationUtils.serialize(taskletAggregationFailureReport.getException()); + avroTaskletReport = AvroTaskletReport.newBuilder() + .setReportType(AvroReportType.TaskletAggregationFailure) + .setTaskletReport( + AvroTaskletAggregationFailureReport.newBuilder() + .setTaskletIds(taskletAggregationFailureReport.getTaskletIds()) + .setSerializedException(ByteBuffer.wrap(serializedAggregationException)) + .build()) + .build(); + break; default: throw new RuntimeException("Undefined message type"); } @@ -197,7 +232,16 @@ public final class VortexAvroUtils { // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. final Serializable output = (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array()); - taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletIds(), output); + taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output); + break; + case TaskletAggregationResult: + final AvroTaskletAggregationResultReport taskletAggregationResultReport = + (AvroTaskletAggregationResultReport)avroTaskletReport.getTaskletReport(); + final Serializable aggregationOutput = + (Serializable) SerializationUtils.deserialize( + taskletAggregationResultReport.getSerializedOutput().array()); + taskletReport = + new TaskletAggregationResultReport<>(taskletAggregationResultReport.getTaskletIds(), aggregationOutput); break; case TaskletCancelled: final AvroTaskletCancelledReport taskletCancelledReport = @@ -209,7 +253,16 @@ public final class VortexAvroUtils { (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport(); final Exception exception = (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); - taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletIds(), exception); + taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); + break; + case TaskletAggregationFailure: + final AvroTaskletAggregationFailureReport taskletAggregationFailureReport = + (AvroTaskletAggregationFailureReport)avroTaskletReport.getTaskletReport(); + final Exception aggregationException = + (Exception) SerializationUtils.deserialize( + taskletAggregationFailureReport.getSerializedException().array()); + taskletReport = + new TaskletAggregationFailureReport(taskletAggregationFailureReport.getTaskletIds(), aggregationException); break; default: throw new RuntimeException("Undefined TaskletReport type"); http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java new file mode 100644 index 0000000..55f3cf5 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +import java.io.Serializable; +import java.util.List; + +/** + * Exposes functions to be called by the {@link org.apache.reef.vortex.driver.VortexMaster} + * to note that a list of Tasklets associated with a Future has completed. + */ +@Unstable +@DriverSide +@Private +public interface VortexFutureDelegate<TOutput extends Serializable> { + + /** + * A Tasklet associated with the future has completed with a result. + */ + void completed(final int taskletId, final TOutput result); + + /** + * The list of aggregated Tasklets associated with the Future that have completed with a result. + */ + void aggregationCompleted(final List<Integer> taskletIds, final TOutput result); + + /** + * A Tasklet associated with the Future has thrown an Exception. + */ + void threwException(final int taskletId, final Exception exception); + + /** + * The list of Tasklets associated with the Future that have thrown an Exception. + */ + void aggregationThrewException(final List<Integer> taskletIds, final Exception exception); + + /** + * A Tasklet associated with the Future has been cancelled. + */ + void cancelled(final int taskletId); +} http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index cb62049..80c3cb0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -25,6 +25,7 @@ import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.common.*; import javax.inject.Inject; import java.io.Serializable; @@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; @ThreadSafe @DriverSide final class DefaultVortexMaster implements VortexMaster { + private final Map<Integer, VortexFutureDelegate> taskletFutureMap = new HashMap<>(); private final AtomicInteger taskletIdCounter = new AtomicInteger(); private final RunningWorkers runningWorkers; private final PendingTasklets pendingTasklets; @@ -74,7 +76,9 @@ final class DefaultVortexMaster implements VortexMaster { } final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture); + putDelegate(Collections.singletonList(tasklet), vortexFuture); this.pendingTasklets.addLast(tasklet); + return vortexFuture; } @@ -107,37 +111,97 @@ final class DefaultVortexMaster implements VortexMaster { } } - /** - * Notify task completion to runningWorkers. - */ @Override - public void taskletCompleted(final String workerId, - final int taskletId, - final Serializable result) { - runningWorkers.completeTasklet(workerId, taskletId, result); + public void workerReported(final String workerId, final WorkerReport workerReport) { + for (final TaskletReport taskletReport : workerReport.getTaskletReports()) { + switch (taskletReport.getType()) { + case TaskletResult: + final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; + + final int resultTaskletId = taskletResultReport.getTaskletId(); + final List<Integer> singletonResultTaskletId = Collections.singletonList(resultTaskletId); + runningWorkers.doneTasklets(workerId, singletonResultTaskletId); + fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, taskletResultReport.getResult()); + + break; + case TaskletAggregationResult: + final TaskletAggregationResultReport taskletAggregationResultReport = + (TaskletAggregationResultReport) taskletReport; + + final List<Integer> aggregatedTaskletIds = taskletAggregationResultReport.getTaskletIds(); + runningWorkers.doneTasklets(workerId, aggregatedTaskletIds); + fetchDelegate(aggregatedTaskletIds).aggregationCompleted( + aggregatedTaskletIds, taskletAggregationResultReport.getResult()); + + break; + case TaskletCancelled: + final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; + final List<Integer> cancelledIdToList = Collections.singletonList(taskletCancelledReport.getTaskletId()); + runningWorkers.doneTasklets(workerId, cancelledIdToList); + fetchDelegate(cancelledIdToList).cancelled(taskletCancelledReport.getTaskletId()); + + break; + case TaskletFailure: + final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport; + + final int failureTaskletId = taskletFailureReport.getTaskletId(); + final List<Integer> singletonFailedTaskletId = Collections.singletonList(failureTaskletId); + runningWorkers.doneTasklets(workerId, singletonFailedTaskletId); + fetchDelegate(singletonFailedTaskletId).threwException(failureTaskletId, taskletFailureReport.getException()); + + break; + case TaskletAggregationFailure: + final TaskletAggregationFailureReport taskletAggregationFailureReport = + (TaskletAggregationFailureReport) taskletReport; + + final List<Integer> aggregationFailedTaskletIds = taskletAggregationFailureReport.getTaskletIds(); + runningWorkers.doneTasklets(workerId, aggregationFailedTaskletIds); + fetchDelegate(aggregationFailedTaskletIds).aggregationThrewException(aggregationFailedTaskletIds, + taskletAggregationFailureReport.getException()); + break; + default: + throw new RuntimeException("Unknown Report"); + } + } } /** - * Notify task failure to runningWorkers. + * Terminate the job. */ @Override - public void taskletErrored(final String workerId, final int taskletId, final Exception exception) { - runningWorkers.errorTasklet(workerId, taskletId, exception); + public void terminate() { + runningWorkers.terminate(); } /** - * Notify tasklet cancellation to runningWorkers. + * Puts a delegate to associate with a Tasklet. */ - @Override - public void taskletCancelled(final String workerId, final int taskletId) { - runningWorkers.taskletCancelled(workerId, taskletId); + private synchronized void putDelegate(final List<Tasklet> tasklets, final VortexFutureDelegate delegate) { + for (final Tasklet tasklet : tasklets) { + taskletFutureMap.put(tasklet.getId(), delegate); + } } /** - * Terminate the job. + * Fetches a delegate that maps to the list of Tasklets. */ - @Override - public void terminate() { - runningWorkers.terminate(); + private synchronized VortexFutureDelegate fetchDelegate(final List<Integer> taskletIds) { + VortexFutureDelegate delegate = null; + for (final int taskletId : taskletIds) { + final VortexFutureDelegate currDelegate = taskletFutureMap.remove(taskletId); + if (currDelegate == null) { + // TODO[JIRA REEF-500]: Consider duplicate tasklets. + throw new RuntimeException("Tasklet should only be removed once."); + } + + if (delegate == null) { + delegate = currDelegate; + } else { + assert delegate == currDelegate; + } + } + + return delegate; } + } http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java index 8b28eab..489751f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java @@ -125,33 +125,17 @@ class FirstFitSchedulingPolicy implements SchedulingPolicy { /** * @param vortexWorker that the tasklet completed in - * @param tasklet completed + * @param tasklets completed */ @Override - public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) { final String workerId = vortexWorker.getId(); - removeTasklet(workerId); + removeTasklet(workerId, tasklets); } - /** - * @param vortexWorker that the tasklet failed in - * @param tasklet failed - */ - @Override - public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { - final String workerId = vortexWorker.getId(); - removeTasklet(workerId); - } - - @Override - public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { - final String workerId = vortexWorker.getId(); - removeTasklet(workerId); - } - - private void removeTasklet(final String workerId) { + private void removeTasklet(final String workerId, final List<Tasklet> tasklets) { if (idLoadMap.containsKey(workerId)) { - idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1)); + idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - tasklets.size())); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java index ffa2fbf..76dc5ca 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java @@ -90,24 +90,7 @@ class RandomSchedulingPolicy implements SchedulingPolicy { * Do nothing. */ @Override - public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) { // Do nothing } - - /** - * Do nothing. - */ - @Override - public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { - // Do nothing - } - - /** - * Do nothing. - */ - @Override - public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { - // Do nothing - } - } http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java index 7b05c0c..a1fb96f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -25,7 +25,6 @@ import org.apache.reef.util.Optional; import javax.inject.Inject; -import java.io.Serializable; import java.util.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -184,69 +183,18 @@ final class RunningWorkers { * Parameter: Same arguments can come in multiple times. * (e.g. preemption message coming before tasklet completion message multiple times) */ - void completeTasklet(final String workerId, - final int taskletId, - final Serializable result) { + void doneTasklets(final String workerId, final List<Integer> taskletIds) { lock.lock(); try { if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before final VortexWorkerManager worker = this.runningWorkers.get(workerId); - final Tasklet tasklet = worker.taskletCompleted(taskletId, result); - this.schedulingPolicy.taskletCompleted(worker, tasklet); + final List<Tasklet> tasklets = worker.taskletsDone(taskletIds); + this.schedulingPolicy.taskletsDone(worker, tasklets); - // Notify (possibly) waiting scheduler - noWorkerOrResource.signal(); - - taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak. - } - } finally { - lock.unlock(); - } - } - - /** - * Concurrency: Called by multiple threads. - * Parameter: Same arguments can come in multiple times. - * (e.g. preemption message coming before tasklet error message multiple times) - */ - void errorTasklet(final String workerId, - final int taskletId, - final Exception exception) { - lock.lock(); - try { - if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before - final VortexWorkerManager worker = this.runningWorkers.get(workerId); - final Tasklet tasklet = worker.taskletThrewException(taskletId, exception); - this.schedulingPolicy.taskletFailed(worker, tasklet); - - // Notify (possibly) waiting scheduler - noWorkerOrResource.signal(); - - taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak. - } - } finally { - lock.unlock(); - } - } - - /** - * Concurrency: Called by multiple threads. - * Parameter: Same arguments can come in multiple times. - * (e.g. preemption message coming before tasklet error message multiple times) - */ - void taskletCancelled(final String workerId, - final int taskletId) { - lock.lock(); - try { - if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before - final VortexWorkerManager worker = this.runningWorkers.get(workerId); - final Tasklet tasklet = worker.taskletCancelled(taskletId); - this.schedulingPolicy.taskletCancelled(worker, tasklet); + taskletsToCancel.removeAll(taskletIds); // cleanup to prevent memory leak. // Notify (possibly) waiting scheduler noWorkerOrResource.signal(); - - taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak. } } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java index a058f6f..bccc0ea 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java @@ -21,6 +21,8 @@ package org.apache.reef.vortex.driver; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.util.Optional; +import java.util.List; + /** * For choosing which worker to schedule the tasklet onto. */ @@ -49,17 +51,7 @@ interface SchedulingPolicy { void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet); /** - * Tasklet completed. - */ - void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet); - - /** - * Tasklet failed. - */ - void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet); - - /** - * Tasklet cancelled. + * Tasklets completed. */ - void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet); + void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets); } http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java index b0138d0..24db3cb 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java @@ -20,7 +20,7 @@ package org.apache.reef.vortex.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.common.VortexFutureDelegate; import java.io.Serializable; @@ -32,16 +32,16 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> { private final int taskletId; private final VortexFunction<TInput, TOutput> userTask; private final TInput input; - private final VortexFuture<TOutput> vortexFuture; + private final VortexFutureDelegate delegate; Tasklet(final int taskletId, final VortexFunction<TInput, TOutput> userTask, final TInput input, - final VortexFuture<TOutput> vortexFuture) { + final VortexFutureDelegate delegate) { this.taskletId = taskletId; this.userTask = userTask; this.input = input; - this.vortexFuture = vortexFuture; + this.delegate = delegate; } /** @@ -66,31 +66,10 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> { } /** - * Called by VortexMaster to let the user know that the task completed. + * Called by {@link RunningWorkers} to cancel the Tasklet before launch. */ - void completed(final TOutput result) { - vortexFuture.completed(result); - } - - /** - * Called by VortexMaster to let the user know that the task threw an exception. - */ - void threwException(final Exception exception) { - vortexFuture.threwException(exception); - } - - /** - * Called by VortexMaster to let the user know that the task has been cancelled. - */ - void cancelled(){ - vortexFuture.cancelled(); - } - - /** - * For tests. - */ - boolean isCompleted() { - return vortexFuture.isDone(); + void cancelled() { + delegate.cancelled(taskletId); } /** http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index 993b32f..f581f99 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -37,7 +37,6 @@ import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.time.event.StartTime; import javax.inject.Inject; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -153,34 +152,7 @@ final class VortexDriver { public void onNext(final TaskMessage taskMessage) { final String workerId = taskMessage.getId(); final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get()); - - // TODO[JIRA REEF-942]: Fix when aggregation is allowed. - assert workerReport.getTaskletReports().size() == 1; - - final TaskletReport taskletReport = workerReport.getTaskletReports().get(0); - switch (taskletReport.getType()) { - case TaskletResult: - final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; - - // TODO[JIRA REEF-942]: Fix when aggregation is allowed. - final List<Integer> resultTaskletIds = taskletResultReport.getTaskletIds(); - - assert resultTaskletIds.size() == 1; - vortexMaster.taskletCompleted(workerId, resultTaskletIds.get(0), - taskletResultReport.getResult()); - break; - case TaskletCancelled: - final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; - vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId()); - break; - case TaskletFailure: - final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport; - vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletIds().get(0), - taskletFailureReport.getException()); - break; - default: - throw new RuntimeException("Unknown Report"); - } + vortexMaster.workerReported(workerId, workerReport); } } http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java index 38bbcc5..becf3f9 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -25,6 +25,7 @@ import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.common.WorkerReport; import java.io.Serializable; @@ -62,19 +63,9 @@ public interface VortexMaster { void workerPreempted(final String id); /** - * Call this when a Tasklet is completed. + * Call this when a worker has reported back. */ - void taskletCompleted(final String workerId, final int taskletId, final Serializable result); - - /** - * Call this when a Tasklet errored. - */ - void taskletErrored(final String workerId, final int taskletId, final Exception exception); - - /** - * Call this when a Tasklet is cancelled and the cancellation is honored. - */ - void taskletCancelled(final String workerId, final int taskletId); + void workerReported(final String workerId, final WorkerReport workerReport); /** * Release all resources and shut down. http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java index 76d6cec..ffba985 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java @@ -25,8 +25,7 @@ import org.apache.reef.vortex.common.TaskletCancellationRequest; import org.apache.reef.vortex.common.TaskletExecutionRequest; import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; +import java.util.*; /** * Representation of a VortexWorkerManager in Driver. @@ -57,25 +56,13 @@ class VortexWorkerManager { vortexRequestor.send(reefTask, cancellationRequest); } - <TOutput extends Serializable> Tasklet taskletCompleted(final Integer taskletId, final TOutput result) { - final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId); - assert tasklet != null; // Tasklet should complete/error only once - tasklet.completed(result); - return tasklet; - } - - Tasklet taskletThrewException(final Integer taskletId, final Exception exception) { - final Tasklet tasklet = runningTasklets.remove(taskletId); - assert tasklet != null; // Tasklet should complete/error only once - tasklet.threwException(exception); - return tasklet; - } + List<Tasklet> taskletsDone(final List<Integer> taskletIds) { + final List<Tasklet> taskletList = new ArrayList<>(); + for (final int taskletId : taskletIds) { + taskletList.add(runningTasklets.remove(taskletId)); + } - Tasklet taskletCancelled(final Integer taskletId) { - final Tasklet tasklet = runningTasklets.remove(taskletId); - assert tasklet != null; // Tasklet should finish only once. - tasklet.cancelled(); - return tasklet; + return Collections.unmodifiableList(taskletList); } Collection<Tasklet> removed() { http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 26d4287..920f1a9 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -36,7 +36,6 @@ import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.*; import java.util.logging.Level; @@ -111,8 +110,7 @@ public final class VortexWorker implements Task, TaskMessageSource { // Command Executor: Execute the command final Serializable result = taskletExecutionRequest.execute(); final TaskletReport taskletReport = - new TaskletResultReport<>(Collections.singletonList( - taskletExecutionRequest.getTaskletId()), result); + new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); taskletReports.add(taskletReport); } catch (final InterruptedException ex) { // Assumes that user's thread follows convention that cancelled Futures @@ -124,8 +122,7 @@ public final class VortexWorker implements Task, TaskMessageSource { } catch (Exception e) { // Command Executor: Tasklet throws an exception final TaskletReport taskletReport = - new TaskletFailureReport(Collections.singletonList( - taskletExecutionRequest.getTaskletId()), e); + new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); taskletReports.add(taskletReport); } http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java index 6c2162a..325d3d4 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java @@ -22,6 +22,10 @@ import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.common.TaskletFailureReport; +import org.apache.reef.vortex.common.TaskletReport; +import org.apache.reef.vortex.common.TaskletResultReport; +import org.apache.reef.vortex.common.WorkerReport; import org.junit.Test; import java.util.*; @@ -71,7 +75,9 @@ public class DefaultVortexMasterTest { final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1); for (final int taskletId : taskletIds) { - vortexMaster.taskletCompleted(vortexWorkerManager1.getId(), taskletId, null); + final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null); + vortexMaster.workerReported( + vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport))); } assertTrue("The VortexFuture should be done", future.isDone()); @@ -106,9 +112,12 @@ public class DefaultVortexMasterTest { final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, pendingTasklets, 1); assertEquals("Both lists need to contain the same single tasklet id", taskletIds1, taskletIds2); + // Completed? for (final int taskletId : taskletIds2) { - vortexMaster.taskletCompleted(vortexWorkerManager2.getId(), taskletId, null); + final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null); + vortexMaster.workerReported( + vortexWorkerManager2.getId(), new WorkerReport(Collections.singletonList(taskletReport))); } assertTrue("The VortexFuture should be done", future.isDone()); } @@ -157,7 +166,9 @@ public class DefaultVortexMasterTest { for (final int taskletId : taskletIds2) { final String workerId = runningWorkers.getWhereTaskletWasScheduledTo(taskletId); assertNotNull("The tasklet must have been scheduled", workerId); - vortexMaster.taskletCompleted(workerId, taskletId, null); + final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null); + vortexMaster.workerReported( + workerId, new WorkerReport(Collections.singletonList(taskletReport))); } for (final VortexFuture vortexFuture : vortexFutures) { assertTrue("The VortexFuture should be done", vortexFuture.isDone()); @@ -196,8 +207,11 @@ public class DefaultVortexMasterTest { final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler)); final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1); + for (final int taskletId : taskletIds) { - vortexMaster.taskletErrored(vortexWorkerManager1.getId(), taskletId, new RuntimeException("Test exception")); + final TaskletReport taskletReport = new TaskletFailureReport(taskletId, new RuntimeException("Test exception.")); + vortexMaster.workerReported( + vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport))); } assertTrue("The VortexFuture should be done", future.isDone()); @@ -246,9 +260,9 @@ public class DefaultVortexMasterTest { private VortexFuture createTaskletCancellationFuture(final RunningWorkers runningWorkers, final PendingTasklets pendingTasklets) { final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction(); - final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(vortexMaster); + // Allocate worker & tasklet and schedule vortexMaster.workerAllocated(vortexWorkerManager1); http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java index 7cd0847..76416cb 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java @@ -20,7 +20,9 @@ package org.apache.reef.vortex.driver; import org.junit.Test; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -30,8 +32,9 @@ import static org.junit.Assert.assertTrue; * Test Possible Race Conditions. */ public class RunningWorkersTest { - private final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); private final TestUtil testUtil = new TestUtil(); + private final TestUtil.TestSchedulingPolicy schedulingPolicy = testUtil.newSchedulingPolicy(); + private final RunningWorkers runningWorkers = new RunningWorkers(schedulingPolicy); /** * Test executor preemption -> executor allocation. @@ -58,7 +61,12 @@ public class RunningWorkersTest { final Collection<Tasklet> tasklets = runningWorkers.removeWorker(vortexWorkerManager.getId()).get(); assertEquals("Only 1 Tasklet must have been running", 1, tasklets.size()); assertTrue("This Tasklet must have been running", tasklets.contains(tasklet)); - runningWorkers.completeTasklet(vortexWorkerManager.getId(), tasklet.getId(), null); - assertFalse("Tasklet must not have been completed", tasklet.isCompleted()); + final List<Integer> taskletIds = new ArrayList<>(); + for (final Tasklet taskletIter : tasklets) { + taskletIds.add(taskletIter.getId()); + } + + runningWorkers.doneTasklets(vortexWorkerManager.getId(), taskletIds); + assertFalse("Tasklet must not have been completed", schedulingPolicy.taskletIsDone(tasklet.getId())); } } http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java index 8704b0b..ab93a08 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java @@ -19,14 +19,18 @@ package org.apache.reef.vortex.driver; import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; -import org.apache.reef.vortex.common.TaskletCancellationRequest; -import org.apache.reef.vortex.common.VortexRequest; +import org.apache.reef.vortex.common.*; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -46,9 +50,16 @@ public final class TestUtil { private final VortexMaster vortexMaster = mock(VortexMaster.class); /** - * @return a new mocked worker. + * @return a new mocked worker, with a mocked {@link VortexMaster}. */ public VortexWorkerManager newWorker() { + return newWorker(vortexMaster); + } + + /** + * @return a new mocked worker, with the {@link VortexMaster} passed in. + */ + public VortexWorkerManager newWorker(final VortexMaster master) { final RunningTask reefTask = mock(RunningTask.class); when(reefTask.getId()).thenReturn("worker" + String.valueOf(workerId.getAndIncrement())); final VortexRequestor vortexRequestor = mock(VortexRequestor.class); @@ -58,7 +69,8 @@ public final class TestUtil { public Object answer(final InvocationOnMock invocation) throws Throwable { final VortexRequest request = (VortexRequest)invocation.getArguments()[1]; if (request instanceof TaskletCancellationRequest) { - workerManager.taskletCancelled(request.getTaskletId()); + final TaskletReport cancelReport = new TaskletCancelledReport(request.getTaskletId()); + master.workerReported(workerManager.getId(), new WorkerReport(Collections.singleton(cancelReport))); } return null; @@ -89,6 +101,13 @@ public final class TestUtil { } /** + * @return a queryable {@link org.apache.reef.vortex.driver.TestUtil.TestSchedulingPolicy} + */ + public TestSchedulingPolicy newSchedulingPolicy() { + return new TestSchedulingPolicy(); + } + + /** * @return a new dummy function. */ public VortexFunction newInfiniteLoopFunction() { @@ -116,4 +135,47 @@ public final class TestUtil { } }; } + + static final class TestSchedulingPolicy implements SchedulingPolicy { + private final SchedulingPolicy policy = new RandomSchedulingPolicy(); + private final Set<Integer> doneTasklets = new HashSet<>(); + + private TestSchedulingPolicy() { + } + + @Override + public Optional<String> trySchedule(final Tasklet tasklet) { + return policy.trySchedule(tasklet); + } + + @Override + public void workerAdded(final VortexWorkerManager vortexWorker) { + policy.workerAdded(vortexWorker); + } + + @Override + public void workerRemoved(final VortexWorkerManager vortexWorker) { + policy.workerRemoved(vortexWorker); + } + + @Override + public void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + policy.taskletLaunched(vortexWorker, tasklet); + } + + @Override + public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) { + policy.taskletsDone(vortexWorker, tasklets); + for (final Tasklet t : tasklets) { + doneTasklets.add(t.getId()); + } + } + + /** + * @return true if Tasklet with taskletId is done, false otherwise. + */ + public boolean taskletIsDone(final int taskletId) { + return doneTasklets.contains(taskletId); + } + } }
