[REEF-504] Clean up Serializable in Vortex  This addressed the issue by   * Using Kryo as the one and only serializer in Vortex * Making classes Kryo-compatible   * Removing Avro-related code   * Refactoring protocol names  JIRA:   [REEF-504](https://issues.apache.org/jira/browse/REEF-504)
Pull Request: Closes #931 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/48bde0c0 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/48bde0c0 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/48bde0c0 Branch: refs/heads/master Commit: 48bde0c0ec0ad4ffbe88b65a11d9fccb453eff6e Parents: 120327f Author: John Yang <[email protected]> Authored: Sun Apr 3 16:50:12 2016 +0900 Committer: Andrew Chung <[email protected]> Committed: Wed Apr 13 10:08:02 2016 -0700 ---------------------------------------------------------------------- lang/java/reef-applications/reef-vortex/pom.xml | 44 +-- .../src/main/avro/VortexRequest.avsc | 80 ---- .../reef-vortex/src/main/avro/WorkerReport.avsc | 125 ------- .../vortex/api/VortexAggregateFunction.java | 18 +- .../reef/vortex/api/VortexAggregateFuture.java | 16 +- .../reef/vortex/api/VortexAggregatePolicy.java | 15 +- .../apache/reef/vortex/api/VortexFunction.java | 25 +- .../apache/reef/vortex/api/VortexFuture.java | 21 +- .../common/AggregateFunctionRepository.java | 81 ---- .../apache/reef/vortex/common/KryoUtils.java | 75 ++++ .../TaskletAggregateExecutionRequest.java | 69 ---- .../common/TaskletAggregationFailureReport.java | 65 ---- .../common/TaskletAggregationRequest.java | 105 ------ .../common/TaskletAggregationResultReport.java | 70 ---- .../common/TaskletCancellationRequest.java | 45 --- .../vortex/common/TaskletCancelledReport.java | 48 --- .../vortex/common/TaskletExecutionRequest.java | 86 ----- .../vortex/common/TaskletFailureReport.java | 65 ---- .../reef/vortex/common/TaskletReport.java | 47 --- .../reef/vortex/common/TaskletResultReport.java | 66 ---- .../reef/vortex/common/VortexAvroUtils.java | 374 ------------------- .../vortex/common/VortexFutureDelegate.java | 61 --- .../reef/vortex/common/VortexRequest.java | 42 --- .../apache/reef/vortex/common/WorkerReport.java | 51 --- .../apache/reef/vortex/common/package-info.java | 2 +- .../driver/AggregateFunctionRepository.java | 69 ++++ .../reef/vortex/driver/DefaultVortexMaster.java | 41 +- .../reef/vortex/driver/RunningWorkers.java | 1 - .../org/apache/reef/vortex/driver/Tasklet.java | 1 - .../apache/reef/vortex/driver/VortexDriver.java | 14 +- .../vortex/driver/VortexFutureDelegate.java | 60 +++ .../apache/reef/vortex/driver/VortexMaster.java | 4 +- .../reef/vortex/driver/VortexRequestor.java | 22 +- .../reef/vortex/driver/VortexWorkerManager.java | 8 +- .../vortex/evaluator/AggregateContainer.java | 32 +- .../reef/vortex/evaluator/VortexWorker.java | 74 ++-- .../vortex/examples/addone/AddOneFunction.java | 13 - .../examples/hello/HelloVortexFunction.java | 13 - .../vortex/examples/matmul/MatMulException.java | 5 + .../vortex/examples/matmul/MatMulFunction.java | 14 - .../vortex/examples/matmul/MatMulInput.java | 12 +- .../examples/matmul/MatMulInputCodec.java | 100 ----- .../vortex/examples/matmul/MatMulOutput.java | 10 +- .../examples/matmul/MatMulOutputCodec.java | 97 ----- .../reef/vortex/examples/matmul/RowMatrix.java | 8 +- .../sumones/AdditionAggregateFunction.java | 9 - .../examples/sumones/IdentityFunction.java | 14 - .../mastertoworker/MasterToWorkerRequest.java | 42 +++ .../TaskletAggregateExecutionRequest.java | 75 ++++ .../TaskletAggregationRequest.java | 106 ++++++ .../TaskletCancellationRequest.java | 51 +++ .../mastertoworker/TaskletExecutionRequest.java | 88 +++++ .../protocol/mastertoworker/package-info.java | 22 ++ .../TaskletAggregationFailureReport.java | 71 ++++ .../TaskletAggregationResultReport.java | 76 ++++ .../workertomaster/TaskletCancelledReport.java | 54 +++ .../workertomaster/TaskletFailureReport.java | 71 ++++ .../workertomaster/TaskletResultReport.java | 72 ++++ .../workertomaster/WorkerToMasterReport.java | 47 +++ .../workertomaster/WorkerToMasterReports.java | 55 +++ .../protocol/workertomaster/package-info.java | 22 ++ .../org/apache/reef/vortex/util/VoidCodec.java | 37 -- .../apache/reef/vortex/util/package-info.java | 22 -- .../vortex/driver/DefaultVortexMasterTest.java | 30 +- .../reef/vortex/driver/RunningWorkersTest.java | 1 - .../vortex/driver/SchedulingPolicyTest.java | 4 +- .../org/apache/reef/vortex/driver/TestUtil.java | 52 +-- .../applications/vortex/VortexTestSuite.java | 4 +- .../vortex/addone/AddOneFunction.java | 13 - .../InfiniteLoopWithCancellationFunction.java | 13 - .../TaskletCancellationRequestTest.java | 73 ++++ .../cancellation/TaskletCancellationTest.java | 73 ---- .../vortex/exception/ExceptionFunction.java | 13 - pom.xml | 2 + 74 files changed, 1315 insertions(+), 2191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/pom.xml b/lang/java/reef-applications/reef-vortex/pom.xml index b140127..0662544 100644 --- a/lang/java/reef-applications/reef-vortex/pom.xml +++ b/lang/java/reef-applications/reef-vortex/pom.xml @@ -51,48 +51,20 @@ under the License. <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + <version>${kryo.version}</version> + </dependency> + <dependency> + <groupId>de.javakaffee</groupId> + <artifactId>kryo-serializers</artifactId> + <version>${kryo-serializers.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> - <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>target/generated-sources/avro</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc deleted file mode 100644 index c0ab97b..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -[ - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletAggregateExecutionRequest", - "fields": [ - {"name": "taskletId", "type": "int"}, - {"name": "aggregateFunctionId", "type": "int"}, - {"name": "serializedInput", "type": "bytes"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletAggregationRequest", - "fields": [ - {"name": "aggregateFunctionId", "type": "int"}, - {"name": "serializedUserFunction", "type": "bytes"}, - {"name": "serializedAggregateFunction", "type": "bytes"}, - {"name": "serializedPolicy", "type": "bytes"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletExecutionRequest", - "fields": [ - {"name": "taskletId", "type": "int"}, - {"name": "serializedUserFunction", "type": "bytes"}, - {"name": "serializedInput", "type": "bytes"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletCancellationRequest", - "fields": [{"name": "taskletId", "type": "int"}] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroVortexRequest", - "fields": [ - { - "name": "requestType", - "type": {"type": "enum", "name": "AvroRequestType", - "symbols": ["ExecuteTasklet", "CancelTasklet", "Aggregate", "AggregateExecute"]} - }, - { - "name": "taskletRequest", - "type": [ - "null", - "AvroTaskletAggregateExecutionRequest", - "AvroTaskletAggregationRequest", - "AvroTaskletExecutionRequest", - "AvroTaskletCancellationRequest" - ], - "default": null - } - ] - } -] http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc deleted file mode 100644 index 5d7395d..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -[ - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletResultReport", - "fields": [ - {"name": "taskletId", "type": "int"}, - {"name": "serializedOutput", "type": "bytes"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletAggregationResultReport", - "fields": [ - { - "name": "taskletIds", - "type": - { - "type": "array", - "items": "int" - } - }, - {"name": "serializedOutput", "type": "bytes"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletCancelledReport", - "fields": [ - {"name": "taskletId", "type": "int"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletFailureReport", - "fields": [ - {"name": "taskletId", "type": "int"}, - {"name": "serializedException", "type": "bytes"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletAggregationFailureReport", - "fields": [ - { - "name": "taskletIds", - "type": - { - "type": "array", - "items": "int" - } - }, - {"name": "serializedException", "type": "bytes"} - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroTaskletReport", - "fields": [ - { - "name": "reportType", - "type": - { - "type": "enum", - "name": "AvroReportType", - "symbols": [ - "TaskletResult", - "TaskletAggregationResult", - "TaskletCancelled", - "TaskletFailure", - "TaskletAggregationFailure" - ] - } - }, - { - "name": "taskletReport", - "type": [ - "AvroTaskletResultReport", - "AvroTaskletAggregationResultReport", - "AvroTaskletCancelledReport", - "AvroTaskletFailureReport", - "AvroTaskletAggregationFailureReport" - ] - } - ] - }, - { - "namespace": "org.apache.reef.vortex.common.avro", - "type": "record", - "name": "AvroWorkerReport", - "fields": [ - { - "name": "taskletReports", - "type": - { - "type": "array", - "items": "AvroTaskletReport" - } - } - ] - } -] http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java index d7254d5..b96d19c 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java @@ -21,22 +21,16 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; -import org.apache.reef.io.serialization.Codec; - -import java.io.Serializable; import java.util.List; /** * Typed user function for Local Aggregation. Implement your functions using this interface. - * TODO[REEF-504]: Clean up Serializable in Vortex. - * TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction. - * * @param <TOutput> output type of the aggregation function and the functions to-be-aggregated. */ @Public @ClientSide @Unstable -public interface VortexAggregateFunction<TOutput> extends Serializable { +public interface VortexAggregateFunction<TOutput> { /** * Runs a custom local aggregation function on Tasklets assigned to a VortexWorker. @@ -45,14 +39,4 @@ public interface VortexAggregateFunction<TOutput> extends Serializable { * @throws Exception */ TOutput call(final List<TOutput> taskletOutputs) throws VortexAggregateException; - - /** - * Users must define codec for the AggregationOutput. - * {@link org.apache.reef.vortex.util.VoidCodec} can be used if the aggregation output is - * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can be used for ({@link Serializable} - * aggregation output. - * Custom aggregation output Codec can also be supplied. - * @return Codec used to serialize/deserialize the output. - */ - Codec<TOutput> getOutputCodec(); } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java index d156a13..0a3aa7b 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java @@ -25,8 +25,7 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.annotations.audience.Public; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.vortex.common.VortexFutureDelegate; +import org.apache.reef.vortex.driver.VortexFutureDelegate; import javax.annotation.concurrent.NotThreadSafe; import java.util.*; @@ -41,9 +40,8 @@ import java.util.concurrent.*; @ClientSide @NotThreadSafe @Unstable -public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate { +public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutureDelegate<TOutput> { private final Executor executor; - private final Codec<TOutput> aggOutputCodec; private final BlockingQueue<Pair<List<Integer>, AggregateResult>> resultQueue; private final ConcurrentMap<Integer, TInput> taskletIdInputMap; private final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler; @@ -51,12 +49,10 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur @Private public VortexAggregateFuture(final Executor executor, final Map<Integer, TInput> taskletIdInputMap, - final Codec<TOutput> aggOutputCodec, final FutureCallback<AggregateResult<TInput, TOutput>> callbackHandler) { this.executor = executor; this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap); this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size()); - this.aggOutputCodec = aggOutputCodec; this.callbackHandler = callbackHandler; } @@ -115,10 +111,8 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur */ @Private @Override - public void completed(final int taskletId, final byte[] serializedResult) { + public void completed(final int taskletId, final TOutput result) { try { - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - final TOutput result = aggOutputCodec.decode(serializedResult); completedTasklets(result, Collections.singletonList(taskletId)); } catch (final InterruptedException e) { throw new RuntimeException(e); @@ -130,10 +124,8 @@ public final class VortexAggregateFuture<TInput, TOutput> implements VortexFutur */ @Private @Override - public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) { + public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) { try { - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - final TOutput result = aggOutputCodec.decode(serializedResult); completedTasklets(result, taskletIds); } catch (final InterruptedException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java index ef10eb4..d13feb1 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java @@ -24,20 +24,23 @@ import org.apache.reef.annotations.audience.Public; import org.apache.reef.util.Builder; import org.apache.reef.util.Optional; -import java.io.Serializable; - /** * The policy for local aggregation on the {@link org.apache.reef.vortex.evaluator.VortexWorker}s. * The Aggregation function will be triggered on the individual {@link VortexFunction} results on * an "OR" basis of what is specified by the policy. - * TODO[REEF-504]: Clean up Serializable in Vortex. */ @ClientSide @Public @Unstable -public final class VortexAggregatePolicy implements Serializable { - private final Optional<Integer> count; - private final int periodMilliseconds; +public final class VortexAggregatePolicy { + private Optional<Integer> count; + private int periodMilliseconds; + + /** + * No-arg constructor required for Kryo to serialize/deserialize. + */ + VortexAggregatePolicy() { + } private VortexAggregatePolicy(final int periodMilliseconds, final Optional<Integer> count) { this.periodMilliseconds = periodMilliseconds; http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java index 3efe4c5..f53b1c1 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java @@ -19,20 +19,17 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; -import org.apache.reef.io.serialization.Codec; - -import java.io.Serializable; /** * Typed user function. Implement your functions using this interface. - * TODO[REEF-504]: Clean up Serializable in Vortex. - * TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction. + * Note that Kryo should be able to serialize/deserialize your function and input. + * Please refer to Kryo project's GitHub repository for how to make Kryo-compatible objects. * * @param <TInput> input type * @param <TOutput> output type */ @Unstable -public interface VortexFunction<TInput, TOutput> extends Serializable { +public interface VortexFunction<TInput, TOutput> { /** * @param input of the function * @return output of the function @@ -41,20 +38,4 @@ public interface VortexFunction<TInput, TOutput> extends Serializable { * For example if threads are spawned here, shut them down before throwing an exception */ TOutput call(TInput input) throws Exception; - - /** - * Users must define codec for the input. {@link org.apache.reef.vortex.util.VoidCodec} can be used if the input is - * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can be used for ({@link Serializable} input. - * {@link org.apache.reef.vortex.examples.matmul.MatMulInputCodec} is an example of codec for the custom input. - * @return Codec used to serialize/deserialize the input. - */ - Codec<TInput> getInputCodec(); - - /** - * Users must define codec for the output. {@link org.apache.reef.vortex.util.VoidCodec} can be used if the output is - * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can be used for ({@link Serializable} output. - * {@link org.apache.reef.vortex.examples.matmul.MatMulOutputCodec} is an example of codec for the custom output. - * @return Codec used to serialize/deserialize the output. - */ - Codec<TOutput> getOutputCodec(); } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java index 3cb6a9b..446011c 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java @@ -20,9 +20,8 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.io.serialization.Codec; import org.apache.reef.util.Optional; -import org.apache.reef.vortex.common.VortexFutureDelegate; +import org.apache.reef.vortex.driver.VortexFutureDelegate; import org.apache.reef.vortex.driver.VortexMaster; import java.util.List; @@ -35,8 +34,7 @@ import java.util.logging.Logger; * The interface between user code and submitted task. */ @Unstable -public final class VortexFuture<TOutput> - implements Future<TOutput>, VortexFutureDelegate { +public final class VortexFuture<TOutput> implements Future<TOutput>, VortexFutureDelegate<TOutput> { private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName()); // userResult starts out as null. If not null => variable is set and tasklet returned. @@ -49,15 +47,13 @@ public final class VortexFuture<TOutput> private final Executor executor; private final VortexMaster vortexMaster; private final int taskletId; - private final Codec<TOutput> outputCodec; /** * Creates a {@link VortexFuture}. */ @Private - public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId, - final Codec<TOutput> outputCodec) { - this(executor, vortexMaster, taskletId, outputCodec, null); + public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId) { + this(executor, vortexMaster, taskletId, null); } /** @@ -67,12 +63,10 @@ public final class VortexFuture<TOutput> public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId, - final Codec<TOutput> outputCodec, final FutureCallback<TOutput> callbackHandler) { this.executor = executor; this.vortexMaster = vortexMaster; this.taskletId = taskletId; - this.outputCodec = outputCodec; this.callbackHandler = callbackHandler; } @@ -187,11 +181,8 @@ public final class VortexFuture<TOutput> */ @Private @Override - public void completed(final int pTaskletId, final byte[] serializedResult) { + public void completed(final int pTaskletId, final TOutput result) { assert taskletId == pTaskletId; - - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - final TOutput result = outputCodec.decode(serializedResult); this.userResult = Optional.ofNullable(result); if (callbackHandler != null) { executor.execute(new Runnable() { @@ -209,7 +200,7 @@ public final class VortexFuture<TOutput> */ @Private @Override - public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) { + public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) { throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated."); } http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java deleted file mode 100644 index c45dcde..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Triple; -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.vortex.api.VortexAggregateFunction; -import org.apache.reef.vortex.api.VortexAggregatePolicy; -import org.apache.reef.vortex.api.VortexFunction; - -import javax.annotation.concurrent.ThreadSafe; -import javax.inject.Inject; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * A repository for {@link VortexAggregateFunction} and its associated {@link VortexFunction}, - * used to pass functions between VortexMaster and RunningWorkers, as well as used to cache functions - * for VortexWorkers on AggregateRequests and AggregateExecutionRequests. - */ -@ThreadSafe -@Unstable -@Private -public final class AggregateFunctionRepository { - private final ConcurrentMap<Integer, Triple<VortexAggregateFunction, VortexFunction, VortexAggregatePolicy>> - aggregateFunctionMap = new ConcurrentHashMap<>(); - - @Inject - private AggregateFunctionRepository() { - } - - /** - * Associates an aggregate function ID with a {@link VortexAggregateFunction} and a {@link VortexFunction}. - */ - public Triple<VortexAggregateFunction, VortexFunction, VortexAggregatePolicy> put( - final int aggregateFunctionId, - final VortexAggregateFunction aggregateFunction, - final VortexFunction function, - final VortexAggregatePolicy policy) { - return aggregateFunctionMap.put(aggregateFunctionId, new ImmutableTriple<>(aggregateFunction, function, policy)); - } - - /** - * Gets the {@link VortexAggregateFunction} associated with the aggregate function ID. - */ - public VortexAggregateFunction getAggregateFunction(final int aggregateFunctionId) { - return aggregateFunctionMap.get(aggregateFunctionId).getLeft(); - } - - /** - * Gets the {@link VortexFunction} associated with the aggregate function ID. - */ - public VortexFunction getFunction(final int aggregateFunctionId) { - return aggregateFunctionMap.get(aggregateFunctionId).getMiddle(); - } - - /** - * Gets the {@link VortexAggregatePolicy} associated with the aggregate function ID. - */ - public VortexAggregatePolicy getPolicy(final int aggregateFunctionId) { - return aggregateFunctionMap.get(aggregateFunctionId).getRight(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java new file mode 100644 index 0000000..5fe3f17 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.common; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.pool.KryoFactory; +import com.esotericsoftware.kryo.pool.KryoPool; +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; + +import javax.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +/** + * The one and only serializer for the Vortex protocol. + */ +@Private +@Unstable +public final class KryoUtils { + /** + * For reducing Kryo object instantiation cost. + */ + private final KryoPool kryoPool; + + @Inject + private KryoUtils() { + final KryoFactory factory = new KryoFactory() { + @Override + public Kryo create() { + final Kryo kryo = new Kryo(); + UnmodifiableCollectionsSerializer.registerSerializers(kryo); // Required to serialize/deserialize Throwable + return kryo; + } + }; + kryoPool = new KryoPool.Builder(factory).softReferences().build(); + } + + public byte[] serialize(final Object object) { + try (final Output out = new Output(new ByteArrayOutputStream())) { + final Kryo kryo = kryoPool.borrow(); + kryo.writeClassAndObject(out, object); + kryoPool.release(kryo); + return out.toBytes(); + } + } + + public Object deserialize(final byte[] bytes) { + try (final Input input = new Input(new ByteArrayInputStream(bytes))) { + final Kryo kryo = kryoPool.borrow(); + final Object object = kryo.readClassAndObject(input); + kryoPool.release(kryo); + return object; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java deleted file mode 100644 index db850fc..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -/** - * A request from the Vortex Driver to run an aggregate-able function. - */ -@Unstable -@Private -@DriverSide -public final class TaskletAggregateExecutionRequest<TInput> implements VortexRequest { - private final TInput input; - private final int aggregateFunctionId; - private final int taskletId; - - public TaskletAggregateExecutionRequest(final int taskletId, - final int aggregateFunctionId, - final TInput input) { - this.taskletId = taskletId; - this.input = input; - this.aggregateFunctionId = aggregateFunctionId; - } - - /** - * @return input of the request. - */ - public TInput getInput() { - return input; - } - - /** - * @return tasklet ID corresponding to the tasklet request. - */ - public int getTaskletId() { - return taskletId; - } - - /** - * @return the AggregateFunctionID of the request. - */ - public int getAggregateFunctionId() { - return aggregateFunctionId; - } - - @Override - public RequestType getType() { - return RequestType.ExecuteAggregateTasklet; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java deleted file mode 100644 index e6a4c82..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Report of a tasklet exception on aggregation. - */ -@Unstable -public final class TaskletAggregationFailureReport implements TaskletReport { - private final List<Integer> taskletIds; - private final Exception exception; - - /** - * @param taskletIds of the failed tasklet(s). - * @param exception that caused the tasklet failure. - */ - public TaskletAggregationFailureReport(final List<Integer> taskletIds, final Exception exception) { - this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); - this.exception = exception; - } - - /** - * @return the type of this TaskletReport. - */ - @Override - public TaskletReportType getType() { - return TaskletReportType.TaskletAggregationFailure; - } - - /** - * @return the taskletIds that failed on aggregation. - */ - public List<Integer> getTaskletIds() { - return taskletIds; - } - - /** - * @return the exception that caused the tasklet aggregation failure. - */ - public Exception getException() { - return exception; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java deleted file mode 100644 index 6d0d3a6..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.vortex.api.VortexAggregateFunction; -import org.apache.reef.vortex.api.VortexAggregatePolicy; -import org.apache.reef.vortex.api.VortexFunction; - -import java.util.List; - -/** - * A request from the Vortex Driver for the {@link org.apache.reef.vortex.evaluator.VortexWorker} to - * record aggregate functions for later execution. - */ -@Unstable -@Private -@DriverSide -public final class TaskletAggregationRequest<TInput, TOutput> implements VortexRequest { - private final int aggregateFunctionId; - private final VortexAggregateFunction<TOutput> userAggregateFunction; - private final VortexFunction<TInput, TOutput> function; - private final VortexAggregatePolicy policy; - - public TaskletAggregationRequest(final int aggregateFunctionId, - final VortexAggregateFunction<TOutput> aggregateFunction, - final VortexFunction<TInput, TOutput> function, - final VortexAggregatePolicy policy) { - this.aggregateFunctionId = aggregateFunctionId; - this.userAggregateFunction = aggregateFunction; - this.function = function; - this.policy = policy; - } - - @Override - public RequestType getType() { - return RequestType.AggregateTasklets; - } - - /** - * @return the AggregateFunctionID of the aggregate function. - */ - public int getAggregateFunctionId() { - return aggregateFunctionId; - } - - /** - * @return the aggregate function as specified by the user. - */ - public VortexAggregateFunction getAggregateFunction() { - return userAggregateFunction; - } - - /** - * @return the user specified function. - */ - public VortexFunction getFunction() { - return function; - } - - /** - * @return the aggregation policy. - */ - public VortexAggregatePolicy getPolicy() { - return policy; - } - - /** - * Execute the aggregate function using the list of outputs. - * @return Output of the function in a serialized form. - */ - public byte[] executeAggregation(final List<TOutput> outputs) throws Exception { - final TOutput output = userAggregateFunction.call(outputs); - final Codec<TOutput> codec = userAggregateFunction.getOutputCodec(); - - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - return codec.encode(output); - } - - /** - * Execute the user specified function. - */ - public TOutput executeFunction(final TInput input) throws Exception { - return function.call(input); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java deleted file mode 100644 index 1e52a2e..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Report of a Tasklet aggregation execution result. - */ -@Private -@DriverSide -@Unstable -public final class TaskletAggregationResultReport implements TaskletReport { - private final List<Integer> taskletIds; - private final byte[] serializedResult; - - /** - * @param taskletIds of the tasklets. - * @param serializedResult of the tasklet execution in a serialized form. - */ - public TaskletAggregationResultReport(final List<Integer> taskletIds, final byte[] serializedResult) { - this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); - this.serializedResult = serializedResult; - } - - /** - * @return the type of this TaskletReport. - */ - @Override - public TaskletReportType getType() { - return TaskletReportType.TaskletAggregationResult; - } - - /** - * @return the TaskletId(s) of this TaskletReport - */ - public List<Integer> getTaskletIds() { - return taskletIds; - } - - /** - * @return the result of the Tasklet aggregation execution in a serialized form. - */ - public byte[] getSerializedResult() { - return serializedResult; - } - -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java deleted file mode 100644 index 88f2d89..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; - -/** - * A {@link VortexRequest} to cancel tasklets. - */ -@Unstable -public final class TaskletCancellationRequest implements VortexRequest { - private final int taskletId; - - public TaskletCancellationRequest(final int taskletId) { - this.taskletId = taskletId; - } - - /** - * @return the ID of the VortexTasklet associated with this VortexRequest. - */ - public int getTaskletId() { - return taskletId; - } - - @Override - public RequestType getType() { - return RequestType.CancelTasklet; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java deleted file mode 100644 index c09a02f..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; - -/** - * The report of a cancelled Tasklet. - */ -@Unstable -public final class TaskletCancelledReport implements TaskletReport { - private int taskletId; - - /** - * @param taskletId of the cancelled tasklet. - */ - public TaskletCancelledReport(final int taskletId) { - this.taskletId = taskletId; - } - - @Override - public TaskletReportType getType() { - return TaskletReportType.TaskletCancelled; - } - - /** - * @return the taskletId of this TaskletReport. - */ - public int getTaskletId() { - return taskletId; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java deleted file mode 100644 index e850c9a..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.io.serialization.Codec; -import org.apache.reef.vortex.api.VortexFunction; - -/** - * Request to execute a tasklet. - */ -@Unstable -@Private -public final class TaskletExecutionRequest<TInput, TOutput> implements VortexRequest { - private final int taskletId; - private final VortexFunction<TInput, TOutput> userFunction; - private final TInput input; - - /** - * @return the type of this VortexRequest. - */ - @Override - public RequestType getType() { - return RequestType.ExecuteTasklet; - } - - /** - * Request from Vortex Master to Vortex Worker to execute a tasklet. - */ - public TaskletExecutionRequest(final int taskletId, - final VortexFunction<TInput, TOutput> userFunction, - final TInput input) { - this.taskletId = taskletId; - this.userFunction = userFunction; - this.input = input; - } - - /** - * Execute the function using the input. - * @return Output of the function in a serialized form. - */ - public byte[] execute() throws Exception { - final TOutput output = userFunction.call(input); - final Codec<TOutput> codec = userFunction.getOutputCodec(); - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - return codec.encode(output); - } - - /** - * @return the ID of the VortexTasklet associated with this VortexRequest. - */ - public int getTaskletId() { - return taskletId; - } - - /** - * Get function of the tasklet. - */ - public VortexFunction getFunction() { - return userFunction; - } - - /** - * Get input of the tasklet. - */ - public TInput getInput() { - return input; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java deleted file mode 100644 index 5c0b2de..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -/** - * Report of a Tasklet exception. - */ -@Unstable -@Private -@DriverSide -public final class TaskletFailureReport implements TaskletReport { - private final int taskletId; - private final Exception exception; - - /** - * @param taskletId of the failed Tasklet. - * @param exception that caused the tasklet failure. - */ - public TaskletFailureReport(final int taskletId, final Exception exception) { - this.taskletId = taskletId; - this.exception = exception; - } - - /** - * @return the type of this TaskletReport. - */ - @Override - public TaskletReportType getType() { - return TaskletReportType.TaskletFailure; - } - - /** - * @return the taskletId of this TaskletReport. - */ - public int getTaskletId() { - return taskletId; - } - - /** - * @return the exception that caused the Tasklet failure. - */ - public Exception getException() { - return exception; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java deleted file mode 100644 index 98149c0..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -/** - * The interface for a status report from the {@link org.apache.reef.vortex.evaluator.VortexWorker}. - */ -@Unstable -@Private -@DriverSide -public interface TaskletReport { - /** - * Type of TaskletReport. - */ - enum TaskletReportType { - TaskletResult, - TaskletAggregationResult, - TaskletCancelled, - TaskletFailure, - TaskletAggregationFailure - } - - /** - * @return the type of this TaskletReport. - */ - TaskletReportType getType(); -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java deleted file mode 100644 index 2c32578..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -/** - * Report of a tasklet execution result. - */ -@Unstable -@Private -@DriverSide -public final class TaskletResultReport implements TaskletReport { - private final int taskletId; - private final byte[] serializedResult; - - /** - * @param taskletId of the Tasklet. - * @param serializedResult of the tasklet execution in a serialized form. - */ - public TaskletResultReport(final int taskletId, final byte[] serializedResult) { - this.taskletId = taskletId; - this.serializedResult = serializedResult; - } - - /** - * @return the type of this TaskletReport. - */ - @Override - public TaskletReportType getType() { - return TaskletReportType.TaskletResult; - } - - /** - * @return the TaskletId of this TaskletReport - */ - public int getTaskletId() { - return taskletId; - } - - /** - * @return the result of the tasklet execution in a serialized form. - */ - public byte[] getSerializedResult() { - return serializedResult; - } - -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java deleted file mode 100644 index f0e930b..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.avro.io.*; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.commons.lang.SerializationUtils; -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.vortex.api.VortexAggregateFunction; -import org.apache.reef.vortex.api.VortexAggregatePolicy; -import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.common.avro.*; - -import javax.inject.Inject; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -/** - * Serialize and deserialize Vortex message to/from byte array. - */ -@Private -@DriverSide -@Unstable -public final class VortexAvroUtils { - private final AggregateFunctionRepository aggregateFunctionRepository; - - @Inject - private VortexAvroUtils(final AggregateFunctionRepository aggregateFunctionRepository) { - this.aggregateFunctionRepository = aggregateFunctionRepository; - } - - /** - * Serialize VortexRequest to byte array. - * @param vortexRequest Vortex request message to serialize. - * @return Serialized byte array. - */ - public byte[] toBytes(final VortexRequest vortexRequest) { - // Convert VortexRequest message to Avro message. - final AvroVortexRequest avroVortexRequest; - switch (vortexRequest.getType()) { - case ExecuteAggregateTasklet: - final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest = - (TaskletAggregateExecutionRequest) vortexRequest; - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - final byte[] serializedInputForAggregate = - aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId()) - .getInputCodec().encode(taskletAggregateExecutionRequest.getInput()); - avroVortexRequest = AvroVortexRequest.newBuilder() - .setRequestType(AvroRequestType.AggregateExecute) - .setTaskletRequest( - AvroTaskletAggregateExecutionRequest.newBuilder() - .setAggregateFunctionId(taskletAggregateExecutionRequest.getAggregateFunctionId()) - .setSerializedInput(ByteBuffer.wrap(serializedInputForAggregate)) - .setTaskletId(taskletAggregateExecutionRequest.getTaskletId()) - .build()) - .build(); - break; - case AggregateTasklets: - final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest; - - // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction - final byte[] serializedAggregateFunction = SerializationUtils.serialize( - taskletAggregationRequest.getAggregateFunction()); - final byte[] serializedFunctionForAggregation = SerializationUtils.serialize( - taskletAggregationRequest.getFunction()); - final byte[] serializedPolicy = SerializationUtils.serialize( - taskletAggregationRequest.getPolicy()); - avroVortexRequest = AvroVortexRequest.newBuilder() - .setRequestType(AvroRequestType.Aggregate) - .setTaskletRequest(AvroTaskletAggregationRequest.newBuilder() - .setAggregateFunctionId(taskletAggregationRequest.getAggregateFunctionId()) - .setSerializedAggregateFunction(ByteBuffer.wrap(serializedAggregateFunction)) - .setSerializedUserFunction(ByteBuffer.wrap(serializedFunctionForAggregation)) - .setSerializedPolicy(ByteBuffer.wrap(serializedPolicy)) - .build()) - .build(); - break; - case ExecuteTasklet: - final TaskletExecutionRequest taskletExecutionRequest = (TaskletExecutionRequest) vortexRequest; - // The following TODOs are sub-issues of cleaning up Serializable in Vortex (REEF-504). - // The purpose is to reduce serialization cost, which leads to bottleneck in Master. - // Temporarily those are left as TODOs, but will be addressed in separate PRs. - final VortexFunction vortexFunction = taskletExecutionRequest.getFunction(); - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - final byte[] serializedInput = vortexFunction.getInputCodec().encode(taskletExecutionRequest.getInput()); - // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction - final byte[] serializedFunction = SerializationUtils.serialize(vortexFunction); - avroVortexRequest = AvroVortexRequest.newBuilder() - .setRequestType(AvroRequestType.ExecuteTasklet) - .setTaskletRequest( - AvroTaskletExecutionRequest.newBuilder() - .setTaskletId(taskletExecutionRequest.getTaskletId()) - .setSerializedInput(ByteBuffer.wrap(serializedInput)) - .setSerializedUserFunction(ByteBuffer.wrap(serializedFunction)) - .build()) - .build(); - break; - case CancelTasklet: - final TaskletCancellationRequest taskletCancellationRequest = (TaskletCancellationRequest) vortexRequest; - avroVortexRequest = AvroVortexRequest.newBuilder() - .setRequestType(AvroRequestType.CancelTasklet) - .setTaskletRequest( - AvroTaskletCancellationRequest.newBuilder() - .setTaskletId(taskletCancellationRequest.getTaskletId()) - .build()) - .build(); - break; - default: - throw new RuntimeException("Undefined message type"); - } - - // Serialize the Avro message to byte array. - return toBytes(avroVortexRequest, AvroVortexRequest.class); - } - - /** - * Serialize WorkerReport to byte array. - * @param workerReport Worker report message to serialize. - * @return Serialized byte array. - */ - public byte[] toBytes(final WorkerReport workerReport) { - final List<AvroTaskletReport> workerTaskletReports = new ArrayList<>(); - - for (final TaskletReport taskletReport : workerReport.getTaskletReports()) { - final AvroTaskletReport avroTaskletReport; - switch (taskletReport.getType()) { - case TaskletResult: - final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; - avroTaskletReport = AvroTaskletReport.newBuilder() - .setReportType(AvroReportType.TaskletResult) - .setTaskletReport( - AvroTaskletResultReport.newBuilder() - .setTaskletId(taskletResultReport.getTaskletId()) - .setSerializedOutput(ByteBuffer.wrap(taskletResultReport.getSerializedResult())) - .build()) - .build(); - break; - case TaskletAggregationResult: - final TaskletAggregationResultReport taskletAggregationResultReport = - (TaskletAggregationResultReport) taskletReport; - avroTaskletReport = AvroTaskletReport.newBuilder() - .setReportType(AvroReportType.TaskletAggregationResult) - .setTaskletReport( - AvroTaskletAggregationResultReport.newBuilder() - .setTaskletIds(taskletAggregationResultReport.getTaskletIds()) - .setSerializedOutput(ByteBuffer.wrap(taskletAggregationResultReport.getSerializedResult())) - .build()) - .build(); - break; - case TaskletCancelled: - final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport; - avroTaskletReport = AvroTaskletReport.newBuilder() - .setReportType(AvroReportType.TaskletCancelled) - .setTaskletReport( - AvroTaskletCancelledReport.newBuilder() - .setTaskletId(taskletCancelledReport.getTaskletId()) - .build()) - .build(); - break; - case TaskletFailure: - final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport; - final byte[] serializedException = SerializationUtils.serialize(taskletFailureReport.getException()); - avroTaskletReport = AvroTaskletReport.newBuilder() - .setReportType(AvroReportType.TaskletFailure) - .setTaskletReport( - AvroTaskletFailureReport.newBuilder() - .setTaskletId(taskletFailureReport.getTaskletId()) - .setSerializedException(ByteBuffer.wrap(serializedException)) - .build()) - .build(); - break; - case TaskletAggregationFailure: - final TaskletAggregationFailureReport taskletAggregationFailureReport = - (TaskletAggregationFailureReport) taskletReport; - final byte[] serializedAggregationException = - SerializationUtils.serialize(taskletAggregationFailureReport.getException()); - avroTaskletReport = AvroTaskletReport.newBuilder() - .setReportType(AvroReportType.TaskletAggregationFailure) - .setTaskletReport( - AvroTaskletAggregationFailureReport.newBuilder() - .setTaskletIds(taskletAggregationFailureReport.getTaskletIds()) - .setSerializedException(ByteBuffer.wrap(serializedAggregationException)) - .build()) - .build(); - break; - default: - throw new RuntimeException("Undefined message type"); - } - - workerTaskletReports.add(avroTaskletReport); - } - - // Convert WorkerReport message to Avro message. - final AvroWorkerReport avroWorkerReport = AvroWorkerReport.newBuilder() - .setTaskletReports(workerTaskletReports) - .build(); - - // Serialize the Avro message to byte array. - return toBytes(avroWorkerReport, AvroWorkerReport.class); - } - - /** - * Deserialize byte array to VortexRequest. - * @param bytes Byte array to deserialize. - * @return De-serialized VortexRequest. - */ - public VortexRequest toVortexRequest(final byte[] bytes) { - final AvroVortexRequest avroVortexRequest = toAvroObject(bytes, AvroVortexRequest.class); - - final VortexRequest vortexRequest; - switch (avroVortexRequest.getRequestType()) { - case AggregateExecute: - final AvroTaskletAggregateExecutionRequest taskletAggregateExecutionRequest = - (AvroTaskletAggregateExecutionRequest)avroVortexRequest.getTaskletRequest(); - vortexRequest = new TaskletAggregateExecutionRequest<>(taskletAggregateExecutionRequest.getTaskletId(), - taskletAggregateExecutionRequest.getAggregateFunctionId(), - aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId()) - .getInputCodec().decode(taskletAggregateExecutionRequest.getSerializedInput().array())); - break; - case Aggregate: - final AvroTaskletAggregationRequest taskletAggregationRequest = - (AvroTaskletAggregationRequest)avroVortexRequest.getTaskletRequest(); - final VortexAggregateFunction aggregateFunction = - (VortexAggregateFunction) SerializationUtils.deserialize( - taskletAggregationRequest.getSerializedAggregateFunction().array()); - final VortexFunction functionForAggregation = - (VortexFunction) SerializationUtils.deserialize( - taskletAggregationRequest.getSerializedUserFunction().array()); - final VortexAggregatePolicy policy = - (VortexAggregatePolicy) SerializationUtils.deserialize( - taskletAggregationRequest.getSerializedPolicy().array()); - vortexRequest = new TaskletAggregationRequest<>(taskletAggregationRequest.getAggregateFunctionId(), - aggregateFunction, functionForAggregation, policy); - break; - case ExecuteTasklet: - final AvroTaskletExecutionRequest taskletExecutionRequest = - (AvroTaskletExecutionRequest)avroVortexRequest.getTaskletRequest(); - // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction - final VortexFunction function = - (VortexFunction) SerializationUtils.deserialize( - taskletExecutionRequest.getSerializedUserFunction().array()); - // TODO[REEF-1113]: Handle serialization failure separately in Vortex - vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, - function.getInputCodec().decode(taskletExecutionRequest.getSerializedInput().array())); - break; - case CancelTasklet: - final AvroTaskletCancellationRequest taskletCancellationRequest = - (AvroTaskletCancellationRequest)avroVortexRequest.getTaskletRequest(); - vortexRequest = new TaskletCancellationRequest(taskletCancellationRequest.getTaskletId()); - break; - default: - throw new RuntimeException("Undefined VortexRequest type"); - } - return vortexRequest; - } - - /** - * Deserialize byte array to WorkerReport. - * @param bytes Byte array to deserialize. - * @return De-serialized WorkerReport. - */ - public WorkerReport toWorkerReport(final byte[] bytes) { - final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, AvroWorkerReport.class); - final List<TaskletReport> workerTaskletReports = new ArrayList<>(); - - for (final AvroTaskletReport avroTaskletReport : avroWorkerReport.getTaskletReports()) { - final TaskletReport taskletReport; - - switch (avroTaskletReport.getReportType()) { - case TaskletResult: - final AvroTaskletResultReport taskletResultReport = - (AvroTaskletResultReport) avroTaskletReport.getTaskletReport(); - taskletReport = new TaskletResultReport(taskletResultReport.getTaskletId(), - taskletResultReport.getSerializedOutput().array()); - break; - case TaskletAggregationResult: - final AvroTaskletAggregationResultReport taskletAggregationResultReport = - (AvroTaskletAggregationResultReport) avroTaskletReport.getTaskletReport(); - taskletReport = - new TaskletAggregationResultReport(taskletAggregationResultReport.getTaskletIds(), - taskletAggregationResultReport.getSerializedOutput().array()); - break; - case TaskletCancelled: - final AvroTaskletCancelledReport taskletCancelledReport = - (AvroTaskletCancelledReport) avroTaskletReport.getTaskletReport(); - taskletReport = new TaskletCancelledReport(taskletCancelledReport.getTaskletId()); - break; - case TaskletFailure: - final AvroTaskletFailureReport taskletFailureReport = - (AvroTaskletFailureReport) avroTaskletReport.getTaskletReport(); - final Exception exception = - (Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array()); - taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception); - break; - case TaskletAggregationFailure: - final AvroTaskletAggregationFailureReport taskletAggregationFailureReport = - (AvroTaskletAggregationFailureReport) avroTaskletReport.getTaskletReport(); - final Exception aggregationException = - (Exception) SerializationUtils.deserialize( - taskletAggregationFailureReport.getSerializedException().array()); - taskletReport = - new TaskletAggregationFailureReport(taskletAggregationFailureReport.getTaskletIds(), aggregationException); - break; - default: - throw new RuntimeException("Undefined TaskletReport type"); - } - - workerTaskletReports.add(taskletReport); - } - - return new WorkerReport(workerTaskletReports); - } - - /** - * Serialize Avro object to byte array. - * @param avroObject Avro object to serialize. - * @param theClass Class of the Avro object. - * @param <T> Type of the Avro object. - * @return Serialized byte array. - */ - private <T> byte[] toBytes(final T avroObject, final Class<T> theClass) { - final DatumWriter<T> reportWriter = new SpecificDatumWriter<>(theClass); - final byte[] theBytes; - try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) { - final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); - reportWriter.write(avroObject, encoder); - encoder.flush(); - out.flush(); - theBytes = out.toByteArray(); - return theBytes; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Deserialize byte array to Avro object. - * @param bytes Byte array to deserialize. - * @param theClass Class of the Avro object. - * @param <T> Type of the Avro object. - * @return Avro object de-serialized from byte array. - */ - private <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) { - final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); - final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass); - try { - return reader.read(null, decoder); - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java deleted file mode 100644 index e6fa91e..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.annotations.audience.Private; - -import java.util.List; - -/** - * Exposes functions to be called by the {@link org.apache.reef.vortex.driver.VortexMaster} - * to note that a list of Tasklets associated with a Future has completed. - */ -@Unstable -@DriverSide -@Private -public interface VortexFutureDelegate { - - /** - * A Tasklet associated with the future has completed with a result. - * The result should be decoded as in {@link org.apache.reef.vortex.api.VortexFuture#completed(int, byte[])}. - */ - void completed(final int taskletId, final byte[] serializedResult); - - /** - * The list of aggregated Tasklets associated with the Future that have completed with a result. - */ - void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult); - - /** - * A Tasklet associated with the Future has thrown an Exception. - */ - void threwException(final int taskletId, final Exception exception); - - /** - * The list of Tasklets associated with the Future that have thrown an Exception. - */ - void aggregationThrewException(final List<Integer> taskletIds, final Exception exception); - - /** - * A Tasklet associated with the Future has been cancelled. - */ - void cancelled(final int taskletId); -} http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java deleted file mode 100644 index 18f44e0..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.common; - -import org.apache.reef.annotations.Unstable; - -/** - * Master-to-Worker protocol. - */ -@Unstable -public interface VortexRequest { - /** - * Type of Request. - */ - enum RequestType { - AggregateTasklets, - ExecuteTasklet, - CancelTasklet, - ExecuteAggregateTasklet - } - - /** - * @return the type of this VortexRequest. - */ - RequestType getType(); -}
