Repository: reef
Updated Branches:
  refs/heads/master 120327fea -> 48bde0c0e


http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java
new file mode 100644
index 0000000..2974beb
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletFailureReport.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Report of a Tasklet exception.
+ */
+@Unstable
+@Private
+@DriverSide
+public final class TaskletFailureReport implements WorkerToMasterReport {
+  private int taskletId;
+  private Exception exception;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletFailureReport() {
+  }
+
+  /**
+   * @param taskletId of the failed Tasklet.
+   * @param exception that caused the tasklet failure.
+   */
+  public TaskletFailureReport(final int taskletId, final Exception exception) {
+    this.taskletId = taskletId;
+    this.exception = exception;
+  }
+
+  /**
+   * @return the type of this TaskletReport.
+   */
+  @Override
+  public Type getType() {
+    return Type.TaskletFailure;
+  }
+
+  /**
+   * @return the taskletId of this TaskletReport.
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  /**
+   * @return the exception that caused the Tasklet failure.
+   */
+  public Exception getException() {
+    return exception;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java
new file mode 100644
index 0000000..dec4a59
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/TaskletResultReport.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Report of a tasklet execution result.
+ */
+@Unstable
+@Private
+@DriverSide
+public final class TaskletResultReport implements WorkerToMasterReport {
+  private int taskletId;
+  private Object result;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  TaskletResultReport() {
+  }
+
+  /**
+   * @param taskletId of the Tasklet.
+   * @param result of the tasklet execution.
+   */
+  public TaskletResultReport(final int taskletId, final Object result) {
+    this.taskletId = taskletId;
+    this.result = result;
+  }
+
+  /**
+   * @return the type of this TaskletReport.
+   */
+  @Override
+  public Type getType() {
+    return Type.TaskletResult;
+  }
+
+  /**
+   * @return the TaskletId of this TaskletReport
+   */
+  public int getTaskletId() {
+    return taskletId;
+  }
+
+  /**
+   * @return the result of the tasklet execution.
+   */
+  public Object getResult() {
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java
new file mode 100644
index 0000000..eb1238a
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReport.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * The interface for a status report from the {@link 
org.apache.reef.vortex.evaluator.VortexWorker}.
+ */
+@Unstable
+@Private
+@DriverSide
+public interface WorkerToMasterReport {
+  /**
+   * Type of WorkerToMasterReport.
+   */
+  enum Type {
+    TaskletResult,
+    TaskletAggregationResult,
+    TaskletCancelled,
+    TaskletFailure,
+    TaskletAggregationFailure
+  }
+
+  /**
+   * @return the type of this WorkerToMasterReport.
+   */
+  Type getType();
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java
new file mode 100644
index 0000000..cb0cdbf
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/WorkerToMasterReports.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Container for multiple WorkerToMasterReports.
+ */
+@Private
+@Unstable
+@DriverSide
+public final class WorkerToMasterReports {
+  private ArrayList<WorkerToMasterReport> workerToMasterReports;
+
+  /**
+   * No-arg constructor required for Kryo to serialize/deserialize.
+   */
+  WorkerToMasterReports() {
+  }
+
+  public WorkerToMasterReports(final Collection<WorkerToMasterReport> 
workerToMasterReports) {
+    this.workerToMasterReports = new ArrayList<>(workerToMasterReports);
+  }
+
+  /**
+   * @return the list of Tasklet reports.
+   */
+  public List<WorkerToMasterReport> getReports() {
+    return Collections.unmodifiableList(workerToMasterReports);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java
new file mode 100644
index 0000000..584847f
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/protocol/workertomaster/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Protocol from VortexWorker to VortexMaster.
+ */
+package org.apache.reef.vortex.protocol.workertomaster;

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
deleted file mode 100644
index 7e21066..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/VoidCodec.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.util;
-
-import org.apache.reef.io.serialization.Codec;
-
-/**
- * Codec for empty input/output.
- */
-public final class VoidCodec implements Codec<Void> {
-
-  @Override
-  public byte[] encode(final Void obj) {
-    return new byte[0];
-  }
-
-  @Override
-  public Void decode(final byte[] buf) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
deleted file mode 100644
index fcbb9f5..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/util/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Utilities used in Vortex.
- */
-package org.apache.reef.vortex.util;

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index 0770f77..026453d 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -18,16 +18,15 @@
  */
 package org.apache.reef.vortex.driver;
 
-import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
-import org.apache.reef.vortex.common.TaskletFailureReport;
-import org.apache.reef.vortex.common.TaskletReport;
-import org.apache.reef.vortex.common.TaskletResultReport;
-import org.apache.reef.vortex.common.WorkerReport;
+import org.apache.reef.vortex.protocol.workertomaster.TaskletFailureReport;
+import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReport;
+import org.apache.reef.vortex.protocol.workertomaster.TaskletResultReport;
+import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports;
 import org.junit.Test;
 
 import java.util.*;
@@ -42,8 +41,8 @@ import static org.junit.Assert.*;
  * Test whether DefaultVortexMaster correctly handles (simulated) events.
  */
 public class DefaultVortexMasterTest {
-  private static final byte[] EMPTY_RESULT = new byte[0];
-  private static final byte[] INTEGER_RESULT = new 
SerializableCodec<Integer>().encode(1);
+  private static final Object EMPTY_RESULT = null;
+  private static final int INTEGER_RESULT = 1;
   private TestUtil testUtil = new TestUtil();
 
   /**
@@ -81,9 +80,9 @@ public class DefaultVortexMasterTest {
 
     final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, 
pendingTasklets, 1);
     for (final int taskletId : taskletIds) {
-      final TaskletReport taskletReport = new TaskletResultReport(taskletId, 
INTEGER_RESULT);
+      final WorkerToMasterReport workerToMasterReport = new 
TaskletResultReport(taskletId, INTEGER_RESULT);
       vortexMaster.workerReported(
-          vortexWorkerManager1.getId(), new 
WorkerReport(Collections.singletonList(taskletReport)));
+          vortexWorkerManager1.getId(), new 
WorkerToMasterReports(Collections.singletonList(workerToMasterReport)));
     }
 
     assertTrue("The VortexFuture should be done", future.isDone());
@@ -123,9 +122,9 @@ public class DefaultVortexMasterTest {
 
     // Completed?
     for (final int taskletId : taskletIds2) {
-      final TaskletReport taskletReport = new TaskletResultReport(taskletId, 
EMPTY_RESULT);
+      final WorkerToMasterReport workerToMasterReport = new 
TaskletResultReport(taskletId, EMPTY_RESULT);
       vortexMaster.workerReported(
-          vortexWorkerManager2.getId(), new 
WorkerReport(Collections.singletonList(taskletReport)));
+          vortexWorkerManager2.getId(), new 
WorkerToMasterReports(Collections.singletonList(workerToMasterReport)));
     }
     assertTrue("The VortexFuture should be done", future.isDone());
   }
@@ -176,9 +175,9 @@ public class DefaultVortexMasterTest {
     for (final int taskletId : taskletIds2) {
       final String workerId = 
runningWorkers.getWhereTaskletWasScheduledTo(taskletId);
       assertNotNull("The tasklet must have been scheduled", workerId);
-      final TaskletReport taskletReport = new TaskletResultReport(taskletId, 
EMPTY_RESULT);
+      final WorkerToMasterReport workerToMasterReport = new 
TaskletResultReport(taskletId, EMPTY_RESULT);
       vortexMaster.workerReported(
-          workerId, new 
WorkerReport(Collections.singletonList(taskletReport)));
+          workerId, new 
WorkerToMasterReports(Collections.singletonList(workerToMasterReport)));
     }
     for (final VortexFuture vortexFuture : vortexFutures) {
       assertTrue("The VortexFuture should be done", vortexFuture.isDone());
@@ -221,9 +220,10 @@ public class DefaultVortexMasterTest {
     final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, 
pendingTasklets, 1);
 
     for (final int taskletId : taskletIds) {
-      final TaskletReport taskletReport = new TaskletFailureReport(taskletId, 
new RuntimeException("Test exception."));
+      final WorkerToMasterReport workerToMasterReport =
+          new TaskletFailureReport(taskletId, new RuntimeException("Test 
exception."));
       vortexMaster.workerReported(
-          vortexWorkerManager1.getId(), new 
WorkerReport(Collections.singletonList(taskletReport)));
+          vortexWorkerManager1.getId(), new 
WorkerToMasterReports(Collections.singletonList(workerToMasterReport)));
     }
 
     assertTrue("The VortexFuture should be done", future.isDone());

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
index 110143f..2828ccb 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
@@ -20,7 +20,6 @@ package org.apache.reef.vortex.driver;
 
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.vortex.common.AggregateFunctionRepository;
 import org.junit.Test;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
index fea4235..b349231 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
@@ -32,7 +32,7 @@ public class SchedulingPolicyTest {
   private final TestUtil testUtil = new TestUtil();
 
   /**
-   * Test common traits of different scheduling policies.
+   * Test protocol traits of different scheduling policies.
    */
   @Test
   public void testCommon() throws Exception {
@@ -102,7 +102,7 @@ public class SchedulingPolicyTest {
   }
 
   /**
-   * Simple common tests.
+   * Simple protocol tests.
    */
   private void commonPolicyTests(final SchedulingPolicy policy) {
     // Initial state

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index 1656f80..c8d2b9e 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -19,15 +19,16 @@
 package org.apache.reef.vortex.driver;
 
 import org.apache.reef.driver.task.RunningTask;
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.vortex.util.VoidCodec;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
-import org.apache.reef.vortex.common.*;
+import org.apache.reef.vortex.protocol.mastertoworker.MasterToWorkerRequest;
+import 
org.apache.reef.vortex.protocol.mastertoworker.TaskletCancellationRequest;
+import org.apache.reef.vortex.protocol.workertomaster.TaskletCancelledReport;
+import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReport;
+import org.apache.reef.vortex.protocol.workertomaster.WorkerToMasterReports;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -48,9 +49,6 @@ import static org.mockito.Mockito.when;
  * Utility methods for tests.
  */
 public final class TestUtil {
-  private static final Codec<Void> VOID_CODEC = new VoidCodec();
-  private static final Codec<Integer> INTEGER_CODEC = new 
SerializableCodec<>();
-
   private final AtomicInteger taskletId = new AtomicInteger(0);
   private final AtomicInteger workerId = new AtomicInteger(0);
   private final Executor executor = Executors.newFixedThreadPool(5);
@@ -74,16 +72,16 @@ public final class TestUtil {
     doAnswer(new Answer() {
       @Override
       public Object answer(final InvocationOnMock invocation) throws Throwable 
{
-        final VortexRequest request = 
(VortexRequest)invocation.getArguments()[1];
+        final MasterToWorkerRequest request = 
(MasterToWorkerRequest)invocation.getArguments()[1];
         if (request instanceof TaskletCancellationRequest) {
-          final TaskletReport cancelReport = new TaskletCancelledReport(
+          final WorkerToMasterReport cancelReport = new TaskletCancelledReport(
               ((TaskletCancellationRequest)request).getTaskletId());
-          master.workerReported(workerManager.getId(), new 
WorkerReport(Collections.singleton(cancelReport)));
+          master.workerReported(workerManager.getId(), new 
WorkerToMasterReports(Collections.singleton(cancelReport)));
         }
 
         return null;
       }
-    }).when(vortexRequestor).sendAsync(any(RunningTask.class), 
any(VortexRequest.class));
+    }).when(vortexRequestor).sendAsync(any(RunningTask.class), 
any(MasterToWorkerRequest.class));
 
     return workerManager;
   }
@@ -93,7 +91,7 @@ public final class TestUtil {
    */
   public Tasklet newTasklet() {
     final int id = taskletId.getAndIncrement();
-    return new Tasklet(id, Optional.empty(), null, null, new 
VortexFuture(executor, vortexMaster, id, VOID_CODEC));
+    return new Tasklet(id, Optional.empty(), null, null, new 
VortexFuture(executor, vortexMaster, id));
   }
 
   /**
@@ -112,16 +110,6 @@ public final class TestUtil {
       public Void call(final Void input) throws Exception {
         return null;
       }
-
-      @Override
-      public Codec getInputCodec() {
-        return VOID_CODEC;
-      }
-
-      @Override
-      public Codec getOutputCodec() {
-        return VOID_CODEC;
-      }
     };
   }
 
@@ -146,16 +134,6 @@ public final class TestUtil {
           }
         }
       }
-
-      @Override
-      public Codec getInputCodec() {
-        return VOID_CODEC;
-      }
-
-      @Override
-      public Codec getOutputCodec() {
-        return VOID_CODEC;
-      }
     };
   }
 
@@ -168,16 +146,6 @@ public final class TestUtil {
       public Integer call(final Integer input) throws Exception {
         return 1;
       }
-
-      @Override
-      public Codec<Integer> getInputCodec() {
-        return INTEGER_CODEC;
-      }
-
-      @Override
-      public Codec<Integer> getOutputCodec() {
-        return INTEGER_CODEC;
-      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
index f97d559..3cce93c 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/VortexTestSuite.java
@@ -20,7 +20,7 @@ package org.apache.reef.tests.applications.vortex;
 
 import org.apache.reef.tests.applications.vortex.addone.AddOneTest;
 import org.apache.reef.tests.applications.vortex.exception.VortexExceptionTest;
-import 
org.apache.reef.tests.applications.vortex.cancellation.TaskletCancellationTest;
+import 
org.apache.reef.tests.applications.vortex.cancellation.TaskletCancellationRequestTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -31,7 +31,7 @@ import org.junit.runners.Suite;
 @Suite.SuiteClasses({
     AddOneTest.class,
     VortexExceptionTest.class,
-    TaskletCancellationTest.class
+    TaskletCancellationRequestTest.class
     })
 public final class VortexTestSuite {
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
index 8db6827..e064976 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneFunction.java
@@ -18,15 +18,12 @@
  */
 package org.apache.reef.tests.applications.vortex.addone;
 
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * Outputs Input+1.
  */
 public final class AddOneFunction implements VortexFunction<Integer, Integer> {
-  private static final Codec<Integer> CODEC = new SerializableCodec<>();
   /**
    * Outputs Input+1.
    */
@@ -34,14 +31,4 @@ public final class AddOneFunction implements 
VortexFunction<Integer, Integer> {
   public Integer call(final Integer input) throws Exception {
     return input + 1;
   }
-
-  @Override
-  public Codec<Integer> getInputCodec() {
-    return CODEC;
-  }
-
-  @Override
-  public Codec<Integer> getOutputCodec() {
-    return CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
index b885c82..3225c53 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/InfiniteLoopWithCancellationFunction.java
@@ -19,9 +19,7 @@
 
 package org.apache.reef.tests.applications.vortex.cancellation;
 
-import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.util.VoidCodec;
 
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -31,7 +29,6 @@ import java.util.logging.Logger;
  */
 public final class InfiniteLoopWithCancellationFunction implements 
VortexFunction<Void, Void> {
   private static final Logger LOG = 
Logger.getLogger(InfiniteLoopWithCancellationFunction.class.getName());
-  private static final Codec<Void> CODEC = new VoidCodec();
 
   @Override
   public Void call(final Void input) throws Exception {
@@ -43,14 +40,4 @@ public final class InfiniteLoopWithCancellationFunction 
implements VortexFunctio
       }
     }
   }
-
-  @Override
-  public Codec<Void> getInputCodec() {
-    return CODEC;
-  }
-
-  @Override
-  public Codec<Void> getOutputCodec() {
-    return CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java
new file mode 100644
index 0000000..fa572fa
--- /dev/null
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationRequestTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.tests.applications.vortex.cancellation;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.vortex.driver.VortexJobConf;
+import org.apache.reef.vortex.driver.VortexMasterConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests the cancellation of a tasklet.
+ */
+public final class TaskletCancellationRequestTest {
+  private final TestEnvironment testEnvironment = 
TestEnvironmentFactory.getNewTestEnvironment();
+
+  /**
+   * Set up the test environment.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.testEnvironment.setUp();
+  }
+
+  /**
+   * Tear down the test environment.
+   */
+  @After
+  public void tearDown() throws Exception {
+    this.testEnvironment.tearDown();
+  }
+
+  @Test
+  public void testVortexTaskletCancellation() {
+    final Configuration vortexMasterConf = VortexMasterConf.CONF
+        .set(VortexMasterConf.WORKER_NUM, 2)
+        .set(VortexMasterConf.WORKER_MEM, 64)
+        .set(VortexMasterConf.WORKER_CORES, 4)
+        .set(VortexMasterConf.WORKER_CAPACITY, 2000)
+        .set(VortexMasterConf.VORTEX_START, TaskletCancellationTestStart.class)
+        .build();
+
+    final VortexJobConf vortexJobConf = VortexJobConf.newBuilder()
+        .setJobName("TEST_Vortex_TaskletCancellationTest")
+        .setVortexMasterConf(vortexMasterConf)
+        .build();
+
+    final LauncherStatus status = 
this.testEnvironment.run(vortexJobConf.getConfiguration());
+    Assert.assertTrue("Job state after execution: " + status, 
status.isSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
deleted file mode 100644
index 0d86154..0000000
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/cancellation/TaskletCancellationTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.tests.applications.vortex.cancellation;
-
-import org.apache.reef.client.LauncherStatus;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tests.TestEnvironment;
-import org.apache.reef.tests.TestEnvironmentFactory;
-import org.apache.reef.vortex.driver.VortexJobConf;
-import org.apache.reef.vortex.driver.VortexMasterConf;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests the cancellation of a tasklet.
- */
-public final class TaskletCancellationTest {
-  private final TestEnvironment testEnvironment = 
TestEnvironmentFactory.getNewTestEnvironment();
-
-  /**
-   * Set up the test environment.
-   */
-  @Before
-  public void setUp() throws Exception {
-    this.testEnvironment.setUp();
-  }
-
-  /**
-   * Tear down the test environment.
-   */
-  @After
-  public void tearDown() throws Exception {
-    this.testEnvironment.tearDown();
-  }
-
-  @Test
-  public void testVortexTaskletCancellation() {
-    final Configuration vortexMasterConf = VortexMasterConf.CONF
-        .set(VortexMasterConf.WORKER_NUM, 2)
-        .set(VortexMasterConf.WORKER_MEM, 64)
-        .set(VortexMasterConf.WORKER_CORES, 4)
-        .set(VortexMasterConf.WORKER_CAPACITY, 2000)
-        .set(VortexMasterConf.VORTEX_START, TaskletCancellationTestStart.class)
-        .build();
-
-    final VortexJobConf vortexJobConf = VortexJobConf.newBuilder()
-        .setJobName("TEST_Vortex_TaskletCancellationTest")
-        .setVortexMasterConf(vortexMasterConf)
-        .build();
-
-    final LauncherStatus status = 
this.testEnvironment.run(vortexJobConf.getConfiguration());
-    Assert.assertTrue("Job state after execution: " + status, 
status.isSuccess());
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
index aeff37b..e78eb75 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/exception/ExceptionFunction.java
@@ -18,27 +18,14 @@
  */
 package org.apache.reef.tests.applications.vortex.exception;
 
-import org.apache.reef.io.serialization.Codec;
-import org.apache.reef.io.serialization.SerializableCodec;
 import org.apache.reef.vortex.api.VortexFunction;
 
 /**
  * A test Vortex function that throws an Exception.
  */
 public final class ExceptionFunction implements VortexFunction<Integer, 
Integer> {
-  private static final Codec<Integer> CODEC = new SerializableCodec<>();
   @Override
   public Integer call(final Integer input) throws Exception {
     throw new RuntimeException("Expected test exception.");
   }
-
-  @Override
-  public Codec<Integer> getInputCodec() {
-    return CODEC;
-  }
-
-  @Override
-  public Codec<Integer> getOutputCodec() {
-    return CODEC;
-  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/48bde0c0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f277566..d3c5fc4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,8 @@ under the License.
         <findbugs.version>3.0.2</findbugs.version>
         <reflections.version>0.9.9-RC1</reflections.version>
         <jsr305.version>3.0.1</jsr305.version>
+        <kryo.version>3.0.3</kryo.version>
+        <kryo-serializers.version>0.37</kryo-serializers.version>
         <rootPath>${user.dir}</rootPath>
     </properties>
 

Reply via email to