Repository: reef
Updated Branches:
  refs/heads/master 81090b636 -> 4a847c5c5


[REEF-1084] Modify TaskletReport to allow aggregation of Tasklets for a single 
result/error

This addressed the issue by
  * Switch TaskletResultReport and TaskletFailureReport to be able to associate 
a List of taskletIds to a result or error.

JIRA:
  [REEF-1084](https://issues.apache.org/jira/browse/REEF-1084)

Pull Request:
  Closes #736


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4a847c5c
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4a847c5c
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4a847c5c

Branch: refs/heads/master
Commit: 4a847c5c51f3561c7970445091553f5d90d85182
Parents: 81090b6
Author: Andrew Chung <[email protected]>
Authored: Sun Dec 13 16:49:02 2015 -0800
Committer: Yunseong Lee <[email protected]>
Committed: Wed Dec 16 16:02:07 2015 +0800

----------------------------------------------------------------------
 .../reef-vortex/src/main/avro/WorkerReport.avsc | 21 ++++++++++++++++----
 .../vortex/common/TaskletCancelledReport.java   |  4 +++-
 .../vortex/common/TaskletFailureReport.java     | 21 ++++++++++++--------
 .../reef/vortex/common/TaskletReport.java       |  5 -----
 .../reef/vortex/common/TaskletResultReport.java | 20 +++++++++++--------
 .../reef/vortex/common/VortexAvroUtils.java     |  8 ++++----
 .../apache/reef/vortex/driver/VortexDriver.java | 11 ++++++++--
 .../reef/vortex/evaluator/VortexWorker.java     |  7 +++++--
 8 files changed, 63 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc 
b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
index 182f88e..24110fe 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -22,7 +22,14 @@
     "type": "record",
     "name": "AvroTaskletResultReport",
     "fields": [
-      {"name": "taskletId", "type": "int"},
+      {
+        "name": "taskletIds",
+        "type":
+        {
+          "type": "array",
+          "items": "int"
+        }
+      },
       {"name": "serializedOutput", "type": "bytes"}
     ]
   },
@@ -39,7 +46,14 @@
     "type": "record",
     "name": "AvroTaskletFailureReport",
     "fields": [
-      {"name": "taskletId", "type": "int"},
+      {
+        "name": "taskletIds",
+        "type":
+        {
+          "type": "array",
+          "items": "int"
+        }
+      },
       {"name": "serializedException", "type": "bytes"}
     ]
   },
@@ -59,8 +73,7 @@
       },
       {
         "name": "taskletReport",
-        "type": ["null", "AvroTaskletResultReport", 
"AvroTaskletCancelledReport", "AvroTaskletFailureReport"],
-        "default": null
+        "type": ["AvroTaskletResultReport", "AvroTaskletCancelledReport", 
"AvroTaskletFailureReport"]
       }
     ]
   },

http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
index 1ec1890..c09a02f 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
@@ -39,7 +39,9 @@ public final class TaskletCancelledReport implements 
TaskletReport {
     return TaskletReportType.TaskletCancelled;
   }
 
