http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
deleted file mode 100644
index 7a82eee..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/WorkerReport.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.common;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.annotations.audience.Private;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Worker-to-Master protocol.
- * A report of Tasklet statuses sent form the {@link 
org.apache.reef.vortex.evaluator.VortexWorker}
- * to the {@link org.apache.reef.vortex.driver.VortexMaster}.
- */
-@Private
-@Unstable
-@DriverSide
-public final class WorkerReport {
-  private ArrayList<TaskletReport> taskletReports;
-
-  public WorkerReport(final Collection<TaskletReport> taskletReports) {
-    this.taskletReports = new ArrayList<>(taskletReports);
-  }
-
-  /**
-   * @return the list of Tasklet reports.
-   */
-  public List<TaskletReport> getTaskletReports() {
-    return Collections.unmodifiableList(taskletReports);
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
index 35cf59e..df2e161 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/package-info.java
@@ -17,6 +17,6 @@
  * under the License.
  */
 /**
- * Vortex Code used both in Vortex Driver and Vortex Evaluator.
+ * Code commonly used in VortexMaster and VortexWorker.
  */
 package org.apache.reef.vortex.common;

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
new file mode 100644
index 0000000..19d1dc8
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexAggregatePolicy;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A repository for {@link VortexAggregateFunction} and its associated {@link 
VortexAggregatePolicy}.
+ */
+@ThreadSafe
+@Unstable
+@Private
+final class AggregateFunctionRepository {
+  private final ConcurrentMap<Integer, Tuple<VortexAggregateFunction, 
VortexAggregatePolicy>>
+      aggregateFunctionMap = new ConcurrentHashMap<>();
+
+  @Inject
+  private AggregateFunctionRepository() {
+  }
+
+  /**
+   * Associates an aggregate function ID with a {@link 
VortexAggregateFunction} and a {@link VortexAggregatePolicy}.
+   */
+  public Tuple<VortexAggregateFunction, VortexAggregatePolicy> put(
+      final int aggregateFunctionId,
+      final VortexAggregateFunction aggregateFunction,
+      final VortexAggregatePolicy policy) {
+    return aggregateFunctionMap.put(aggregateFunctionId, new 
Tuple<>(aggregateFunction, policy));
+  }
+
+  /**
+   * Gets the {@link VortexAggregateFunction} associated with the aggregate 
function ID.
+   */
+  public VortexAggregateFunction getAggregateFunction(final int 
aggregateFunctionId) {
+    return aggregateFunctionMap.get(aggregateFunctionId).getKey();
+  }
+
+  /**
+   * Gets the {@link VortexAggregatePolicy} associated with the aggregate 
function ID.
+   */
+  public VortexAggregatePolicy getPolicy(final int aggregateFunctionId) {
+    return aggregateFunctionMap.get(aggregateFunctionId).getValue();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index 21b1bd0..d508893 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -20,11 +20,10 @@ package org.apache.reef.vortex.driver;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.*;
-import org.apache.reef.vortex.common.*;
+import org.apache.reef.vortex.protocol.workertomaster.*;
 
 import javax.inject.Inject;
 import java.util.*;
@@ -71,11 +70,10 @@ final class DefaultVortexMaster implements VortexMaster {
     // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
     final VortexFuture<TOutput> vortexFuture;
     final int id = taskletIdCounter.getAndIncrement();
-    final Codec<TOutput> outputCodec = function.getOutputCodec();
     if (callback.isPresent()) {
-      vortexFuture = new VortexFuture<>(executor, this, id, outputCodec, 
callback.get());
+      vortexFuture = new VortexFuture<>(executor, this, id, callback.get());
     } else {
-      vortexFuture = new VortexFuture<>(executor, this, id, outputCodec);
+      vortexFuture = new VortexFuture<>(executor, this, id);
     }
 
     final Tasklet tasklet = new Tasklet<>(id, Optional.<Integer>empty(), 
function, input, vortexFuture);
@@ -96,8 +94,7 @@ final class DefaultVortexMaster implements VortexMaster {
                       final List<TInput> inputs,
                       final Optional<FutureCallback<AggregateResult<TInput, 
TOutput>>> callback) {
     final int aggregateFunctionId = aggregateIdCounter.getAndIncrement();
-    aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, 
vortexFunction, policy);
-    final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec();
+    aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, 
policy);
     final List<Tasklet> tasklets = new ArrayList<>(inputs.size());
     final Map<Integer, TInput> taskletIdInputMap = new 
HashMap<>(inputs.size());
 
@@ -105,10 +102,12 @@ final class DefaultVortexMaster implements VortexMaster {
       taskletIdInputMap.put(taskletIdCounter.getAndIncrement(), input);
     }
 
-    final VortexAggregateFuture<TInput, TOutput> vortexAggregateFuture =
-        callback.isPresent() ?
-        new VortexAggregateFuture<>(executor, taskletIdInputMap, 
aggOutputCodec, callback.get()) :
-        new VortexAggregateFuture<>(executor, taskletIdInputMap, 
aggOutputCodec, null);
+    final VortexAggregateFuture<TInput, TOutput> vortexAggregateFuture;
+    if (callback.isPresent()) {
+      vortexAggregateFuture = new VortexAggregateFuture<>(executor, 
taskletIdInputMap, callback.get());
+    } else {
+      vortexAggregateFuture = new VortexAggregateFuture<>(executor, 
taskletIdInputMap, null);
+    }
 
     for (final Map.Entry<Integer, TInput> taskletIdInputEntry : 
taskletIdInputMap.entrySet()) {
       final Tasklet tasklet = new Tasklet<>(taskletIdInputEntry.getKey(), 
Optional.of(aggregateFunctionId),
@@ -151,37 +150,37 @@ final class DefaultVortexMaster implements VortexMaster {
   }
 
   @Override
-  public void workerReported(final String workerId, final WorkerReport 
workerReport) {
-    for (final TaskletReport taskletReport : workerReport.getTaskletReports()) 
{
-      switch (taskletReport.getType()) {
+  public void workerReported(final String workerId, final 
WorkerToMasterReports workerToMasterReports) {
+    for (final WorkerToMasterReport workerToMasterReport : 
workerToMasterReports.getReports()) {
+      switch (workerToMasterReport.getType()) {
       case TaskletResult:
-        final TaskletResultReport taskletResultReport = (TaskletResultReport) 
taskletReport;
+        final TaskletResultReport taskletResultReport = (TaskletResultReport) 
workerToMasterReport;
 
         final int resultTaskletId = taskletResultReport.getTaskletId();
         final List<Integer> singletonResultTaskletId = 
Collections.singletonList(resultTaskletId);
         runningWorkers.doneTasklets(workerId, singletonResultTaskletId);
-        fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, 
taskletResultReport.getSerializedResult());
+        fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, 
taskletResultReport.getResult());
 
         break;
       case TaskletAggregationResult:
         final TaskletAggregationResultReport taskletAggregationResultReport =
-            (TaskletAggregationResultReport) taskletReport;
+            (TaskletAggregationResultReport) workerToMasterReport;
 
         final List<Integer> aggregatedTaskletIds = 
taskletAggregationResultReport.getTaskletIds();
         runningWorkers.doneTasklets(workerId, aggregatedTaskletIds);
         fetchDelegate(aggregatedTaskletIds).aggregationCompleted(
-            aggregatedTaskletIds, 
taskletAggregationResultReport.getSerializedResult());
+            aggregatedTaskletIds, taskletAggregationResultReport.getResult());
 
         break;
       case TaskletCancelled:
-        final TaskletCancelledReport taskletCancelledReport = 
(TaskletCancelledReport) taskletReport;
+        final TaskletCancelledReport taskletCancelledReport = 
(TaskletCancelledReport) workerToMasterReport;
         final List<Integer> cancelledIdToList = 
Collections.singletonList(taskletCancelledReport.getTaskletId());
         runningWorkers.doneTasklets(workerId, cancelledIdToList);
         
fetchDelegate(cancelledIdToList).cancelled(taskletCancelledReport.getTaskletId());
 
         break;
       case TaskletFailure:
-        final TaskletFailureReport taskletFailureReport = 
(TaskletFailureReport) taskletReport;
+        final TaskletFailureReport taskletFailureReport = 
(TaskletFailureReport) workerToMasterReport;
 
         final int failureTaskletId = taskletFailureReport.getTaskletId();
         final List<Integer> singletonFailedTaskletId = 
Collections.singletonList(failureTaskletId);
@@ -191,7 +190,7 @@ final class DefaultVortexMaster implements VortexMaster {
         break;
       case TaskletAggregationFailure:
         final TaskletAggregationFailureReport taskletAggregationFailureReport =
-            (TaskletAggregationFailureReport) taskletReport;
+            (TaskletAggregationFailureReport) workerToMasterReport;
 
         final List<Integer> aggregationFailedTaskletIds = 
taskletAggregationFailureReport.getTaskletIds();
         runningWorkers.doneTasklets(workerId, aggregationFailedTaskletIds);

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index f207ab5..d016891 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -22,7 +22,6 @@ import net.jcip.annotations.ThreadSafe;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.util.Optional;
-import org.apache.reef.vortex.common.AggregateFunctionRepository;
 
 import javax.inject.Inject;
 

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
index 063835f..9f64bf0 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -21,7 +21,6 @@ package org.apache.reef.vortex.driver;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.common.VortexFutureDelegate;
 
 /**
  * Representation of user task in Driver.

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index ca58174..5e796c9 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -28,8 +28,9 @@ import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.vortex.api.VortexStart;
-import org.apache.reef.vortex.common.*;
+import org.apache.reef.vortex.common.KryoUtils;
 import org.apache.reef.vortex.evaluator.VortexWorker;
+import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports;
 import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.SingleThreadStage;
@@ -64,7 +65,7 @@ final class VortexDriver {
   private final EStage<VortexStart> vortexStartEStage;
   private final VortexStart vortexStart;
   private final EStage<Integer> pendingTaskletSchedulerEStage;
-  private final VortexAvroUtils vortexAvroUtils;
+  private final KryoUtils kryoUtils;
 
   @Inject
   private VortexDriver(final EvaluatorRequestor evaluatorRequestor,
@@ -73,7 +74,7 @@ final class VortexDriver {
                        final VortexStart vortexStart,
                        final VortexStartExecutor vortexStartExecutor,
                        final PendingTaskletLauncher pendingTaskletLauncher,
-                       final VortexAvroUtils vortexAvroUtils,
+                       final KryoUtils kryoUtils,
                        @Parameter(VortexMasterConf.WorkerMem.class) final int 
workerMem,
                        @Parameter(VortexMasterConf.WorkerNum.class) final int 
workerNum,
                        @Parameter(VortexMasterConf.WorkerCores.class) final 
int workerCores,
@@ -81,7 +82,7 @@ final class VortexDriver {
     this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, 
numOfStartThreads);
     this.vortexStart = vortexStart;
     this.pendingTaskletSchedulerEStage = new 
SingleThreadStage<>(pendingTaskletLauncher, 1);
-    this.vortexAvroUtils = vortexAvroUtils;
+    this.kryoUtils = kryoUtils;
     this.evaluatorRequestor = evaluatorRequestor;
     this.vortexMaster = vortexMaster;
     this.vortexRequestor = vortexRequestor;
@@ -154,8 +155,9 @@ final class VortexDriver {
     @Override
     public void onNext(final TaskMessage taskMessage) {
       final String workerId = taskMessage.getId();
-      final WorkerReport workerReport = 
vortexAvroUtils.toWorkerReport(taskMessage.get());
-      vortexMaster.workerReported(workerId, workerReport);
+      final WorkerToMasterReports workerToMasterReports =
+          (WorkerToMasterReports)kryoUtils.deserialize(taskMessage.get());
+      vortexMaster.workerReported(workerId, workerToMasterReports);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java
new file mode 100644
index 0000000..e84840d
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexFutureDelegate.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.util.List;
+
+/**
+ * Exposes functions to be called by the {@link 
org.apache.reef.vortex.driver.VortexMaster}
+ * to note that a list of Tasklets associated with a Future has completed.
+ */
+@Unstable
+@DriverSide
+@Private
+public interface VortexFutureDelegate<TOutput> {
+
+  /**
+   * A Tasklet associated with the future has completed with a result.
+   */
+  void completed(final int taskletId, TOutput result);
+
+  /**
+   * The list of aggregated Tasklets associated with the Future that have 
completed with a result.
+   */
+  void aggregationCompleted(final List<Integer> taskletIds, final TOutput 
result);
+
+  /**
+   * A Tasklet associated with the Future has thrown an Exception.
+   */
+  void threwException(final int taskletId, final Exception exception);
+
+  /**
+   * The list of Tasklets associated with the Future that have thrown an 
Exception.
+   */
+  void aggregationThrewException(final List<Integer> taskletIds, final 
Exception exception);
+
+  /**
+   * A Tasklet associated with the Future has been cancelled.
+   */
+  void cancelled(final int taskletId);
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index 95cec93..959ed5f 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -23,7 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.*;
-import org.apache.reef.vortex.common.WorkerReport;
+import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports;
 
 import java.util.List;
 
@@ -74,7 +74,7 @@ public interface VortexMaster {
   /**
    * Call this when a worker has reported back.
    */
-  void workerReported(final String workerId, final WorkerReport workerReport);
+  void workerReported(final String workerId, final WorkerToMasterReports 
workerToMasterReports);
 
   /**
    * Release all resources and shut down.

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
index 4aabf32..0f93dd2 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexRequestor.java
@@ -20,8 +20,8 @@ package org.apache.reef.vortex.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.task.RunningTask;
-import org.apache.reef.vortex.common.VortexAvroUtils;
-import org.apache.reef.vortex.common.VortexRequest;
+import org.apache.reef.vortex.common.KryoUtils;
+import org.apache.reef.vortex.protocol.mastertoworker.MasterToWorkerRequest;
 
 import javax.inject.Inject;
 import java.util.concurrent.ExecutorService;
@@ -33,30 +33,30 @@ import java.util.concurrent.Executors;
 @DriverSide
 class VortexRequestor {
   private final ExecutorService executorService = 
Executors.newCachedThreadPool();
-  private final VortexAvroUtils vortexAvroUtils;
+  private final KryoUtils kryoUtils;
 
   @Inject
-  VortexRequestor(final VortexAvroUtils vortexAvroUtils) {
-    this.vortexAvroUtils = vortexAvroUtils;
+  VortexRequestor(final KryoUtils kryoUtils) {
+    this.kryoUtils = kryoUtils;
   }
 
   /**
-   * Sends a {@link VortexRequest} asynchronously to a {@link 
org.apache.reef.vortex.evaluator.VortexWorker}.
+   * Sends a {@link MasterToWorkerRequest} asynchronously to a {@link 
org.apache.reef.vortex.evaluator.VortexWorker}.
    */
-  void sendAsync(final RunningTask reefTask, final VortexRequest 
vortexRequest) {
+  void sendAsync(final RunningTask reefTask, final MasterToWorkerRequest 
masterToWorkerRequest) {
     executorService.execute(new Runnable() {
       @Override
       public void run() {
         //  Possible race condition with VortexWorkerManager#terminate is 
addressed by the global lock in VortexMaster
-        send(reefTask, vortexRequest);
+        send(reefTask, masterToWorkerRequest);
       }
     });
   }
 
   /**
-   * Sends a {@link VortexRequest} synchronously to a {@link 
org.apache.reef.vortex.evaluator.VortexWorker}.
+   * Sends a {@link MasterToWorkerRequest} synchronously to a {@link 
org.apache.reef.vortex.evaluator.VortexWorker}.
    */
-  void send(final RunningTask reefTask, final VortexRequest vortexRequest) {
-    reefTask.send(vortexAvroUtils.toBytes(vortexRequest));
+  void send(final RunningTask reefTask, final MasterToWorkerRequest 
masterToWorkerRequest) {
+    reefTask.send(kryoUtils.serialize(masterToWorkerRequest));
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index 088e3cf..764d6a3 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -24,10 +24,10 @@ import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.vortex.api.VortexAggregateFunction;
 import org.apache.reef.vortex.api.VortexAggregatePolicy;
 import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.common.TaskletAggregateExecutionRequest;
-import org.apache.reef.vortex.common.TaskletAggregationRequest;
-import org.apache.reef.vortex.common.TaskletCancellationRequest;
-import org.apache.reef.vortex.common.TaskletExecutionRequest;
+import 
org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregateExecutionRequest;
+import 
org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregationRequest;
+import 
org.apache.reef.vortex.protocol.mastertoworker.TaskletCancellationRequest;
+import org.apache.reef.vortex.protocol.mastertoworker.TaskletExecutionRequest;
 
 import java.util.*;
 

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
index 1ea3876..21f27d4 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
@@ -24,7 +24,9 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.task.HeartBeatTriggerManager;
-import org.apache.reef.vortex.common.*;
+import org.apache.reef.vortex.common.KryoUtils;
+import 
org.apache.reef.vortex.protocol.mastertoworker.TaskletAggregationRequest;
+import org.apache.reef.vortex.protocol.workertomaster.*;
 
 import javax.annotation.concurrent.GuardedBy;
 import java.util.*;
@@ -46,7 +48,7 @@ final class AggregateContainer {
   private final Object stateLock = new Object();
   private final TaskletAggregationRequest taskletAggregationRequest;
   private final HeartBeatTriggerManager heartBeatTriggerManager;
-  private final VortexAvroUtils vortexAvroUtils;
+  private final KryoUtils kryoUtils;
   private final BlockingDeque<byte[]> workerReportsQueue;
   private final ScheduledExecutorService timer = 
Executors.newScheduledThreadPool(1);
 
@@ -60,11 +62,11 @@ final class AggregateContainer {
   private final List<Pair<Integer, Exception>> failedTasklets = new 
ArrayList<>();
 
   AggregateContainer(final HeartBeatTriggerManager heartBeatTriggerManager,
-                     final VortexAvroUtils vortexAvroUtils,
+                     final KryoUtils kryoUtils,
                      final BlockingDeque<byte[]> workerReportsQueue,
                      final TaskletAggregationRequest 
taskletAggregationRequest) {
     this.heartBeatTriggerManager = heartBeatTriggerManager;
-    this.vortexAvroUtils = vortexAvroUtils;
+    this.kryoUtils = kryoUtils;
     this.workerReportsQueue = workerReportsQueue;
     this.taskletAggregationRequest = taskletAggregationRequest;
   }
@@ -74,7 +76,7 @@ final class AggregateContainer {
   }
 
   @GuardedBy("stateLock")
-  private void aggregateTasklets(final List<TaskletReport> taskletReports,
+  private void aggregateTasklets(final List<WorkerToMasterReport> 
workerToMasterReports,
                                  final List<Object> results,
                                  final List<Integer> aggregatedTasklets) {
     synchronized (stateLock) {
@@ -86,7 +88,7 @@ final class AggregateContainer {
 
       // Add failed tasklets to worker report.
       for (final Pair<Integer, Exception> failedPair : failedTasklets) {
-        taskletReports.add(new TaskletFailureReport(failedPair.getLeft(), 
failedPair.getRight()));
+        workerToMasterReports.add(new 
TaskletFailureReport(failedPair.getLeft(), failedPair.getRight()));
       }
 
       // Drain the tasklets.
@@ -96,11 +98,11 @@ final class AggregateContainer {
   }
 
   /**
-   * Performs the output aggregation and generates the {@link WorkerReport} to 
report back to the
+   * Performs the output aggregation and generates the {@link 
WorkerToMasterReports} to report back to the
    * {@link org.apache.reef.vortex.driver.VortexDriver}.
    */
   private void aggregateTasklets(final AggregateTriggerType type) {
-    final List<TaskletReport> taskletReports = new ArrayList<>();
+    final List<WorkerToMasterReport> workerToMasterReports = new ArrayList<>();
     final List<Object> results = new ArrayList<>();
     final List<Integer> aggregatedTasklets = new ArrayList<>();
 
@@ -108,14 +110,14 @@ final class AggregateContainer {
     synchronized (stateLock) {
       switch(type) {
       case ALARM:
-        aggregateTasklets(taskletReports, results, aggregatedTasklets);
+        aggregateTasklets(workerToMasterReports, results, aggregatedTasklets);
         break;
       case COUNT:
         if (!aggregateOnCount()) {
           return;
         }
 
-        aggregateTasklets(taskletReports, results, aggregatedTasklets);
+        aggregateTasklets(workerToMasterReports, results, aggregatedTasklets);
         break;
       default:
         throw new RuntimeException("Unexpected aggregate type.");
@@ -125,16 +127,16 @@ final class AggregateContainer {
     if (!results.isEmpty()) {
       // Run the aggregation function.
       try {
-        final byte[] aggregationResult = 
taskletAggregationRequest.executeAggregation(results);
-        taskletReports.add(new 
TaskletAggregationResultReport(aggregatedTasklets, aggregationResult));
+        final Object aggregationResult = 
taskletAggregationRequest.executeAggregation(results);
+        workerToMasterReports.add(new 
TaskletAggregationResultReport(aggregatedTasklets, aggregationResult));
       } catch (final Exception e) {
-        taskletReports.add(new 
TaskletAggregationFailureReport(aggregatedTasklets, e));
+        workerToMasterReports.add(new 
TaskletAggregationFailureReport(aggregatedTasklets, e));
       }
     }
 
     // Add to worker report only if there is something to report back.
-    if (!taskletReports.isEmpty()) {
-      workerReportsQueue.addLast(vortexAvroUtils.toBytes(new 
WorkerReport(taskletReports)));
+    if (!workerToMasterReports.isEmpty()) {
+      workerReportsQueue.addLast(kryoUtils.serialize(new 
WorkerToMasterReports(workerToMasterReports)));
       heartBeatTriggerManager.triggerHeartBeat();
     }
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 3d53bc4..5b0d38d 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -29,8 +29,9 @@ import org.apache.reef.task.TaskMessageSource;
 import org.apache.reef.task.events.CloseEvent;
 import org.apache.reef.task.events.DriverMessage;
 import org.apache.reef.util.Optional;
-import org.apache.reef.vortex.common.*;
-import org.apache.reef.vortex.common.AggregateFunctionRepository;
+import org.apache.reef.vortex.common.KryoUtils;
+import org.apache.reef.vortex.protocol.mastertoworker.*;
+import org.apache.reef.vortex.protocol.workertomaster.*;
 import org.apache.reef.vortex.driver.VortexWorkerConf;
 import org.apache.reef.wake.EventHandler;
 
@@ -56,20 +57,17 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
   private final BlockingDeque<byte[]> workerReports = new 
LinkedBlockingDeque<>();
   private final ConcurrentMap<Integer, AggregateContainer> aggregates = new 
ConcurrentHashMap<>();
 
-  private final AggregateFunctionRepository aggregateFunctionRepository;
-  private final VortexAvroUtils vortexAvroUtils;
+  private final KryoUtils kryoUtils;
   private final HeartBeatTriggerManager heartBeatTriggerManager;
   private final int numOfThreads;
   private final CountDownLatch terminated = new CountDownLatch(1);
 
   @Inject
   private VortexWorker(final HeartBeatTriggerManager heartBeatTriggerManager,
-                       final AggregateFunctionRepository 
aggregateFunctionRepository,
-                       final VortexAvroUtils vortexAvroUtils,
+                       final KryoUtils kryoUtils,
                        @Parameter(VortexWorkerConf.NumOfThreads.class) final 
int numOfThreads) {
     this.heartBeatTriggerManager = heartBeatTriggerManager;
-    this.aggregateFunctionRepository = aggregateFunctionRepository;
-    this.vortexAvroUtils = vortexAvroUtils;
+    this.kryoUtils = kryoUtils;
     this.numOfThreads = numOfThreads;
   }
 
@@ -97,30 +95,24 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
           }
 
           // Command Executor: Deserialize the command
-          final VortexRequest vortexRequest = 
vortexAvroUtils.toVortexRequest(message);
+          final MasterToWorkerRequest masterToWorkerRequest = 
(MasterToWorkerRequest)kryoUtils.deserialize(message);
 
-          switch (vortexRequest.getType()) {
+          switch (masterToWorkerRequest.getType()) {
             case AggregateTasklets:
-              final TaskletAggregationRequest taskletAggregationRequest = 
(TaskletAggregationRequest) vortexRequest;
+              final TaskletAggregationRequest taskletAggregationRequest =
+                  (TaskletAggregationRequest) masterToWorkerRequest;
               
aggregates.put(taskletAggregationRequest.getAggregateFunctionId(),
-                  new AggregateContainer(heartBeatTriggerManager, 
vortexAvroUtils, workerReports,
+                  new AggregateContainer(heartBeatTriggerManager, kryoUtils, 
workerReports,
                       taskletAggregationRequest));
-
-              // VortexFunctions need to be put into the repository such that 
VortexAvroUtils will know how to
-              // convert inputs and functions into a VortexRequest on 
subsequent messages requesting to
-              // execute the aggregateable tasklets.
-              
aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(),
-                  taskletAggregationRequest.getAggregateFunction(), 
taskletAggregationRequest.getFunction(),
-                  taskletAggregationRequest.getPolicy());
               break;
             case ExecuteAggregateTasklet:
-              executeAggregateTasklet(commandExecutor, vortexRequest);
+              executeAggregateTasklet(commandExecutor, masterToWorkerRequest);
               break;
             case ExecuteTasklet:
-              executeTasklet(commandExecutor, futures, vortexRequest);
+              executeTasklet(commandExecutor, futures, masterToWorkerRequest);
               break;
             case CancelTasklet:
-              final TaskletCancellationRequest cancellationRequest = 
(TaskletCancellationRequest) vortexRequest;
+              final TaskletCancellationRequest cancellationRequest = 
(TaskletCancellationRequest) masterToWorkerRequest;
               LOG.log(Level.FINE, "Cancelling Tasklet with ID {0}.", 
cancellationRequest.getTaskletId());
               final Future future = 
futures.get(cancellationRequest.getTaskletId());
               if (future != null) {
@@ -143,9 +135,9 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
    */
   private void executeTasklet(final ExecutorService commandExecutor,
                               final ConcurrentMap<Integer, Future> futures,
-                              final VortexRequest vortexRequest) {
+                              final MasterToWorkerRequest 
masterToWorkerRequest) {
     final CountDownLatch latch = new CountDownLatch(1);
-    final TaskletExecutionRequest taskletExecutionRequest = 
(TaskletExecutionRequest) vortexRequest;
+    final TaskletExecutionRequest taskletExecutionRequest = 
(TaskletExecutionRequest) masterToWorkerRequest;
 
     // Scheduler Thread: Pass the command to the worker thread pool to be 
executed
     // Record future to support cancellation.
@@ -154,32 +146,30 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
         commandExecutor.submit(new Runnable() {
           @Override
           public void run() {
-            final WorkerReport workerReport;
-            final List<TaskletReport> taskletReports = new ArrayList<>();
+            final WorkerToMasterReports reports;
+            final List<WorkerToMasterReport> holder = new ArrayList<>();
 
             try {
               // Command Executor: Execute the command
-              final TaskletReport taskletReport =
-                  new 
TaskletResultReport(taskletExecutionRequest.getTaskletId(),
-                      taskletExecutionRequest.execute());
-              taskletReports.add(taskletReport);
+              final WorkerToMasterReport workerToMasterReport =
+                  new 
TaskletResultReport(taskletExecutionRequest.getTaskletId(), 
taskletExecutionRequest.execute());
+              holder.add(workerToMasterReport);
             } catch (final InterruptedException ex) {
               // Assumes that user's thread follows convention that cancelled 
Futures
               // should throw InterruptedException.
-              final TaskletReport taskletReport =
+              final WorkerToMasterReport workerToMasterReport =
                   new 
TaskletCancelledReport(taskletExecutionRequest.getTaskletId());
-              LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled",
-                  taskletExecutionRequest.getTaskletId());
-              taskletReports.add(taskletReport);
+              LOG.log(Level.WARNING, "Tasklet with ID {0} has been cancelled", 
taskletExecutionRequest.getTaskletId());
+              holder.add(workerToMasterReport);
             } catch (Exception e) {
               // Command Executor: Tasklet throws an exception
-              final TaskletReport taskletReport =
+              final WorkerToMasterReport workerToMasterReport =
                   new 
TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
-              taskletReports.add(taskletReport);
+              holder.add(workerToMasterReport);
             }
 
-            workerReport = new WorkerReport(taskletReports);
-            workerReports.addLast(vortexAvroUtils.toBytes(workerReport));
+            reports = new WorkerToMasterReports(holder);
+            workerReports.addLast(kryoUtils.serialize(reports));
             try {
               latch.await();
             } catch (final InterruptedException e) {
@@ -199,14 +189,14 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
    * Executes an aggregation request from the {@link 
org.apache.reef.vortex.driver.VortexDriver}.
    */
   private void executeAggregateTasklet(final ExecutorService commandExecutor,
-                                       final VortexRequest vortexRequest) {
+                                       final MasterToWorkerRequest 
masterToWorkerRequest) {
     final TaskletAggregateExecutionRequest taskletAggregateExecutionRequest =
-        (TaskletAggregateExecutionRequest) vortexRequest;
+        (TaskletAggregateExecutionRequest) masterToWorkerRequest;
 
     assert 
aggregates.containsKey(taskletAggregateExecutionRequest.getAggregateFunctionId());
 
-    final AggregateContainer aggregateContainer = aggregates.get(
-        taskletAggregateExecutionRequest.getAggregateFunctionId());
+    final AggregateContainer aggregateContainer =
+        
aggregates.get(taskletAggregateExecutionRequest.getAggregateFunctionId());
     final TaskletAggregationRequest aggregationRequest = 
aggregateContainer.getTaskletAggregationRequest();
 
     commandExecutor.submit(new Runnable() {

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
index 5e32c8c..299a31a 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOneFunction.java
@@ -18,15 +18,12 @@
  */
 package org.apache.reef.vortex.examples.addone;
 
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * Outputs input + 1.
  */
 final class AddOneFunction implements VortexFunction<Integer, Integer> {
-  private static final Codec<Integer> CODEC = new SerializableCodec<>();
   /**
    * Outputs input + 1.
    */
@@ -34,14 +31,4 @@ final class AddOneFunction implements 
VortexFunction<Integer, Integer> {
   public Integer call(final Integer input) throws Exception {
     return input + 1;
   }
-
-  @Override
-  public Codec<Integer> getInputCodec() {
-    return CODEC;
-  }
-
-  @Override
-  public Codec<Integer> getOutputCodec() {
-    return CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
index 89c6918..0496f27 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortexFunction.java
@@ -18,15 +18,12 @@
  */
 package org.apache.reef.vortex.examples.hello;
 
-import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.util.VoidCodec;
 
 /**
  * Prints to stdout.
  */
 final class HelloVortexFunction implements VortexFunction<Void, Void> {
-  private static final Codec<Void> CODEC =  new VoidCodec();
   /**
    * Prints to stdout.
    */
@@ -35,14 +32,4 @@ final class HelloVortexFunction implements 
VortexFunction<Void, Void> {
     System.out.println("Hello, Vortex!");
     return null;
   }
-
-  @Override
-  public Codec<Void> getInputCodec() {
-    return CODEC;
-  }
-
-  @Override
-  public Codec<Void> getOutputCodec() {
-    return CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java
index e07e977..e3e0a0d 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulException.java
@@ -23,6 +23,11 @@ package org.apache.reef.vortex.examples.matmul;
  */
 class MatMulException extends Exception {
   /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  MatMulException() {}
+
+  /**
    * Constructor of MatMulException.
    * @param message Message to inform users.
    */

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
index db4a320..2a2374a 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulFunction.java
@@ -18,16 +18,12 @@
  */
 package org.apache.reef.vortex.examples.matmul;
 
-import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * Computes multiplication of two matrices.
  */
 final class MatMulFunction implements VortexFunction<MatMulInput, 
MatMulOutput> {
-  private static final Codec<MatMulInput> INPUT_CODEC = new MatMulInputCodec();
-  private static final Codec<MatMulOutput> OUTPUT_CODEC = new 
MatMulOutputCodec();
-
   /**
    * Computes multiplication of two matrices.
    * @param input Input which contains two matrices to multiply,
@@ -43,14 +39,4 @@ final class MatMulFunction implements 
VortexFunction<MatMulInput, MatMulOutput>
     final Matrix<Double> result = leftMatrix.multiply(rightMatrix);
     return new MatMulOutput(index, result);
   }
-
-  @Override
-  public Codec<MatMulInput> getInputCodec() {
-    return INPUT_CODEC;
-  }
-
-  @Override
-  public Codec<MatMulOutput> getOutputCodec() {
-    return OUTPUT_CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
index b35b402..baae96c 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInput.java
@@ -23,9 +23,15 @@ package org.apache.reef.vortex.examples.matmul;
  * and index of the sub-matrix in the entire result.
  */
 final class MatMulInput {
-  private final int index;
-  private final Matrix<Double> leftMatrix;
-  private final Matrix<Double> rightMatrix;
+  private int index;
+  private Matrix<Double> leftMatrix;
+  private Matrix<Double> rightMatrix;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  MatMulInput() {
+  }
 
   /**
    * Constructor of MatMulInput which consists of two matrices.

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
deleted file mode 100644
index 229ba0e..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulInputCodec.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.examples.matmul;
-
-import org.apache.reef.io.serialization.Codec;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Encodes/decodes {@link MatMulInput} to/from byte array.
- */
-final class MatMulInputCodec implements Codec<MatMulInput> {
-
-  @Override
-  public byte[] encode(final MatMulInput matMulInput) {
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      try (DataOutputStream daos = new DataOutputStream(baos)) {
-        final int index = matMulInput.getIndex();
-        final Matrix<Double> leftMatrix = matMulInput.getLeftMatrix();
-        final Matrix<Double> rightMatrix = matMulInput.getRightMatrix();
-
-        daos.writeInt(index);
-        encodeMatrixToStream(daos, leftMatrix);
-        encodeMatrixToStream(daos, rightMatrix);
-
-        return baos.toByteArray();
-      }
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public MatMulInput decode(final byte[] buf) {
-    try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) {
-      try (DataInputStream dais = new DataInputStream(bais)) {
-        final int index = dais.readInt();
-        final Matrix leftMatrix = decodeMatrixFromStream(dais);
-        final Matrix rightMatrix = decodeMatrixFromStream(dais);
-        return new MatMulInput(index, leftMatrix, rightMatrix);
-      }
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Encode a Matrix to output stream.
-   */
-  private void encodeMatrixToStream(final DataOutputStream stream, final 
Matrix<Double> matrix) throws IOException {
-    final int numRow = matrix.getNumRows();
-    final int numColumn = matrix.getNumColumns();
-
-    stream.writeInt(numRow);
-    stream.writeInt(numColumn);
-
-    for (final List<Double> row : matrix.getRows()) {
-      for (final double element : row) {
-        stream.writeDouble(element);
-      }
-    }
-  }
-
-  /**
-   * Decode a Matrix from input stream.
-   */
-  private Matrix decodeMatrixFromStream(final DataInputStream stream) throws 
IOException {
-    final int numRow = stream.readInt();
-    final int numColumn = stream.readInt();
-
-    final List<List<Double>> rows = new ArrayList<>(numRow);
-    for (int rowIndex = 0; rowIndex < numRow; rowIndex++) {
-      final List<Double> row = new ArrayList<>(numColumn);
-      for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) {
-        row.add(stream.readDouble());
-      }
-      rows.add(row);
-    }
-    return new RowMatrix(Collections.unmodifiableList(rows));
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
index 99ee5bb..8daed08 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutput.java
@@ -22,8 +22,14 @@ package org.apache.reef.vortex.examples.matmul;
  * Output of {@link MatMulFunction} which contains the sub-matrix and index of 
it in the entire result.
  */
 final class MatMulOutput {
-  private final int index;
-  private final Matrix<Double> result;
+  private int index;
+  private Matrix<Double> result;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  MatMulOutput() {
+  }
 
   /**
    * Constructor of the output.

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
deleted file mode 100644
index 04d974a..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/MatMulOutputCodec.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.examples.matmul;
-
-import org.apache.reef.io.serialization.Codec;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Encodes/decodes {@link MatMulOutput} to/from byte array.
- */
-final class MatMulOutputCodec implements Codec<MatMulOutput> {
-
-  @Override
-  public byte[] encode(final MatMulOutput matMulOutput) {
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      try (DataOutputStream daos = new DataOutputStream(baos)) {
-        final int index = matMulOutput.getIndex();
-        final Matrix<Double> result = matMulOutput.getResult();
-
-        daos.writeInt(index);
-        encodeMatrixToStream(daos, result);
-
-        return baos.toByteArray();
-      }
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public MatMulOutput decode(final byte[] buf) {
-    try (ByteArrayInputStream bais = new ByteArrayInputStream(buf)) {
-      try (DataInputStream dais = new DataInputStream(bais)) {
-        final int index = dais.readInt();
-        final Matrix result = decodeMatrixFromStream(dais);
-        return new MatMulOutput(index, result);
-      }
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Encode a Matrix to output stream.
-   */
-  private void encodeMatrixToStream(final DataOutputStream stream, final 
Matrix<Double> matrix) throws IOException {
-    final int numRow = matrix.getNumRows();
-    final int numColumn = matrix.getNumColumns();
-
-    stream.writeInt(numRow);
-    stream.writeInt(numColumn);
-
-    for (final List<Double> row : matrix.getRows()) {
-      for (final double element : row) {
-        stream.writeDouble(element);
-      }
-    }
-  }
-
-  /**
-   * Decode a Matrix from input stream.
-   */
-  private Matrix decodeMatrixFromStream(final DataInputStream stream) throws 
IOException {
-    final int numRow = stream.readInt();
-    final int numColumn = stream.readInt();
-
-    final List<List<Double>> rows = new ArrayList<>(numRow);
-    for (int rowIndex = 0; rowIndex < numRow; rowIndex++) {
-      final List<Double> row = new ArrayList<>(numColumn);
-      for (int columnIndex = 0; columnIndex < numColumn; columnIndex++) {
-        row.add(stream.readDouble());
-      }
-      rows.add(row);
-    }
-    return new RowMatrix(Collections.unmodifiableList(rows));
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java
index a0ca817..9de71d8 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/matmul/RowMatrix.java
@@ -26,7 +26,13 @@ import java.util.List;
  * Row-oriented matrix implementation used in {@link MatMul} example.
  */
 final class RowMatrix implements Matrix<Double> {
-  private final List<List<Double>> rows;
+  private List<List<Double>> rows;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  RowMatrix() {
+  }
 
   /**
    * Constructor of matrix which creates an empty matrix of size (numRow x 
numColumn).

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
index 391de6e..dde2b5a 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/AdditionAggregateFunction.java
@@ -18,8 +18,6 @@
  */
 package org.apache.reef.vortex.examples.sumones;
 
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexAggregateException;
 import org.apache.reef.vortex.api.VortexAggregateFunction;
 
@@ -29,8 +27,6 @@ import java.util.List;
  * Aggregates and sums the outputs.
  */
 public final class AdditionAggregateFunction implements 
VortexAggregateFunction<Integer> {
-  private static final Codec<Integer> CODEC = new SerializableCodec<>();
-
   @Override
   public Integer call(final List<Integer> taskletOutputs) throws 
VortexAggregateException {
     int sum = 0;
@@ -40,9 +36,4 @@ public final class AdditionAggregateFunction implements 
VortexAggregateFunction<
 
     return sum;
   }
-
-  @Override
-  public Codec<Integer> getOutputCodec() {
-    return CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
index b7a69ef..890b9d9 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/IdentityFunction.java
@@ -18,16 +18,12 @@
  */
 package org.apache.reef.vortex.examples.sumones;
 
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * Identity function.
  */
 public final class IdentityFunction implements VortexFunction<Integer, 
Integer> {
-  private static final Codec<Integer> CODEC = new SerializableCodec<>();
-
   /**
    * Outputs input.
    */
@@ -35,14 +31,4 @@ public final class IdentityFunction implements 
VortexFunction<Integer, Integer>
   public Integer call(final Integer input) throws Exception {
     return input;
   }
-
-  @Override
-  public Codec<Integer> getInputCodec() {
-    return CODEC;
-  }
-
-  @Override
-  public Codec<Integer> getOutputCodec() {
-    return CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java
new file mode 100644
index 0000000..5b59ccc
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/MasterToWorkerRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.mastertoworker;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * Master-to-Worker protocol.
+ */
+@Unstable
+public interface MasterToWorkerRequest {
+  /**
+   * Type of Request.
+   */
+  enum Type {
+    AggregateTasklets,
+    ExecuteTasklet,
+    CancelTasklet,
+    ExecuteAggregateTasklet
+  }
+
+  /**
+   * @return the type of this MasterToWorkerRequest.
+   */
+  Type getType();
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java
new file mode 100644
index 0000000..24527d4
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregateExecutionRequest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.mastertoworker;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * A request from the Vortex Driver to run an aggregate-able function.
+ */
+@Unstable
+@Private
+@DriverSide
+public final class TaskletAggregateExecutionRequest<TInput> implements 
MasterToWorkerRequest {
+  private TInput input;
+  private int aggregateFunctionId;
+  private int taskletId;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletAggregateExecutionRequest() {
+  }
+
+  public TaskletAggregateExecutionRequest(final int taskletId,
+                                          final int aggregateFunctionId,
+                                          final TInput input) {
+    this.taskletId = taskletId;
+    this.input = input;
+    this.aggregateFunctionId = aggregateFunctionId;
+  }
+
+  /**
+   * @return input of the request.
+   */
+  public TInput getInput() {
+    return input;
+  }
+
+  /**
+   * @return tasklet ID corresponding to the tasklet request.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  /**
+   * @return the AggregateFunctionID of the request.
+   */
+  public int getAggregateFunctionId() {
+    return aggregateFunctionId;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.ExecuteAggregateTasklet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java
new file mode 100644
index 0000000..bfa90d9
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletAggregationRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.mastertoworker;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexAggregatePolicy;
+import org.apache.reef.vortex.api.VortexFunction;
+
+import java.util.List;
+
+/**
+ * A request from the Vortex Driver for the {@link 
org.apache.reef.vortex.evaluator.VortexWorker} to
+ * record aggregate functions for later execution.
+ */
+@Unstable
+@Private
+@DriverSide
+public final class TaskletAggregationRequest<TInput, TOutput> implements 
MasterToWorkerRequest {
+  private int aggregateFunctionId;
+  private VortexAggregateFunction<TOutput> userAggregateFunction;
+  private VortexFunction<TInput, TOutput> function;
+  private VortexAggregatePolicy policy;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletAggregationRequest() {
+  }
+
+  public TaskletAggregationRequest(final int aggregateFunctionId,
+                                   final VortexAggregateFunction<TOutput> 
aggregateFunction,
+                                   final VortexFunction<TInput, TOutput> 
function,
+                                   final VortexAggregatePolicy policy) {
+    this.aggregateFunctionId = aggregateFunctionId;
+    this.userAggregateFunction = aggregateFunction;
+    this.function = function;
+    this.policy = policy;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.AggregateTasklets;
+  }
+
+  /**
+   * @return the AggregateFunctionID of the aggregate function.
+   */
+  public int getAggregateFunctionId() {
+    return aggregateFunctionId;
+  }
+
+  /**
+   * @return the aggregate function as specified by the user.
+   */
+  public VortexAggregateFunction getAggregateFunction() {
+    return userAggregateFunction;
+  }
+
+  /**
+   * @return the user specified function.
+   */
+  public VortexFunction getFunction() {
+    return function;
+  }
+
+  /**
+   * @return the aggregation policy.
+   */
+  public VortexAggregatePolicy getPolicy() {
+    return policy;
+  }
+
+  /**
+   * Execute the aggregate function using the list of outputs.
+   * @return Output of the function.
+   */
+  public TOutput executeAggregation(final List<TOutput> outputs) throws 
Exception {
+    return userAggregateFunction.call(outputs);
+  }
+
+  /**
+   * Execute the user specified function.
+   */
+  public TOutput executeFunction(final TInput input) throws Exception {
+    return function.call(input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java
new file mode 100644
index 0000000..36d0233
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletCancellationRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.mastertoworker;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * A {@link MasterToWorkerRequest} to cancel tasklets.
+ */
+@Unstable
+public final class TaskletCancellationRequest implements MasterToWorkerRequest 
{
+  private int taskletId;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletCancellationRequest() {
+  }
+
+  public TaskletCancellationRequest(final int taskletId) {
+    this.taskletId = taskletId;
+  }
+
+  /**
+   * @return the ID of the VortexTasklet associated with this 
MasterToWorkerRequest.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.CancelTasklet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java
new file mode 100644
index 0000000..2963dc1
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/TaskletExecutionRequest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.mastertoworker;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.vortex.api.VortexFunction;
+
+/**
+ * Request to execute a tasklet.
+ */
+@Unstable
+@Private
+public final class TaskletExecutionRequest<TInput, TOutput> implements 
MasterToWorkerRequest {
+  private int taskletId;
+  private VortexFunction<TInput, TOutput> userFunction;
+  private TInput input;
+
+  /**
+   * @return the type of this MasterToWorkerRequest.
+   */
+  @Override
+  public Type getType() {
+    return Type.ExecuteTasklet;
+  }
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletExecutionRequest() {
+  }
+
+  /**
+   * Request from Vortex Master to Vortex Worker to execute a tasklet.
+   */
+  public TaskletExecutionRequest(final int taskletId,
+                                 final VortexFunction<TInput, TOutput> 
userFunction,
+                                 final TInput input) {
+    this.taskletId = taskletId;
+    this.userFunction = userFunction;
+    this.input = input;
+  }
+
+  /**
+   * Execute the function using the input.
+   * @return Output of the function.
+   */
+  public TOutput execute() throws Exception {
+    return userFunction.call(input);
+  }
+
+  /**
+   * @return the ID of the VortexTasklet associated with this 
MasterToWorkerRequest.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  /**
+   * Get function of the tasklet.
+   */
+  public VortexFunction getFunction() {
+    return userFunction;
+  }
+
+  /**
+   * Get input of the tasklet.
+   */
+  public TInput getInput() {
+    return input;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java
new file mode 100644
index 0000000..6b106b6
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/mastertoworker/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Protocol from VortexMaster to VortexWorker.
+ */
+package org.apache.reef.vortex.protocol.mastertoworker;

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java
new file mode 100644
index 0000000..bd5dabb
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationFailureReport.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Report of a tasklet exception on aggregation.
+ */
+@Unstable
+public final class TaskletAggregationFailureReport implements 
WorkerToMasterReport {
+  private List<Integer> taskletIds;
+  private Exception exception;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletAggregationFailureReport() {
+  }
+
+  /**
+   * @param taskletIds of the failed tasklet(s).
+   * @param exception that caused the tasklet failure.
+   */
+  public TaskletAggregationFailureReport(final List<Integer> taskletIds, final 
Exception exception) {
+    this.taskletIds = Collections.unmodifiableList(new 
ArrayList<>(taskletIds));
+    this.exception = exception;
+  }
+
+  /**
+   * @return the type of this TaskletReport.
+   */
+  @Override
+  public Type getType() {
+    return Type.TaskletAggregationFailure;
+  }
+
+  /**
+   * @return the taskletIds that failed on aggregation.
+   */
+  public List<Integer> getTaskletIds() {
+    return taskletIds;
+  }
+
+  /**
+   * @return the exception that caused the tasklet aggregation failure.
+   */
+  public Exception getException() {
+    return exception;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java
new file mode 100644
index 0000000..52cfcfa
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletAggregationResultReport.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Report of a Tasklet aggregation execution result.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class TaskletAggregationResultReport implements 
WorkerToMasterReport {
+  private List<Integer> taskletIds;
+  private Object result;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletAggregationResultReport() {
+  }
+
+  /**
+   * @param taskletIds of the tasklets.
+   * @param result of the tasklet execution.
+   */
+  public TaskletAggregationResultReport(final List<Integer> taskletIds, final 
Object result) {
+    this.taskletIds = Collections.unmodifiableList(new 
ArrayList<>(taskletIds));
+    this.result = result;
+  }
+
+  /**
+   * @return the type of this TaskletReport.
+   */
+  @Override
+  public Type getType() {
+    return Type.TaskletAggregationResult;
+  }
+
+  /**
+   * @return the TaskletId(s) of this TaskletReport
+   */
+  public List<Integer> getTaskletIds() {
+    return taskletIds;
+  }
+
+  /**
+   * @return the result of the Tasklet aggregation execution.
+   */
+  public Object getResult() {
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java
new file mode 100644
index 0000000..53d531d
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletCancelledReport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * The report of a cancelled Tasklet.
+ */
+@Unstable
+public final class TaskletCancelledReport implements WorkerToMasterReport {
+  private int taskletId;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletCancelledReport() {
+  }
+
+  /**
+   * @param taskletId of the cancelled tasklet.
+   */
+  public TaskletCancelledReport(final int taskletId) {
+    this.taskletId = taskletId;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.TaskletCancelled;
+  }
+
+  /**
+   * @return the taskletId of this TaskletReport.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+}

Reply via email to