Repository: reef
Updated Branches:
  refs/heads/master 3cafad6b1 -> 8b997771b


[REEF-1005] Allow custom codecs for input/output data in Vortex

This addressed the issue by
  * Allowing users to specify codecs in VortexMasterConf
  * Removing Serializable from input and output
  * Defining custom codec for MatMul example

JIRA:
  [REEF-1005](https://issues.apache.org/jira/browse/REEF-1005)

Pull Request:
  Closes #739


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/8b997771
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/8b997771
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/8b997771

Branch: refs/heads/master
Commit: 8b997771b3390217e7725a775ebbd33f9ef8a9c2
Parents: 3cafad6
Author: Yunseong Lee <[email protected]>
Authored: Tue Dec 22 20:28:26 2015 +0800
Committer: Andrew Chung <[email protected]>
Committed: Thu Dec 24 18:41:45 2015 -0800

----------------------------------------------------------------------
 .../apache/reef/vortex/api/VortexFunction.java  |  23 ++++-
 .../apache/reef/vortex/api/VortexFuture.java    |  20 ++--
 .../reef/vortex/api/VortexThreadPool.java       |   5 +-
 .../common/TaskletAggregationResultReport.java  |  19 ++--
 .../vortex/common/TaskletExecutionRequest.java  |  16 +--
 .../reef/vortex/common/TaskletReport.java       |   4 +-
 .../reef/vortex/common/TaskletResultReport.java |  20 ++--
 .../reef/vortex/common/VortexAvroUtils.java     |  37 +++----
 .../vortex/common/VortexFutureDelegate.java     |   8 +-
 .../reef/vortex/common/VortexRequest.java       |   4 +-
 .../apache/reef/vortex/common/WorkerReport.java |   3 +-
 .../reef/vortex/driver/DefaultVortexMaster.java |  13 +--
 .../org/apache/reef/vortex/driver/Tasklet.java  |   4 +-
 .../apache/reef/vortex/driver/VortexMaster.java |   4 +-
 .../reef/vortex/driver/VortexWorkerManager.java |   6 +-
 .../reef/vortex/evaluator/VortexWorker.java     |   6 +-
 .../vortex/examples/addone/AddOneFunction.java  |  13 +++
 .../examples/hello/HelloVortexFunction.java     |  19 +++-
 .../vortex/examples/matmul/MatMulFunction.java  |  14 +++
 .../vortex/examples/matmul/MatMulInput.java     |   4 +-
 .../examples/matmul/MatMulInputCodec.java       | 100 +++++++++++++++++++
 .../vortex/examples/matmul/MatMulOutput.java    |   4 +-
 .../examples/matmul/MatMulOutputCodec.java      |  97 ++++++++++++++++++
 .../reef/vortex/examples/matmul/Matrix.java     |   5 +-
 .../org/apache/reef/vortex/util/VoidCodec.java  |  37 +++++++
 .../apache/reef/vortex/util/package-info.java   |  22 ++++
 .../vortex/driver/DefaultVortexMasterTest.java  |  11 +-
 .../org/apache/reef/vortex/driver/TestUtil.java |  53 ++++++++--
 .../vortex/addone/AddOneFunction.java           |  13 +++
 .../InfiniteLoopWithCancellationFunction.java   |  18 +++-
 .../TaskletCancellationTestStart.java           |   2 +-
 .../vortex/exception/ExceptionFunction.java     |  15 ++-
 32 files changed, 492 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
index 96e47b6..3efe4c5 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
@@ -19,19 +19,20 @@
 package org.apache.reef.vortex.api;
 
 import org.apache.reef.annotations.Unstable;
+import org.apache.reef.io.serialization.Codec;
 
 import java.io.Serializable;
 
 /**
- * Typed user function.
- * Implement your functions using this interface.
+ * Typed user function. Implement your functions using this interface.
  * TODO[REEF-504]: Clean up Serializable in Vortex.
+ * TODO[REEF-1003]: Use reflection instead of serialization when launching 
VortexFunction.
  *
  * @param <TInput> input type
  * @param <TOutput> output type
  */
 @Unstable
-public interface VortexFunction<TInput extends Serializable, TOutput extends 
Serializable> extends Serializable {
+public interface VortexFunction<TInput, TOutput> extends Serializable {
   /**
    * @param input of the function
    * @return output of the function
@@ -40,4 +41,20 @@ public interface VortexFunction<TInput extends Serializable, 
TOutput extends Ser
    * For example if threads are spawned here, shut them down before throwing 
an exception
    */
   TOutput call(TInput input) throws Exception;
+
+  /**
+   * Users must define codec for the input. {@link 
org.apache.reef.vortex.util.VoidCodec} can be used if the input is
+   * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can 
be used for ({@link Serializable} input.
+   * {@link org.apache.reef.vortex.examples.matmul.MatMulInputCodec} is an 
example of codec for the custom input.
+   * @return Codec used to serialize/deserialize the input.
+   */
+  Codec<TInput> getInputCodec();
+
+  /**
+   * Users must define codec for the output. {@link 
org.apache.reef.vortex.util.VoidCodec} can be used if the output is
+   * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can 
be used for ({@link Serializable} output.
+   * {@link org.apache.reef.vortex.examples.matmul.MatMulOutputCodec} is an 
example of codec for the custom output.
+   * @return Codec used to serialize/deserialize the output.
+   */
+  Codec<TOutput> getOutputCodec();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index 5e7f9a2..388ef16 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -20,11 +20,11 @@ package org.apache.reef.vortex.api;
 
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.common.VortexFutureDelegate;
 import org.apache.reef.vortex.driver.VortexMaster;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,8 +35,8 @@ import java.util.logging.Logger;
  * The interface between user code and submitted task.
  */
 @Unstable
-public final class VortexFuture<TOutput extends Serializable>
-    implements Future<TOutput>, VortexFutureDelegate<TOutput> {
+public final class VortexFuture<TOutput>
+    implements Future<TOutput>, VortexFutureDelegate {
   private static final Logger LOG = 
Logger.getLogger(VortexFuture.class.getName());
 
   // userResult starts out as null. If not null => variable is set and tasklet 
returned.
@@ -49,13 +49,15 @@ public final class VortexFuture<TOutput extends 
Serializable>
   private final Executor executor;
   private final VortexMaster vortexMaster;
   private final int taskletId;
+  private final Codec<TOutput> outputCodec;
 
   /**
    * Creates a {@link VortexFuture}.
    */
   @Private
-  public VortexFuture(final Executor executor, final VortexMaster 
vortexMaster, final int taskletId) {
-    this(executor, vortexMaster, taskletId, null);
+  public VortexFuture(final Executor executor, final VortexMaster 
vortexMaster, final int taskletId,
+                      final Codec<TOutput> outputCodec) {
+    this(executor, vortexMaster, taskletId, outputCodec, null);
   }
 
   /**
@@ -65,10 +67,12 @@ public final class VortexFuture<TOutput extends 
Serializable>
   public VortexFuture(final Executor executor,
                       final VortexMaster vortexMaster,
                       final int taskletId,
+                      final Codec<TOutput> outputCodec,
                       final FutureCallback<TOutput> callbackHandler) {
     this.executor = executor;
     this.vortexMaster = vortexMaster;
     this.taskletId = taskletId;
+    this.outputCodec = outputCodec;
     this.callbackHandler = callbackHandler;
   }
 
@@ -181,9 +185,11 @@ public final class VortexFuture<TOutput extends 
Serializable>
    */
   @Private
   @Override
-  public void completed(final int pTaskletId, final TOutput result) {
+  public void completed(final int pTaskletId, final byte[] serializedResult) {
     assert taskletId == pTaskletId;
 
+    // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+    final TOutput result = outputCodec.decode(serializedResult);
     this.userResult = Optional.ofNullable(result);
     if (callbackHandler != null) {
       executor.execute(new Runnable() {
@@ -201,7 +207,7 @@ public final class VortexFuture<TOutput extends 
Serializable>
    */
   @Private
   @Override
-  public void aggregationCompleted(final List<Integer> taskletIds, final 
TOutput result) {
+  public void aggregationCompleted(final List<Integer> taskletIds, final 
byte[] serializedResult) {
     throw new RuntimeException("Functions not associated with 
AggregationFunctions cannot be aggregated.");
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
index 1b02894..e2b1d35 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
@@ -23,7 +23,6 @@ import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.driver.VortexMaster;
 
 import javax.inject.Inject;
-import java.io.Serializable;
 
 /**
  * Distributed thread pool.
@@ -44,7 +43,7 @@ public final class VortexThreadPool {
    * @param <TOutput> output type
    * @return VortexFuture for tracking execution progress
    */
-  public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
+  public <TInput, TOutput> VortexFuture<TOutput>
       submit(final VortexFunction<TInput, TOutput> function, final TInput 
input) {
     return vortexMaster.enqueueTasklet(function, input, 
Optional.<FutureCallback<TOutput>>empty());
   }
@@ -57,7 +56,7 @@ public final class VortexThreadPool {
    * @param <TOutput> output type
    * @return VortexFuture for tracking execution progress
    */
-  public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
+  public <TInput, TOutput> VortexFuture<TOutput>
       submit(final VortexFunction<TInput, TOutput> function, final TInput 
input,
              final FutureCallback<TOutput> callback) {
     return vortexMaster.enqueueTasklet(function, input, Optional.of(callback));

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
index ce4a015..1e52a2e 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
@@ -22,7 +22,6 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -33,17 +32,17 @@ import java.util.List;
 @Private
 @DriverSide
 @Unstable
-public final class TaskletAggregationResultReport<TOutput extends 
Serializable> implements TaskletReport {
+public final class TaskletAggregationResultReport implements TaskletReport {
   private final List<Integer> taskletIds;
-  private final TOutput result;
+  private final byte[] serializedResult;
 
   /**
    * @param taskletIds of the tasklets.
-   * @param result of the tasklet execution.
+   * @param serializedResult of the tasklet execution in a serialized form.
    */
-  public TaskletAggregationResultReport(final List<Integer> taskletIds, final 
TOutput result) {
+  public TaskletAggregationResultReport(final List<Integer> taskletIds, final 
byte[] serializedResult) {
     this.taskletIds = Collections.unmodifiableList(new 
ArrayList<>(taskletIds));
-    this.result = result;
+    this.serializedResult = serializedResult;
   }
 
   /**
@@ -62,10 +61,10 @@ public final class TaskletAggregationResultReport<TOutput 
extends Serializable>
   }
 
   /**
-   * @return the result of the Tasklet aggregation execution.
+   * @return the result of the Tasklet aggregation execution in a serialized 
form.
    */
-  public TOutput getResult() {
-    return result;
+  public byte[] getSerializedResult() {
+    return serializedResult;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
index 8e43e4b..d85c69b 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
@@ -19,16 +19,16 @@
 package org.apache.reef.vortex.common;
 
 import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexFunction;
 
-import java.io.Serializable;
-
 /**
  * Request to execute a tasklet.
  */
 @Unstable
-public final class TaskletExecutionRequest<TInput extends Serializable, 
TOutput extends Serializable>
-    implements VortexRequest {
+@Private
+public final class TaskletExecutionRequest<TInput, TOutput> implements 
VortexRequest {
   private final int taskletId;
   private final VortexFunction<TInput, TOutput> userFunction;
   private final TInput input;
@@ -54,9 +54,13 @@ public final class TaskletExecutionRequest<TInput extends 
Serializable, TOutput
 
   /**
    * Execute the function using the input.
+   * @return Output of the function in a serialized form.
    */
-  public TOutput execute() throws Exception {
-    return userFunction.call(input);
+  public byte[] execute() throws Exception {
+    final TOutput output = userFunction.call(input);
+    final Codec<TOutput> codec = userFunction.getOutputCodec();
+    // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+    return codec.encode(output);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
index 6392b23..98149c0 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
@@ -22,15 +22,13 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
-import java.io.Serializable;
-
 /**
  * The interface for a status report from the {@link 
org.apache.reef.vortex.evaluator.VortexWorker}.
  */
 @Unstable
 @Private
 @DriverSide
-public interface TaskletReport extends Serializable {
+public interface TaskletReport {
   /**
    * Type of TaskletReport.
    */

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index 8e3bac3..2c32578 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -22,25 +22,23 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
-import java.io.Serializable;
-
 /**
  * Report of a tasklet execution result.
  */
 @Unstable
 @Private
 @DriverSide
-public final class TaskletResultReport<TOutput extends Serializable> 
implements TaskletReport {
+public final class TaskletResultReport implements TaskletReport {
   private final int taskletId;
-  private final TOutput result;
+  private final byte[] serializedResult;
 
   /**
    * @param taskletId of the Tasklet.
-   * @param result of the Tasklet execution.
+   * @param serializedResult of the tasklet execution in a serialized form.
    */
-  public TaskletResultReport(final int taskletId, final TOutput result) {
+  public TaskletResultReport(final int taskletId, final byte[] 
serializedResult) {
     this.taskletId = taskletId;
-    this.result = result;
+    this.serializedResult = serializedResult;
   }
 
   /**
@@ -59,10 +57,10 @@ public final class TaskletResultReport<TOutput extends 
Serializable> implements
   }
 
   /**
-   * @return the result of the Tasklet execution.
+   * @return the result of the tasklet execution in a serialized form.
    */
-  public TOutput getResult() {
-    return result;
+  public byte[] getSerializedResult() {
+    return serializedResult;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index cc3cced..2200af3 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -30,7 +30,6 @@ import org.apache.reef.vortex.common.avro.*;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -56,10 +55,11 @@ public final class VortexAvroUtils {
       // The following TODOs are sub-issues of cleaning up Serializable in 
Vortex (REEF-504).
       // The purpose is to reduce serialization cost, which leads to 
bottleneck in Master.
       // Temporarily those are left as TODOs, but will be addressed in 
separate PRs.
-      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
-      final byte[] serializedInput = 
SerializationUtils.serialize(taskletExecutionRequest.getInput());
+      final VortexFunction vortexFunction = 
taskletExecutionRequest.getFunction();
+      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+      final byte[] serializedInput = 
vortexFunction.getInputCodec().encode(taskletExecutionRequest.getInput());
       // TODO[REEF-1003]: Use reflection instead of serialization when 
launching VortexFunction
-      final byte[] serializedFunction = 
SerializationUtils.serialize(taskletExecutionRequest.getFunction());
+      final byte[] serializedFunction = 
SerializationUtils.serialize(vortexFunction);
       avroVortexRequest = AvroVortexRequest.newBuilder()
           .setRequestType(AvroRequestType.ExecuteTasklet)
           .setTaskletRequest(
@@ -101,29 +101,24 @@ public final class VortexAvroUtils {
       switch (taskletReport.getType()) {
       case TaskletResult:
         final TaskletResultReport taskletResultReport = (TaskletResultReport) 
taskletReport;
-        // TODO[REEF-1005]: Allow custom codecs for input/output data in 
Vortex.
-        final byte[] serializedOutput = 
SerializationUtils.serialize(taskletResultReport.getResult());
         avroTaskletReport = AvroTaskletReport.newBuilder()
             .setReportType(AvroReportType.TaskletResult)
             .setTaskletReport(
                 AvroTaskletResultReport.newBuilder()
                     .setTaskletId(taskletResultReport.getTaskletId())
-                    .setSerializedOutput(ByteBuffer.wrap(serializedOutput))
+                    
.setSerializedOutput(ByteBuffer.wrap(taskletResultReport.getSerializedResult()))
                     .build())
             .build();
         break;
       case TaskletAggregationResult:
         final TaskletAggregationResultReport taskletAggregationResultReport =
             (TaskletAggregationResultReport) taskletReport;
-        // TODO[REEF-1005]: Allow custom codecs for input/output data in 
Vortex.
-        final byte[] serializedAggregationOutput =
-            
SerializationUtils.serialize(taskletAggregationResultReport.getResult());
         avroTaskletReport = AvroTaskletReport.newBuilder()
             .setReportType(AvroReportType.TaskletAggregationResult)
             .setTaskletReport(
                 AvroTaskletAggregationResultReport.newBuilder()
                     
.setTaskletIds(taskletAggregationResultReport.getTaskletIds())
-                    
.setSerializedOutput(ByteBuffer.wrap(serializedAggregationOutput))
+                    
.setSerializedOutput(ByteBuffer.wrap(taskletAggregationResultReport.getSerializedResult()))
                     .build())
             .build();
         break;
@@ -196,11 +191,9 @@ public final class VortexAvroUtils {
       final VortexFunction function =
           (VortexFunction) SerializationUtils.deserialize(
               taskletExecutionRequest.getSerializedUserFunction().array());
-      // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
-      final Serializable input =
-          (Serializable) SerializationUtils.deserialize(
-              taskletExecutionRequest.getSerializedInput().array());
-      vortexRequest = new 
TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function, 
input);
+      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
+      vortexRequest = new 
TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function,
+         
function.getInputCodec().decode(taskletExecutionRequest.getSerializedInput().array()));
       break;
     case CancelTasklet:
       final AvroTaskletCancellationRequest taskletCancellationRequest =
@@ -229,19 +222,15 @@ public final class VortexAvroUtils {
       case TaskletResult:
         final AvroTaskletResultReport taskletResultReport =
             (AvroTaskletResultReport)avroTaskletReport.getTaskletReport();
-        // TODO[REEF-1005]: Allow custom codecs for input/output data in 
Vortex.
-        final Serializable output =
-            (Serializable) 
SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
-        taskletReport = new 
TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+        taskletReport = new 
TaskletResultReport(taskletResultReport.getTaskletId(),
+            taskletResultReport.getSerializedOutput().array());
         break;
       case TaskletAggregationResult:
         final AvroTaskletAggregationResultReport 
taskletAggregationResultReport =
             
(AvroTaskletAggregationResultReport)avroTaskletReport.getTaskletReport();
-        final Serializable aggregationOutput =
-            (Serializable) SerializationUtils.deserialize(
-                taskletAggregationResultReport.getSerializedOutput().array());
         taskletReport =
-            new 
TaskletAggregationResultReport<>(taskletAggregationResultReport.getTaskletIds(),
 aggregationOutput);
+            new 
TaskletAggregationResultReport(taskletAggregationResultReport.getTaskletIds(),
+                taskletAggregationResultReport.getSerializedOutput().array());
         break;
       case TaskletCancelled:
         final AvroTaskletCancelledReport taskletCancelledReport =

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
index 55f3cf5..e6fa91e 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
@@ -22,7 +22,6 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
-import java.io.Serializable;
 import java.util.List;
 
 /**
@@ -32,17 +31,18 @@ import java.util.List;
 @Unstable
 @DriverSide
 @Private
-public interface VortexFutureDelegate<TOutput extends Serializable> {
+public interface VortexFutureDelegate {
 
   /**
    * A Tasklet associated with the future has completed with a result.
+   * The result should be decoded as in {@link 
org.apache.reef.vortex.api.VortexFuture#completed(int, byte[])}.
    */
-  void completed(final int taskletId, final TOutput result);
+  void completed(final int taskletId, final byte[] serializedResult);
 
   /**
    * The list of aggregated Tasklets associated with the Future that have 
completed with a result.
    */
-  void aggregationCompleted(final List<Integer> taskletIds, final TOutput 
result);
+  void aggregationCompleted(final List<Integer> taskletIds, final byte[] 
serializedResult);
 
   /**
    * A Tasklet associated with the Future has thrown an Exception.

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
index 5d59a96..133b007 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
@@ -20,13 +20,11 @@ package org.apache.reef.vortex.common;
 
 import org.apache.reef.annotations.Unstable;
 
-import java.io.Serializable;
-
 /**
  * Master-to-Worker protocol.
  */
 @Unstable
-public interface VortexRequest extends Serializable {
+public interface VortexRequest {
   /**
    * Type of Request.
    */

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
index 7c88a44..7a82eee 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
@@ -22,7 +22,6 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,7 +35,7 @@ import java.util.List;
 @Private
 @Unstable
 @DriverSide
-public final class WorkerReport implements Serializable {
+public final class WorkerReport {
   private ArrayList<TaskletReport> taskletReports;
 
   public WorkerReport(final Collection<TaskletReport> taskletReports) {

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index 80c3cb0..f0b4949 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -20,6 +20,7 @@ package org.apache.reef.vortex.driver;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.FutureCallback;
@@ -28,7 +29,6 @@ import org.apache.reef.vortex.api.VortexFuture;
 import org.apache.reef.vortex.common.*;
 
 import javax.inject.Inject;
-import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -63,16 +63,17 @@ final class DefaultVortexMaster implements VortexMaster {
    * Add a new tasklet to pendingTasklets.
    */
   @Override
-  public <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
+  public <TInput, TOutput> VortexFuture<TOutput>
       enqueueTasklet(final VortexFunction<TInput, TOutput> function, final 
TInput input,
                      final Optional<FutureCallback<TOutput>> callback) {
     // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
     final VortexFuture<TOutput> vortexFuture;
     final int id = taskletIdCounter.getAndIncrement();
+    final Codec<TOutput> outputCodec = function.getOutputCodec();
     if (callback.isPresent()) {
-      vortexFuture = new VortexFuture<>(executor, this, id, callback.get());
+      vortexFuture = new VortexFuture<>(executor, this, id, outputCodec, 
callback.get());
     } else {
-      vortexFuture = new VortexFuture<>(executor, this, id);
+      vortexFuture = new VortexFuture<>(executor, this, id, outputCodec);
     }
 
     final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture);
@@ -121,7 +122,7 @@ final class DefaultVortexMaster implements VortexMaster {
         final int resultTaskletId = taskletResultReport.getTaskletId();
         final List<Integer> singletonResultTaskletId = 
Collections.singletonList(resultTaskletId);
         runningWorkers.doneTasklets(workerId, singletonResultTaskletId);
-        fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, 
taskletResultReport.getResult());
+        fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, 
taskletResultReport.getSerializedResult());
 
         break;
       case TaskletAggregationResult:
@@ -131,7 +132,7 @@ final class DefaultVortexMaster implements VortexMaster {
         final List<Integer> aggregatedTaskletIds = 
taskletAggregationResultReport.getTaskletIds();
         runningWorkers.doneTasklets(workerId, aggregatedTaskletIds);
         fetchDelegate(aggregatedTaskletIds).aggregationCompleted(
-            aggregatedTaskletIds, taskletAggregationResultReport.getResult());
+            aggregatedTaskletIds, 
taskletAggregationResultReport.getSerializedResult());
 
         break;
       case TaskletCancelled:

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
index 24db3cb..6f5d519 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -22,13 +22,11 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.common.VortexFutureDelegate;
 
-import java.io.Serializable;
-
 /**
  * Representation of user task in Driver.
  */
 @DriverSide
-class Tasklet<TInput extends Serializable, TOutput extends Serializable> {
+class Tasklet<TInput, TOutput> {
   private final int taskletId;
   private final VortexFunction<TInput, TOutput> userTask;
   private final TInput input;

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index becf3f9..a423706 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -27,8 +27,6 @@ import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
 import org.apache.reef.vortex.common.WorkerReport;
 
-import java.io.Serializable;
-
 /**
  * The heart of Vortex.
  * Processes various tasklet related events/requests coming from different 
components of the system.
@@ -40,7 +38,7 @@ public interface VortexMaster {
   /**
    * Submit a new Tasklet to be run sometime in the future, with an optional 
callback function on the result.
    */
-  <TInput extends Serializable, TOutput extends Serializable> 
VortexFuture<TOutput>
+  <TInput, TOutput> VortexFuture<TOutput>
       enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, 
final TInput input,
                      final Optional<FutureCallback<TOutput>> callback);
 

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index ffba985..88911e3 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -24,7 +24,6 @@ import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.vortex.common.TaskletCancellationRequest;
 import org.apache.reef.vortex.common.TaskletExecutionRequest;
 
-import java.io.Serializable;
 import java.util.*;
 
 /**
@@ -42,8 +41,7 @@ class VortexWorkerManager {
     this.reefTask = reefTask;
   }
 
-  <TInput extends Serializable, TOutput extends Serializable>
-      void launchTasklet(final Tasklet<TInput, TOutput> tasklet) {
+  <TInput, TOutput> void launchTasklet(final Tasklet<TInput, TOutput> tasklet) 
{
     assert !runningTasklets.containsKey(tasklet.getId());
     runningTasklets.put(tasklet.getId(), tasklet);
     final TaskletExecutionRequest<TInput, TOutput> taskletExecutionRequest
@@ -91,4 +89,4 @@ class VortexWorkerManager {
   boolean containsTasklet(final Integer taskletId) {
     return runningTasklets.containsKey(taskletId);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 920f1a9..3390c22 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -34,7 +34,6 @@ import org.apache.reef.vortex.driver.VortexWorkerConf;
 import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
@@ -108,9 +107,9 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
 
                       try {
                         // Command Executor: Execute the command
-                        final Serializable result = 
taskletExecutionRequest.execute();
                         final TaskletReport taskletReport =
-                            new 
TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
+                            new 
TaskletResultReport(taskletExecutionRequest.getTaskletId(),
+                                taskletExecutionRequest.execute());
                         taskletReports.add(taskletReport);
                       } catch (final InterruptedException ex) {
                         // Assumes that user's thread follows convention that 
cancelled Futures
@@ -149,7 +148,6 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
               if (future != null) {
                 future.cancel(true);
               }
-
               break;
             default:
               throw new RuntimeException("Unknown Command");

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
index 299a31a..5e32c8c 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
@@ -18,12 +18,15 @@
  */
 package org.apache.reef.vortex.examples.addone;
 
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * Outputs input + 1.
  */
 final class AddOneFunction implements VortexFunction<Integer, Integer> {
+  private static final Codec<Integer> CODEC = new SerializableCodec<>();
   /**
    * Outputs input + 1.
    */
@@ -31,4 +34,14 @@ final class AddOneFunction implements 
VortexFunction<Integer, Integer> {
   public Integer call(final Integer input) throws Exception {
     return input + 1;
   }
+
+  @Override
+  public Codec<Integer> getInputCodec() {
+    return CODEC;
+  }
+
+  @Override
+  public Codec<Integer> getOutputCodec() {
+    return CODEC;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
index 52f7d21..89c6918 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
@@ -18,20 +18,31 @@
  */
 package org.apache.reef.vortex.examples.hello;
 
+import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexFunction;
-
-import java.io.Serializable;
+import org.apache.reef.vortex.util.VoidCodec;
 
 /**
  * Prints to stdout.
  */
-final class HelloVortexFunction implements VortexFunction {
+final class HelloVortexFunction implements VortexFunction<Void, Void> {
+  private static final Codec<Void> CODEC =  new VoidCodec();
   /**
    * Prints to stdout.
    */
   @Override
-  public Serializable call(final Serializable serializable) throws Exception {
+  public Void call(final Void input) throws Exception {
     System.out.println("Hello, Vortex!");
     return null;
   }
+
+  @Override
+  public Codec<Void> getInputCodec() {
+    return CODEC;
+  }
+
+  @Override
+  public Codec<Void> getOutputCodec() {
+    return CODEC;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
index 2a2374a..db4a320 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
@@ -18,12 +18,16 @@
  */
 package org.apache.reef.vortex.examples.matmul;
 
+import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * Computes multiplication of two matrices.
  */
 final class MatMulFunction implements VortexFunction<MatMulInput, 
MatMulOutput> {
+  private static final Codec<MatMulInput> INPUT_CODEC = new MatMulInputCodec();
+  private static final Codec<MatMulOutput> OUTPUT_CODEC = new 
MatMulOutputCodec();
+
   /**
    * Computes multiplication of two matrices.
    * @param input Input which contains two matrices to multiply,
@@ -39,4 +43,14 @@ final class MatMulFunction implements 
VortexFunction<MatMulInput, MatMulOutput>
     final Matrix<Double> result = leftMatrix.multiply(rightMatrix);
     return new MatMulOutput(index, result);
   }
+
+  @Override
+  public Codec<MatMulInput> getInputCodec() {
+    return INPUT_CODEC;
+  }
+
+  @Override
+  public Codec<MatMulOutput> getOutputCodec() {
+    return OUTPUT_CODEC;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
index 86be004..b35b402 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
@@ -18,13 +18,11 @@
  */
 package org.apache.reef.vortex.examples.matmul;
 
-import java.io.Serializable;
-
 /**
  * Input of {@link MatMulFunction} which contains two matrices to multiply,
  * and index of the sub-matrix in the entire result.
  */
-class MatMulInput implements Serializable {
+final class MatMulInput {
   private final int index;
   private final Matrix<Double> leftMatrix;
   private final Matrix<Double> rightMatrix;

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
new file mode 100644
index 0000000..229ba0e
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.examples.matmul;
+
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Encodes/decodes {@link MatMulInput} to/from byte array.
+ */
+final class MatMulInputCodec implements Codec<MatMulInput> {
+
+  @Override
+  public byte[] encode(final MatMulInput matMulInput) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      try (DataOutputStream daos = new DataOutputStream(baos)) {
+        final int index = matMulInput.getIndex();
+        final Matrix<Double> leftMatrix = matMulInput.getLeftMatrix();
+        final Matrix<Double> rightMatrix = matMulInput.getRightMatrix();
+
+        daos.writeInt(index);
+        encodeMatrixToStream(daos, leftMatrix);
+        encodeMatrixToStream(daos, rightMatrix);
+
+        return baos.toByteArray();
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public MatMulInput decode(final byte[] buf) {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) {
+      try (DataInputStream dais = new DataInputStream(bais)) {
+        final int index = dais.readInt();
+        final Matrix leftMatrix = decodeMatrixFromStream(dais);
+        final Matrix rightMatrix = decodeMatrixFromStream(dais);
+        return new MatMulInput(index, leftMatrix, rightMatrix);
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Encode a Matrix to output stream.
+   */
+  private void encodeMatrixToStream(final DataOutputStream stream, final 
Matrix<Double> matrix) throws IOException {
+    final int numRow = matrix.getNumRows();
+    final int numColumn = matrix.getNumColumns();
+
+    stream.writeInt(numRow);
+    stream.writeInt(numColumn);
+
+    for (final List<Double> row : matrix.getRows()) {
+      for (final double element : row) {
+        stream.writeDouble(element);
+      }
+    }
+  }
+
+  /**
+   * Decode a Matrix from input stream.
+   */
+  private Matrix decodeMatrixFromStream(final DataInputStream stream) throws 
IOException {
+    final int numRow = stream.readInt();
+    final int numColumn = stream.readInt();
+
+    final List<List<Double>> rows = new ArrayList<>(numRow);
+    for (int rowIndex = 0; rowIndex < numRow; rowIndex++) {
+      final List<Double> row = new ArrayList<>(numColumn);
+      for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) {
+        row.add(stream.readDouble());
+      }
+      rows.add(row);
+    }
+    return new RowMatrix(Collections.unmodifiableList(rows));
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
index 42c118c..99ee5bb 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
@@ -18,12 +18,10 @@
  */
 package org.apache.reef.vortex.examples.matmul;
 
-import java.io.Serializable;
-
 /**
  * Output of {@link MatMulFunction} which contains the sub-matrix and index of 
it in the entire result.
  */
-class MatMulOutput implements Serializable {
+final class MatMulOutput {
   private final int index;
   private final Matrix<Double> result;
 

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
new file mode 100644
index 0000000..04d974a
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.examples.matmul;
+
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Encodes/decodes {@link MatMulOutput} to/from byte array.
+ */
+final class MatMulOutputCodec implements Codec<MatMulOutput> {
+
+  @Override
+  public byte[] encode(final MatMulOutput matMulOutput) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      try (DataOutputStream daos = new DataOutputStream(baos)) {
+        final int index = matMulOutput.getIndex();
+        final Matrix<Double> result = matMulOutput.getResult();
+
+        daos.writeInt(index);
+        encodeMatrixToStream(daos, result);
+
+        return baos.toByteArray();
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public MatMulOutput decode(final byte[] buf) {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) {
+      try (DataInputStream dais = new DataInputStream(bais)) {
+        final int index = dais.readInt();
+        final Matrix result = decodeMatrixFromStream(dais);
+        return new MatMulOutput(index, result);
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Encode a Matrix to output stream.
+   */
+  private void encodeMatrixToStream(final DataOutputStream stream, final 
Matrix<Double> matrix) throws IOException {
+    final int numRow = matrix.getNumRows();
+    final int numColumn = matrix.getNumColumns();
+
+    stream.writeInt(numRow);
+    stream.writeInt(numColumn);
+
+    for (final List<Double> row : matrix.getRows()) {
+      for (final double element : row) {
+        stream.writeDouble(element);
+      }
+    }
+  }
+
+  /**
+   * Decode a Matrix from input stream.
+   */
+  private Matrix decodeMatrixFromStream(final DataInputStream stream) throws 
IOException {
+    final int numRow = stream.readInt();
+    final int numColumn = stream.readInt();
+
+    final List<List<Double>> rows = new ArrayList<>(numRow);
+    for (int rowIndex = 0; rowIndex < numRow; rowIndex++) {
+      final List<Double> row = new ArrayList<>(numColumn);
+      for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) {
+        row.add(stream.readDouble());
+      }
+      rows.add(row);
+    }
+    return new RowMatrix(Collections.unmodifiableList(rows));
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java
index 0b1fe2e..6cd4c05 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/Matrix.java
@@ -18,14 +18,13 @@
  */
 package org.apache.reef.vortex.examples.matmul;
 
-import java.io.Serializable;
 import java.util.List;
 
 /**
- * Interface of serializable Matrix.
+ * Interface of Matrix.
  * @param <T> Type of elements in Matrix.
  */
-interface Matrix<T> extends Serializable {
+interface Matrix<T> {
 
   /**
    * Add another matrix. Note that dimensions of two matrices should be 
identical.

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
new file mode 100644
index 0000000..7e21066
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.util;
+
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * Codec for empty input/output.
+ */
+public final class VoidCodec implements Codec<Void> {
+
+  @Override
+  public byte[] encode(final Void obj) {
+    return new byte[0];
+  }
+
+  @Override
+  public Void decode(final byte[] buf) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
new file mode 100644
index 0000000..fcbb9f5
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Utilities used in Vortex.
+ */
+package org.apache.reef.vortex.util;

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index 325d3d4..18e328d 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.vortex.driver;
 
+import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
@@ -40,6 +41,8 @@ import static org.junit.Assert.*;
  * Test whether DefaultVortexMaster correctly handles (simulated) events.
  */
 public class DefaultVortexMasterTest {
+  private static final byte[] EMPTY_RESULT = new byte[0];
+  private static final byte[] INTEGER_RESULT = new 
SerializableCodec<Integer>().encode(1);
   private TestUtil testUtil = new TestUtil();
 
   /**
@@ -75,7 +78,7 @@ public class DefaultVortexMasterTest {
 
     final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, 
pendingTasklets, 1);
     for (final int taskletId : taskletIds) {
-      final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, 
null);
+      final TaskletReport taskletReport = new TaskletResultReport(taskletId, 
INTEGER_RESULT);
       vortexMaster.workerReported(
           vortexWorkerManager1.getId(), new 
WorkerReport(Collections.singletonList(taskletReport)));
     }
@@ -115,7 +118,7 @@ public class DefaultVortexMasterTest {
 
     // Completed?
     for (final int taskletId : taskletIds2) {
-      final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, 
null);
+      final TaskletReport taskletReport = new TaskletResultReport(taskletId, 
EMPTY_RESULT);
       vortexMaster.workerReported(
           vortexWorkerManager2.getId(), new 
WorkerReport(Collections.singletonList(taskletReport)));
     }
@@ -146,7 +149,7 @@ public class DefaultVortexMasterTest {
     final int numOfTasklets = 100;
     for (int i = 0; i < numOfTasklets; i++) {
       vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), 
null,
-          Optional.<FutureCallback<Integer>>empty()));
+          Optional.<FutureCallback<Void>>empty()));
     }
     final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, 
pendingTasklets, numOfTasklets);
 
@@ -166,7 +169,7 @@ public class DefaultVortexMasterTest {
     for (final int taskletId : taskletIds2) {
       final String workerId = 
runningWorkers.getWhereTaskletWasScheduledTo(taskletId);
       assertNotNull("The tasklet must have been scheduled", workerId);
-      final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, 
null);
+      final TaskletReport taskletReport = new TaskletResultReport(taskletId, 
EMPTY_RESULT);
       vortexMaster.workerReported(
           workerId, new 
WorkerReport(Collections.singletonList(taskletReport)));
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index ab93a08..c2dee99 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -19,6 +19,9 @@
 package org.apache.reef.vortex.driver;
 
 import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.vortex.util.VoidCodec;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
@@ -26,7 +29,6 @@ import org.apache.reef.vortex.common.*;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -44,6 +46,9 @@ import static org.mockito.Mockito.when;
  * Utility methods for tests.
  */
 public final class TestUtil {
+  private static final Codec<Void> VOID_CODEC = new VoidCodec();
+  private static final Codec<Integer> INTEGER_CODEC = new 
SerializableCodec<>();
+
   private final AtomicInteger taskletId = new AtomicInteger(0);
   private final AtomicInteger workerId = new AtomicInteger(0);
   private final Executor executor = Executors.newFixedThreadPool(5);
@@ -85,18 +90,28 @@ public final class TestUtil {
    */
   public Tasklet newTasklet() {
     final int id = taskletId.getAndIncrement();
-    return new Tasklet(id, null, null, new VortexFuture(executor, 
vortexMaster, id));
+    return new Tasklet(id, null, null, new VortexFuture(executor, 
vortexMaster, id, VOID_CODEC));
   }
 
   /**
    * @return a new dummy function.
    */
-  public VortexFunction newFunction() {
-    return new VortexFunction() {
+  public VortexFunction<Void, Void> newFunction() {
+    return new VortexFunction<Void, Void>() {
       @Override
-      public Serializable call(final Serializable serializable) throws 
Exception {
+      public Void call(final Void input) throws Exception {
         return null;
       }
+
+      @Override
+      public Codec getInputCodec() {
+        return VOID_CODEC;
+      }
+
+      @Override
+      public Codec getOutputCodec() {
+        return VOID_CODEC;
+      }
     };
   }
 
@@ -110,10 +125,10 @@ public final class TestUtil {
   /**
    * @return a new dummy function.
    */
-  public VortexFunction newInfiniteLoopFunction() {
-    return new VortexFunction() {
+  public VortexFunction<Void, Void> newInfiniteLoopFunction() {
+    return new VortexFunction<Void, Void>() {
       @Override
-      public Serializable call(final Serializable serializable) throws 
Exception {
+      public Void call(final Void input) throws Exception {
         while(true) {
           Thread.sleep(100);
           if (Thread.currentThread().isInterrupted()) {
@@ -121,6 +136,16 @@ public final class TestUtil {
           }
         }
       }
+
+      @Override
+      public Codec getInputCodec() {
+        return VOID_CODEC;
+      }
+
+      @Override
+      public Codec getOutputCodec() {
+        return VOID_CODEC;
+      }
     };
   }
 
@@ -130,9 +155,19 @@ public final class TestUtil {
   public VortexFunction<Integer, Integer> newIntegerFunction() {
     return new VortexFunction<Integer, Integer>() {
       @Override
-      public Integer call(final Integer integer) throws Exception {
+      public Integer call(final Integer input) throws Exception {
         return 1;
       }
+
+      @Override
+      public Codec<Integer> getInputCodec() {
+        return INTEGER_CODEC;
+      }
+
+      @Override
+      public Codec<Integer> getOutputCodec() {
+        return INTEGER_CODEC;
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
index e064976..8db6827 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
@@ -18,12 +18,15 @@
  */
 package org.apache.reef.tests.applications.vortex.addone;
 
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * Outputs Input+1.
  */
 public final class AddOneFunction implements VortexFunction<Integer, Integer> {
+  private static final Codec<Integer> CODEC = new SerializableCodec<>();
   /**
    * Outputs Input+1.
    */
@@ -31,4 +34,14 @@ public final class AddOneFunction implements 
VortexFunction<Integer, Integer> {
   public Integer call(final Integer input) throws Exception {
     return input + 1;
   }
+
+  @Override
+  public Codec<Integer> getInputCodec() {
+    return CODEC;
+  }
+
+  @Override
+  public Codec<Integer> getOutputCodec() {
+    return CODEC;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
index f1c982e..b885c82 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
@@ -19,20 +19,22 @@
 
 package org.apache.reef.tests.applications.vortex.cancellation;
 
+import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexFunction;
+import org.apache.reef.vortex.util.VoidCodec;
 
-import java.io.Serializable;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
  * Runs an infinite loop and waits for cancellation.
  */
-public final class InfiniteLoopWithCancellationFunction implements 
VortexFunction {
+public final class InfiniteLoopWithCancellationFunction implements 
VortexFunction<Void, Void> {
   private static final Logger LOG = 
Logger.getLogger(InfiniteLoopWithCancellationFunction.class.getName());
+  private static final Codec<Void> CODEC = new VoidCodec();
 
   @Override
-  public Serializable call(final Serializable serializable) throws Exception {
+  public Void call(final Void input) throws Exception {
     LOG.log(Level.FINE, "Entered Infinite Loop Tasklet.");
     while (true) {
       Thread.sleep(100);
@@ -41,4 +43,14 @@ public final class InfiniteLoopWithCancellationFunction 
implements VortexFunctio
       }
     }
   }
+
+  @Override
+  public Codec<Void> getInputCodec() {
+    return CODEC;
+  }
+
+  @Override
+  public Codec<Void> getOutputCodec() {
+    return CODEC;
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
index 481bb5f..e37e6f1 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTestStart.java
@@ -42,7 +42,7 @@ public final class TaskletCancellationTestStart implements 
VortexStart {
   @Override
   public void start(final VortexThreadPool vortexThreadPool) {
     final InfiniteLoopWithCancellationFunction function = new 
InfiniteLoopWithCancellationFunction();
-    final VortexFuture future = vortexThreadPool.submit(function, 0);
+    final VortexFuture future = vortexThreadPool.submit(function, null);
 
     try {
       // Hacky way to increase probability that the task has been launched.

http://git-wip-us.apache.org/repos/asf/reef/blob/8b997771/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
index a6a5737..aeff37b 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
@@ -18,14 +18,27 @@
  */
 package org.apache.reef.tests.applications.vortex.exception;
 
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * A test Vortex function that throws an Exception.
  */
 public final class ExceptionFunction implements VortexFunction<Integer, 
Integer> {
+  private static final Codec<Integer> CODEC = new SerializableCodec<>();
   @Override
-  public Integer call(final Integer integer) throws Exception {
+  public Integer call(final Integer input) throws Exception {
     throw new RuntimeException("Expected test exception.");
   }
+
+  @Override
+  public Codec<Integer> getInputCodec() {
+    return CODEC;
+  }
+
+  @Override
+  public Codec<Integer> getOutputCodec() {
+    return CODEC;
+  }
 }

Reply via email to