Repository: reef Updated Branches: refs/heads/master 3cafad6b1 -> 8b997771b
[REEF-1005] Allow custom codecs for input/output data in Vortex This addressed the issue by * Allowing users to specify codecs in VortexMasterConf * Removing Serializable from input and output * Defining custom codec for MatMul example JIRA: [REEF-1005](https://issues.apache.org/jira/browse/REEF-1005) Pull Request: Closes #739 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/8b997771 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/8b997771 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/8b997771 Branch: refs/heads/master Commit: 8b997771b3390217e7725a775ebbd33f9ef8a9c2 Parents: 3cafad6 Author: Yunseong Lee <[email protected]> Authored: Tue Dec 22 20:28:26 2015 +0800 Committer: Andrew Chung <[email protected]> Committed: Thu Dec 24 18:41:45 2015 -0800 ---------------------------------------------------------------------- .../apache/reef/vortex/api/VortexFunction.java | 23 ++++- .../apache/reef/vortex/api/VortexFuture.java | 20 ++-- .../reef/vortex/api/VortexThreadPool.java | 5 +- .../common/TaskletAggregationResultReport.java | 19 ++-- .../vortex/common/TaskletExecutionRequest.java | 16 +-- .../reef/vortex/common/TaskletReport.java | 4 +- .../reef/vortex/common/TaskletResultReport.java | 20 ++-- .../reef/vortex/common/VortexAvroUtils.java | 37 +++---- .../vortex/common/VortexFutureDelegate.java | 8 +- .../reef/vortex/common/VortexRequest.java | 4 +- .../apache/reef/vortex/common/WorkerReport.java | 3 +- .../reef/vortex/driver/DefaultVortexMaster.java | 13 +-- .../org/apache/reef/vortex/driver/Tasklet.java | 4 +- .../apache/reef/vortex/driver/VortexMaster.java | 4 +- .../reef/vortex/driver/VortexWorkerManager.java | 6 +- .../reef/vortex/evaluator/VortexWorker.java | 6 +- .../vortex/examples/addone/AddOneFunction.java | 13 +++ .../examples/hello/HelloVortexFunction.java | 19 +++- .../vortex/examples/matmul/MatMulFunction.java | 14 +++ .../vortex/examples/matmul/MatMulInput.java | 4 +- .../examples/matmul/MatMulInputCodec.java | 100 +++++++++++++++++++ .../vortex/examples/matmul/MatMulOutput.java | 4 +- .../examples/matmul/MatMulOutputCodec.java | 97 ++++++++++++++++++ .../reef/vortex/examples/matmul/Matrix.java | 5 +- .../org/apache/reef/vortex/util/VoidCodec.java | 37 +++++++ .../apache/reef/vortex/util/package-info.java | 22 ++++ .../vortex/driver/DefaultVortexMasterTest.java | 11 +- .../org/apache/reef/vortex/driver/TestUtil.java | 53 ++++++++-- .../vortex/addone/AddOneFunction.java | 13 +++ .../InfiniteLoopWithCancellationFunction.java | 18 +++- .../TaskletCancellationTestStart.java | 2 +- .../vortex/exception/ExceptionFunction.java | 15 ++- 32 files changed, 492 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java index 96e47b6..3efe4c5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java @@ -19,19 +19,20 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; +import org.apache.reef.io.serialization.Codec; import java.io.Serializable; /** - * Typed user function. - * Implement your functions using this interface. + * Typed user function. Implement your functions using this interface. * TODO[REEF-504]: Clean up Serializable in Vortex. + * TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction. * * @param <TInput> input type * @param <TOutput> output type */ @Unstable -public interface VortexFunction<TInput extends Serializable, TOutput extends Serializable> extends Serializable { +public interface VortexFunction<TInput, TOutput> extends Serializable { /** * @param input of the function * @return output of the function @@ -40,4 +41,20 @@ public interface VortexFunction<TInput extends Serializable, TOutput extends Ser * For example if threads are spawned here, shut them down before throwing an exception */ TOutput call(TInput input) throws Exception; + + /** + * Users must define codec for the input. {@link org.apache.reef.vortex.util.VoidCodec} can be used if the input is + * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can be used for ({@link Serializable} input. + * {@link org.apache.reef.vortex.examples.matmul.MatMulInputCodec} is an example of codec for the custom input. + * @return Codec used to serialize/deserialize the input. + */ + Codec<TInput> getInputCodec(); + + /** + * Users must define codec for the output. {@link org.apache.reef.vortex.util.VoidCodec} can be used if the output is + * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can be used for ({@link Serializable} output. + * {@link org.apache.reef.vortex.examples.matmul.MatMulOutputCodec} is an example of codec for the custom output. + * @return Codec used to serialize/deserialize the output. + */ + Codec<TOutput> getOutputCodec(); } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java index 5e7f9a2..388ef16 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java @@ -20,11 +20,11 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.serialization.Codec; import org.apache.reef.util.Optional; import org.apache.reef.vortex.common.VortexFutureDelegate; import org.apache.reef.vortex.driver.VortexMaster; -import java.io.Serializable; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,8 +35,8 @@ import java.util.logging.Logger; * The interface between user code and submitted task. */ @Unstable -public final class VortexFuture<TOutput extends Serializable> - implements Future<TOutput>, VortexFutureDelegate<TOutput> { +public final class VortexFuture<TOutput> + implements Future<TOutput>, VortexFutureDelegate { private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName()); // userResult starts out as null. If not null => variable is set and tasklet returned. @@ -49,13 +49,15 @@ public final class VortexFuture<TOutput extends Serializable> private final Executor executor; private final VortexMaster vortexMaster; private final int taskletId; + private final Codec<TOutput> outputCodec; /** * Creates a {@link VortexFuture}. */ @Private - public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId) { - this(executor, vortexMaster, taskletId, null); + public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId, + final Codec<TOutput> outputCodec) { + this(executor, vortexMaster, taskletId, outputCodec, null); } /** @@ -65,10 +67,12 @@ public final class VortexFuture<TOutput extends Serializable> public VortexFuture(final Executor executor, final VortexMaster vortexMaster, final int taskletId, + final Codec<TOutput> outputCodec, final FutureCallback<TOutput> callbackHandler) { this.executor = executor; this.vortexMaster = vortexMaster; this.taskletId = taskletId; + this.outputCodec = outputCodec; this.callbackHandler = callbackHandler; } @@ -181,9 +185,11 @@ public final class VortexFuture<TOutput extends Serializable> */ @Private @Override - public void completed(final int pTaskletId, final TOutput result) { + public void completed(final int pTaskletId, final byte[] serializedResult) { assert taskletId == pTaskletId; + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + final TOutput result = outputCodec.decode(serializedResult); this.userResult = Optional.ofNullable(result); if (callbackHandler != null) { executor.execute(new Runnable() { @@ -201,7 +207,7 @@ public final class VortexFuture<TOutput extends Serializable> */ @Private @Override - public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) { + public void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult) { throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated."); } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java index 1b02894..e2b1d35 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java @@ -23,7 +23,6 @@ import org.apache.reef.util.Optional; import org.apache.reef.vortex.driver.VortexMaster; import javax.inject.Inject; -import java.io.Serializable; /** * Distributed thread pool. @@ -44,7 +43,7 @@ public final class VortexThreadPool { * @param <TOutput> output type * @return VortexFuture for tracking execution progress */ - public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> + public <TInput, TOutput> VortexFuture<TOutput> submit(final VortexFunction<TInput, TOutput> function, final TInput input) { return vortexMaster.enqueueTasklet(function, input, Optional.<FutureCallback<TOutput>>empty()); } @@ -57,7 +56,7 @@ public final class VortexThreadPool { * @param <TOutput> output type * @return VortexFuture for tracking execution progress */ - public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> + public <TInput, TOutput> VortexFuture<TOutput> submit(final VortexFunction<TInput, TOutput> function, final TInput input, final FutureCallback<TOutput> callback) { return vortexMaster.enqueueTasklet(function, input, Optional.of(callback)); http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java index ce4a015..1e52a2e 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java @@ -22,7 +22,6 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -33,17 +32,17 @@ import java.util.List; @Private @DriverSide @Unstable -public final class TaskletAggregationResultReport<TOutput extends Serializable> implements TaskletReport { +public final class TaskletAggregationResultReport implements TaskletReport { private final List<Integer> taskletIds; - private final TOutput result; + private final byte[] serializedResult; /** * @param taskletIds of the tasklets. - * @param result of the tasklet execution. + * @param serializedResult of the tasklet execution in a serialized form. */ - public TaskletAggregationResultReport(final List<Integer> taskletIds, final TOutput result) { + public TaskletAggregationResultReport(final List<Integer> taskletIds, final byte[] serializedResult) { this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds)); - this.result = result; + this.serializedResult = serializedResult; } /** @@ -62,10 +61,10 @@ public final class TaskletAggregationResultReport<TOutput extends Serializable> } /** - * @return the result of the Tasklet aggregation execution. + * @return the result of the Tasklet aggregation execution in a serialized form. */ - public TOutput getResult() { - return result; + public byte[] getSerializedResult() { + return serializedResult; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java index 8e43e4b..d85c69b 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java @@ -19,16 +19,16 @@ package org.apache.reef.vortex.common; import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexFunction; -import java.io.Serializable; - /** * Request to execute a tasklet. */ @Unstable -public final class TaskletExecutionRequest<TInput extends Serializable, TOutput extends Serializable> - implements VortexRequest { +@Private +public final class TaskletExecutionRequest<TInput, TOutput> implements VortexRequest { private final int taskletId; private final VortexFunction<TInput, TOutput> userFunction; private final TInput input; @@ -54,9 +54,13 @@ public final class TaskletExecutionRequest<TInput extends Serializable, TOutput /** * Execute the function using the input. + * @return Output of the function in a serialized form. */ - public TOutput execute() throws Exception { - return userFunction.call(input); + public byte[] execute() throws Exception { + final TOutput output = userFunction.call(input); + final Codec<TOutput> codec = userFunction.getOutputCodec(); + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + return codec.encode(output); } /** http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java index 6392b23..98149c0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java @@ -22,15 +22,13 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import java.io.Serializable; - /** * The interface for a status report from the {@link org.apache.reef.vortex.evaluator.VortexWorker}. */ @Unstable @Private @DriverSide -public interface TaskletReport extends Serializable { +public interface TaskletReport { /** * Type of TaskletReport. */ http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java index 8e3bac3..2c32578 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java @@ -22,25 +22,23 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import java.io.Serializable; - /** * Report of a tasklet execution result. */ @Unstable @Private @DriverSide -public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport { +public final class TaskletResultReport implements TaskletReport { private final int taskletId; - private final TOutput result; + private final byte[] serializedResult; /** * @param taskletId of the Tasklet. - * @param result of the Tasklet execution. + * @param serializedResult of the tasklet execution in a serialized form. */ - public TaskletResultReport(final int taskletId, final TOutput result) { + public TaskletResultReport(final int taskletId, final byte[] serializedResult) { this.taskletId = taskletId; - this.result = result; + this.serializedResult = serializedResult; } /** @@ -59,10 +57,10 @@ public final class TaskletResultReport<TOutput extends Serializable> implements } /** - * @return the result of the Tasklet execution. + * @return the result of the tasklet execution in a serialized form. */ - public TOutput getResult() { - return result; + public byte[] getSerializedResult() { + return serializedResult; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java index cc3cced..2200af3 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -30,7 +30,6 @@ import org.apache.reef.vortex.common.avro.*; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -56,10 +55,11 @@ public final class VortexAvroUtils { // The following TODOs are sub-issues of cleaning up Serializable in Vortex (REEF-504). // The purpose is to reduce serialization cost, which leads to bottleneck in Master. // Temporarily those are left as TODOs, but will be addressed in separate PRs. - // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. - final byte[] serializedInput = SerializationUtils.serialize(taskletExecutionRequest.getInput()); + final VortexFunction vortexFunction = taskletExecutionRequest.getFunction(); + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + final byte[] serializedInput = vortexFunction.getInputCodec().encode(taskletExecutionRequest.getInput()); // TODO[REEF-1003]: Use reflection instead of serialization when launching VortexFunction - final byte[] serializedFunction = SerializationUtils.serialize(taskletExecutionRequest.getFunction()); + final byte[] serializedFunction = SerializationUtils.serialize(vortexFunction); avroVortexRequest = AvroVortexRequest.newBuilder() .setRequestType(AvroRequestType.ExecuteTasklet) .setTaskletRequest( @@ -101,29 +101,24 @@ public final class VortexAvroUtils { switch (taskletReport.getType()) { case TaskletResult: final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport; - // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. - final byte[] serializedOutput = SerializationUtils.serialize(taskletResultReport.getResult()); avroTaskletReport = AvroTaskletReport.newBuilder() .setReportType(AvroReportType.TaskletResult) .setTaskletReport( AvroTaskletResultReport.newBuilder() .setTaskletId(taskletResultReport.getTaskletId()) - .setSerializedOutput(ByteBuffer.wrap(serializedOutput)) + .setSerializedOutput(ByteBuffer.wrap(taskletResultReport.getSerializedResult())) .build()) .build(); break; case TaskletAggregationResult: final TaskletAggregationResultReport taskletAggregationResultReport = (TaskletAggregationResultReport) taskletReport; - // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. - final byte[] serializedAggregationOutput = - SerializationUtils.serialize(taskletAggregationResultReport.getResult()); avroTaskletReport = AvroTaskletReport.newBuilder() .setReportType(AvroReportType.TaskletAggregationResult) .setTaskletReport( AvroTaskletAggregationResultReport.newBuilder() .setTaskletIds(taskletAggregationResultReport.getTaskletIds()) - .setSerializedOutput(ByteBuffer.wrap(serializedAggregationOutput)) + .setSerializedOutput(ByteBuffer.wrap(taskletAggregationResultReport.getSerializedResult())) .build()) .build(); break; @@ -196,11 +191,9 @@ public final class VortexAvroUtils { final VortexFunction function = (VortexFunction) SerializationUtils.deserialize( taskletExecutionRequest.getSerializedUserFunction().array()); - // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. - final Serializable input = - (Serializable) SerializationUtils.deserialize( - taskletExecutionRequest.getSerializedInput().array()); - vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, input); + // TODO[REEF-1113]: Handle serialization failure separately in Vortex + vortexRequest = new TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, + function.getInputCodec().decode(taskletExecutionRequest.getSerializedInput().array())); break; case CancelTasklet: final AvroTaskletCancellationRequest taskletCancellationRequest = @@ -229,19 +222,15 @@ public final class VortexAvroUtils { case TaskletResult: final AvroTaskletResultReport taskletResultReport = (AvroTaskletResultReport)avroTaskletReport.getTaskletReport(); - // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex. - final Serializable output = - (Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array()); - taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output); + taskletReport = new TaskletResultReport(taskletResultReport.getTaskletId(), + taskletResultReport.getSerializedOutput().array()); break; case TaskletAggregationResult: final AvroTaskletAggregationResultReport taskletAggregationResultReport = (AvroTaskletAggregationResultReport)avroTaskletReport.getTaskletReport(); - final Serializable aggregationOutput = - (Serializable) SerializationUtils.deserialize( - taskletAggregationResultReport.getSerializedOutput().array()); taskletReport = - new TaskletAggregationResultReport<>(taskletAggregationResultReport.getTaskletIds(), aggregationOutput); + new TaskletAggregationResultReport(taskletAggregationResultReport.getTaskletIds(), + taskletAggregationResultReport.getSerializedOutput().array()); break; case TaskletCancelled: final AvroTaskletCancelledReport taskletCancelledReport = http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java index 55f3cf5..e6fa91e 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java @@ -22,7 +22,6 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import java.io.Serializable; import java.util.List; /** @@ -32,17 +31,18 @@ import java.util.List; @Unstable @DriverSide @Private -public interface VortexFutureDelegate<TOutput extends Serializable> { +public interface VortexFutureDelegate { /** * A Tasklet associated with the future has completed with a result. + * The result should be decoded as in {@link org.apache.reef.vortex.api.VortexFuture#completed(int, byte[])}. */ - void completed(final int taskletId, final TOutput result); + void completed(final int taskletId, final byte[] serializedResult); /** * The list of aggregated Tasklets associated with the Future that have completed with a result. */ - void aggregationCompleted(final List<Integer> taskletIds, final TOutput result); + void aggregationCompleted(final List<Integer> taskletIds, final byte[] serializedResult); /** * A Tasklet associated with the Future has thrown an Exception. http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java index 5d59a96..133b007 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java @@ -20,13 +20,11 @@ package org.apache.reef.vortex.common; import org.apache.reef.annotations.Unstable; -import java.io.Serializable; - /** * Master-to-Worker protocol. */ @Unstable -public interface VortexRequest extends Serializable { +public interface VortexRequest { /** * Type of Request. */ http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java index 7c88a44..7a82eee 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java @@ -22,7 +22,6 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -36,7 +35,7 @@ import java.util.List; @Private @Unstable @DriverSide -public final class WorkerReport implements Serializable { +public final class WorkerReport { private ArrayList<TaskletReport> taskletReports; public WorkerReport(final Collection<TaskletReport> taskletReports) { http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index 80c3cb0..f0b4949 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -20,6 +20,7 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.io.serialization.Codec; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.FutureCallback; @@ -28,7 +29,6 @@ import org.apache.reef.vortex.api.VortexFuture; import org.apache.reef.vortex.common.*; import javax.inject.Inject; -import java.io.Serializable; import java.util.*; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -63,16 +63,17 @@ final class DefaultVortexMaster implements VortexMaster { * Add a new tasklet to pendingTasklets. */ @Override - public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> + public <TInput, TOutput> VortexFuture<TOutput> enqueueTasklet(final VortexFunction<TInput, TOutput> function, final TInput input, final Optional<FutureCallback<TOutput>> callback) { // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch. final VortexFuture<TOutput> vortexFuture; final int id = taskletIdCounter.getAndIncrement(); + final Codec<TOutput> outputCodec = function.getOutputCodec(); if (callback.isPresent()) { - vortexFuture = new VortexFuture<>(executor, this, id, callback.get()); + vortexFuture = new VortexFuture<>(executor, this, id, outputCodec, callback.get()); } else { - vortexFuture = new VortexFuture<>(executor, this, id); + vortexFuture = new VortexFuture<>(executor, this, id, outputCodec); } final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture); @@ -121,7 +122,7 @@ final class DefaultVortexMaster implements VortexMaster { final int resultTaskletId = taskletResultReport.getTaskletId(); final List<Integer> singletonResultTaskletId = Collections.singletonList(resultTaskletId); runningWorkers.doneTasklets(workerId, singletonResultTaskletId); - fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, taskletResultReport.getResult()); + fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, taskletResultReport.getSerializedResult()); break; case TaskletAggregationResult: @@ -131,7 +132,7 @@ final class DefaultVortexMaster implements VortexMaster { final List<Integer> aggregatedTaskletIds = taskletAggregationResultReport.getTaskletIds(); runningWorkers.doneTasklets(workerId, aggregatedTaskletIds); fetchDelegate(aggregatedTaskletIds).aggregationCompleted( - aggregatedTaskletIds, taskletAggregationResultReport.getResult()); + aggregatedTaskletIds, taskletAggregationResultReport.getSerializedResult()); break; case TaskletCancelled: http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java index 24db3cb..6f5d519 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java @@ -22,13 +22,11 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.common.VortexFutureDelegate; -import java.io.Serializable; - /** * Representation of user task in Driver. */ @DriverSide -class Tasklet<TInput extends Serializable, TOutput extends Serializable> { +class Tasklet<TInput, TOutput> { private final int taskletId; private final VortexFunction<TInput, TOutput> userTask; private final TInput input; http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java index becf3f9..a423706 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -27,8 +27,6 @@ import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; import org.apache.reef.vortex.common.WorkerReport; -import java.io.Serializable; - /** * The heart of Vortex. * Processes various tasklet related events/requests coming from different components of the system. @@ -40,7 +38,7 @@ public interface VortexMaster { /** * Submit a new Tasklet to be run sometime in the future, with an optional callback function on the result. */ - <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput> + <TInput, TOutput> VortexFuture<TOutput> enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, final TInput input, final Optional<FutureCallback<TOutput>> callback); http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java index ffba985..88911e3 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java @@ -24,7 +24,6 @@ import org.apache.reef.driver.task.RunningTask; import org.apache.reef.vortex.common.TaskletCancellationRequest; import org.apache.reef.vortex.common.TaskletExecutionRequest; -import java.io.Serializable; import java.util.*; /** @@ -42,8 +41,7 @@ class VortexWorkerManager { this.reefTask = reefTask; } - <TInput extends Serializable, TOutput extends Serializable> - void launchTasklet(final Tasklet<TInput, TOutput> tasklet) { + <TInput, TOutput> void launchTasklet(final Tasklet<TInput, TOutput> tasklet) { assert !runningTasklets.containsKey(tasklet.getId()); runningTasklets.put(tasklet.getId(), tasklet); final TaskletExecutionRequest<TInput, TOutput> taskletExecutionRequest @@ -91,4 +89,4 @@ class VortexWorkerManager { boolean containsTasklet(final Integer taskletId) { return runningTasklets.containsKey(taskletId); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 920f1a9..3390c22 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -34,7 +34,6 @@ import org.apache.reef.vortex.driver.VortexWorkerConf; import org.apache.reef.wake.EventHandler; import javax.inject.Inject; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; @@ -108,9 +107,9 @@ public final class VortexWorker implements Task, TaskMessageSource { try { // Command Executor: Execute the command - final Serializable result = taskletExecutionRequest.execute(); final TaskletReport taskletReport = - new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result); + new TaskletResultReport(taskletExecutionRequest.getTaskletId(), + taskletExecutionRequest.execute()); taskletReports.add(taskletReport); } catch (final InterruptedException ex) { // Assumes that user's thread follows convention that cancelled Futures @@ -149,7 +148,6 @@ public final class VortexWorker implements Task, TaskMessageSource { if (future != null) { future.cancel(true); } - break; default: throw new RuntimeException("Unknown Command"); http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java index 299a31a..5e32c8c 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java @@ -18,12 +18,15 @@ */ package org.apache.reef.vortex.examples.addone; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexFunction; /** * Outputs input + 1. */ final class AddOneFunction implements VortexFunction<Integer, Integer> { + private static final Codec<Integer> CODEC = new SerializableCodec<>(); /** * Outputs input + 1. */ @@ -31,4 +34,14 @@ final class AddOneFunction implements VortexFunction<Integer, Integer> { public Integer call(final Integer input) throws Exception { return input + 1; } + + @Override + public Codec<Integer> getInputCodec() { + return CODEC; + } + + @Override + public Codec<Integer> getOutputCodec() { + return CODEC; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java index 52f7d21..89c6918 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java @@ -18,20 +18,31 @@ */ package org.apache.reef.vortex.examples.hello; +import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexFunction; - -import java.io.Serializable; +import org.apache.reef.vortex.util.VoidCodec; /** * Prints to stdout. */ -final class HelloVortexFunction implements VortexFunction { +final class HelloVortexFunction implements VortexFunction<Void, Void> { + private static final Codec<Void> CODEC = new VoidCodec(); /** * Prints to stdout. */ @Override - public Serializable call(final Serializable serializable) throws Exception { + public Void call(final Void input) throws Exception { System.out.println("Hello, Vortex!"); return null; } + + @Override + public Codec<Void> getInputCodec() { + return CODEC; + } + + @Override + public Codec<Void> getOutputCodec() { + return CODEC; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java index 2a2374a..db4a320 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java @@ -18,12 +18,16 @@ */ package org.apache.reef.vortex.examples.matmul; +import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexFunction; /** * Computes multiplication of two matrices. */ final class MatMulFunction implements VortexFunction<MatMulInput, MatMulOutput> { + private static final Codec<MatMulInput> INPUT_CODEC = new MatMulInputCodec(); + private static final Codec<MatMulOutput> OUTPUT_CODEC = new MatMulOutputCodec(); + /** * Computes multiplication of two matrices. * @param input Input which contains two matrices to multiply, @@ -39,4 +43,14 @@ final class MatMulFunction implements VortexFunction<MatMulInput, MatMulOutput> final Matrix<Double> result = leftMatrix.multiply(rightMatrix); return new MatMulOutput(index, result); } + + @Override + public Codec<MatMulInput> getInputCodec() { + return INPUT_CODEC; + } + + @Override + public Codec<MatMulOutput> getOutputCodec() { + return OUTPUT_CODEC; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java index 86be004..b35b402 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java @@ -18,13 +18,11 @@ */ package org.apache.reef.vortex.examples.matmul; -import java.io.Serializable; - /** * Input of {@link MatMulFunction} which contains two matrices to multiply, * and index of the sub-matrix in the entire result. */ -class MatMulInput implements Serializable { +final class MatMulInput { private final int index; private final Matrix<Double> leftMatrix; private final Matrix<Double> rightMatrix; http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java new file mode 100644 index 0000000..229ba0e --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.examples.matmul; + +import org.apache.reef.io.serialization.Codec; + +import java.io.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Encodes/decodes {@link MatMulInput} to/from byte array. + */ +final class MatMulInputCodec implements Codec<MatMulInput> { + + @Override + public byte[] encode(final MatMulInput matMulInput) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (DataOutputStream daos = new DataOutputStream(baos)) { + final int index = matMulInput.getIndex(); + final Matrix<Double> leftMatrix = matMulInput.getLeftMatrix(); + final Matrix<Double> rightMatrix = matMulInput.getRightMatrix(); + + daos.writeInt(index); + encodeMatrixToStream(daos, leftMatrix); + encodeMatrixToStream(daos, rightMatrix); + + return baos.toByteArray(); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public MatMulInput decode(final byte[] buf) { + try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) { + try (DataInputStream dais = new DataInputStream(bais)) { + final int index = dais.readInt(); + final Matrix leftMatrix = decodeMatrixFromStream(dais); + final Matrix rightMatrix = decodeMatrixFromStream(dais); + return new MatMulInput(index, leftMatrix, rightMatrix); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Encode a Matrix to output stream. + */ + private void encodeMatrixToStream(final DataOutputStream stream, final Matrix<Double> matrix) throws IOException { + final int numRow = matrix.getNumRows(); + final int numColumn = matrix.getNumColumns(); + + stream.writeInt(numRow); + stream.writeInt(numColumn); + + for (final List<Double> row : matrix.getRows()) { + for (final double element : row) { + stream.writeDouble(element); + } + } + } + + /** + * Decode a Matrix from input stream. + */ + private Matrix decodeMatrixFromStream(final DataInputStream stream) throws IOException { + final int numRow = stream.readInt(); + final int numColumn = stream.readInt(); + + final List<List<Double>> rows = new ArrayList<>(numRow); + for (int rowIndex = 0; rowIndex < numRow; rowIndex++) { + final List<Double> row = new ArrayList<>(numColumn); + for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) { + row.add(stream.readDouble()); + } + rows.add(row); + } + return new RowMatrix(Collections.unmodifiableList(rows)); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java index 42c118c..99ee5bb 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java @@ -18,12 +18,10 @@ */ package org.apache.reef.vortex.examples.matmul; -import java.io.Serializable; - /** * Output of {@link MatMulFunction} which contains the sub-matrix and index of it in the entire result. */ -class MatMulOutput implements Serializable { +final class MatMulOutput { private final int index; private final Matrix<Double> result; http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java new file mode 100644 index 0000000..04d974a --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.examples.matmul; + +import org.apache.reef.io.serialization.Codec; + +import java.io.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Encodes/decodes {@link MatMulOutput} to/from byte array. + */ +final class MatMulOutputCodec implements Codec<MatMulOutput> { + + @Override + public byte[] encode(final MatMulOutput matMulOutput) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (DataOutputStream daos = new DataOutputStream(baos)) { + final int index = matMulOutput.getIndex(); + final Matrix<Double> result = matMulOutput.getResult(); + + daos.writeInt(index); + encodeMatrixToStream(daos, result); + + return baos.toByteArray(); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public MatMulOutput decode(final byte[] buf) { + try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) { + try (DataInputStream dais = new DataInputStream(bais)) { + final int index = dais.readInt(); + final Matrix result = decodeMatrixFromStream(dais); + return new MatMulOutput(index, result); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Encode a Matrix to output stream. + */ + private void encodeMatrixToStream(final DataOutputStream stream, final Matrix<Double> matrix) throws IOException { + final int numRow = matrix.getNumRows(); + final int numColumn = matrix.getNumColumns(); + + stream.writeInt(numRow); + stream.writeInt(numColumn); + + for (final List<Double> row : matrix.getRows()) { + for (final double element : row) { + stream.writeDouble(element); + } + } + } + + /** + * Decode a Matrix from input stream. + */ + private Matrix decodeMatrixFromStream(final DataInputStream stream) throws IOException { + final int numRow = stream.readInt(); + final int numColumn = stream.readInt(); + + final List<List<Double>> rows = new ArrayList<>(numRow); + for (int rowIndex = 0; rowIndex < numRow; rowIndex++) { + final List<Double> row = new ArrayList<>(numColumn); + for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) { + row.add(stream.readDouble()); + } + rows.add(row); + } + return new RowMatrix(Collections.unmodifiableList(rows)); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java index 0b1fe2e..6cd4c05 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java @@ -18,14 +18,13 @@ */ package org.apache.reef.vortex.examples.matmul; -import java.io.Serializable; import java.util.List; /** - * Interface of serializable Matrix. + * Interface of Matrix. * @param <T> Type of elements in Matrix. */ -interface Matrix<T> extends Serializable { +interface Matrix<T> { /** * Add another matrix. Note that dimensions of two matrices should be identical. http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java new file mode 100644 index 0000000..7e21066 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.util; + +import org.apache.reef.io.serialization.Codec; + +/** + * Codec for empty input/output. + */ +public final class VoidCodec implements Codec<Void> { + + @Override + public byte[] encode(final Void obj) { + return new byte[0]; + } + + @Override + public Void decode(final byte[] buf) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java new file mode 100644 index 0000000..fcbb9f5 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Utilities used in Vortex. + */ +package org.apache.reef.vortex.util; http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java index 325d3d4..18e328d 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java @@ -18,6 +18,7 @@ */ package org.apache.reef.vortex.driver; +import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; @@ -40,6 +41,8 @@ import static org.junit.Assert.*; * Test whether DefaultVortexMaster correctly handles (simulated) events. */ public class DefaultVortexMasterTest { + private static final byte[] EMPTY_RESULT = new byte[0]; + private static final byte[] INTEGER_RESULT = new SerializableCodec<Integer>().encode(1); private TestUtil testUtil = new TestUtil(); /** @@ -75,7 +78,7 @@ public class DefaultVortexMasterTest { final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1); for (final int taskletId : taskletIds) { - final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null); + final TaskletReport taskletReport = new TaskletResultReport(taskletId, INTEGER_RESULT); vortexMaster.workerReported( vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport))); } @@ -115,7 +118,7 @@ public class DefaultVortexMasterTest { // Completed? for (final int taskletId : taskletIds2) { - final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null); + final TaskletReport taskletReport = new TaskletResultReport(taskletId, EMPTY_RESULT); vortexMaster.workerReported( vortexWorkerManager2.getId(), new WorkerReport(Collections.singletonList(taskletReport))); } @@ -146,7 +149,7 @@ public class DefaultVortexMasterTest { final int numOfTasklets = 100; for (int i = 0; i < numOfTasklets; i++) { vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null, - Optional.<FutureCallback<Integer>>empty())); + Optional.<FutureCallback<Void>>empty())); } final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets); @@ -166,7 +169,7 @@ public class DefaultVortexMasterTest { for (final int taskletId : taskletIds2) { final String workerId = runningWorkers.getWhereTaskletWasScheduledTo(taskletId); assertNotNull("The tasklet must have been scheduled", workerId); - final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null); + final TaskletReport taskletReport = new TaskletResultReport(taskletId, EMPTY_RESULT); vortexMaster.workerReported( workerId, new WorkerReport(Collections.singletonList(taskletReport))); } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java index ab93a08..c2dee99 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java @@ -19,6 +19,9 @@ package org.apache.reef.vortex.driver; import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.SerializableCodec; +import org.apache.reef.vortex.util.VoidCodec; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; @@ -26,7 +29,6 @@ import org.apache.reef.vortex.common.*; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.Serializable; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -44,6 +46,9 @@ import static org.mockito.Mockito.when; * Utility methods for tests. */ public final class TestUtil { + private static final Codec<Void> VOID_CODEC = new VoidCodec(); + private static final Codec<Integer> INTEGER_CODEC = new SerializableCodec<>(); + private final AtomicInteger taskletId = new AtomicInteger(0); private final AtomicInteger workerId = new AtomicInteger(0); private final Executor executor = Executors.newFixedThreadPool(5); @@ -85,18 +90,28 @@ public final class TestUtil { */ public Tasklet newTasklet() { final int id = taskletId.getAndIncrement(); - return new Tasklet(id, null, null, new VortexFuture(executor, vortexMaster, id)); + return new Tasklet(id, null, null, new VortexFuture(executor, vortexMaster, id, VOID_CODEC)); } /** * @return a new dummy function. */ - public VortexFunction newFunction() { - return new VortexFunction() { + public VortexFunction<Void, Void> newFunction() { + return new VortexFunction<Void, Void>() { @Override - public Serializable call(final Serializable serializable) throws Exception { + public Void call(final Void input) throws Exception { return null; } + + @Override + public Codec getInputCodec() { + return VOID_CODEC; + } + + @Override + public Codec getOutputCodec() { + return VOID_CODEC; + } }; } @@ -110,10 +125,10 @@ public final class TestUtil { /** * @return a new dummy function. */ - public VortexFunction newInfiniteLoopFunction() { - return new VortexFunction() { + public VortexFunction<Void, Void> newInfiniteLoopFunction() { + return new VortexFunction<Void, Void>() { @Override - public Serializable call(final Serializable serializable) throws Exception { + public Void call(final Void input) throws Exception { while(true) { Thread.sleep(100); if (Thread.currentThread().isInterrupted()) { @@ -121,6 +136,16 @@ public final class TestUtil { } } } + + @Override + public Codec getInputCodec() { + return VOID_CODEC; + } + + @Override + public Codec getOutputCodec() { + return VOID_CODEC; + } }; } @@ -130,9 +155,19 @@ public final class TestUtil { public VortexFunction<Integer, Integer> newIntegerFunction() { return new VortexFunction<Integer, Integer>() { @Override - public Integer call(final Integer integer) throws Exception { + public Integer call(final Integer input) throws Exception { return 1; } + + @Override + public Codec<Integer> getInputCodec() { + return INTEGER_CODEC; + } + + @Override + public Codec<Integer> getOutputCodec() { + return INTEGER_CODEC; + } }; } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java index e064976..8db6827 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java @@ -18,12 +18,15 @@ */ package org.apache.reef.tests.applications.vortex.addone; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexFunction; /** * Outputs Input+1. */ public final class AddOneFunction implements VortexFunction<Integer, Integer> { + private static final Codec<Integer> CODEC = new SerializableCodec<>(); /** * Outputs Input+1. */ @@ -31,4 +34,14 @@ public final class AddOneFunction implements VortexFunction<Integer, Integer> { public Integer call(final Integer input) throws Exception { return input + 1; } + + @Override + public Codec<Integer> getInputCodec() { + return CODEC; + } + + @Override + public Codec<Integer> getOutputCodec() { + return CODEC; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java index f1c982e..b885c82 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java @@ -19,20 +19,22 @@ package org.apache.reef.tests.applications.vortex.cancellation; +import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexFunction; +import org.apache.reef.vortex.util.VoidCodec; -import java.io.Serializable; import java.util.logging.Level; import java.util.logging.Logger; /** * Runs an infinite loop and waits for cancellation. */ -public final class InfiniteLoopWithCancellationFunction implements VortexFunction { +public final class InfiniteLoopWithCancellationFunction implements VortexFunction<Void, Void> { private static final Logger LOG = Logger.getLogger(InfiniteLoopWithCancellationFunction.class.getName()); + private static final Codec<Void> CODEC = new VoidCodec(); @Override - public Serializable call(final Serializable serializable) throws Exception { + public Void call(final Void input) throws Exception { LOG.log(Level.FINE, "Entered Infinite Loop Tasklet."); while (true) { Thread.sleep(100); @@ -41,4 +43,14 @@ public final class InfiniteLoopWithCancellationFunction implements VortexFunctio } } } + + @Override + public Codec<Void> getInputCodec() { + return CODEC; + } + + @Override + public Codec<Void> getOutputCodec() { + return CODEC; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java index 481bb5f..e37e6f1 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java @@ -42,7 +42,7 @@ public final class TaskletCancellationTestStart implements VortexStart { @Override public void start(final VortexThreadPool vortexThreadPool) { final InfiniteLoopWithCancellationFunction function = new InfiniteLoopWithCancellationFunction(); - final VortexFuture future = vortexThreadPool.submit(function, 0); + final VortexFuture future = vortexThreadPool.submit(function, null); try { // Hacky way to increase probability that the task has been launched. http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java index a6a5737..aeff37b 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java @@ -18,14 +18,27 @@ */ package org.apache.reef.tests.applications.vortex.exception; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.SerializableCodec; import org.apache.reef.vortex.api.VortexFunction; /** * A test Vortex function that throws an Exception. */ public final class ExceptionFunction implements VortexFunction<Integer, Integer> { + private static final Codec<Integer> CODEC = new SerializableCodec<>(); @Override - public Integer call(final Integer integer) throws Exception { + public Integer call(final Integer input) throws Exception { throw new RuntimeException("Expected test exception."); } + + @Override + public Codec<Integer> getInputCodec() { + return CODEC; + } + + @Override + public Codec<Integer> getOutputCodec() { + return CODEC; + } }