-  @Override
+  /**
+   * @return the taskletId of this TaskletReport.
+   */
   public int getTaskletId() {
     return taskletId;
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
index cbf6953..487a3d2 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -20,20 +20,24 @@ package org.apache.reef.vortex.common;
 
 import org.apache.reef.annotations.Unstable;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * Report of a tasklet exception.
  */
 @Unstable
 public final class TaskletFailureReport implements TaskletReport {
-  private final int taskletId;
+  private final List<Integer> taskletIds;
   private final Exception exception;
 
   /**
-   * @param taskletId of the failed tasklet.
+   * @param taskletIds of the failed tasklet(s).
    * @param exception that caused the tasklet failure.
    */
-  public TaskletFailureReport(final int taskletId, final Exception exception) {
-    this.taskletId = taskletId;
+  public TaskletFailureReport(final List<Integer> taskletIds, final Exception 
exception) {
+    this.taskletIds = Collections.unmodifiableList(new 
ArrayList<>(taskletIds));
     this.exception = exception;
   }
 
@@ -46,11 +50,12 @@ public final class TaskletFailureReport implements 
TaskletReport {
   }
 
   /**
-   * @return the id of the tasklet.
+   * Returns multiple TaskletIds if an aggregation of Tasklets fail.
+   * Returns a single TaskletId if a Tasklet fails.
+   * @return the taskletId(s) of this TaskletReport.
    */
-  @Override
-  public int getTaskletId() {
-    return taskletId;
+  public List<Integer> getTaskletIds() {
+    return taskletIds;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
index 196f597..7e083eb 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
@@ -44,9 +44,4 @@ public interface TaskletReport extends Serializable {
    * @return the type of this TaskletReport.
    */
   TaskletReportType getType();
-
-  /**
-   * @return the taskletId of this TaskletReport.
-   */
-  int getTaskletId();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index de05185..08e8d06 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -21,21 +21,24 @@ package org.apache.reef.vortex.common;
 import org.apache.reef.annotations.Unstable;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Report of a tasklet execution result.
  */
 @Unstable
 public final class TaskletResultReport<TOutput extends Serializable> 
implements TaskletReport {
-  private final int taskletId;
+  private final List<Integer> taskletIds;
   private final TOutput result;
 
   /**
-   * @param taskletId of the tasklet.
+   * @param taskletIds of the tasklets.
    * @param result of the tasklet execution.
    */
-  public TaskletResultReport(final int taskletId, final TOutput result) {
-    this.taskletId = taskletId;
+  public TaskletResultReport(final List<Integer> taskletIds, final TOutput 
result) {
+    this.taskletIds = Collections.unmodifiableList(new 
ArrayList<>(taskletIds));
     this.result = result;
   }
 
@@ -48,11 +51,12 @@ public final class TaskletResultReport<TOutput extends 
Serializable> implements
   }
 
   /**
-   * @return the id of the tasklet.
+   * Returns multiple TaskletIds if the result is from an Aggregation.
+   * Returns a single TaskletId if the result is from a single Tasklet.
+   * @return the TaskletId(s) of this TaskletReport
    */
-  @Override
-  public int getTaskletId() {
-    return taskletId;
+  public List<Integer> getTaskletIds() {
+    return taskletIds;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index 660f059..e4f747d 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -101,7 +101,7 @@ public final class VortexAvroUtils {
             .setReportType(AvroReportType.TaskletResult)
             .setTaskletReport(
                 AvroTaskletResultReport.newBuilder()
-                    .setTaskletId(taskletResultReport.getTaskletId())
+                    .setTaskletIds(taskletResultReport.getTaskletIds())
                     .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
                     .build())
             .build();
@@ -123,7 +123,7 @@ public final class VortexAvroUtils {
             .setReportType(AvroReportType.TaskletFailure)
             .setTaskletReport(
                 AvroTaskletFailureReport.newBuilder()
-                    .setTaskletId(taskletFailureReport.getTaskletId())
+                    .setTaskletIds(taskletFailureReport.getTaskletIds())
                     
.setSerializedException(ByteBuffer.wrap(serializedException))
                     .build())
             .build();
@@ -197,7 +197,7 @@ public final class VortexAvroUtils {
         // TODO[REEF-1005]: Allow custom codecs for input/output data in 
Vortex.
         final Serializable output =
             (Serializable) 
SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
-        taskletReport = new 
TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+        taskletReport = new 
TaskletResultReport<>(taskletResultReport.getTaskletIds(), output);
         break;
       case TaskletCancelled:
         final AvroTaskletCancelledReport taskletCancelledReport =
@@ -209,7 +209,7 @@ public final class VortexAvroUtils {
             (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport();
         final Exception exception =
             (Exception) 
SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
-        taskletReport = new 
TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
+        taskletReport = new 
TaskletFailureReport(taskletFailureReport.getTaskletIds(), exception);
         break;
       default:
         throw new RuntimeException("Undefined TaskletReport type");

http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index e3c24d2..993b32f 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -37,6 +37,7 @@ import org.apache.reef.wake.impl.ThreadPoolStage;
 import org.apache.reef.wake.time.event.StartTime;
 
 import javax.inject.Inject;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -160,7 +161,13 @@ final class VortexDriver {
       switch (taskletReport.getType()) {
       case TaskletResult:
         final TaskletResultReport taskletResultReport = (TaskletResultReport) 
taskletReport;
-        vortexMaster.taskletCompleted(workerId, 
taskletResultReport.getTaskletId(), taskletResultReport.getResult());
+
+        // TODO[JIRA REEF-942]: Fix when aggregation is allowed.
+        final List<Integer> resultTaskletIds = 
taskletResultReport.getTaskletIds();
+
+        assert resultTaskletIds.size() == 1;
+        vortexMaster.taskletCompleted(workerId, resultTaskletIds.get(0),
+            taskletResultReport.getResult());
         break;
       case TaskletCancelled:
         final TaskletCancelledReport taskletCancelledReport = 
(TaskletCancelledReport) taskletReport;
@@ -168,7 +175,7 @@ final class VortexDriver {
         break;
       case TaskletFailure:
         final TaskletFailureReport taskletFailureReport = 
(TaskletFailureReport) taskletReport;
-        vortexMaster.taskletErrored(workerId, 
taskletFailureReport.getTaskletId(),
+        vortexMaster.taskletErrored(workerId, 
taskletFailureReport.getTaskletIds().get(0),
             taskletFailureReport.getException());
         break;
       default:

http://git-wip-us.apache.org/repos/asf/reef/blob/4a847c5c/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 920f1a9..26d4287 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -36,6 +36,7 @@ import org.apache.reef.wake.EventHandler;
 import javax.inject.Inject;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.logging.Level;
@@ -110,7 +111,8 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
                         // Command Executor: Execute the command
                         final Serializable result = 
taskletExecutionRequest.execute();
                         final TaskletReport taskletReport =
-                            new 
TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
+                            new 
TaskletResultReport<>(Collections.singletonList(
+                                taskletExecutionRequest.getTaskletId()), 
result);
                         taskletReports.add(taskletReport);
                       } catch (final InterruptedException ex) {
                         // Assumes that user's thread follows convention that 
cancelled Futures
@@ -122,7 +124,8 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
                       } catch (Exception e) {
                         // Command Executor: Tasklet throws an exception
                         final TaskletReport taskletReport =
-                            new 
TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
+                            new TaskletFailureReport(Collections.singletonList(
+                                taskletExecutionRequest.getTaskletId()), e);
                         taskletReports.add(taskletReport);
                       }
 

Reply via email to