Repository: reef Updated Branches: refs/heads/master 977d4f58c -> 40a311e49
[REEF-1129] Driver side VortexAggregate submission This addressed the issue by * Modified ThreadPool, VortexMaster, and RunningWorkers to support submission of aggregate-able VortexFunctions. * Added AggregateFunctionRepository to share aggregate functions between VortexMaster and RunningWorkers. * Modified tests. * Add TODO for REEF-1130 in passing VortexAggregateFunctions to Worker processes. JIRA: [REEF-1129](https://issues.apache.org/jira/browse/REEF-1129) Pull Request: This closes #777 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/40a311e4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/40a311e4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/40a311e4 Branch: refs/heads/master Commit: 40a311e49a765bc4e0a773792382cb0ed5b3c380 Parents: 977d4f5 Author: Andrew Chung <[email protected]> Authored: Mon Jan 18 22:41:12 2016 -0800 Committer: Yunseong Lee <[email protected]> Committed: Wed Jan 20 16:44:24 2016 +0800 ---------------------------------------------------------------------- .../vortex/api/VortexAggregateFunction.java | 2 +- .../reef/vortex/api/VortexAggregateFuture.java | 2 +- .../reef/vortex/api/VortexThreadPool.java | 32 ++++++++++++ .../driver/AggregateFunctionRepository.java | 51 ++++++++++++++++++++ .../reef/vortex/driver/DefaultVortexMaster.java | 44 +++++++++++++++-- .../reef/vortex/driver/RunningWorkers.java | 37 +++++++++++++- .../org/apache/reef/vortex/driver/Tasklet.java | 11 +++++ .../apache/reef/vortex/driver/VortexMaster.java | 15 ++++-- .../vortex/driver/DefaultVortexMasterTest.java | 41 ++++++++++------ .../reef/vortex/driver/RunningWorkersTest.java | 11 ++++- .../org/apache/reef/vortex/driver/TestUtil.java | 13 ++++- 11 files changed, 230 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java index fe3b96a..d7254d5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFunction.java @@ -21,7 +21,7 @@ package org.apache.reef.vortex.api; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; -import org.apache.reef.wake.remote.Codec; +import org.apache.reef.io.serialization.Codec; import java.io.Serializable; import java.util.List; http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java index d958fac..23017b7 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregateFuture.java @@ -23,8 +23,8 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.annotations.audience.Public; +import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.common.VortexFutureDelegate; -import org.apache.reef.wake.remote.Codec; import javax.annotation.concurrent.NotThreadSafe; import java.util.*; http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java index e2b1d35..fb06211 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java @@ -23,6 +23,7 @@ import org.apache.reef.util.Optional; import org.apache.reef.vortex.driver.VortexMaster; import javax.inject.Inject; +import java.util.List; /** * Distributed thread pool. @@ -61,4 +62,35 @@ public final class VortexThreadPool { final FutureCallback<TOutput> callback) { return vortexMaster.enqueueTasklet(function, input, Optional.of(callback)); } + + /** + * @param aggregateFunction to run on VortexFunction outputs + * @param function to run on Vortex + * @param inputs of the function + * @param <TInput> input type + * @param <TOutput> output type + * @return VortexAggregationFuture for tracking execution progress of aggregate-able functions + */ + public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> + submit(final VortexAggregateFunction<TOutput> aggregateFunction, + final VortexFunction<TInput, TOutput> function, final List<TInput> inputs) { + return vortexMaster.enqueueTasklets( + aggregateFunction, function, inputs, Optional.<FutureCallback<AggregateResult<TInput, TOutput>>>empty()); + } + + /** + * @param aggregateFunction to run on VortexFunction outputs + * @param function to run on Vortex + * @param inputs of the function + * @param callback of the aggregation + * @param <TInput> input type + * @param <TOutput> output type + * @return VortexAggregationFuture for tracking execution progress of aggregate-able functions + */ + public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> + submit(final VortexAggregateFunction<TOutput> aggregateFunction, + final VortexFunction<TInput, TOutput> function, final List<TInput> inputs, + final FutureCallback<AggregateResult<TInput, TOutput>> callback) { + return vortexMaster.enqueueTasklets(aggregateFunction, function, inputs, Optional.of(callback)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java new file mode 100644 index 0000000..9b1da7a --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/AggregateFunctionRepository.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.vortex.api.VortexAggregateFunction; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * A repository for {@link VortexAggregateFunction}, used to pass functions between {@link VortexMaster} and + * {@link RunningWorkers}. + */ +@ThreadSafe +@Unstable +@Private +public final class AggregateFunctionRepository { + private final ConcurrentMap<Integer, VortexAggregateFunction> aggregateFunctionMap = new ConcurrentHashMap<>(); + + @Inject + private AggregateFunctionRepository() { + } + + VortexAggregateFunction put(final int aggregateFunctionId, final VortexAggregateFunction function) { + return aggregateFunctionMap.put(aggregateFunctionId, function); + } + + VortexAggregateFunction get(final int aggregateFunctionId) { + return aggregateFunctionMap.get(aggregateFunctionId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index f0b4949..55aeb5a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -23,9 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.io.serialization.Codec; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.util.Optional; -import org.apache.reef.vortex.api.FutureCallback; -import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.api.*; import org.apache.reef.vortex.common.*; import javax.inject.Inject; @@ -43,6 +41,8 @@ import java.util.concurrent.atomic.AtomicInteger; final class DefaultVortexMaster implements VortexMaster { private final Map<Integer, VortexFutureDelegate> taskletFutureMap = new HashMap<>(); private final AtomicInteger taskletIdCounter = new AtomicInteger(); + private final AtomicInteger aggregateIdCounter = new AtomicInteger(); + private final AggregateFunctionRepository aggregateFunctionRepository; private final RunningWorkers runningWorkers; private final PendingTasklets pendingTasklets; private final Executor executor; @@ -53,10 +53,12 @@ final class DefaultVortexMaster implements VortexMaster { @Inject DefaultVortexMaster(final RunningWorkers runningWorkers, final PendingTasklets pendingTasklets, + final AggregateFunctionRepository aggregateFunctionRepository, @Parameter(VortexMasterConf.CallbackThreadPoolSize.class) final int threadPoolSize) { this.executor = Executors.newFixedThreadPool(threadPoolSize); this.runningWorkers = runningWorkers; this.pendingTasklets = pendingTasklets; + this.aggregateFunctionRepository = aggregateFunctionRepository; } /** @@ -76,7 +78,7 @@ final class DefaultVortexMaster implements VortexMaster { vortexFuture = new VortexFuture<>(executor, this, id, outputCodec); } - final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture); + final Tasklet tasklet = new Tasklet<>(id, Optional.<Integer>empty(), function, input, vortexFuture); putDelegate(Collections.singletonList(tasklet), vortexFuture); this.pendingTasklets.addLast(tasklet); @@ -84,6 +86,40 @@ final class DefaultVortexMaster implements VortexMaster { } /** + * Add aggregate-able Tasklets to pendingTasklets. + */ + @Override + public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> + enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction, + final VortexFunction<TInput, TOutput> vortexFunction, final List<TInput> inputs, + final Optional<FutureCallback<AggregateResult<TInput, TOutput>>> callback) { + final int aggregateFunctionId = aggregateIdCounter.getAndIncrement(); + aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction); + final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec(); + final List<Tasklet> tasklets = new ArrayList<>(inputs.size()); + final Map<Integer, TInput> taskletIdInputMap = new HashMap<>(inputs.size()); + + for (final TInput input : inputs) { + taskletIdInputMap.put(taskletIdCounter.getAndIncrement(), input); + } + + final VortexAggregateFuture<TInput, TOutput> vortexAggregateFuture = + callback.isPresent() ? + new VortexAggregateFuture<>(executor, taskletIdInputMap, aggOutputCodec, callback.get()) : + new VortexAggregateFuture<>(executor, taskletIdInputMap, aggOutputCodec, null); + + for (final Map.Entry<Integer, TInput> taskletIdInputEntry : taskletIdInputMap.entrySet()) { + final Tasklet tasklet = new Tasklet<>(taskletIdInputEntry.getKey(), Optional.of(aggregateFunctionId), + vortexFunction, taskletIdInputEntry.getValue(), vortexAggregateFuture); + tasklets.add(tasklet); + pendingTasklets.addLast(tasklet); + } + + putDelegate(tasklets, vortexAggregateFuture); + return vortexAggregateFuture; + } + + /** * Cancels tasklets on the running workers. */ @Override http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java index a1fb96f..2bddc18 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -20,6 +20,7 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.ThreadSafe; +import org.apache.commons.lang3.NotImplementedException; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.util.Optional; @@ -57,12 +58,18 @@ final class RunningWorkers { // Scheduling policy private final SchedulingPolicy schedulingPolicy; + private final AggregateFunctionRepository aggregateFunctionRepository; + + private final Map<String, Set<Integer>> workerAggregateFunctionMap = new HashMap<>(); + /** * RunningWorkers constructor. */ @Inject - RunningWorkers(final SchedulingPolicy schedulingPolicy) { + RunningWorkers(final SchedulingPolicy schedulingPolicy, + final AggregateFunctionRepository aggregateFunctionRepository) { this.schedulingPolicy = schedulingPolicy; + this.aggregateFunctionRepository = aggregateFunctionRepository; } /** @@ -76,6 +83,7 @@ final class RunningWorkers { if (!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) { this.runningWorkers.put(vortexWorkerManager.getId(), vortexWorkerManager); this.schedulingPolicy.workerAdded(vortexWorkerManager); + this.workerAggregateFunctionMap.put(vortexWorkerManager.getId(), new HashSet<Integer>()); // Notify (possibly) waiting scheduler noWorkerOrResource.signal(); @@ -111,7 +119,11 @@ final class RunningWorkers { return Optional.empty(); } } finally { - lock.unlock(); + try { + workerAggregateFunctionMap.remove(id); + } finally { + lock.unlock(); + } } } @@ -145,7 +157,15 @@ final class RunningWorkers { return; } + final Optional<Integer> taskletAggFunctionId = tasklet.getAggregateFunctionId(); final VortexWorkerManager vortexWorkerManager = runningWorkers.get(workerId.get()); + if (taskletAggFunctionId.isPresent() && + !workerHasAggregateFunction(vortexWorkerManager.getId(), taskletAggFunctionId.get())) { + // TODO[JIRA REEF-1130]: fetch aggregate function from repo and send aggregate function to worker. + throw new NotImplementedException("Serialize aggregate function to worker if it doesn't have it. " + + "Complete in REEF-1130."); + } + vortexWorkerManager.launchTasklet(tasklet); schedulingPolicy.taskletLaunched(vortexWorkerManager, tasklet); } @@ -247,4 +267,17 @@ final class RunningWorkers { boolean isWorkerRunning(final String workerId) { return runningWorkers.containsKey(workerId); } + + /** + * @return true if Vortex has sent the aggregation function to the worker specified by workerId + */ + private boolean workerHasAggregateFunction(final String workerId, final int aggregateFunctionId) { + if (!workerAggregateFunctionMap.containsKey(workerId)) { + LOG.log(Level.WARNING, "Trying to look up a worker's aggregation function for a worker with an ID that has " + + "not yet been added."); + return false; + } + + return workerAggregateFunctionMap.get(workerId).contains(aggregateFunctionId); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java index 6f5d519..063835f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java @@ -19,6 +19,7 @@ package org.apache.reef.vortex.driver; import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.common.VortexFutureDelegate; @@ -29,13 +30,16 @@ import org.apache.reef.vortex.common.VortexFutureDelegate; class Tasklet<TInput, TOutput> { private final int taskletId; private final VortexFunction<TInput, TOutput> userTask; + private final Optional<Integer> aggregateFunctionId; private final TInput input; private final VortexFutureDelegate delegate; Tasklet(final int taskletId, + final Optional<Integer> aggregateFunctionId, final VortexFunction<TInput, TOutput> userTask, final TInput input, final VortexFutureDelegate delegate) { + this.aggregateFunctionId = aggregateFunctionId; this.taskletId = taskletId; this.userTask = userTask; this.input = input; @@ -50,6 +54,13 @@ class Tasklet<TInput, TOutput> { } /** + * @return aggregate function id of the tasklet, not present if the tasklet is not aggregate-able + */ + Optional<Integer> getAggregateFunctionId() { + return aggregateFunctionId; + } + + /** * @return the input of the tasklet */ TInput getInput() { http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java index a423706..09b7e0a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -22,11 +22,11 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.util.Optional; -import org.apache.reef.vortex.api.FutureCallback; -import org.apache.reef.vortex.api.VortexFunction; -import org.apache.reef.vortex.api.VortexFuture; +import org.apache.reef.vortex.api.*; import org.apache.reef.vortex.common.WorkerReport; +import java.util.List; + /** * The heart of Vortex. * Processes various tasklet related events/requests coming from different components of the system. @@ -43,6 +43,15 @@ public interface VortexMaster { final Optional<FutureCallback<TOutput>> callback); /** + * Submits aggregate-able Tasklets to be run sometime in the future, with an optional callback function on + * the aggregation progress. + */ + <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> + enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction, + final VortexFunction<TInput, TOutput> vortexFunction, final List<TInput> inputs, + final Optional<FutureCallback<AggregateResult<TInput, TOutput>>> callback); + + /** * Call this when a Tasklet is to be cancelled. * @param mayInterruptIfRunning if true, will attempt to cancel running Tasklets; otherwise will only * prevent a pending Tasklet from running. http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java index 40a819c..0770f77 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java @@ -19,6 +19,7 @@ package org.apache.reef.vortex.driver; import org.apache.reef.io.serialization.SerializableCodec; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.FutureCallback; import org.apache.reef.vortex.api.VortexFunction; @@ -52,9 +53,11 @@ public class DefaultVortexMasterTest { public void testSingleTaskletNoFailure() throws Exception { final VortexFunction vortexFunction = testUtil.newIntegerFunction(); final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); - final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(), + testUtil.newAggregateFunctionRepository()); final PendingTasklets pendingTasklets = new PendingTasklets(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, + testUtil.newAggregateFunctionRepository(), 5); final AtomicBoolean callbackReceived = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); @@ -96,9 +99,11 @@ public class DefaultVortexMasterTest { final VortexFunction vortexFunction = testUtil.newFunction(); final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); final VortexWorkerManager vortexWorkerManager2 = testUtil.newWorker(); - final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(), + testUtil.newAggregateFunctionRepository()); final PendingTasklets pendingTasklets = new PendingTasklets(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, + testUtil.newAggregateFunctionRepository(), 5); // Allocate worker & tasklet and schedule vortexMaster.workerAllocated(vortexWorkerManager1); @@ -132,9 +137,11 @@ public class DefaultVortexMasterTest { public void testMultipleTaskletsFailure() throws Exception { // The tasklets that need to be executed final ArrayList<VortexFuture> vortexFutures = new ArrayList<>(); - final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(), + testUtil.newAggregateFunctionRepository()); final PendingTasklets pendingTasklets = new PendingTasklets(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, + testUtil.newAggregateFunctionRepository(), 5); // Allocate iniital evaluators (will all be preempted later...) final List<VortexWorkerManager> initialWorkers = new ArrayList<>(); @@ -185,9 +192,11 @@ public class DefaultVortexMasterTest { public void testTaskletThrowException() throws Exception { final VortexFunction vortexFunction = testUtil.newIntegerFunction(); final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); - final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(), + testUtil.newAggregateFunctionRepository()); final PendingTasklets pendingTasklets = new PendingTasklets(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, + testUtil.newAggregateFunctionRepository(), 5); final AtomicBoolean callbackReceived = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); @@ -227,7 +236,8 @@ public class DefaultVortexMasterTest { */ @Test(timeout = 10000) public void testSingleTaskletCancellation() throws Exception { - final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(), + testUtil.newAggregateFunctionRepository()); final PendingTasklets pendingTasklets = new PendingTasklets(); final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets); launchTasklets(runningWorkers, pendingTasklets, 1); @@ -243,7 +253,8 @@ public class DefaultVortexMasterTest { @Test(timeout = 10000) public void testSingleTaskletCancellationBeforeLaunch() throws Exception { - final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy(), + testUtil.newAggregateFunctionRepository()); final PendingTasklets pendingTasklets = new PendingTasklets(); final VortexFuture future = createTaskletCancellationFuture(runningWorkers, pendingTasklets); @@ -260,10 +271,12 @@ public class DefaultVortexMasterTest { assertTrue("The VortexFuture should be done", future.isDone()); } - private VortexFuture createTaskletCancellationFuture(final RunningWorkers runningWorkers, - final PendingTasklets pendingTasklets) { + private VortexFuture createTaskletCancellationFuture( + final RunningWorkers runningWorkers, final PendingTasklets pendingTasklets) throws InjectionException { final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction(); - final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5); + final DefaultVortexMaster vortexMaster = new DefaultVortexMaster( + runningWorkers, pendingTasklets, + testUtil.newAggregateFunctionRepository(), 5); final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(vortexMaster); @@ -288,4 +301,4 @@ public class DefaultVortexMasterTest { } return taskletIds; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java index 76416cb..2828ccb 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java @@ -18,6 +18,8 @@ */ package org.apache.reef.vortex.driver; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.junit.Test; import java.util.ArrayList; @@ -34,7 +36,12 @@ import static org.junit.Assert.assertTrue; public class RunningWorkersTest { private final TestUtil testUtil = new TestUtil(); private final TestUtil.TestSchedulingPolicy schedulingPolicy = testUtil.newSchedulingPolicy(); - private final RunningWorkers runningWorkers = new RunningWorkers(schedulingPolicy); + private final RunningWorkers runningWorkers; + + public RunningWorkersTest() throws InjectionException { + runningWorkers = new RunningWorkers( + schedulingPolicy, Tang.Factory.getTang().newInjector().getInstance(AggregateFunctionRepository.class)); + } /** * Test executor preemption -> executor allocation. @@ -69,4 +76,4 @@ public class RunningWorkersTest { runningWorkers.doneTasklets(vortexWorkerManager.getId(), taskletIds); assertFalse("Tasklet must not have been completed", schedulingPolicy.taskletIsDone(tasklet.getId())); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/40a311e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java index c2dee99..1b9abac 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java @@ -21,6 +21,8 @@ package org.apache.reef.vortex.driver; import org.apache.reef.driver.task.RunningTask; import org.apache.reef.io.serialization.Codec; import org.apache.reef.io.serialization.SerializableCodec; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.vortex.util.VoidCodec; import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.VortexFunction; @@ -90,7 +92,14 @@ public final class TestUtil { */ public Tasklet newTasklet() { final int id = taskletId.getAndIncrement(); - return new Tasklet(id, null, null, new VortexFuture(executor, vortexMaster, id, VOID_CODEC)); + return new Tasklet(id, Optional.empty(), null, null, new VortexFuture(executor, vortexMaster, id, VOID_CODEC)); + } + + /** + * @return a new {@link AggregateFunctionRepository} + */ + public AggregateFunctionRepository newAggregateFunctionRepository() throws InjectionException { + return Tang.Factory.getTang().newInjector().getInstance(AggregateFunctionRepository.class); } /** @@ -213,4 +222,4 @@ public final class TestUtil { return doneTasklets.contains(taskletId); } } -} +} \ No newline at end of file
