Repository: reef Updated Branches: refs/heads/master ccecdd577 -> bc5e14dcc
[REEF-1077] Allow VortexWorker reports to report about more than a single Tasklet This addressed the issue by * Change the original WorkerReport to TaskletReport. * Allow WorkerReports to hold a List of TaskletReports. JIRA: [REEF-1077](https://issues.apache.org/jira/browse/REEF-1077) Pull Request: Closes #729 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bc5e14dc Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bc5e14dc Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bc5e14dc Branch: refs/heads/master Commit: bc5e14dcc648b489d61bc09155fd523ecd6d276a Parents: ccecdd5 Author: Andrew Chung <[email protected]> Authored: Sun Dec 13 16:49:02 2015 -0800 Committer: Byung-Gon Chun <[email protected]> Committed: Tue Dec 15 12:16:29 2015 +0900 ---------------------------------------------------------------------- .../reef-vortex/src/main/avro/WorkerReport.avsc | 23 ++- .../vortex/common/TaskletCancelledReport.java | 6 +- .../vortex/common/TaskletFailureReport.java | 8 +- .../reef/vortex/common/TaskletReport.java | 52 +++++++ .../reef/vortex/common/TaskletResultReport.java | 8 +- .../reef/vortex/common/VortexAvroUtils.java | 150 +++++++++++-------- .../apache/reef/vortex/common/WorkerReport.java | 34 +++-- .../apache/reef/vortex/driver/VortexDriver.java | 19 +-- .../reef/vortex/evaluator/VortexWorker.java | 20 ++- 9 files changed, 211 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc index 11adb12..182f88e 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc @@ -46,14 +46,16 @@ { "namespace": "org.apache.reef.vortex.common.avro", "type": "record", - "name": "AvroWorkerReport", + "name": "AvroTaskletReport", "fields": [ { "name": "reportType", - "type": { + "type": + { "type": "enum", "name": "AvroReportType", - "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]} + "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"] + } }, { "name": "taskletReport", @@ -61,5 +63,20 @@ "default": null } ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroWorkerReport", + "fields": [ + { + "name": "taskletReports", + "type": + { + "type": "array", + "items": "AvroTaskletReport" + } + } + ] } ] http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java index b51219b..1ec1890 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java @@ -24,7 +24,7 @@ import org.apache.reef.annotations.Unstable; * The report of a cancelled Tasklet. */ @Unstable -public final class TaskletCancelledReport implements WorkerReport { +public final class TaskletCancelledReport implements TaskletReport { private int taskletId; /** @@ -35,8 +35,8 @@ public final class TaskletCancelledReport implements WorkerReport { } @Override - public WorkerReportType getType() { - return WorkerReportType.TaskletCancelled; + public TaskletReportType getType() { + return TaskletReportType.TaskletCancelled; } @Override http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java index 96df2e1..cbf6953 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java @@ -24,7 +24,7 @@ import org.apache.reef.annotations.Unstable; * Report of a tasklet exception. */ @Unstable -public final class TaskletFailureReport implements WorkerReport { +public final class TaskletFailureReport implements TaskletReport { private final int taskletId; private final Exception exception; @@ -38,11 +38,11 @@ public final class TaskletFailureReport implements WorkerReport { } /** - * @return the type of this WorkerReport. + * @return the type of this TaskletReport. */ @Override - public WorkerReportType getType() { - return WorkerReportType.TaskletFailure; + public TaskletReportType getType() { + return TaskletReportType.TaskletFailure; } /** http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java new file mode 100644 index 0000000..196f597 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +import java.io.Serializable; + +/** + * The interface for a status report from the {@link org.apache.reef.vortex.evaluator.VortexWorker}. + */ +@Unstable +@Private +@DriverSide +public interface TaskletReport extends Serializable { + /** + * Type of TaskletReport. + */ + enum TaskletReportType { + TaskletResult, + TaskletCancelled, + TaskletFailure + } + + /** + * @return the type of this TaskletReport. + */ + TaskletReportType getType(); + + /** + * @return the taskletId of this TaskletReport. + */ + int getTaskletId(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java index cd3a597..de05185 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java @@ -26,7 +26,7 @@ import java.io.Serializable; * Report of a tasklet execution result. */ @Unstable -public final class TaskletResultReport<TOutput extends Serializable> implements WorkerReport { +public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport { private final int taskletId; private final TOutput result; @@ -40,11 +40,11 @@ public final class TaskletResultReport<TOutput extends Serializable> implements } /** - * @return the type of this WorkerReport. + * @return the type of this TaskletReport. */ @Override - public WorkerReportType getType() { - return WorkerReportType.TaskletResult; + public TaskletReportType getType() { + return TaskletReportType.TaskletResult; } /** http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java index 4ab4cb5..660f059 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -29,6 +29,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; /** * Serialize and deserialize Vortex message to/from byte array. @@ -86,48 +88,58 @@ public final class VortexAvroUtils { * @return Serialized byte array. */ public static byte[] toBytes(final WorkerReport workerReport) { - // Convert WorkerReport message to Avro message. - final AvroWorkerReport avroWorkerReport; - switch (workerReport.getType()) { - case TaskletResult: - final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport; - // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. - final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult()); - avroWorkerReport = AvroWorkerReport.newBuilder() - .setReportType(AvroReportType.TaskletResult) - .setTaskletReport( - AvroTaskletResultReport.newBuilder() - .setTaskletId(taskletResultReport.getTaskletId()) - .setSerializedOutput(ByteBuffer.wrap(serializedOutput)) - .build()) - .build(); - break; - case TaskletCancelled: - final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) workerReport; - avroWorkerReport = AvroWorkerReport.newBuilder() - .setReportType(AvroReportType.TaskletCancelled) - .setTaskletReport( - AvroTaskletCancelledReport.newBuilder() - .setTaskletId(workerReport.getTaskletId()) - .build()) - .build(); - break; - case TaskletFailure: - final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport; - final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException()); - avroWorkerReport = AvroWorkerReport.newBuilder() - .setReportType(AvroReportType.TaskletFailure) - .setTaskletReport( - AvroTaskletFailureReport.newBuilder() - .setTaskletId(taskletFailureReport.getTaskletId()) - .setSerializedException(ByteBuffer.wrap(serializedException)) - .build()) - .build(); - break; - default: - throw new RuntimeException("Undefined message type"); + final List<AvroTaskletReport> workerTaskletReports = new ArrayList<>(); + + for (final TaskletReport taskletReport : workerReport.getTaskletReports()) { + final AvroTaskletReport avroTaskletReport; + switch (taskletReport.getType()) { + case TaskletResult: + final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; + // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. + final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult()); + avroTaskletReport = AvroTaskletReport.newBuilder() + .setReportType(AvroReportType.TaskletResult) + .setTaskletReport( + AvroTaskletResultReport.newBuilder() + .setTaskletId(taskletResultReport.getTaskletId()) + .setSerializedOutput(ByteBuffer.wrap(serializedOutput)) + .build()) + .build(); + break; + case TaskletCancelled: + final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; + avroTaskletReport = AvroTaskletReport.newBuilder() + .setReportType(AvroReportType.TaskletCancelled) + .setTaskletReport( + AvroTaskletCancelledReport.newBuilder() + .setTaskletId(taskletCancelledReport.getTaskletId()) + .build()) + .build(); + break; + case TaskletFailure: + final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport; + final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException()); + avroTaskletReport = AvroTaskletReport.newBuilder() + .setReportType(AvroReportType.TaskletFailure) + .setTaskletReport( + AvroTaskletFailureReport.newBuilder() + .setTaskletId(taskletFailureReport.getTaskletId()) + .setSerializedException(ByteBuffer.wrap(serializedException)) + .build()) + .build(); + break; + default: + throw new RuntimeException("Undefined message type"); + } + + workerTaskletReports.add(avroTaskletReport); } + // Convert WorkerReport message to Avro message. + final AvroWorkerReport avroWorkerReport = AvroWorkerReport.newBuilder() + .setTaskletReports(workerTaskletReports) + .build(); + // Serialize the Avro message to byte array. return toBytes(avroWorkerReport, AvroWorkerReport.class); } @@ -172,33 +184,41 @@ public final class VortexAvroUtils { * @return De-serialized WorkerReport. */ public static WorkerReport toWorkerReport(final byte[] bytes) { - final WorkerReport workerReport; final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class); - switch (avroWorkerReport.getReportType()) { - case TaskletResult: - final AvroTaskletResultReport taskletResultReport = - (AvroTaskletResultReport)avroWorkerReport.getTaskletReport(); - // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. - final Serializable output = - (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array()); - workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output); - break; - case TaskletCancelled: - final AvroTaskletCancelledReport taskletCancelledReport = - (AvroTaskletCancelledReport)avroWorkerReport.getTaskletReport(); - workerReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId()); - break; - case TaskletFailure: - final AvroTaskletFailureReport taskletFailureReport = - (AvroTaskletFailureReport)avroWorkerReport.getTaskletReport(); - final Exception exception = - (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); - workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); - break; - default: - throw new RuntimeException("Undefined WorkerReport type"); + final List<TaskletReport> workerTaskletReports = new ArrayList<>(); + + for (final AvroTaskletReport avroTaskletReport : avroWorkerReport.getTaskletReports()) { + final TaskletReport taskletReport; + + switch (avroTaskletReport.getReportType()) { + case TaskletResult: + final AvroTaskletResultReport taskletResultReport = + (AvroTaskletResultReport)avroTaskletReport.getTaskletReport(); + // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. + final Serializable output = + (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array()); + taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output); + break; + case TaskletCancelled: + final AvroTaskletCancelledReport taskletCancelledReport = + (AvroTaskletCancelledReport)avroTaskletReport.getTaskletReport(); + taskletReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId()); + break; + case TaskletFailure: + final AvroTaskletFailureReport taskletFailureReport = + (AvroTaskletFailureReport)avroTaskletReport.getTaskletReport(); + final Exception exception = + (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); + taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); + break; + default: + throw new RuntimeException("Undefined TaskletReport type"); + } + + workerTaskletReports.add(taskletReport); } - return workerReport; + + return new WorkerReport(workerTaskletReports); } /** http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java index 192299a..7c88a44 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java @@ -19,30 +19,34 @@ package org.apache.reef.vortex.common; import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** * Worker-to-Master protocol. + * A report of Tasklet statuses sent form the {@link org.apache.reef.vortex.evaluator.VortexWorker} + * to the {@link org.apache.reef.vortex.driver.VortexMaster}. */ +@Private @Unstable -public interface WorkerReport extends Serializable { - /** - * Type of WorkerReport. - */ - enum WorkerReportType { - TaskletResult, - TaskletCancelled, - TaskletFailure - } +@DriverSide +public final class WorkerReport implements Serializable { + private ArrayList<TaskletReport> taskletReports; - /** - * @return the type of this WorkerReport. - */ - WorkerReportType getType(); + public WorkerReport(final Collection<TaskletReport> taskletReports) { + this.taskletReports = new ArrayList<>(taskletReports); + } /** - * @return the taskletId of this WorkerReport. + * @return the list of Tasklet reports. */ - int getTaskletId(); + public List<TaskletReport> getTaskletReports() { + return Collections.unmodifiableList(taskletReports); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index 75674cc..e3c24d2 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -28,11 +28,7 @@ import org.apache.reef.tang.Configurations; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.annotations.Unit; import org.apache.reef.vortex.api.VortexStart; -import org.apache.reef.vortex.common.TaskletCancelledReport; -import org.apache.reef.vortex.common.TaskletFailureReport; -import org.apache.reef.vortex.common.TaskletResultReport; -import org.apache.reef.vortex.common.VortexAvroUtils; -import org.apache.reef.vortex.common.WorkerReport; +import org.apache.reef.vortex.common.*; import org.apache.reef.vortex.evaluator.VortexWorker; import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; @@ -156,17 +152,22 @@ final class VortexDriver { public void onNext(final TaskMessage taskMessage) { final String workerId = taskMessage.getId(); final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get()); - switch (workerReport.getType()) { + + // TODO[JIRA REEF-942]: Fix when aggregation is allowed. + assert workerReport.getTaskletReports().size() == 1; + + final TaskletReport taskletReport = workerReport.getTaskletReports().get(0); + switch (taskletReport.getType()) { case TaskletResult: - final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport; + final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult()); break; case TaskletCancelled: - final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport)workerReport; + final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId()); break; case TaskletFailure: - final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport; + final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport; vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), taskletFailureReport.getException()); break; http://git-wip-us.apache.org/repos/asf/reef/blob/bc5e14dc/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 10d23a8..920f1a9 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -35,6 +35,8 @@ import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -101,25 +103,31 @@ public final class VortexWorker implements Task, TaskMessageSource { @Override public void run() { final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; + final WorkerReport workerReport; + final List<TaskletReport> taskletReports = new ArrayList<>(); + try { // Command Executor: Execute the command final Serializable result = taskletExecutionRequest.execute(); - final WorkerReport report = + final TaskletReport taskletReport = new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); - workerReports.addLast(VortexAvroUtils.toBytes(report)); + taskletReports.add(taskletReport); } catch (final InterruptedException ex) { // Assumes that user's thread follows convention that cancelled Futures // should throw InterruptedException. - final WorkerReport report = new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); + final TaskletReport taskletReport = + new TaskletCancelledReport(taskletExecutionRequest.getTaskletId()); LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", vortexRequest.getTaskletId()); - workerReports.addLast(VortexAvroUtils.toBytes(report)); + taskletReports.add(taskletReport); } catch (Exception e) { // Command Executor: Tasklet throws an exception - final WorkerReport report = + final TaskletReport taskletReport = new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); - workerReports.addLast(VortexAvroUtils.toBytes(report)); + taskletReports.add(taskletReport); } + workerReport = new WorkerReport(taskletReports); + workerReports.addLast(VortexAvroUtils.toBytes(workerReport)); try { latch.await(); } catch (final InterruptedException e) {
