Repository: reef
Updated Branches:
  refs/heads/master 977d4f58c -> 40a311e49


[REEF-1129] Driver side VortexAggregate submission

This addressed the issue by
  * Modified ThreadPool, VortexMaster, and RunningWorkers to support submission 
of aggregate-able VortexFunctions.
  * Added AggregateFunctionRepository to share aggregate functions between 
VortexMaster and RunningWorkers.
  * Modified tests.
  * Add TODO for REEF-1130 in passing VortexAggregateFunctions to Worker 
processes.

JIRA:
  [REEF-1129](https://issues.apache.org/jira/browse/REEF-1129)

Pull Request:
  This closes #777


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/40a311e4
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/40a311e4
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/40a311e4

Branch: refs/heads/master
Commit: 40a311e49a765bc4e0a773792382cb0ed5b3c380
Parents: 977d4f5
Author: Andrew Chung <[email protected]>
Authored: Mon Jan 18 22:41:12 2016 -0800
Committer: Yunseong Lee <[email protected]>
Committed: Wed Jan 20 16:44:24 2016 +0800

----------------------------------------------------------------------
 .../vortex/api/VortexAggregateFunction.java     |  2 +-
 .../reef/vortex/api/VortexAggregateFuture.java  |  2 +-
 .../reef/vortex/api/VortexThreadPool.java       | 32 ++++++++++++
 .../driver/AggregateFunctionRepository.java     | 51 ++++++++++++++++++++
 .../reef/vortex/driver/DefaultVortexMaster.java | 44 +++++++++++++++--
 .../reef/vortex/driver/RunningWorkers.java      | 37 +++++++++++++-
 .../org/apache/reef/vortex/driver/Tasklet.java  | 11 +++++
 .../apache/reef/vortex/driver/VortexMaster.java | 15 ++++--
 .../vortex/driver/DefaultVortexMasterTest.java  | 41 ++++++++++------
 .../reef/vortex/driver/RunningWorkersTest.java  | 11 ++++-
 .../org/apache/reef/vortex/driver/TestUtil.java | 13 ++++-
 11 files changed, 230 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
index fe3b96a..d7254d5 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java
@@ -21,7 +21,7 @@ package org.apache.reef.vortex.api;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
-import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.io.serialization.Codec;
 
 import java.io.Serializable;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
index d958fac..23017b7 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java
@@ -23,8 +23,8 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.common.VortexFutureDelegate;
-import org.apache.reef.wake.remote.Codec;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
index e2b1d35..fb06211 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
@@ -23,6 +23,7 @@ import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.driver.VortexMaster;
 
 import javax.inject.Inject;
+import java.util.List;
 
 /**
  * Distributed thread pool.
@@ -61,4 +62,35 @@ public final class VortexThreadPool {
              final FutureCallback<TOutput> callback) {
     return vortexMaster.enqueueTasklet(function, input, Optional.of(callback));
   }
+
+  /**
+   * @param aggregateFunction to run on VortexFunction outputs
+   * @param function to run on Vortex
+   * @param inputs of the function
+   * @param <TInput> input type
+   * @param <TOutput> output type
+   * @return VortexAggregationFuture for tracking execution progress of 
aggregate-able functions
+   */
+  public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
+      submit(final VortexAggregateFunction<TOutput> aggregateFunction,
+             final VortexFunction<TInput, TOutput> function, final 
List<TInput> inputs) {
+    return vortexMaster.enqueueTasklets(
+        aggregateFunction, function, inputs, 
Optional.<FutureCallback<AggregateResult<TInput, TOutput>>>empty());
+  }
+
+  /**
+   * @param aggregateFunction to run on VortexFunction outputs
+   * @param function to run on Vortex
+   * @param inputs of the function
+   * @param callback of the aggregation
+   * @param <TInput> input type
+   * @param <TOutput> output type
+   * @return VortexAggregationFuture for tracking execution progress of 
aggregate-able functions
+   */
+  public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
+      submit(final VortexAggregateFunction<TOutput> aggregateFunction,
+             final VortexFunction<TInput, TOutput> function, final 
List<TInput> inputs,
+             final FutureCallback<AggregateResult<TInput, TOutput>> callback) {
+    return vortexMaster.enqueueTasklets(aggregateFunction, function, inputs, 
Optional.of(callback));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
new file mode 100644
index 0000000..9b1da7a
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.vortex.api.VortexAggregateFunction;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A repository for {@link VortexAggregateFunction}, used to pass functions 
between {@link VortexMaster} and
+ * {@link RunningWorkers}.
+ */
+@ThreadSafe
+@Unstable
+@Private
+public final class AggregateFunctionRepository {
+  private final ConcurrentMap<Integer, VortexAggregateFunction> 
aggregateFunctionMap = new ConcurrentHashMap<>();
+
+  @Inject
+  private AggregateFunctionRepository() {
+  }
+
+  VortexAggregateFunction put(final int aggregateFunctionId, final 
VortexAggregateFunction function) {
+    return aggregateFunctionMap.put(aggregateFunctionId, function);
+  }
+
+  VortexAggregateFunction get(final int aggregateFunctionId) {
+    return aggregateFunctionMap.get(aggregateFunctionId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index f0b4949..55aeb5a 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -23,9 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.Optional;
-import org.apache.reef.vortex.api.FutureCallback;
-import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.api.*;
 import org.apache.reef.vortex.common.*;
 
 import javax.inject.Inject;
@@ -43,6 +41,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 final class DefaultVortexMaster implements VortexMaster {
   private final Map<Integer, VortexFutureDelegate> taskletFutureMap = new 
HashMap<>();
   private final AtomicInteger taskletIdCounter = new AtomicInteger();
+  private final AtomicInteger aggregateIdCounter = new AtomicInteger();
+  private final AggregateFunctionRepository aggregateFunctionRepository;
   private final RunningWorkers runningWorkers;
   private final PendingTasklets pendingTasklets;
   private final Executor executor;
@@ -53,10 +53,12 @@ final class DefaultVortexMaster implements VortexMaster {
   @Inject
   DefaultVortexMaster(final RunningWorkers runningWorkers,
                       final PendingTasklets pendingTasklets,
+                      final AggregateFunctionRepository 
aggregateFunctionRepository,
                       
@Parameter(VortexMasterConf.CallbackThreadPoolSize.class) final int 
threadPoolSize) {
     this.executor = Executors.newFixedThreadPool(threadPoolSize);
     this.runningWorkers = runningWorkers;
     this.pendingTasklets = pendingTasklets;
+    this.aggregateFunctionRepository = aggregateFunctionRepository;
   }
 
   /**
@@ -76,7 +78,7 @@ final class DefaultVortexMaster implements VortexMaster {
       vortexFuture = new VortexFuture<>(executor, this, id, outputCodec);
     }
 
-    final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture);
+    final Tasklet tasklet = new Tasklet<>(id, Optional.<Integer>empty(), 
function, input, vortexFuture);
     putDelegate(Collections.singletonList(tasklet), vortexFuture);
     this.pendingTasklets.addLast(tasklet);
 
@@ -84,6 +86,40 @@ final class DefaultVortexMaster implements VortexMaster {
   }
 
   /**
+   * Add aggregate-able Tasklets to pendingTasklets.
+   */
+  @Override
+  public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
+      enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction,
+                      final VortexFunction<TInput, TOutput> vortexFunction, 
final List<TInput> inputs,
+                      final Optional<FutureCallback<AggregateResult<TInput, 
TOutput>>> callback) {
+    final int aggregateFunctionId = aggregateIdCounter.getAndIncrement();
+    aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction);
+    final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec();
+    final List<Tasklet> tasklets = new ArrayList<>(inputs.size());
+    final Map<Integer, TInput> taskletIdInputMap = new 
HashMap<>(inputs.size());
+
+    for (final TInput input : inputs) {
+      taskletIdInputMap.put(taskletIdCounter.getAndIncrement(), input);
+    }
+
+    final VortexAggregateFuture<TInput, TOutput> vortexAggregateFuture =
+        callback.isPresent() ?
+        new VortexAggregateFuture<>(executor, taskletIdInputMap, 
aggOutputCodec, callback.get()) :
+        new VortexAggregateFuture<>(executor, taskletIdInputMap, 
aggOutputCodec, null);
+
+    for (final Map.Entry<Integer, TInput> taskletIdInputEntry : 
taskletIdInputMap.entrySet()) {
+      final Tasklet tasklet = new Tasklet<>(taskletIdInputEntry.getKey(), 
Optional.of(aggregateFunctionId),
+          vortexFunction, taskletIdInputEntry.getValue(), 
vortexAggregateFuture);
+      tasklets.add(tasklet);
+      pendingTasklets.addLast(tasklet);
+    }
+
+    putDelegate(tasklets, vortexAggregateFuture);
+    return vortexAggregateFuture;
+  }
+
+  /**
    * Cancels tasklets on the running workers.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index a1fb96f..2bddc18 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -20,6 +20,7 @@ package org.apache.reef.vortex.driver;
 
 import net.jcip.annotations.ThreadSafe;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.util.Optional;
 
@@ -57,12 +58,18 @@ final class RunningWorkers {
   // Scheduling policy
   private final SchedulingPolicy schedulingPolicy;
 
+  private final AggregateFunctionRepository aggregateFunctionRepository;
+
+  private final Map<String, Set<Integer>> workerAggregateFunctionMap = new 
HashMap<>();
+
   /**
    * RunningWorkers constructor.
    */
   @Inject
-  RunningWorkers(final SchedulingPolicy schedulingPolicy) {
+  RunningWorkers(final SchedulingPolicy schedulingPolicy,
+                 final AggregateFunctionRepository 
aggregateFunctionRepository) {
     this.schedulingPolicy = schedulingPolicy;
+    this.aggregateFunctionRepository = aggregateFunctionRepository;
   }
 
   /**
@@ -76,6 +83,7 @@ final class RunningWorkers {
         if (!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) {
           this.runningWorkers.put(vortexWorkerManager.getId(), 
vortexWorkerManager);
           this.schedulingPolicy.workerAdded(vortexWorkerManager);
+          this.workerAggregateFunctionMap.put(vortexWorkerManager.getId(), new 
HashSet<Integer>());
 
           // Notify (possibly) waiting scheduler
           noWorkerOrResource.signal();
@@ -111,7 +119,11 @@ final class RunningWorkers {
         return Optional.empty();
       }
     } finally {
-      lock.unlock();
+      try {
+        workerAggregateFunctionMap.remove(id);
+      } finally {
+        lock.unlock();
+      }
     }
   }
 
@@ -145,7 +157,15 @@ final class RunningWorkers {
           return;
         }
 
+        final Optional<Integer> taskletAggFunctionId =  
tasklet.getAggregateFunctionId();
         final VortexWorkerManager vortexWorkerManager = 
runningWorkers.get(workerId.get());
+        if (taskletAggFunctionId.isPresent() &&
+            !workerHasAggregateFunction(vortexWorkerManager.getId(), 
taskletAggFunctionId.get())) {
+          // TODO[JIRA REEF-1130]: fetch aggregate function from repo and send 
aggregate function to worker.
+          throw new NotImplementedException("Serialize aggregate function to 
worker if it doesn't have it. " +
+              "Complete in REEF-1130.");
+        }
+
         vortexWorkerManager.launchTasklet(tasklet);
         schedulingPolicy.taskletLaunched(vortexWorkerManager, tasklet);
       }
@@ -247,4 +267,17 @@ final class RunningWorkers {
   boolean isWorkerRunning(final String workerId) {
     return runningWorkers.containsKey(workerId);
   }
+
+  /**
+   * @return true if Vortex has sent the aggregation function to the worker 
specified by workerId
+   */
+  private boolean workerHasAggregateFunction(final String workerId, final int 
aggregateFunctionId) {
+    if (!workerAggregateFunctionMap.containsKey(workerId)) {
+      LOG.log(Level.WARNING, "Trying to look up a worker's aggregation 
function for a worker with an ID that has " +
+          "not yet been added.");
+      return false;
+    }
+
+    return 
workerAggregateFunctionMap.get(workerId).contains(aggregateFunctionId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
index 6f5d519..063835f 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -19,6 +19,7 @@
 package org.apache.reef.vortex.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.common.VortexFutureDelegate;
 
@@ -29,13 +30,16 @@ import org.apache.reef.vortex.common.VortexFutureDelegate;
 class Tasklet<TInput, TOutput> {
   private final int taskletId;
   private final VortexFunction<TInput, TOutput> userTask;
+  private final Optional<Integer> aggregateFunctionId;
   private final TInput input;
   private final VortexFutureDelegate delegate;
 
   Tasklet(final int taskletId,
+          final Optional<Integer> aggregateFunctionId,
           final VortexFunction<TInput, TOutput> userTask,
           final TInput input,
           final VortexFutureDelegate delegate) {
+    this.aggregateFunctionId = aggregateFunctionId;
     this.taskletId = taskletId;
     this.userTask = userTask;
     this.input = input;
@@ -50,6 +54,13 @@ class Tasklet<TInput, TOutput> {
   }
 
   /**
+   * @return aggregate function id of the tasklet, not present if the tasklet 
is not aggregate-able
+   */
+  Optional<Integer> getAggregateFunctionId() {
+    return aggregateFunctionId;
+  }
+
+  /**
    * @return the input of the tasklet
    */
   TInput getInput() {

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index a423706..09b7e0a 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -22,11 +22,11 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.util.Optional;
-import org.apache.reef.vortex.api.FutureCallback;
-import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.api.*;
 import org.apache.reef.vortex.common.WorkerReport;
 
+import java.util.List;
+
 /**
  * The heart of Vortex.
  * Processes various tasklet related events/requests coming from different 
components of the system.
@@ -43,6 +43,15 @@ public interface VortexMaster {
                      final Optional<FutureCallback<TOutput>> callback);
 
   /**
+   * Submits aggregate-able Tasklets to be run sometime in the future, with an 
optional callback function on
+   * the aggregation progress.
+   */
+  <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
+      enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction,
+                      final VortexFunction<TInput, TOutput> vortexFunction, 
final List<TInput> inputs,
+                      final Optional<FutureCallback<AggregateResult<TInput, 
TOutput>>> callback);
+
+  /**
    * Call this when a Tasklet is to be cancelled.
    * @param mayInterruptIfRunning if true, will attempt to cancel running 
Tasklets; otherwise will only
    *                              prevent a pending Tasklet from running.

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index 40a819c..0770f77 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -19,6 +19,7 @@
 package org.apache.reef.vortex.driver;
 
 import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.FutureCallback;
 import org.apache.reef.vortex.api.VortexFunction;
@@ -52,9 +53,11 @@ public class DefaultVortexMasterTest {
   public void testSingleTaskletNoFailure() throws Exception {
     final VortexFunction vortexFunction = testUtil.newIntegerFunction();
     final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
-    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy(),
+        testUtil.newAggregateFunctionRepository());
     final PendingTasklets pendingTasklets = new PendingTasklets();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets,
+        testUtil.newAggregateFunctionRepository(), 5);
 
     final AtomicBoolean callbackReceived = new AtomicBoolean(false);
     final CountDownLatch latch = new CountDownLatch(1);
@@ -96,9 +99,11 @@ public class DefaultVortexMasterTest {
     final VortexFunction vortexFunction = testUtil.newFunction();
     final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
     final VortexWorkerManager vortexWorkerManager2 = testUtil.newWorker();
-    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy(),
+        testUtil.newAggregateFunctionRepository());
     final PendingTasklets pendingTasklets = new PendingTasklets();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets,
+        testUtil.newAggregateFunctionRepository(), 5);
 
     // Allocate worker & tasklet and schedule
     vortexMaster.workerAllocated(vortexWorkerManager1);
@@ -132,9 +137,11 @@ public class DefaultVortexMasterTest {
   public void testMultipleTaskletsFailure() throws Exception {
     // The tasklets that need to be executed
     final ArrayList<VortexFuture> vortexFutures = new ArrayList<>();
-    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy(),
+        testUtil.newAggregateFunctionRepository());
     final PendingTasklets pendingTasklets = new PendingTasklets();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets,
+        testUtil.newAggregateFunctionRepository(), 5);
 
     // Allocate iniital evaluators (will all be preempted later...)
     final List<VortexWorkerManager> initialWorkers = new ArrayList<>();
@@ -185,9 +192,11 @@ public class DefaultVortexMasterTest {
   public void testTaskletThrowException() throws Exception {
     final VortexFunction vortexFunction = testUtil.newIntegerFunction();
     final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
-    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy(),
+        testUtil.newAggregateFunctionRepository());
     final PendingTasklets pendingTasklets = new PendingTasklets();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets,
+        testUtil.newAggregateFunctionRepository(), 5);
 
     final AtomicBoolean callbackReceived = new AtomicBoolean(false);
     final CountDownLatch latch = new CountDownLatch(1);
@@ -227,7 +236,8 @@ public class DefaultVortexMasterTest {
    */
   @Test(timeout = 10000)
   public void testSingleTaskletCancellation() throws Exception {
-    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy(),
+        testUtil.newAggregateFunctionRepository());
     final PendingTasklets pendingTasklets = new PendingTasklets();
     final VortexFuture future = 
createTaskletCancellationFuture(runningWorkers, pendingTasklets);
     launchTasklets(runningWorkers, pendingTasklets, 1);
@@ -243,7 +253,8 @@ public class DefaultVortexMasterTest {
   @Test(timeout = 10000)
   public void testSingleTaskletCancellationBeforeLaunch() throws Exception {
 
-    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy(),
+        testUtil.newAggregateFunctionRepository());
     final PendingTasklets pendingTasklets = new PendingTasklets();
     final VortexFuture future = 
createTaskletCancellationFuture(runningWorkers, pendingTasklets);
 
@@ -260,10 +271,12 @@ public class DefaultVortexMasterTest {
     assertTrue("The VortexFuture should be done", future.isDone());
   }
 
-  private VortexFuture createTaskletCancellationFuture(final RunningWorkers 
runningWorkers,
-                                                       final PendingTasklets 
pendingTasklets) {
+  private VortexFuture createTaskletCancellationFuture(
+      final RunningWorkers runningWorkers, final PendingTasklets 
pendingTasklets) throws InjectionException {
     final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction();
-    final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+    final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(
+        runningWorkers, pendingTasklets,
+        testUtil.newAggregateFunctionRepository(), 5);
     final VortexWorkerManager vortexWorkerManager1 = 
testUtil.newWorker(vortexMaster);
 
 
@@ -288,4 +301,4 @@ public class DefaultVortexMasterTest {
     }
     return taskletIds;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
index 76416cb..2828ccb 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.reef.vortex.driver;
 
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -34,7 +36,12 @@ import static org.junit.Assert.assertTrue;
 public class RunningWorkersTest {
   private final TestUtil testUtil = new TestUtil();
   private final TestUtil.TestSchedulingPolicy schedulingPolicy = 
testUtil.newSchedulingPolicy();
-  private final RunningWorkers runningWorkers = new 
RunningWorkers(schedulingPolicy);
+  private final RunningWorkers runningWorkers;
+
+  public RunningWorkersTest() throws InjectionException {
+    runningWorkers = new RunningWorkers(
+        schedulingPolicy, 
Tang.Factory.getTang().newInjector().getInstance(AggregateFunctionRepository.class));
+  }
 
   /**
    * Test executor preemption -> executor allocation.
@@ -69,4 +76,4 @@ public class RunningWorkersTest {
     runningWorkers.doneTasklets(vortexWorkerManager.getId(), taskletIds);
     assertFalse("Tasklet must not have been completed", 
schedulingPolicy.taskletIsDone(tasklet.getId()));
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index c2dee99..1b9abac 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -21,6 +21,8 @@ package org.apache.reef.vortex.driver;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.vortex.util.VoidCodec;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
@@ -90,7 +92,14 @@ public final class TestUtil {
    */
   public Tasklet newTasklet() {
     final int id = taskletId.getAndIncrement();
-    return new Tasklet(id, null, null, new VortexFuture(executor, 
vortexMaster, id, VOID_CODEC));
+    return new Tasklet(id, Optional.empty(), null, null, new 
VortexFuture(executor, vortexMaster, id, VOID_CODEC));
+  }
+
+  /**
+   * @return a new {@link AggregateFunctionRepository}
+   */
+  public AggregateFunctionRepository newAggregateFunctionRepository() throws 
InjectionException {
+    return 
Tang.Factory.getTang().newInjector().getInstance(AggregateFunctionRepository.class);
   }
 
   /**
@@ -213,4 +222,4 @@ public final class TestUtil {
       return doneTasklets.contains(taskletId);
     }
   }
-}
+}
\ No newline at end of file

Reply via email to