Repository: reef Updated Branches: refs/heads/master 81090b636 -> 4a847c5c5
[REEF-1084] Modify TaskletReport to allow aggregation of Tasklets for a single result/error This addressed the issue by * Switch TaskletResultReport and TaskletFailureReport to be able to associate a List of taskletIds to a result or error. JIRA: [REEF-1084](https://issues.apache.org/jira/browse/REEF-1084) Pull Request: Closes #736 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4a847c5c Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4a847c5c Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4a847c5c Branch: refs/heads/master Commit: 4a847c5c51f3561c7970445091553f5d90d85182 Parents: 81090b6 Author: Andrew Chung <[email protected]> Authored: Sun Dec 13 16:49:02 2015 -0800 Committer: Yunseong Lee <[email protected]> Committed: Wed Dec 16 16:02:07 2015 +0800 ---------------------------------------------------------------------- .../reef-vortex/src/main/avro/WorkerReport.avsc | 21 ++++++++++++++++---- .../vortex/common/TaskletCancelledReport.java | 4 +++- .../vortex/common/TaskletFailureReport.java | 21 ++++++++++++-------- .../reef/vortex/common/TaskletReport.java | 5 ----- .../reef/vortex/common/TaskletResultReport.java | 20 +++++++++++-------- .../reef/vortex/common/VortexAvroUtils.java | 8 ++++---- .../apache/reef/vortex/driver/VortexDriver.java | 11 ++++++++-- .../reef/vortex/evaluator/VortexWorker.java | 7 +++++-- 8 files changed, 63 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc index 182f88e..24110fe 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc @@ -22,7 +22,14 @@ "type": "record", "name": "AvroTaskletResultReport", "fields": [ - {"name": "taskletId", "type": "int"}, + { + "name": "taskletIds", + "type": + { + "type": "array", + "items": "int" + } + }, {"name": "serializedOutput", "type": "bytes"} ] }, @@ -39,7 +46,14 @@ "type": "record", "name": "AvroTaskletFailureReport", "fields": [ - {"name": "taskletId", "type": "int"}, + { + "name": "taskletIds", + "type": + { + "type": "array", + "items": "int" + } + }, {"name": "serializedException", "type": "bytes"} ] }, @@ -59,8 +73,7 @@ }, { "name": "taskletReport", - "type": ["null", "AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"], - "default": null + "type": ["AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"] } ] }, http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java index 1ec1890..c09a02f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java @@ -39,7 +39,9 @@ public final class TaskletCancelledReport implements TaskletReport { return TaskletReportType.TaskletCancelled; } - @Override + /** + * @return the taskletId of this TaskletReport. + */ public int getTaskletId() { return taskletId; } http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java index cbf6953..487a3d2 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java @@ -20,20 +20,24 @@ package org.apache.reef.vortex.common; import org.apache.reef.annotations.Unstable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** * Report of a tasklet exception. */ @Unstable public final class TaskletFailureReport implements TaskletReport { - private final int taskletId; + private final List<Integer> taskletIds; private final Exception exception; /** - * @param taskletId of the failed tasklet. + * @param taskletIds of the failed tasklet(s). * @param exception that caused the tasklet failure. */ - public TaskletFailureReport(final int taskletId, final Exception exception) { - this.taskletId = taskletId; + public TaskletFailureReport(final List<Integer> taskletIds, final Exception exception) { + this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); this.exception = exception; } @@ -46,11 +50,12 @@ public final class TaskletFailureReport implements TaskletReport { } /** - * @return the id of the tasklet. + * Returns multiple TaskletIds if an aggregation of Tasklets fail. + * Returns a single TaskletId if a Tasklet fails. + * @return the taskletId(s) of this TaskletReport. */ - @Override - public int getTaskletId() { - return taskletId; + public List<Integer> getTaskletIds() { + return taskletIds; } /** http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java index 196f597..7e083eb 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java @@ -44,9 +44,4 @@ public interface TaskletReport extends Serializable { * @return the type of this TaskletReport. */ TaskletReportType getType(); - - /** - * @return the taskletId of this TaskletReport. - */ - int getTaskletId(); } http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java index de05185..08e8d06 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java @@ -21,21 +21,24 @@ package org.apache.reef.vortex.common; import org.apache.reef.annotations.Unstable; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * Report of a tasklet execution result. */ @Unstable public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport { - private final int taskletId; + private final List<Integer> taskletIds; private final TOutput result; /** - * @param taskletId of the tasklet. + * @param taskletIds of the tasklets. * @param result of the tasklet execution. */ - public TaskletResultReport(final int taskletId, final TOutput result) { - this.taskletId = taskletId; + public TaskletResultReport(final List<Integer> taskletIds, final TOutput result) { + this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); this.result = result; } @@ -48,11 +51,12 @@ public final class TaskletResultReport<TOutput extends Serializable> implements } /** - * @return the id of the tasklet. + * Returns multiple TaskletIds if the result is from an Aggregation. + * Returns a single TaskletId if the result is from a single Tasklet. + * @return the TaskletId(s) of this TaskletReport */ - @Override - public int getTaskletId() { - return taskletId; + public List<Integer> getTaskletIds() { + return taskletIds; } /** http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java index 660f059..e4f747d 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -101,7 +101,7 @@ public final class VortexAvroUtils { .setReportType(AvroReportType.TaskletResult) .setTaskletReport( AvroTaskletResultReport.newBuilder() - .setTaskletId(taskletResultReport.getTaskletId()) + .setTaskletIds(taskletResultReport.getTaskletIds()) .setSerializedOutput(ByteBuffer.wrap(serializedOutput)) .build()) .build(); @@ -123,7 +123,7 @@ public final class VortexAvroUtils { .setReportType(AvroReportType.TaskletFailure) .setTaskletReport( AvroTaskletFailureReport.newBuilder() - .setTaskletId(taskletFailureReport.getTaskletId()) + .setTaskletIds(taskletFailureReport.getTaskletIds()) .setSerializedException(ByteBuffer.wrap(serializedException)) .build()) .build(); @@ -197,7 +197,7 @@ public final class VortexAvroUtils { // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. final Serializable output = (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array()); - taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output); + taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletIds(), output); break; case TaskletCancelled: final AvroTaskletCancelledReport taskletCancelledReport = @@ -209,7 +209,7 @@ public final class VortexAvroUtils { (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport(); final Exception exception = (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); - taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); + taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletIds(), exception); break; default: throw new RuntimeException("Undefined TaskletReport type"); http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index e3c24d2..993b32f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -37,6 +37,7 @@ import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.time.event.StartTime; import javax.inject.Inject; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -160,7 +161,13 @@ final class VortexDriver { switch (taskletReport.getType()) { case TaskletResult: final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; - vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult()); + + // TODO[JIRA REEF-942]: Fix when aggregation is allowed. + final List<Integer> resultTaskletIds = taskletResultReport.getTaskletIds(); + + assert resultTaskletIds.size() == 1; + vortexMaster.taskletCompleted(workerId, resultTaskletIds.get(0), + taskletResultReport.getResult()); break; case TaskletCancelled: final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; @@ -168,7 +175,7 @@ final class VortexDriver { break; case TaskletFailure: final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport; - vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), + vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletIds().get(0), taskletFailureReport.getException()); break; default: http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 920f1a9..26d4287 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -36,6 +36,7 @@ import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.*; import java.util.logging.Level; @@ -110,7 +111,8 @@ public final class VortexWorker implements Task, TaskMessageSource { // Command Executor: Execute the command final Serializable result = taskletExecutionRequest.execute(); final TaskletReport taskletReport = - new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); + new TaskletResultReport<>(Collections.singletonList( + taskletExecutionRequest.getTaskletId()), result); taskletReports.add(taskletReport); } catch (final InterruptedException ex) { // Assumes that user's thread follows convention that cancelled Futures @@ -122,7 +124,8 @@ public final class VortexWorker implements Task, TaskMessageSource { } catch (Exception e) { // Command Executor: Tasklet throws an exception final TaskletReport taskletReport = - new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); + new TaskletFailureReport(Collections.singletonList( + taskletExecutionRequest.getTaskletId()), e); taskletReports.add(taskletReport); }
