Repository: reef Updated Branches: refs/heads/master 7d5a14169 -> 71951320e
[REEF-1004] Define Avro schema for Vortex messages This addressed the issue by * Defining Avro schema for Vortex messages. * Adding VortexAvroUtils to serialize/deserialize messages. * Replacing SerializationUtils with Avro. * Adding TODO comments for user function, input, and output. JIRA: [REEF-1004](https://issues.apache.org/jira/browse/REEF-1004) Pull Request: Closes #677 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/71951320 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/71951320 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/71951320 Branch: refs/heads/master Commit: 71951320e12c1341f0b8c9b774636f7054455312 Parents: 7d5a141 Author: Yunseong Lee <[email protected]> Authored: Mon Nov 16 12:13:08 2015 +0800 Committer: Byung-Gon Chun <[email protected]> Committed: Mon Dec 7 10:19:40 2015 +0900 ---------------------------------------------------------------------- lang/java/reef-applications/reef-vortex/pom.xml | 42 ++++ .../src/main/avro/VortexRequest.avsc | 39 ++++ .../reef-vortex/src/main/avro/WorkerReport.avsc | 57 +++++ .../vortex/common/TaskletExecutionRequest.java | 14 ++ .../reef/vortex/common/VortexAvroUtils.java | 215 +++++++++++++++++++ .../apache/reef/vortex/driver/VortexDriver.java | 11 +- .../reef/vortex/driver/VortexRequestor.java | 4 +- .../reef/vortex/evaluator/VortexWorker.java | 48 ++--- 8 files changed, 398 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/pom.xml b/lang/java/reef-applications/reef-vortex/pom.xml index c82a856..5c11242 100644 --- a/lang/java/reef-applications/reef-vortex/pom.xml +++ b/lang/java/reef-applications/reef-vortex/pom.xml @@ -50,11 +50,53 @@ under the License. <artifactId>reef-runtime-local</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> </dependencies> <build> <plugins> <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> + <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/avro</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc new file mode 100644 index 0000000..2453761 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +[ + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroTaskletExecutionRequest", + "fields": [ + {"name": "taskletId", "type": "int"}, + {"name": "serializedUserFunction", "type": "bytes"}, + {"name": "serializedInput", "type": "bytes"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroVortexRequest", + "fields": [ + {"name": "requestType", "type": {"type": "enum", "name": "AvroRequestType", "symbols": ["ExecuteTasklet"]}}, + {"name": "taskletExecutionRequest", "type": ["null", "AvroTaskletExecutionRequest"], "default": null} + ] + } +] http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc new file mode 100644 index 0000000..19a655d --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +[ + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroTaskletResultReport", + "fields": [ + {"name": "taskletId", "type": "int"}, + {"name": "serializedOutput", "type": "bytes"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroTaskletFailureReport", + "fields": [ + {"name": "taskletId", "type": "int"}, + {"name": "serializedException", "type": "bytes"} + ] + }, + { + "namespace": "org.apache.reef.vortex.common.avro", + "type": "record", + "name": "AvroWorkerReport", + "fields": [ + { + "name": "reportType", + "type": {"type": "enum", "name": "AvroReportType", "symbols": ["TaskletResult", "TaskletFailure"]} + }, + { + "name": "taskletResult", + "type": ["null", "AvroTaskletResultReport"], "default": null + }, + { + "name": "taskletFailure", + "type": ["null", "AvroTaskletFailureReport"], "default": null + } + ] + } +] http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java index c540572..961b574 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java @@ -65,4 +65,18 @@ public final class TaskletExecutionRequest<TInput extends Serializable, TOutput public int getTaskletId() { return taskletId; } + + /** + * Get function of the tasklet. + */ + public VortexFunction getFunction() { + return userFunction; + } + + /** + * Get input of the tasklet. + */ + public TInput getInput() { + return input; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java new file mode 100644 index 0000000..c3d01de --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import org.apache.avro.io.*; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.commons.lang.SerializationUtils; +import org.apache.reef.vortex.api.VortexFunction; +import org.apache.reef.vortex.common.avro.*; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; + +/** + * Serialize and deserialize Vortex message to/from byte array. + */ +public final class VortexAvroUtils { + /** + * Serialize VortexRequest to byte array. + * @param vortexRequest Vortex request message to serialize. + * @return Serialized byte array. + */ + public static byte[] toBytes(final VortexRequest vortexRequest) { + // Convert VortexRequest message to Avro message. + final AvroVortexRequest avroVortexRequest; + switch (vortexRequest.getType()) { + case ExecuteTasklet: + final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; + // The following TODOs are sub-issues of cleaning up Serializable in Vortex (REEF-504). + // The purpose is to reduce serialization cost, which leads to bottleneck in Master. + // Temporarily those are left as TODOs, but will be addressed in separate PRs. + // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. + final byte[] serializedInput = SerializationUtils.serialize(taskletExecutionRequest.getInput()); + // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction + final byte[] serializedFunction = SerializationUtils.serialize(taskletExecutionRequest.getFunction()); + avroVortexRequest = AvroVortexRequest.newBuilder() + .setRequestType(AvroRequestType.ExecuteTasklet) + .setTaskletExecutionRequest( + AvroTaskletExecutionRequest.newBuilder() + .setTaskletId(taskletExecutionRequest.getTaskletId()) + .setSerializedInput(ByteBuffer.wrap(serializedInput)) + .setSerializedUserFunction(ByteBuffer.wrap(serializedFunction)) + .build()) + .build(); + break; + default: + throw new RuntimeException("Undefined message type"); + } + + // Serialize the Avro message to byte array. + return toBytes(avroVortexRequest, AvroVortexRequest.class); + } + + /** + * Serialize WorkerReport to byte array. + * @param workerReport Worker report message to serialize. + * @return Serialized byte array. + */ + public static byte[] toBytes(final WorkerReport workerReport) { + // Convert WorkerReport message to Avro message. + final AvroWorkerReport avroWorkerReport; + switch (workerReport.getType()) { + case TaskletResult: + final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport; + // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. + final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult()); + avroWorkerReport = AvroWorkerReport.newBuilder() + .setReportType(AvroReportType.TaskletResult) + .setTaskletResult( + AvroTaskletResultReport.newBuilder() + .setTaskletId(taskletResultReport.getTaskletId()) + .setSerializedOutput(ByteBuffer.wrap(serializedOutput)) + .build()) + .build(); + break; + case TaskletFailure: + final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport; + final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException()); + avroWorkerReport = AvroWorkerReport.newBuilder() + .setReportType(AvroReportType.TaskletFailure) + .setTaskletFailure( + AvroTaskletFailureReport.newBuilder() + .setTaskletId(taskletFailureReport.getTaskletId()) + .setSerializedException(ByteBuffer.wrap(serializedException)) + .build()) + .build(); + break; + default: + throw new RuntimeException("Undefined message type"); + } + + // Serialize the Avro message to byte array. + return toBytes(avroWorkerReport, AvroWorkerReport.class); + } + + /** + * Deserialize byte array to VortexRequest. + * @param bytes Byte array to deserialize. + * @return De-serialized VortexRequest. + */ + public static VortexRequest toVortexRequest(final byte[] bytes) { + final AvroVortexRequest avroVortexRequest = toAvroObject(bytes, AvroVortexRequest.class); + + final VortexRequest vortexRequest; + switch (avroVortexRequest.getRequestType()) { + case ExecuteTasklet: + final AvroTaskletExecutionRequest taskletExecutionRequest = avroVortexRequest.getTaskletExecutionRequest(); + // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction + final VortexFunction function = + (VortexFunction) SerializationUtils.deserialize( + taskletExecutionRequest.getSerializedUserFunction().array()); + // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. + final Serializable input = + (Serializable) SerializationUtils.deserialize( + taskletExecutionRequest.getSerializedInput().array()); + vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, input); + break; + default: + throw new RuntimeException("Undefined VortexRequest type"); + } + return vortexRequest; + } + + /** + * Deserialize byte array to WorkerReport. + * @param bytes Byte array to deserialize. + * @return De-serialized WorkerReport. + */ + public static WorkerReport toWorkerReport(final byte[] bytes) { + final WorkerReport workerReport; + final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class); + switch (avroWorkerReport.getReportType()) { + case TaskletResult: + final AvroTaskletResultReport taskletResultReport = avroWorkerReport.getTaskletResult(); + // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. + final Serializable output = + (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array()); + workerReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output); + break; + case TaskletFailure: + final AvroTaskletFailureReport taskletFailureReport = avroWorkerReport.getTaskletFailure(); + final Exception exception = + (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); + workerReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); + break; + default: + throw new RuntimeException("Undefined WorkerReport type"); + } + return workerReport; + } + + /** + * Serialize Avro object to byte array. + * @param avroObject Avro object to serialize. + * @param theClass Class of the Avro object. + * @param <T> Type of the Avro object. + * @return Serialized byte array. + */ + private static <T> byte[] toBytes(final T avroObject, final Class<T> theClass) { + final DatumWriter<T> reportWriter = new SpecificDatumWriter<>(theClass); + final byte[] theBytes; + try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + reportWriter.write(avroObject, encoder); + encoder.flush(); + out.flush(); + theBytes = out.toByteArray(); + return theBytes; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Deserialize byte array to Avro object. + * @param bytes Byte array to deserialize. + * @param theClass Class of the Avro object. + * @param <T> Type of the Avro object. + * @return Avro object de-serialized from byte array. + */ + private static <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) { + final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); + final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass); + try { + return reader.read(null, decoder); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private VortexAvroUtils() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index 977302f..352cdbd 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -18,7 +18,6 @@ */ package org.apache.reef.vortex.driver; -import org.apache.commons.lang.SerializationUtils; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.evaluator.*; import org.apache.reef.driver.task.RunningTask; @@ -31,6 +30,7 @@ import org.apache.reef.tang.annotations.Unit; import org.apache.reef.vortex.api.VortexStart; import org.apache.reef.vortex.common.TaskletFailureReport; import org.apache.reef.vortex.common.TaskletResultReport; +import org.apache.reef.vortex.common.VortexAvroUtils; import org.apache.reef.vortex.common.WorkerReport; import org.apache.reef.vortex.evaluator.VortexWorker; import org.apache.reef.wake.EStage; @@ -154,15 +154,16 @@ final class VortexDriver { @Override public void onNext(final TaskMessage taskMessage) { final String workerId = taskMessage.getId(); - final WorkerReport workerReport= (WorkerReport)SerializationUtils.deserialize(taskMessage.get()); + final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get()); switch (workerReport.getType()) { case TaskletResult: - final TaskletResultReport taskletResultReport = (TaskletResultReport)workerReport; + final TaskletResultReport taskletResultReport = (TaskletResultReport) workerReport; vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult()); break; case TaskletFailure: - final TaskletFailureReport taskletFailureReport = (TaskletFailureReport)workerReport; - vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), taskletFailureReport.getException()); + final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) workerReport; + vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), + taskletFailureReport.getException()); break; default: throw new RuntimeException("Unknown Report"); http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java index 2c02d6e..b94b5b0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java @@ -18,9 +18,9 @@ */ package org.apache.reef.vortex.driver; -import org.apache.commons.lang.SerializationUtils; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.vortex.common.VortexAvroUtils; import org.apache.reef.vortex.common.VortexRequest; import javax.inject.Inject; @@ -43,7 +43,7 @@ class VortexRequestor { @Override public void run() { // Possible race condition with VortexWorkerManager#terminate is addressed by the global lock in VortexMaster - reefTask.send(SerializationUtils.serialize(vortexRequest)); + reefTask.send(VortexAvroUtils.toBytes(vortexRequest)); } }); } http://git-wip-us.apache.org/repos/asf/reef/blob/71951320/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 6a2821e..e65830c 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -18,7 +18,6 @@ */ package org.apache.reef.vortex.evaluator; -import org.apache.commons.lang.SerializationUtils; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.TaskSide; import org.apache.reef.tang.annotations.Parameter; @@ -57,7 +56,7 @@ public final class VortexWorker implements Task, TaskMessageSource { @Inject private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager, - @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) { + @Parameter(VortexWorkerConf.NumOfThreads.class) final int numOfThreads) { this.heartBeatTriggerManager = heartBeatTriggerManager; this.numOfThreads = numOfThreads; } @@ -88,33 +87,32 @@ public final class VortexWorker implements Task, TaskMessageSource { @Override public void run() { // Command Executor: Deserialize the command - final VortexRequest vortexRequest = (VortexRequest) SerializationUtils.deserialize(message); + final VortexRequest vortexRequest = VortexAvroUtils.toVortexRequest(message); switch (vortexRequest.getType()) { - case ExecuteTasklet: - final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; - try { - // Command Executor: Execute the command - final Serializable result = taskletExecutionRequest.execute(); - - // Command Executor: Tasklet successfully returns result - final WorkerReport report = - new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); - workerReports.addLast(SerializationUtils.serialize(report)); - } catch (Exception e) { - // Command Executor: Tasklet throws an exception - final WorkerReport report = - new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); - workerReports.addLast(SerializationUtils.serialize(report)); - } - - heartBeatTriggerManager.triggerHeartBeat(); - break; - default: - throw new RuntimeException("Unknown Command"); + case ExecuteTasklet: + final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; + try { + // Command Executor: Execute the command + final Serializable result = taskletExecutionRequest.execute(); + + // Command Executor: Tasklet successfully returns result + final WorkerReport report = + new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); + workerReports.addLast(VortexAvroUtils.toBytes(report)); + } catch (Exception e) { + // Command Executor: Tasklet throws an exception + final WorkerReport report = + new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e); + workerReports.addLast(VortexAvroUtils.toBytes(report)); + } + + heartBeatTriggerManager.triggerHeartBeat(); + break; + default: + throw new RuntimeException("Unknown Command"); } } }); - } } });
