[REEF-504] Clean up Serializable in Vortex
 
This addressed the issue by 
  * Using Kryo as the one and only serializer in Vortex
  * Making classes Kryo-compatible
  * Removing Avro-related code
  * Refactoring protocol names
 
JIRA:
  [REEF-504](https://issues.apache.org/jira/browse/REEF-504)

Pull Request:
  Closes #931


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/48bde0c0
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/48bde0c0
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/48bde0c0

Branch: refs/heads/master
Commit: 48bde0c0ec0ad4ffbe88b65a11d9fccb453eff6e
Parents: 120327f
Author: John Yang <[email protected]>
Authored: Sun Apr 3 16:50:12 2016 +0900
Committer: Andrew Chung <[email protected]>
Committed: Wed Apr 13 10:08:02 2016 -0700

----------------------------------------------------------------------
 lang/java/reef-applications/reef-vortex/pom.xml |  44 +--
 .../src/main/avro/VortexRequest.avsc            |  80 ----
 .../reef-vortex/src/main/avro/WorkerReport.avsc | 125 -------
 .../vortex/api/VortexAggregateFunction.java     |  18 +-
 .../reef/vortex/api/VortexAggregateFuture.java  |  16 +-
 .../reef/vortex/api/VortexAggregatePolicy.java  |  15 +-
 .../apache/reef/vortex/api/VortexFunction.java  |  25 +-
 .../apache/reef/vortex/api/VortexFuture.java    |  21 +-
 .../common/AggregateFunctionRepository.java     |  81 ----
 .../apache/reef/vortex/common/KryoUtils.java    |  75 ++++
 .../TaskletAggregateExecutionRequest.java       |  69 ----
 .../common/TaskletAggregationFailureReport.java |  65 ----
 .../common/TaskletAggregationRequest.java       | 105 ------
 .../common/TaskletAggregationResultReport.java  |  70 ----
 .../common/TaskletCancellationRequest.java      |  45 ---
 .../vortex/common/TaskletCancelledReport.java   |  48 ---
 .../vortex/common/TaskletExecutionRequest.java  |  86 -----
 .../vortex/common/TaskletFailureReport.java     |  65 ----
 .../reef/vortex/common/TaskletReport.java       |  47 ---
 .../reef/vortex/common/TaskletResultReport.java |  66 ----
 .../reef/vortex/common/VortexAvroUtils.java     | 374 -------------------
 .../vortex/common/VortexFutureDelegate.java     |  61 ---
 .../reef/vortex/common/VortexRequest.java       |  42 ---
 .../apache/reef/vortex/common/WorkerReport.java |  51 ---
 .../apache/reef/vortex/common/package-info.java |   2 +-
 .../driver/AggregateFunctionRepository.java     |  69 ++++
 .../reef/vortex/driver/DefaultVortexMaster.java |  41 +-
 .../reef/vortex/driver/RunningWorkers.java      |   1 -
 .../org/apache/reef/vortex/driver/Tasklet.java  |   1 -
 .../apache/reef/vortex/driver/VortexDriver.java |  14 +-
 .../vortex/driver/VortexFutureDelegate.java     |  60 +++
 .../apache/reef/vortex/driver/VortexMaster.java |   4 +-
 .../reef/vortex/driver/VortexRequestor.java     |  22 +-
 .../reef/vortex/driver/VortexWorkerManager.java |   8 +-
 .../vortex/evaluator/AggregateContainer.java    |  32 +-
 .../reef/vortex/evaluator/VortexWorker.java     |  74 ++--
 .../vortex/examples/addone/AddOneFunction.java  |  13 -
 .../examples/hello/HelloVortexFunction.java     |  13 -
 .../vortex/examples/matmul/MatMulException.java |   5 +
 .../vortex/examples/matmul/MatMulFunction.java  |  14 -
 .../vortex/examples/matmul/MatMulInput.java     |  12 +-
 .../examples/matmul/MatMulInputCodec.java       | 100 -----
 .../vortex/examples/matmul/MatMulOutput.java    |  10 +-
 .../examples/matmul/MatMulOutputCodec.java      |  97 -----
 .../reef/vortex/examples/matmul/RowMatrix.java  |   8 +-
 .../sumones/AdditionAggregateFunction.java      |   9 -
 .../examples/sumones/IdentityFunction.java      |  14 -
 .../mastertoworker/MasterToWorkerRequest.java   |  42 +++
 .../TaskletAggregateExecutionRequest.java       |  75 ++++
 .../TaskletAggregationRequest.java              | 106 ++++++
 .../TaskletCancellationRequest.java             |  51 +++
 .../mastertoworker/TaskletExecutionRequest.java |  88 +++++
 .../protocol/mastertoworker/package-info.java   |  22 ++
 .../TaskletAggregationFailureReport.java        |  71 ++++
 .../TaskletAggregationResultReport.java         |  76 ++++
 .../workertomaster/TaskletCancelledReport.java  |  54 +++
 .../workertomaster/TaskletFailureReport.java    |  71 ++++
 .../workertomaster/TaskletResultReport.java     |  72 ++++
 .../workertomaster/WorkerToMasterReport.java    |  47 +++
 .../workertomaster/WorkerToMasterReports.java   |  55 +++
 .../protocol/workertomaster/package-info.java   |  22 ++
 .../org/apache/reef/vortex/util/VoidCodec.java  |  37 --
 .../apache/reef/vortex/util/package-info.java   |  22 --
 .../vortex/driver/DefaultVortexMasterTest.java  |  30 +-
 .../reef/vortex/driver/RunningWorkersTest.java  |   1 -
 .../vortex/driver/SchedulingPolicyTest.java     |   4 +-
 .../org/apache/reef/vortex/driver/TestUtil.java |  52 +--
 .../applications/vortex/VortexTestSuite.java    |   4 +-
 .../vortex/addone/AddOneFunction.java           |  13 -
 .../InfiniteLoopWithCancellationFunction.java   |  13 -
 .../TaskletCancellationRequestTest.java         |  73 ++++
 .../cancellation/TaskletCancellationTest.java   |  73 ----
 .../vortex/exception/ExceptionFunction.java     |  13 -
 pom.xml                                         |   2 +
 74 files changed, 1315 insertions(+), 2191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/pom.xml 
b/lang/java/reef-applications/reef-vortex/pom.xml
index b140127..0662544 100644
--- a/lang/java/reef-applications/reef-vortex/pom.xml
+++ b/lang/java/reef-applications/reef-vortex/pom.xml
@@ -51,48 +51,20 @@ under the License.
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>de.javakaffee</groupId>
+            <artifactId>kryo-serializers</artifactId>
+            <version>${kryo-serializers.version}</version>
         </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
-                <groupId>org.apache.avro</groupId>
-                <artifactId>avro-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>schema</goal>
-                        </goals>
-                        <configuration>
-                            
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
-                            
<outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>add-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>target/generated-sources/avro</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <executions>

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc 
b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
deleted file mode 100644
index c0ab97b..0000000
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-[
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletAggregateExecutionRequest",
-    "fields": [
-      {"name": "taskletId", "type": "int"},
-      {"name": "aggregateFunctionId", "type": "int"},
-      {"name": "serializedInput", "type": "bytes"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletAggregationRequest",
-    "fields": [
-      {"name": "aggregateFunctionId", "type": "int"},
-      {"name": "serializedUserFunction", "type": "bytes"},
-      {"name": "serializedAggregateFunction", "type": "bytes"},
-      {"name": "serializedPolicy", "type": "bytes"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletExecutionRequest",
-    "fields": [
-      {"name": "taskletId", "type": "int"},
-      {"name": "serializedUserFunction", "type": "bytes"},
-      {"name": "serializedInput", "type": "bytes"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletCancellationRequest",
-    "fields": [{"name": "taskletId", "type": "int"}]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroVortexRequest",
-    "fields": [
-      {
-        "name": "requestType",
-        "type": {"type": "enum", "name": "AvroRequestType",
-        "symbols": ["ExecuteTasklet", "CancelTasklet", "Aggregate", 
"AggregateExecute"]}
-      },
-      {
-        "name": "taskletRequest",
-        "type": [
-          "null",
-          "AvroTaskletAggregateExecutionRequest",
-          "AvroTaskletAggregationRequest",
-          "AvroTaskletExecutionRequest",
-          "AvroTaskletCancellationRequest"
-        ],
-        "default": null
-      }
-    ]
-  }
-]

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc 
b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
deleted file mode 100644
index 5d7395d..0000000
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-[
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletResultReport",
-    "fields": [
-      {"name": "taskletId", "type": "int"},
-      {"name": "serializedOutput", "type": "bytes"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletAggregationResultReport",
-    "fields": [
-      {
-        "name": "taskletIds",
-        "type":
-        {
-          "type": "array",
-          "items": "int"
-        }
-      },
-      {"name": "serializedOutput", "type": "bytes"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletCancelledReport",
-    "fields": [
-      {"name": "taskletId", "type": "int"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletFailureReport",
-    "fields": [
-      {"name": "taskletId", "type": "int"},
-      {"name": "serializedException", "type": "bytes"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletAggregationFailureReport",
-    "fields": [
-      {
-        "name": "taskletIds",
-        "type":
-        {
-          "type": "array",
-          "items": "int"
-        }
-      },
-      {"name": "serializedException", "type": "bytes"}
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroTaskletReport",
-    "fields": [
-      {
-        "name": "reportType",
-        "type":
-        {
-          "type": "enum",
-          "name": "AvroReportType",
-          "symbols": [
-            "TaskletResult",
-            "TaskletAggregationResult",
-            "TaskletCancelled",
-            "TaskletFailure",
-            "TaskletAggregationFailure"
-          ]
-        }
-      },
-      {
-        "name": "taskletReport",
-        "type": [
-          "AvroTaskletResultReport",
-          "AvroTaskletAggregationResultReport",
-          "AvroTaskletCancelledReport",
-          "AvroTaskletFailureReport",
-          "AvroTaskletAggregationFailureReport"
-        ]
-      }
-    ]
-  },
-  {
-    "namespace": "org.apache.reef.vortex.common.avro",
-    "type": "record",
-    "name": "AvroWorkerReport",
-    "fields": [
-      {
-        "name": "taskletReports",
-        "type":
-        {
-          "type": "array",
-          "items": "AvroTaskletReport"
-        }
-      }
-    ]
-  }
-]

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
index d7254d5..b96d19c 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
@@ -21,22 +21,16 @@ package org.apache.reef.vortex.api;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
-import org.apache.reef.io.serialization.Codec;
-
-import java.io.Serializable;
 import java.util.List;
 
 /**
  * Typed user function for Local Aggregation. Implement your functions using 
this interface.
- * TODO[REEF-504]: Clean up Serializable in Vortex.
- * TODO[REEF-1003]: Use reflection instead of serialization when launching 
VortexFunction.
- *
  * @param <TOutput> output type of the aggregation function and the functions 
to-be-aggregated.
  */
 @Public
 @ClientSide
 @Unstable
-public interface VortexAggregateFunction<TOutput> extends Serializable {
+public interface VortexAggregateFunction<TOutput> {
 
   /**
    * Runs a custom local aggregation function on Tasklets assigned to a 
VortexWorker.
@@ -45,14 +39,4 @@ public interface VortexAggregateFunction<TOutput> extends 
Serializable {
    * @throws Exception
    */
   TOutput call(final List<TOutput> taskletOutputs) throws 
VortexAggregateException;
-
-  /**
-   * Users must define codec for the AggregationOutput.
-   * {@link org.apache.reef.vortex.util.VoidCodec} can be used if the 
aggregation output is
-   * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can 
be used for ({@link Serializable}
-   * aggregation output.
-   * Custom aggregation output Codec can also be supplied.
-   * @return Codec used to serialize/deserialize the output.
-   */
-  Codec<TOutput> getOutputCodec();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
index d156a13..0a3aa7b 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
@@ -25,8 +25,7 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.annotations.audience.Public;
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.vortex.common.VortexFutureDelegate;
+import org.apache.reef.vortex.driver.VortexFutureDelegate;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.*;
@@ -41,9 +40,8 @@ import java.util.concurrent.*;
 @ClientSide
 @NotThreadSafe
 @Unstable
-public final class VortexAggregateFuture<TInput, TOutput> implements 
VortexFutureDelegate {
+public final class VortexAggregateFuture<TInput, TOutput> implements 
VortexFutureDelegate<TOutput> {
   private final Executor executor;
-  private final Codec<TOutput> aggOutputCodec;
   private final BlockingQueue<Pair<List<Integer>, AggregateResult>> 
resultQueue;
   private final ConcurrentMap<Integer, TInput> taskletIdInputMap;
   private final FutureCallback<AggregateResult<TInput, TOutput>> 
callbackHandler;
@@ -51,12 +49,10 @@ public final class VortexAggregateFuture<TInput, TOutput> 
implements VortexFutur
   @Private
   public VortexAggregateFuture(final Executor executor,
                                final Map<Integer, TInput> taskletIdInputMap,
-                               final Codec<TOutput> aggOutputCodec,
                                final FutureCallback<AggregateResult<TInput, 
TOutput>> callbackHandler) {
     this.executor = executor;
     this.taskletIdInputMap = new ConcurrentHashMap<>(taskletIdInputMap);
     this.resultQueue = new ArrayBlockingQueue<>(taskletIdInputMap.size());
-    this.aggOutputCodec = aggOutputCodec;
     this.callbackHandler = callbackHandler;
   }
 
@@ -115,10 +111,8 @@ public final class VortexAggregateFuture<TInput, TOutput> 
implements VortexFutur
    */
   @Private
   @Override
-  public void completed(final int taskletId, final byte[] serializedResult) {
+  public void completed(final int taskletId, final TOutput result) {
     try {
-      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-      final TOutput result = aggOutputCodec.decode(serializedResult);
       completedTasklets(result, Collections.singletonList(taskletId));
     } catch (final InterruptedException e) {
       throw new RuntimeException(e);
@@ -130,10 +124,8 @@ public final class VortexAggregateFuture<TInput, TOutput> 
implements VortexFutur
    */
   @Private
   @Override
-  public void aggregationCompleted(final List<Integer> taskletIds, final 
byte[] serializedResult) {
+  public void aggregationCompleted(final List<Integer> taskletIds, final 
TOutput result) {
     try {
-      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-      final TOutput result = aggOutputCodec.decode(serializedResult);
       completedTasklets(result, taskletIds);
     } catch (final InterruptedException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
index ef10eb4..d13feb1 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
@@ -24,20 +24,23 @@ import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.util.Builder;
 import org.apache.reef.util.Optional;
 
-import java.io.Serializable;
-
 /**
  * The policy for local aggregation on the {@link 
org.apache.reef.vortex.evaluator.VortexWorker}s.
  * The Aggregation function will be triggered on the individual {@link 
VortexFunction} results on
  * an "OR" basis of what is specified by the policy.
- * TODO[REEF-504]: Clean up Serializable in Vortex.
  */
 @ClientSide
 @Public
 @Unstable
-public final class VortexAggregatePolicy implements Serializable {
-  private final Optional<Integer> count;
-  private final int periodMilliseconds;
+public final class VortexAggregatePolicy {
+  private Optional<Integer> count;
+  private int periodMilliseconds;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  VortexAggregatePolicy() {
+  }
 
   private VortexAggregatePolicy(final int periodMilliseconds, final 
Optional<Integer> count) {
     this.periodMilliseconds = periodMilliseconds;

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
index 3efe4c5..f53b1c1 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFunction.java
@@ -19,20 +19,17 @@
 package org.apache.reef.vortex.api;
 
 import org.apache.reef.annotations.Unstable;
-import org.apache.reef.io.serialization.Codec;
-
-import java.io.Serializable;
 
 /**
  * Typed user function. Implement your functions using this interface.
- * TODO[REEF-504]: Clean up Serializable in Vortex.
- * TODO[REEF-1003]: Use reflection instead of serialization when launching 
VortexFunction.
+ * Note that Kryo should be able to serialize/deserialize your function and 
input.
+ * Please refer to Kryo project's GitHub repository for how to make 
Kryo-compatible objects.
  *
  * @param <TInput> input type
  * @param <TOutput> output type
  */
 @Unstable
-public interface VortexFunction<TInput, TOutput> extends Serializable {
+public interface VortexFunction<TInput, TOutput> {
   /**
    * @param input of the function
    * @return output of the function
@@ -41,20 +38,4 @@ public interface VortexFunction<TInput, TOutput> extends 
Serializable {
    * For example if threads are spawned here, shut them down before throwing 
an exception
    */
   TOutput call(TInput input) throws Exception;
-
-  /**
-   * Users must define codec for the input. {@link 
org.apache.reef.vortex.util.VoidCodec} can be used if the input is
-   * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can 
be used for ({@link Serializable} input.
-   * {@link org.apache.reef.vortex.examples.matmul.MatMulInputCodec} is an 
example of codec for the custom input.
-   * @return Codec used to serialize/deserialize the input.
-   */
-  Codec<TInput> getInputCodec();
-
-  /**
-   * Users must define codec for the output. {@link 
org.apache.reef.vortex.util.VoidCodec} can be used if the output is
-   * empty, and {@link org.apache.reef.io.serialization.SerializableCodec} can 
be used for ({@link Serializable} output.
-   * {@link org.apache.reef.vortex.examples.matmul.MatMulOutputCodec} is an 
example of codec for the custom output.
-   * @return Codec used to serialize/deserialize the output.
-   */
-  Codec<TOutput> getOutputCodec();
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index 3cb6a9b..446011c 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -20,9 +20,8 @@ package org.apache.reef.vortex.api;
 
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.util.Optional;
-import org.apache.reef.vortex.common.VortexFutureDelegate;
+import org.apache.reef.vortex.driver.VortexFutureDelegate;
 import org.apache.reef.vortex.driver.VortexMaster;
 
 import java.util.List;
@@ -35,8 +34,7 @@ import java.util.logging.Logger;
  * The interface between user code and submitted task.
  */
 @Unstable
-public final class VortexFuture<TOutput>
-    implements Future<TOutput>, VortexFutureDelegate {
+public final class VortexFuture<TOutput> implements Future<TOutput>, 
VortexFutureDelegate<TOutput> {
   private static final Logger LOG = 
Logger.getLogger(VortexFuture.class.getName());
 
   // userResult starts out as null. If not null => variable is set and tasklet 
returned.
@@ -49,15 +47,13 @@ public final class VortexFuture<TOutput>
   private final Executor executor;
   private final VortexMaster vortexMaster;
   private final int taskletId;
-  private final Codec<TOutput> outputCodec;
 
   /**
    * Creates a {@link VortexFuture}.
    */
   @Private
-  public VortexFuture(final Executor executor, final VortexMaster 
vortexMaster, final int taskletId,
-                      final Codec<TOutput> outputCodec) {
-    this(executor, vortexMaster, taskletId, outputCodec, null);
+  public VortexFuture(final Executor executor, final VortexMaster 
vortexMaster, final int taskletId) {
+    this(executor, vortexMaster, taskletId, null);
   }
 
   /**
@@ -67,12 +63,10 @@ public final class VortexFuture<TOutput>
   public VortexFuture(final Executor executor,
                       final VortexMaster vortexMaster,
                       final int taskletId,
-                      final Codec<TOutput> outputCodec,
                       final FutureCallback<TOutput> callbackHandler) {
     this.executor = executor;
     this.vortexMaster = vortexMaster;
     this.taskletId = taskletId;
-    this.outputCodec = outputCodec;
     this.callbackHandler = callbackHandler;
   }
 
@@ -187,11 +181,8 @@ public final class VortexFuture<TOutput>
    */
   @Private
   @Override
-  public void completed(final int pTaskletId, final byte[] serializedResult) {
+  public void completed(final int pTaskletId, final TOutput result) {
     assert taskletId == pTaskletId;
-
-    // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-    final TOutput result = outputCodec.decode(serializedResult);
     this.userResult = Optional.ofNullable(result);
     if (callbackHandler != null) {
       executor.execute(new Runnable() {
@@ -209,7 +200,7 @@ public final class VortexFuture<TOutput>
    */
   @Private
   @Override
-  public void aggregationCompleted(final List<Integer> taskletIds, final 
byte[] serializedResult) {
+  public void aggregationCompleted(final List<Integer> taskletIds, final 
TOutput result) {
     throw new RuntimeException("Functions not associated with 
AggregationFunctions cannot be aggregated.");
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
deleted file mode 100644
index c45dcde..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.commons.lang3.tuple.Triple;
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.vortex.api.VortexAggregateFunction;
-import org.apache.reef.vortex.api.VortexAggregatePolicy;
-import org.apache.reef.vortex.api.VortexFunction;
-
-import javax.annotation.concurrent.ThreadSafe;
-import javax.inject.Inject;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A repository for {@link VortexAggregateFunction} and its associated {@link 
VortexFunction},
- * used to pass functions between VortexMaster and RunningWorkers, as well as 
used to cache functions
- * for VortexWorkers on AggregateRequests and AggregateExecutionRequests.
- */
-@ThreadSafe
-@Unstable
-@Private
-public final class AggregateFunctionRepository {
-  private final ConcurrentMap<Integer, Triple<VortexAggregateFunction, 
VortexFunction, VortexAggregatePolicy>>
-      aggregateFunctionMap = new ConcurrentHashMap<>();
-
-  @Inject
-  private AggregateFunctionRepository() {
-  }
-
-  /**
-   * Associates an aggregate function ID with a {@link 
VortexAggregateFunction} and a {@link VortexFunction}.
-   */
-  public Triple<VortexAggregateFunction, VortexFunction, 
VortexAggregatePolicy> put(
-      final int aggregateFunctionId,
-      final VortexAggregateFunction aggregateFunction,
-      final VortexFunction function,
-      final VortexAggregatePolicy policy) {
-    return aggregateFunctionMap.put(aggregateFunctionId, new 
ImmutableTriple<>(aggregateFunction, function, policy));
-  }
-
-  /**
-   * Gets the {@link VortexAggregateFunction} associated with the aggregate 
function ID.
-   */
-  public VortexAggregateFunction getAggregateFunction(final int 
aggregateFunctionId) {
-    return aggregateFunctionMap.get(aggregateFunctionId).getLeft();
-  }
-
-  /**
-   * Gets the {@link VortexFunction} associated with the aggregate function ID.
-   */
-  public VortexFunction getFunction(final int aggregateFunctionId) {
-    return aggregateFunctionMap.get(aggregateFunctionId).getMiddle();
-  }
-
-  /**
-   * Gets the {@link VortexAggregatePolicy} associated with the aggregate 
function ID.
-   */
-  public VortexAggregatePolicy getPolicy(final int aggregateFunctionId) {
-    return aggregateFunctionMap.get(aggregateFunctionId).getRight();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java
new file mode 100644
index 0000000..5fe3f17
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/KryoUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * The one and only serializer for the Vortex protocol.
+ */
+@Private
+@Unstable
+public final class KryoUtils {
+  /**
+   * For reducing Kryo object instantiation cost.
+   */
+  private final KryoPool kryoPool;
+
+  @Inject
+  private KryoUtils() {
+    final KryoFactory factory = new KryoFactory() {
+      @Override
+      public Kryo create() {
+        final Kryo kryo = new Kryo();
+        UnmodifiableCollectionsSerializer.registerSerializers(kryo); // 
Required to serialize/deserialize Throwable
+        return kryo;
+      }
+    };
+    kryoPool = new KryoPool.Builder(factory).softReferences().build();
+  }
+
+  public byte[] serialize(final Object object) {
+    try (final Output out = new Output(new ByteArrayOutputStream())) {
+      final Kryo kryo = kryoPool.borrow();
+      kryo.writeClassAndObject(out, object);
+      kryoPool.release(kryo);
+      return out.toBytes();
+    }
+  }
+
+  public Object deserialize(final byte[] bytes) {
+    try (final Input input = new Input(new ByteArrayInputStream(bytes))) {
+      final Kryo kryo = kryoPool.borrow();
+      final Object object = kryo.readClassAndObject(input);
+      kryoPool.release(kryo);
+      return object;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java
deleted file mode 100644
index db850fc..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregateExecutionRequest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-/**
- * A request from the Vortex Driver to run an aggregate-able function.
- */
-@Unstable
-@Private
-@DriverSide
-public final class TaskletAggregateExecutionRequest<TInput> implements 
VortexRequest {
-  private final TInput input;
-  private final int aggregateFunctionId;
-  private final int taskletId;
-
-  public TaskletAggregateExecutionRequest(final int taskletId,
-                                          final int aggregateFunctionId,
-                                          final TInput input) {
-    this.taskletId = taskletId;
-    this.input = input;
-    this.aggregateFunctionId = aggregateFunctionId;
-  }
-
-  /**
-   * @return input of the request.
-   */
-  public TInput getInput() {
-    return input;
-  }
-
-  /**
-   * @return tasklet ID corresponding to the tasklet request.
-   */
-  public int getTaskletId() {
-    return taskletId;
-  }
-
-  /**
-   * @return the AggregateFunctionID of the request.
-   */
-  public int getAggregateFunctionId() {
-    return aggregateFunctionId;
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.ExecuteAggregateTasklet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
deleted file mode 100644
index e6a4c82..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Report of a tasklet exception on aggregation.
- */
-@Unstable
-public final class TaskletAggregationFailureReport implements TaskletReport {
-  private final List<Integer> taskletIds;
-  private final Exception exception;
-
-  /**
-   * @param taskletIds of the failed tasklet(s).
-   * @param exception that caused the tasklet failure.
-   */
-  public TaskletAggregationFailureReport(final List<Integer> taskletIds, final 
Exception exception) {
-    this.taskletIds = Collections.unmodifiableList(new 
ArrayList<>(taskletIds));
-    this.exception = exception;
-  }
-
-  /**
-   * @return the type of this TaskletReport.
-   */
-  @Override
-  public TaskletReportType getType() {
-    return TaskletReportType.TaskletAggregationFailure;
-  }
-
-  /**
-   * @return the taskletIds that failed on aggregation.
-   */
-  public List<Integer> getTaskletIds() {
-    return taskletIds;
-  }
-
-  /**
-   * @return the exception that caused the tasklet aggregation failure.
-   */
-  public Exception getException() {
-    return exception;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
deleted file mode 100644
index 6d0d3a6..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.vortex.api.VortexAggregateFunction;
-import org.apache.reef.vortex.api.VortexAggregatePolicy;
-import org.apache.reef.vortex.api.VortexFunction;
-
-import java.util.List;
-
-/**
- * A request from the Vortex Driver for the {@link 
org.apache.reef.vortex.evaluator.VortexWorker} to
- * record aggregate functions for later execution.
- */
-@Unstable
-@Private
-@DriverSide
-public final class TaskletAggregationRequest<TInput, TOutput> implements 
VortexRequest {
-  private final int aggregateFunctionId;
-  private final VortexAggregateFunction<TOutput> userAggregateFunction;
-  private final VortexFunction<TInput, TOutput> function;
-  private final VortexAggregatePolicy policy;
-
-  public TaskletAggregationRequest(final int aggregateFunctionId,
-                                   final VortexAggregateFunction<TOutput> 
aggregateFunction,
-                                   final VortexFunction<TInput, TOutput> 
function,
-                                   final VortexAggregatePolicy policy) {
-    this.aggregateFunctionId = aggregateFunctionId;
-    this.userAggregateFunction = aggregateFunction;
-    this.function = function;
-    this.policy = policy;
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.AggregateTasklets;
-  }
-
-  /**
-   * @return the AggregateFunctionID of the aggregate function.
-   */
-  public int getAggregateFunctionId() {
-    return aggregateFunctionId;
-  }
-
-  /**
-   * @return the aggregate function as specified by the user.
-   */
-  public VortexAggregateFunction getAggregateFunction() {
-    return userAggregateFunction;
-  }
-
-  /**
-   * @return the user specified function.
-   */
-  public VortexFunction getFunction() {
-    return function;
-  }
-
-  /**
-   * @return the aggregation policy.
-   */
-  public VortexAggregatePolicy getPolicy() {
-    return policy;
-  }
-
-  /**
-   * Execute the aggregate function using the list of outputs.
-   * @return Output of the function in a serialized form.
-   */
-  public byte[] executeAggregation(final List<TOutput> outputs) throws 
Exception {
-    final TOutput output = userAggregateFunction.call(outputs);
-    final Codec<TOutput> codec = userAggregateFunction.getOutputCodec();
-
-    // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-    return codec.encode(output);
-  }
-
-  /**
-   * Execute the user specified function.
-   */
-  public TOutput executeFunction(final TInput input) throws Exception {
-    return function.call(input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
deleted file mode 100644
index 1e52a2e..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Report of a Tasklet aggregation execution result.
- */
-@Private
-@DriverSide
-@Unstable
-public final class TaskletAggregationResultReport implements TaskletReport {
-  private final List<Integer> taskletIds;
-  private final byte[] serializedResult;
-
-  /**
-   * @param taskletIds of the tasklets.
-   * @param serializedResult of the tasklet execution in a serialized form.
-   */
-  public TaskletAggregationResultReport(final List<Integer> taskletIds, final 
byte[] serializedResult) {
-    this.taskletIds = Collections.unmodifiableList(new 
ArrayList<>(taskletIds));
-    this.serializedResult = serializedResult;
-  }
-
-  /**
-   * @return the type of this TaskletReport.
-   */
-  @Override
-  public TaskletReportType getType() {
-    return TaskletReportType.TaskletAggregationResult;
-  }
-
-  /**
-   * @return the TaskletId(s) of this TaskletReport
-   */
-  public List<Integer> getTaskletIds() {
-    return taskletIds;
-  }
-
-  /**
-   * @return the result of the Tasklet aggregation execution in a serialized 
form.
-   */
-  public byte[] getSerializedResult() {
-    return serializedResult;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
deleted file mode 100644
index 88f2d89..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancellationRequest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-
-/**
- * A {@link VortexRequest} to cancel tasklets.
- */
-@Unstable
-public final class TaskletCancellationRequest implements VortexRequest {
-  private final int taskletId;
-
-  public TaskletCancellationRequest(final int taskletId) {
-    this.taskletId = taskletId;
-  }
-
-  /**
-   * @return the ID of the VortexTasklet associated with this VortexRequest.
-   */
-  public int getTaskletId() {
-    return taskletId;
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.CancelTasklet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
deleted file mode 100644
index c09a02f..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletCancelledReport.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-
-/**
- * The report of a cancelled Tasklet.
- */
-@Unstable
-public final class TaskletCancelledReport implements TaskletReport {
-  private int taskletId;
-
-  /**
-   * @param taskletId of the cancelled tasklet.
-   */
-  public TaskletCancelledReport(final int taskletId) {
-    this.taskletId = taskletId;
-  }
-
-  @Override
-  public TaskletReportType getType() {
-    return TaskletReportType.TaskletCancelled;
-  }
-
-  /**
-   * @return the taskletId of this TaskletReport.
-   */
-  public int getTaskletId() {
-    return taskletId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
deleted file mode 100644
index e850c9a..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletExecutionRequest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.vortex.api.VortexFunction;
-
-/**
- * Request to execute a tasklet.
- */
-@Unstable
-@Private
-public final class TaskletExecutionRequest<TInput, TOutput> implements 
VortexRequest {
-  private final int taskletId;
-  private final VortexFunction<TInput, TOutput> userFunction;
-  private final TInput input;
-
-  /**
-   * @return the type of this VortexRequest.
-   */
-  @Override
-  public RequestType getType() {
-    return RequestType.ExecuteTasklet;
-  }
-
-  /**
-   * Request from Vortex Master to Vortex Worker to execute a tasklet.
-   */
-  public TaskletExecutionRequest(final int taskletId,
-                                 final VortexFunction<TInput, TOutput> 
userFunction,
-                                 final TInput input) {
-    this.taskletId = taskletId;
-    this.userFunction = userFunction;
-    this.input = input;
-  }
-
-  /**
-   * Execute the function using the input.
-   * @return Output of the function in a serialized form.
-   */
-  public byte[] execute() throws Exception {
-    final TOutput output = userFunction.call(input);
-    final Codec<TOutput> codec = userFunction.getOutputCodec();
-    // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-    return codec.encode(output);
-  }
-
-  /**
-   * @return the ID of the VortexTasklet associated with this VortexRequest.
-   */
-  public int getTaskletId() {
-    return taskletId;
-  }
-
-  /**
-   * Get function of the tasklet.
-   */
-  public VortexFunction getFunction() {
-    return userFunction;
-  }
-
-  /**
-   * Get input of the tasklet.
-   */
-  public TInput getInput() {
-    return input;
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
deleted file mode 100644
index 5c0b2de..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-/**
- * Report of a Tasklet exception.
- */
-@Unstable
-@Private
-@DriverSide
-public final class TaskletFailureReport implements TaskletReport {
-  private final int taskletId;
-  private final Exception exception;
-
-  /**
-   * @param taskletId of the failed Tasklet.
-   * @param exception that caused the tasklet failure.
-   */
-  public TaskletFailureReport(final int taskletId, final Exception exception) {
-    this.taskletId = taskletId;
-    this.exception = exception;
-  }
-
-  /**
-   * @return the type of this TaskletReport.
-   */
-  @Override
-  public TaskletReportType getType() {
-    return TaskletReportType.TaskletFailure;
-  }
-
-  /**
-   * @return the taskletId of this TaskletReport.
-   */
-  public int getTaskletId() {
-    return taskletId;
-  }
-
-  /**
-   * @return the exception that caused the Tasklet failure.
-   */
-  public Exception getException() {
-    return exception;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
deleted file mode 100644
index 98149c0..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-/**
- * The interface for a status report from the {@link 
org.apache.reef.vortex.evaluator.VortexWorker}.
- */
-@Unstable
-@Private
-@DriverSide
-public interface TaskletReport {
-  /**
-   * Type of TaskletReport.
-   */
-  enum TaskletReportType {
-    TaskletResult,
-    TaskletAggregationResult,
-    TaskletCancelled,
-    TaskletFailure,
-    TaskletAggregationFailure
-  }
-
-  /**
-   * @return the type of this TaskletReport.
-   */
-  TaskletReportType getType();
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
deleted file mode 100644
index 2c32578..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-/**
- * Report of a tasklet execution result.
- */
-@Unstable
-@Private
-@DriverSide
-public final class TaskletResultReport implements TaskletReport {
-  private final int taskletId;
-  private final byte[] serializedResult;
-
-  /**
-   * @param taskletId of the Tasklet.
-   * @param serializedResult of the tasklet execution in a serialized form.
-   */
-  public TaskletResultReport(final int taskletId, final byte[] 
serializedResult) {
-    this.taskletId = taskletId;
-    this.serializedResult = serializedResult;
-  }
-
-  /**
-   * @return the type of this TaskletReport.
-   */
-  @Override
-  public TaskletReportType getType() {
-    return TaskletReportType.TaskletResult;
-  }
-
-  /**
-   * @return the TaskletId of this TaskletReport
-   */
-  public int getTaskletId() {
-    return taskletId;
-  }
-
-  /**
-   * @return the result of the tasklet execution in a serialized form.
-   */
-  public byte[] getSerializedResult() {
-    return serializedResult;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
deleted file mode 100644
index f0e930b..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.avro.io.*;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.vortex.api.VortexAggregateFunction;
-import org.apache.reef.vortex.api.VortexAggregatePolicy;
-import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.common.avro.*;
-
-import javax.inject.Inject;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Serialize and deserialize Vortex message to/from byte array.
- */
-@Private
-@DriverSide
-@Unstable
-public final class VortexAvroUtils {
-  private final AggregateFunctionRepository aggregateFunctionRepository;
-
-  @Inject
-  private VortexAvroUtils(final AggregateFunctionRepository 
aggregateFunctionRepository) {
-    this.aggregateFunctionRepository = aggregateFunctionRepository;
-  }
-
-  /**
-   * Serialize VortexRequest to byte array.
-   * @param vortexRequest Vortex request message to serialize.
-   * @return Serialized byte array.
-   */
-  public byte[] toBytes(final VortexRequest vortexRequest) {
-    // Convert VortexRequest message to Avro message.
-    final AvroVortexRequest avroVortexRequest;
-    switch (vortexRequest.getType()) {
-    case ExecuteAggregateTasklet:
-      final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest =
-          (TaskletAggregateExecutionRequest) vortexRequest;
-      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-      final byte[] serializedInputForAggregate =
-        
aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId())
-            
.getInputCodec().encode(taskletAggregateExecutionRequest.getInput());
-      avroVortexRequest = AvroVortexRequest.newBuilder()
-          .setRequestType(AvroRequestType.AggregateExecute)
-          .setTaskletRequest(
-              AvroTaskletAggregateExecutionRequest.newBuilder()
-                  
.setAggregateFunctionId(taskletAggregateExecutionRequest.getAggregateFunctionId())
-                  
.setSerializedInput(ByteBuffer.wrap(serializedInputForAggregate))
-                  
.setTaskletId(taskletAggregateExecutionRequest.getTaskletId())
-                  .build())
-          .build();
-      break;
-    case AggregateTasklets:
-      final TaskletAggregationRequest taskletAggregationRequest = 
(TaskletAggregationRequest) vortexRequest;
-
-      // TODO[REEF-1003]: Use reflection instead of serialization when 
launching VortexFunction
-      final byte[] serializedAggregateFunction = SerializationUtils.serialize(
-          taskletAggregationRequest.getAggregateFunction());
-      final byte[] serializedFunctionForAggregation = 
SerializationUtils.serialize(
-          taskletAggregationRequest.getFunction());
-      final byte[] serializedPolicy = SerializationUtils.serialize(
-          taskletAggregationRequest.getPolicy());
-      avroVortexRequest = AvroVortexRequest.newBuilder()
-          .setRequestType(AvroRequestType.Aggregate)
-          .setTaskletRequest(AvroTaskletAggregationRequest.newBuilder()
-              
.setAggregateFunctionId(taskletAggregationRequest.getAggregateFunctionId())
-              
.setSerializedAggregateFunction(ByteBuffer.wrap(serializedAggregateFunction))
-              
.setSerializedUserFunction(ByteBuffer.wrap(serializedFunctionForAggregation))
-              .setSerializedPolicy(ByteBuffer.wrap(serializedPolicy))
-              .build())
-          .build();
-      break;
-    case ExecuteTasklet:
-      final TaskletExecutionRequest taskletExecutionRequest = 
(TaskletExecutionRequest) vortexRequest;
-      // The following TODOs are sub-issues of cleaning up Serializable in 
Vortex (REEF-504).
-      // The purpose is to reduce serialization cost, which leads to 
bottleneck in Master.
-      // Temporarily those are left as TODOs, but will be addressed in 
separate PRs.
-      final VortexFunction vortexFunction = 
taskletExecutionRequest.getFunction();
-      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-      final byte[] serializedInput = 
vortexFunction.getInputCodec().encode(taskletExecutionRequest.getInput());
-      // TODO[REEF-1003]: Use reflection instead of serialization when 
launching VortexFunction
-      final byte[] serializedFunction = 
SerializationUtils.serialize(vortexFunction);
-      avroVortexRequest = AvroVortexRequest.newBuilder()
-          .setRequestType(AvroRequestType.ExecuteTasklet)
-          .setTaskletRequest(
-              AvroTaskletExecutionRequest.newBuilder()
-                  .setTaskletId(taskletExecutionRequest.getTaskletId())
-                  .setSerializedInput(ByteBuffer.wrap(serializedInput))
-                  
.setSerializedUserFunction(ByteBuffer.wrap(serializedFunction))
-                  .build())
-          .build();
-      break;
-    case CancelTasklet:
-      final TaskletCancellationRequest taskletCancellationRequest = 
(TaskletCancellationRequest) vortexRequest;
-      avroVortexRequest = AvroVortexRequest.newBuilder()
-          .setRequestType(AvroRequestType.CancelTasklet)
-          .setTaskletRequest(
-              AvroTaskletCancellationRequest.newBuilder()
-                  .setTaskletId(taskletCancellationRequest.getTaskletId())
-                  .build())
-          .build();
-      break;
-    default:
-      throw new RuntimeException("Undefined message type");
-    }
-
-    // Serialize the Avro message to byte array.
-    return toBytes(avroVortexRequest, AvroVortexRequest.class);
-  }
-
-  /**
-   * Serialize WorkerReport to byte array.
-   * @param workerReport Worker report message to serialize.
-   * @return Serialized byte array.
-   */
-  public byte[] toBytes(final WorkerReport workerReport) {
-    final List<AvroTaskletReport> workerTaskletReports = new ArrayList<>();
-
-    for (final TaskletReport taskletReport : workerReport.getTaskletReports()) 
{
-      final AvroTaskletReport avroTaskletReport;
-      switch (taskletReport.getType()) {
-      case TaskletResult:
-        final TaskletResultReport taskletResultReport = (TaskletResultReport) 
taskletReport;
-        avroTaskletReport = AvroTaskletReport.newBuilder()
-            .setReportType(AvroReportType.TaskletResult)
-            .setTaskletReport(
-                AvroTaskletResultReport.newBuilder()
-                    .setTaskletId(taskletResultReport.getTaskletId())
-                    
.setSerializedOutput(ByteBuffer.wrap(taskletResultReport.getSerializedResult()))
-                    .build())
-            .build();
-        break;
-      case TaskletAggregationResult:
-        final TaskletAggregationResultReport taskletAggregationResultReport =
-            (TaskletAggregationResultReport) taskletReport;
-        avroTaskletReport = AvroTaskletReport.newBuilder()
-            .setReportType(AvroReportType.TaskletAggregationResult)
-            .setTaskletReport(
-                AvroTaskletAggregationResultReport.newBuilder()
-                    
.setTaskletIds(taskletAggregationResultReport.getTaskletIds())
-                    
.setSerializedOutput(ByteBuffer.wrap(taskletAggregationResultReport.getSerializedResult()))
-                    .build())
-            .build();
-        break;
-      case TaskletCancelled:
-        final TaskletCancelledReport taskletCancelledReport = 
(TaskletCancelledReport) taskletReport;
-        avroTaskletReport = AvroTaskletReport.newBuilder()
-            .setReportType(AvroReportType.TaskletCancelled)
-            .setTaskletReport(
-                AvroTaskletCancelledReport.newBuilder()
-                    .setTaskletId(taskletCancelledReport.getTaskletId())
-                    .build())
-            .build();
-        break;
-      case TaskletFailure:
-        final TaskletFailureReport taskletFailureReport = 
(TaskletFailureReport) taskletReport;
-        final byte[] serializedException = 
SerializationUtils.serialize(taskletFailureReport.getException());
-        avroTaskletReport = AvroTaskletReport.newBuilder()
-            .setReportType(AvroReportType.TaskletFailure)
-            .setTaskletReport(
-                AvroTaskletFailureReport.newBuilder()
-                    .setTaskletId(taskletFailureReport.getTaskletId())
-                    
.setSerializedException(ByteBuffer.wrap(serializedException))
-                    .build())
-            .build();
-        break;
-      case TaskletAggregationFailure:
-        final TaskletAggregationFailureReport taskletAggregationFailureReport =
-            (TaskletAggregationFailureReport) taskletReport;
-        final byte[] serializedAggregationException =
-            
SerializationUtils.serialize(taskletAggregationFailureReport.getException());
-        avroTaskletReport = AvroTaskletReport.newBuilder()
-            .setReportType(AvroReportType.TaskletAggregationFailure)
-            .setTaskletReport(
-                AvroTaskletAggregationFailureReport.newBuilder()
-                    
.setTaskletIds(taskletAggregationFailureReport.getTaskletIds())
-                    
.setSerializedException(ByteBuffer.wrap(serializedAggregationException))
-                    .build())
-            .build();
-        break;
-      default:
-        throw new RuntimeException("Undefined message type");
-      }
-
-      workerTaskletReports.add(avroTaskletReport);
-    }
-
-    // Convert WorkerReport message to Avro message.
-    final AvroWorkerReport avroWorkerReport = AvroWorkerReport.newBuilder()
-        .setTaskletReports(workerTaskletReports)
-        .build();
-
-    // Serialize the Avro message to byte array.
-    return toBytes(avroWorkerReport, AvroWorkerReport.class);
-  }
-
-  /**
-   * Deserialize byte array to VortexRequest.
-   * @param bytes Byte array to deserialize.
-   * @return De-serialized VortexRequest.
-   */
-  public VortexRequest toVortexRequest(final byte[] bytes) {
-    final AvroVortexRequest avroVortexRequest = toAvroObject(bytes, 
AvroVortexRequest.class);
-
-    final VortexRequest vortexRequest;
-    switch (avroVortexRequest.getRequestType()) {
-    case AggregateExecute:
-      final AvroTaskletAggregateExecutionRequest 
taskletAggregateExecutionRequest =
-          
(AvroTaskletAggregateExecutionRequest)avroVortexRequest.getTaskletRequest();
-      vortexRequest = new 
TaskletAggregateExecutionRequest<>(taskletAggregateExecutionRequest.getTaskletId(),
-          taskletAggregateExecutionRequest.getAggregateFunctionId(),
-          
aggregateFunctionRepository.getFunction(taskletAggregateExecutionRequest.getAggregateFunctionId())
-              
.getInputCodec().decode(taskletAggregateExecutionRequest.getSerializedInput().array()));
-      break;
-    case Aggregate:
-      final AvroTaskletAggregationRequest taskletAggregationRequest =
-          (AvroTaskletAggregationRequest)avroVortexRequest.getTaskletRequest();
-      final VortexAggregateFunction aggregateFunction =
-          (VortexAggregateFunction) SerializationUtils.deserialize(
-              
taskletAggregationRequest.getSerializedAggregateFunction().array());
-      final VortexFunction functionForAggregation =
-          (VortexFunction) SerializationUtils.deserialize(
-              taskletAggregationRequest.getSerializedUserFunction().array());
-      final VortexAggregatePolicy policy =
-          (VortexAggregatePolicy) SerializationUtils.deserialize(
-              taskletAggregationRequest.getSerializedPolicy().array());
-      vortexRequest = new 
TaskletAggregationRequest<>(taskletAggregationRequest.getAggregateFunctionId(),
-          aggregateFunction, functionForAggregation, policy);
-      break;
-    case ExecuteTasklet:
-      final AvroTaskletExecutionRequest taskletExecutionRequest =
-          (AvroTaskletExecutionRequest)avroVortexRequest.getTaskletRequest();
-      // TODO[REEF-1003]: Use reflection instead of serialization when 
launching VortexFunction
-      final VortexFunction function =
-          (VortexFunction) SerializationUtils.deserialize(
-              taskletExecutionRequest.getSerializedUserFunction().array());
-      // TODO[REEF-1113]: Handle serialization failure separately in Vortex
-      vortexRequest = new 
TaskletExecutionRequest(taskletExecutionRequest.getTaskletId(), function,
-         
function.getInputCodec().decode(taskletExecutionRequest.getSerializedInput().array()));
-      break;
-    case CancelTasklet:
-      final AvroTaskletCancellationRequest taskletCancellationRequest =
-          
(AvroTaskletCancellationRequest)avroVortexRequest.getTaskletRequest();
-      vortexRequest = new 
TaskletCancellationRequest(taskletCancellationRequest.getTaskletId());
-      break;
-    default:
-      throw new RuntimeException("Undefined VortexRequest type");
-    }
-    return vortexRequest;
-  }
-
-  /**
-   * Deserialize byte array to WorkerReport.
-   * @param bytes Byte array to deserialize.
-   * @return De-serialized WorkerReport.
-   */
-  public WorkerReport toWorkerReport(final byte[] bytes) {
-    final AvroWorkerReport avroWorkerReport = toAvroObject(bytes, 
AvroWorkerReport.class);
-    final List<TaskletReport> workerTaskletReports = new ArrayList<>();
-
-    for (final AvroTaskletReport avroTaskletReport : 
avroWorkerReport.getTaskletReports()) {
-      final TaskletReport taskletReport;
-
-      switch (avroTaskletReport.getReportType()) {
-      case TaskletResult:
-        final AvroTaskletResultReport taskletResultReport =
-            (AvroTaskletResultReport) avroTaskletReport.getTaskletReport();
-        taskletReport = new 
TaskletResultReport(taskletResultReport.getTaskletId(),
-            taskletResultReport.getSerializedOutput().array());
-        break;
-      case TaskletAggregationResult:
-        final AvroTaskletAggregationResultReport 
taskletAggregationResultReport =
-            (AvroTaskletAggregationResultReport) 
avroTaskletReport.getTaskletReport();
-        taskletReport =
-            new 
TaskletAggregationResultReport(taskletAggregationResultReport.getTaskletIds(),
-                taskletAggregationResultReport.getSerializedOutput().array());
-        break;
-      case TaskletCancelled:
-        final AvroTaskletCancelledReport taskletCancelledReport =
-            (AvroTaskletCancelledReport) avroTaskletReport.getTaskletReport();
-        taskletReport = new 
TaskletCancelledReport(taskletCancelledReport.getTaskletId());
-        break;
-      case TaskletFailure:
-        final AvroTaskletFailureReport taskletFailureReport =
-            (AvroTaskletFailureReport) avroTaskletReport.getTaskletReport();
-        final Exception exception =
-            (Exception) 
SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
-        taskletReport = new 
TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
-        break;
-      case TaskletAggregationFailure:
-        final AvroTaskletAggregationFailureReport 
taskletAggregationFailureReport =
-            (AvroTaskletAggregationFailureReport) 
avroTaskletReport.getTaskletReport();
-        final Exception aggregationException =
-            (Exception) SerializationUtils.deserialize(
-                
taskletAggregationFailureReport.getSerializedException().array());
-        taskletReport =
-            new 
TaskletAggregationFailureReport(taskletAggregationFailureReport.getTaskletIds(),
 aggregationException);
-        break;
-      default:
-        throw new RuntimeException("Undefined TaskletReport type");
-      }
-
-      workerTaskletReports.add(taskletReport);
-    }
-
-    return new WorkerReport(workerTaskletReports);
-  }
-
-  /**
-   * Serialize Avro object to byte array.
-   * @param avroObject Avro object to serialize.
-   * @param theClass Class of the Avro object.
-   * @param <T> Type of the Avro object.
-   * @return Serialized byte array.
-   */
-  private <T> byte[] toBytes(final T avroObject, final Class<T> theClass) {
-    final DatumWriter<T> reportWriter = new SpecificDatumWriter<>(theClass);
-    final byte[] theBytes;
-    try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-      final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, 
null);
-      reportWriter.write(avroObject, encoder);
-      encoder.flush();
-      out.flush();
-      theBytes = out.toByteArray();
-      return theBytes;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Deserialize byte array to Avro object.
-   * @param bytes Byte array to deserialize.
-   * @param theClass Class of the Avro object.
-   * @param <T> Type of the Avro object.
-   * @return Avro object de-serialized from byte array.
-   */
-  private <T> T toAvroObject(final byte[] bytes, final Class<T> theClass) {
-    final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, 
null);
-    final SpecificDatumReader<T> reader = new SpecificDatumReader<>(theClass);
-    try {
-      return reader.read(null, decoder);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
deleted file mode 100644
index e6fa91e..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-import java.util.List;
-
-/**
- * Exposes functions to be called by the {@link 
org.apache.reef.vortex.driver.VortexMaster}
- * to note that a list of Tasklets associated with a Future has completed.
- */
-@Unstable
-@DriverSide
-@Private
-public interface VortexFutureDelegate {
-
-  /**
-   * A Tasklet associated with the future has completed with a result.
-   * The result should be decoded as in {@link 
org.apache.reef.vortex.api.VortexFuture#completed(int, byte[])}.
-   */
-  void completed(final int taskletId, final byte[] serializedResult);
-
-  /**
-   * The list of aggregated Tasklets associated with the Future that have 
completed with a result.
-   */
-  void aggregationCompleted(final List<Integer> taskletIds, final byte[] 
serializedResult);
-
-  /**
-   * A Tasklet associated with the Future has thrown an Exception.
-   */
-  void threwException(final int taskletId, final Exception exception);
-
-  /**
-   * The list of Tasklets associated with the Future that have thrown an 
Exception.
-   */
-  void aggregationThrewException(final List<Integer> taskletIds, final 
Exception exception);
-
-  /**
-   * A Tasklet associated with the Future has been cancelled.
-   */
-  void cancelled(final int taskletId);
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
deleted file mode 100644
index 18f44e0..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexRequest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-
-/**
- * Master-to-Worker protocol.
- */
-@Unstable
-public interface VortexRequest {
-  /**
-   * Type of Request.
-   */
-  enum RequestType {
-    AggregateTasklets,
-    ExecuteTasklet,
-    CancelTasklet,
-    ExecuteAggregateTasklet
-  }
-
-  /**
-   * @return the type of this VortexRequest.
-   */
-  RequestType getType();
-}

Reply via email to