Repository: incubator-reef Updated Branches: refs/heads/master 4d5ca8b07 -> 073f18e40
[REEF-518] Smarter VortexWorker-Tasklet scheduling This addressed the issue by * Creating pluggable SchedulingPolicy for choosing a worker to launch a tasklet onto. * Implementing two types of SchedulingPolicy: FirstFit and Random. * Creating unit tests for testing these two policies. * Making changes in related classes to configure and use the SchedulingPolicy. Zhengping Qian implemented the core logic. John Yang wrote tests, code for configurations, and JavaDocs. Note that we changed the name of the class PendingTaskletScheduler to PendingTaskletLauncher to make it clearer that the worker-tasklet matching is the responsibility of RunningWorkers. In the code you can see that SchedulingPolicy is injected into RunningWorkers and used there. We expect that later the PendingTaskletLauncher will be responsibile for addressing REEF-500. JIRA: [REEF-518](https://issues.apache.org/jira/browse/REEF-518) Pull Request: Closes #427 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/073f18e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/073f18e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/073f18e4 Branch: refs/heads/master Commit: 073f18e4043cb598d7bca129716cf4b3400b6f7c Parents: 4d5ca8b Author: John Yang <johnya...@apache.org> Authored: Wed Aug 26 14:50:52 2015 +0800 Committer: Byung-Gon Chun <bgc...@apache.org> Committed: Sun Aug 30 12:45:23 2015 +0900 ---------------------------------------------------------------------- .../reef/vortex/driver/DefaultVortexMaster.java | 9 +- .../vortex/driver/FirstFitSchedulingPolicy.java | 151 +++++++++++++ .../vortex/driver/PendingTaskletLauncher.java | 59 +++++ .../vortex/driver/PendingTaskletScheduler.java | 59 ----- .../vortex/driver/RandomSchedulingPolicy.java | 105 +++++++++ .../reef/vortex/driver/RunningWorkers.java | 219 +++++++++---------- .../reef/vortex/driver/SchedulingPolicy.java | 60 +++++ .../reef/vortex/driver/VortexConfHelper.java | 4 +- .../apache/reef/vortex/driver/VortexDriver.java | 4 +- .../reef/vortex/driver/VortexLauncher.java | 12 +- .../reef/vortex/driver/VortexMasterConf.java | 18 +- .../reef/vortex/driver/VortexWorkerManager.java | 27 ++- .../reef/vortex/examples/addone/AddOne.java | 2 +- .../reef/vortex/examples/hello/HelloVortex.java | 2 +- .../vortex/driver/DefaultVortexMasterTest.java | 31 ++- .../reef/vortex/driver/RunningWorkersTest.java | 6 +- .../vortex/driver/SchedulingPolicyTest.java | 120 ++++++++++ .../applications/vortex/addone/AddOneTest.java | 2 +- 18 files changed, 664 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index f1a55b7..206f7e3 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -20,6 +20,7 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.util.Optional; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.api.VortexFuture; @@ -74,9 +75,11 @@ final class DefaultVortexMaster implements VortexMaster { */ @Override public void workerPreempted(final String id) { - final Collection<Tasklet> preemptedTasklets = runningWorkers.removeWorker(id); - for (final Tasklet tasklet : preemptedTasklets) { - pendingTasklets.addFirst(tasklet); + final Optional<Collection<Tasklet>> preemptedTasklets = runningWorkers.removeWorker(id); + if (preemptedTasklets.isPresent()) { + for (final Tasklet tasklet : preemptedTasklets.get()) { + pendingTasklets.addFirst(tasklet); + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java new file mode 100644 index 0000000..6dfd8bb --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import javax.inject.Inject; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.util.Optional; + +/** + * Always select the next worker that has enough resource in a round-robin fashion + * based on the worker capacity configured. + */ +@NotThreadSafe +class FirstFitSchedulingPolicy implements SchedulingPolicy { + + private final int workerCapacity; + + /** + * Keep the load information for each worker. + */ + private final HashMap<String, Integer> idLoadMap = new HashMap<>(); + + /** + * A linked list for a circular buffer of worker ids for search in a round-robin fashion. + */ + private final List<String> idList = new ArrayList<>(); + + /** + * The index of the next/first worker to check. + */ + private int nextIndex = 0; + + + @Inject + FirstFitSchedulingPolicy(@Parameter(VortexMasterConf.WorkerCapacity.class) final int capacity) { + this.workerCapacity = capacity; + } + + /** + * Checking from nextIndex, choose the first worker that fits to schedule the tasklet onto. + * @param tasklet to schedule + * @return the next worker that has enough resources for the tasklet + */ + @Override + public Optional<String> trySchedule(final Tasklet tasklet) { + for (int i = 0; i < idList.size(); i++) { + final int index = (nextIndex + i) % idList.size(); + final String workerId = idList.get(index); + + if (idLoadMap.get(workerId) < workerCapacity) { + nextIndex = (index + 1) % idList.size(); + return Optional.of(workerId); + } + } + return Optional.empty(); + } + + /** + * @param vortexWorker added + */ + @Override + public void workerAdded(final VortexWorkerManager vortexWorker) { + final String workerId = vortexWorker.getId(); + if (!idLoadMap.containsKey(workerId)) { // Ignore duplicate add. + idLoadMap.put(workerId, 0); + idList.add(nextIndex, workerId); // Prefer to schedule the new worker ASAP. + } + } + + /** + * @param vortexWorker removed + */ + @Override + public void workerRemoved(final VortexWorkerManager vortexWorker) { + final String workerId = vortexWorker.getId(); + if (idLoadMap.remove(workerId) != null) { // Ignore invalid removal. + for (int i = 0; i < idList.size(); i++) { // This looping operation might degrade performance. + if (idList.get(i).equals(workerId)) { + idList.remove(i); + + if (i < nextIndex) { + nextIndex--; + } else if (nextIndex == idList.size()) { + nextIndex = 0; + } + return; + } + } + } + } + + /** + * @param vortexWorker that the tasklet was launched onto + * @param tasklet launched + */ + @Override + public void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + final String workerId = vortexWorker.getId(); + if (idLoadMap.containsKey(workerId)) { + idLoadMap.put(workerId, Math.min(workerCapacity, idLoadMap.get(workerId) + 1)); + } + } + + /** + * @param vortexWorker that the tasklet completed in + * @param tasklet completed + */ + @Override + public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + final String workerId = vortexWorker.getId(); + removeTasklet(workerId); + } + + /** + * @param vortexWorker that the tasklet failed in + * @param tasklet failed + */ + @Override + public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + final String workerId = vortexWorker.getId(); + removeTasklet(workerId); + } + + private void removeTasklet(final String workerId) { + if (idLoadMap.containsKey(workerId)) { + idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java new file mode 100644 index 0000000..adb2c1d --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Pending Tasklet Launcher. + */ +@DriverSide +final class PendingTaskletLauncher implements EventHandler<Integer> { + private static final Logger LOG = Logger.getLogger(PendingTaskletLauncher.class.getName()); + + private final RunningWorkers runningWorkers; + private final PendingTasklets pendingTasklets; + + @Inject + private PendingTaskletLauncher(final RunningWorkers runningWorkers, + final PendingTasklets pendingTasklets) { + this.runningWorkers = runningWorkers; + this.pendingTasklets = pendingTasklets; + } + + /** + * Repeatedly take a tasklet from the pending queue and launch it via RunningWorkers. + */ + @Override + public void onNext(final Integer integer) { + while (!runningWorkers.isTerminated()) { + try { + final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no tasklet exists + runningWorkers.launchTasklet(tasklet); // blocks when no worker exists + } catch (InterruptedException e) { + LOG.log(Level.INFO, "Interrupted upon termination"); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java deleted file mode 100644 index 6a0b8b0..0000000 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.vortex.driver; - -import org.apache.reef.annotations.audience.DriverSide; -import org.apache.reef.wake.EventHandler; - -import javax.inject.Inject; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Pending Tasklet Scheduler. - */ -@DriverSide -final class PendingTaskletScheduler implements EventHandler<Integer> { - private static final Logger LOG = Logger.getLogger(PendingTaskletScheduler.class.getName()); - - private final RunningWorkers runningWorkers; - private final PendingTasklets pendingTasklets; - - @Inject - private PendingTaskletScheduler(final RunningWorkers runningWorkers, - final PendingTasklets pendingTasklets) { - this.runningWorkers = runningWorkers; - this.pendingTasklets = pendingTasklets; - } - - /** - * Repeatedly take a tasklet from the pending queue and schedule/launch it via RunningWorkers. - */ - @Override - public void onNext(final Integer integer) { - while (!runningWorkers.isTerminated()) { - try { - final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no tasklet exists - runningWorkers.launchTasklet(tasklet); // blocks when no worker exists - } catch (InterruptedException e) { - LOG.log(Level.INFO, "Interrupted upon termination"); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java new file mode 100644 index 0000000..2f71cd7 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.reef.util.Optional; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import javax.inject.Inject; + +/** + * Randomly select a running worker for scheduling a tasklet, + * without considering worker load or capacity. + */ +@NotThreadSafe +class RandomSchedulingPolicy implements SchedulingPolicy { + private final Random rand = new Random(); + + /** + * Keep the worker ids in an array for fast random selection. + * + * Add/removal from the array require O(n) complexity. + */ + private final List<String> idList = new ArrayList<>(); + + @Inject + RandomSchedulingPolicy() { + } + + /** + * @param tasklet to schedule + * @return a random worker + */ + @Override + public Optional<String> trySchedule(final Tasklet tasklet) { + if (idList.isEmpty()) { + return Optional.empty(); + } else { + final int index = rand.nextInt(idList.size()); + return Optional.of(idList.get(index)); + } + } + + /** + * @param vortexWorker added + */ + @Override + public void workerAdded(final VortexWorkerManager vortexWorker) { + if (idList.indexOf(vortexWorker.getId()) == -1) { // Ignore duplicate add. + idList.add(vortexWorker.getId()); + } + } + + /** + * @param vortexWorker removed + */ + @Override + public void workerRemoved(final VortexWorkerManager vortexWorker) { + idList.remove(vortexWorker.getId()); // Ignore invalid removal. + } + + /** + * Do nothing. + */ + @Override + public void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + // Do nothing + } + + /** + * Do nothing. + */ + @Override + public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + // Do nothing + } + + /** + * Do nothing. + */ + @Override + public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) { + // Do nothing + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java index f712786..01be3a6 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -19,14 +19,17 @@ package org.apache.reef.vortex.driver; import net.jcip.annotations.ThreadSafe; + import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.util.Optional; import javax.inject.Inject; + import java.io.Serializable; import java.util.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; /** * Keeps track of all running VortexWorkers and Tasklets. @@ -37,22 +40,24 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; final class RunningWorkers { // RunningWorkers and its locks private final HashMap<String, VortexWorkerManager> runningWorkers = new HashMap<>(); // Running workers/tasklets - private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); // Need to acquire this to launch/complete tasklets - private final Lock writeLock = rwLock.writeLock(); // Need to acquire this to add/remove worker - private final Condition noRunningWorker = writeLock.newCondition(); // When there's no running worker + private final Lock lock = new ReentrantLock(); + private final Condition noWorkerOrResource = lock.newCondition(); // To keep track of workers that are preempted before acknowledged private final Set<String> removedBeforeAddedWorkers = new HashSet<>(); // Terminated - private volatile boolean terminated = false; + private boolean terminated = false; + + // Scheduling policy + private final SchedulingPolicy schedulingPolicy; /** * RunningWorkers constructor. */ @Inject - RunningWorkers() { + RunningWorkers(final SchedulingPolicy schedulingPolicy) { + this.schedulingPolicy = schedulingPolicy; } /** @@ -60,54 +65,49 @@ final class RunningWorkers { * Parameter: Called exactly once per vortexWorkerManager. */ void addWorker(final VortexWorkerManager vortexWorkerManager) { - if (!terminated) { - writeLock.lock(); - try { - if (!terminated) { // Make sure it is really terminated - if (!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) { - this.runningWorkers.put(vortexWorkerManager.getId(), vortexWorkerManager); - - // Notify (possibly) waiting scheduler - if (runningWorkers.size() == 1) { - noRunningWorker.signalAll(); - } - } - return; + lock.lock(); + try { + if (!terminated) { + if (!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) { + this.runningWorkers.put(vortexWorkerManager.getId(), vortexWorkerManager); + this.schedulingPolicy.workerAdded(vortexWorkerManager); + + // Notify (possibly) waiting scheduler + noWorkerOrResource.signal(); } - } finally { - writeLock.unlock(); + } else { + // Terminate the worker + vortexWorkerManager.terminate(); } + } finally { + lock.unlock(); } - - // Terminate the worker - vortexWorkerManager.terminate(); } /** * Concurrency: Called by multiple threads. * Parameter: Called exactly once per id. */ - Collection<Tasklet> removeWorker(final String id) { - if (!terminated) { - writeLock.lock(); - try { - if (!terminated) { // Make sure it is really terminated - final VortexWorkerManager vortexWorkerManager = this.runningWorkers.remove(id); - if (vortexWorkerManager != null) { - return vortexWorkerManager.removed(); - } else { - // Called before addWorker (e.g. RM preempted the resource before the Evaluator started) - removedBeforeAddedWorkers.add(id); - return new ArrayList<>(0); - } + Optional<Collection<Tasklet>> removeWorker(final String id) { + lock.lock(); + try { + if (!terminated) { + final VortexWorkerManager vortexWorkerManager = this.runningWorkers.remove(id); + if (vortexWorkerManager != null) { + this.schedulingPolicy.workerRemoved(vortexWorkerManager); + return Optional.ofNullable(vortexWorkerManager.removed()); + } else { + // Called before addWorker (e.g. RM preempted the resource before the Evaluator started) + removedBeforeAddedWorkers.add(id); + return Optional.empty(); } - } finally { - writeLock.unlock(); + } else { + // No need to return anything since it is terminated + return Optional.empty(); } + } finally { + lock.unlock(); } - - // No need to return anything since it is terminated - return new ArrayList<>(0); } /** @@ -115,35 +115,29 @@ final class RunningWorkers { * Parameter: Same tasklet can be launched multiple times. */ void launchTasklet(final Tasklet tasklet) { - if (!terminated) { - readLock.lock(); - try { - if (!terminated) { // Make sure it is really terminated - // Wait until there is a running worker - if (runningWorkers.isEmpty()) { - readLock.unlock(); - writeLock.lock(); + lock.lock(); + try { + if (!terminated) { + Optional<String> workerId; + while(true) { + workerId = schedulingPolicy.trySchedule(tasklet); + if (!workerId.isPresent()) { try { - while (runningWorkers.isEmpty()) { - try { - noRunningWorker.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - readLock.lock(); - } finally { - writeLock.unlock(); + noWorkerOrResource.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } else { + break; } - - final VortexWorkerManager vortexWorkerManager = randomlyChooseWorker(); - vortexWorkerManager.launchTasklet(tasklet); - return; } - } finally { - readLock.unlock(); + + final VortexWorkerManager vortexWorkerManager = runningWorkers.get(workerId.get()); + vortexWorkerManager.launchTasklet(tasklet); + schedulingPolicy.taskletLaunched(vortexWorkerManager, tasklet); } + } finally { + lock.unlock(); } } @@ -153,20 +147,22 @@ final class RunningWorkers { * (e.g. preemption message coming before tasklet completion message multiple times) */ void completeTasklet(final String workerId, - final int taskletId, - final Serializable result) { - if (!terminated) { - readLock.lock(); - try { - if (!terminated) { // Make sure it is really terminated - if (runningWorkers.containsKey(workerId)) { // Preemption can come before - this.runningWorkers.get(workerId).taskletCompleted(taskletId, result); - } - return; + final int taskletId, + final Serializable result) { + lock.lock(); + try { + if (!terminated) { + if (runningWorkers.containsKey(workerId)) { // Preemption can come before + final VortexWorkerManager worker = this.runningWorkers.get(workerId); + final Tasklet tasklet = worker.taskletCompleted(taskletId, result); + this.schedulingPolicy.taskletCompleted(worker, tasklet); + + // Notify (possibly) waiting scheduler + noWorkerOrResource.signal(); } - } finally { - readLock.unlock(); } + } finally { + lock.unlock(); } } @@ -176,60 +172,47 @@ final class RunningWorkers { * (e.g. preemption message coming before tasklet error message multiple times) */ void errorTasklet(final String workerId, - final int taskletId, - final Exception exception) { - if (!terminated) { - readLock.lock(); - try { - if (!terminated) { // Make sure it is really terminated - if (runningWorkers.containsKey(workerId)) { // Preemption can come before - this.runningWorkers.get(workerId).taskletThrewException(taskletId, exception); - } - return; + final int taskletId, + final Exception exception) { + lock.lock(); + try { + if (!terminated) { + if (runningWorkers.containsKey(workerId)) { // Preemption can come before + final VortexWorkerManager worker = this.runningWorkers.get(workerId); + final Tasklet tasklet = worker.taskletThrewException(taskletId, exception); + this.schedulingPolicy.taskletFailed(worker, tasklet); + + // Notify (possibly) waiting scheduler + noWorkerOrResource.signal(); } - } finally { - readLock.unlock(); } + } finally { + lock.unlock(); } } void terminate() { - if (!terminated) { - writeLock.lock(); - try { - if (!terminated) { // Make sure it is really terminated - terminated = true; - for (final VortexWorkerManager vortexWorkerManager : runningWorkers.values()) { - vortexWorkerManager.terminate(); - } - runningWorkers.clear(); - return; + lock.lock(); + try { + if (!terminated) { + terminated = true; + for (final VortexWorkerManager vortexWorkerManager : runningWorkers.values()) { + vortexWorkerManager.terminate(); + schedulingPolicy.workerRemoved(vortexWorkerManager); } - } finally { - writeLock.unlock(); + runningWorkers.clear(); + } else { + throw new RuntimeException("Attempting to terminate an already terminated RunningWorkers"); } + } finally { + lock.unlock(); } - throw new RuntimeException("Attempting to terminate an already terminated RunningWorkers"); } boolean isTerminated() { return terminated; } - private VortexWorkerManager randomlyChooseWorker() { - final Collection<VortexWorkerManager> workers = runningWorkers.values(); - final int index = new Random().nextInt(workers.size()); - int i = 0; - for (final VortexWorkerManager vortexWorkerManager : workers) { - if (i == index) { - return vortexWorkerManager; - } else { - i++; - } - } - throw new RuntimeException("Bad Index"); - } - ///////////////////////////////////////// For Tests Only /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java new file mode 100644 index 0000000..cef1cb6 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +/** + * For choosing which worker to schedule the tasklet onto. + */ +@DefaultImplementation(FirstFitSchedulingPolicy.class) +interface SchedulingPolicy { + /** + * Implementation of this method is expected to be fast. + * @param tasklet to schedule + * @return the worker onto which the tasklet should be scheduled, null if there's none + */ + Optional<String> trySchedule(final Tasklet tasklet); + + /** + * Worker added. + */ + void workerAdded(final VortexWorkerManager vortexWorker); + + /** + * Worker removed. + */ + void workerRemoved(final VortexWorkerManager vortexWorker); + + /** + * Tasklet launched. + */ + void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet); + + /** + * Tasklet completed. + */ + void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet); + + /** + * Tasklet failed. + */ + void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java index a7b9106..5c8244d 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java @@ -42,7 +42,8 @@ public final class VortexConfHelper { final Class<? extends VortexStart> vortexStart, final int numOfWorkers, final int workerMemory, - final int workerCores) { + final int workerCores, + final int workerCapacity) { final Configuration vortexDriverConf = DriverConfiguration.CONF .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(VortexDriver.class)) .set(DriverConfiguration.ON_DRIVER_STARTED, VortexDriver.StartHandler.class) @@ -57,6 +58,7 @@ public final class VortexConfHelper { .set(VortexMasterConf.WORKER_NUM, numOfWorkers) .set(VortexMasterConf.WORKER_MEM, workerMemory) .set(VortexMasterConf.WORKER_CORES, workerCores) + .set(VortexMasterConf.WORKER_CAPACITY, workerCapacity) .set(VortexMasterConf.VORTEX_START, vortexStart) .set(VortexMasterConf.NUM_OF_VORTEX_START_THERAD, DEFAULT_NUM_OF_VORTEX_START_THERAD) // fixed to 1 for now .build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java index ef3061b..977302f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java @@ -74,14 +74,14 @@ final class VortexDriver { final VortexMaster vortexMaster, final VortexStart vortexStart, final VortexStartExecutor vortexStartExecutor, - final PendingTaskletScheduler pendingTaskletScheduler, + final PendingTaskletLauncher pendingTaskletLauncher, @Parameter(VortexMasterConf.WorkerMem.class) final int workerMem, @Parameter(VortexMasterConf.WorkerNum.class) final int workerNum, @Parameter(VortexMasterConf.WorkerCores.class) final int workerCores, @Parameter(VortexMasterConf.NumberOfVortexStartThreads.class) final int numOfStartThreads) { this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, numOfStartThreads); this.vortexStart = vortexStart; - this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletScheduler, 1); + this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletLauncher, 1); this.evaluatorRequestor = evaluatorRequestor; this.vortexMaster = vortexMaster; this.vortexRequestor = vortexRequestor; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java index b288824..c7461e5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java @@ -43,12 +43,18 @@ public final class VortexLauncher { final Class<? extends VortexStart> vortexUserCode, final int numOfWorkers, final int workerMemory, - final int workerCores) { + final int workerCores, + final int workerCapacity) { final Configuration runtimeConf = LocalRuntimeConfiguration.CONF .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) .build(); - final Configuration vortexConf = - VortexConfHelper.getVortexConf(jobName, vortexUserCode, numOfWorkers, workerMemory, workerCores); + final Configuration vortexConf = VortexConfHelper.getVortexConf( + jobName, + vortexUserCode, + numOfWorkers, + workerMemory, + workerCores, + workerCapacity); return launch(runtimeConf, vortexConf); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java index 358dcb7..054afe5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java @@ -22,10 +22,7 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.tang.formats.ConfigurationModuleBuilder; -import org.apache.reef.tang.formats.RequiredImpl; -import org.apache.reef.tang.formats.RequiredParameter; +import org.apache.reef.tang.formats.*; import org.apache.reef.vortex.api.VortexStart; /** @@ -56,6 +53,13 @@ public final class VortexMasterConf extends ConfigurationModuleBuilder { } /** + * Worker Capacity. + */ + @NamedParameter(doc = "Worker Capacity") + final class WorkerCapacity implements Name<Integer> { + } + + /** * Number of Vortex Start Threads. */ @NamedParameter(doc = "Number of Vortex Start Threads") @@ -78,6 +82,11 @@ public final class VortexMasterConf extends ConfigurationModuleBuilder { public static final RequiredParameter<Integer> WORKER_CORES = new RequiredParameter<>(); /** + * Worker Cores. + */ + public static final OptionalParameter<Integer> WORKER_CAPACITY = new OptionalParameter<>(); + + /** * Vortex Start. */ public static final RequiredImpl<VortexStart> VORTEX_START = new RequiredImpl<>(); @@ -94,6 +103,7 @@ public final class VortexMasterConf extends ConfigurationModuleBuilder { .bindNamedParameter(WorkerNum.class, WORKER_NUM) .bindNamedParameter(WorkerMem.class, WORKER_MEM) .bindNamedParameter(WorkerCores.class, WORKER_CORES) + .bindNamedParameter(WorkerCapacity.class, WORKER_CAPACITY) .bindImplementation(VortexStart.class, VORTEX_START) .bindNamedParameter(NumberOfVortexStartThreads.class, NUM_OF_VORTEX_START_THERAD) .build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java index 5aa4759..3b11ad6 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java @@ -18,24 +18,24 @@ */ package org.apache.reef.vortex.driver; -import net.jcip.annotations.ThreadSafe; +import net.jcip.annotations.NotThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.task.RunningTask; import org.apache.reef.vortex.common.TaskletExecutionRequest; import java.io.Serializable; import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; +import java.util.HashMap; /** * Representation of a VortexWorkerManager in Driver. */ -@ThreadSafe +@NotThreadSafe @DriverSide class VortexWorkerManager { private final VortexRequestor vortexRequestor; private final RunningTask reefTask; - private final ConcurrentHashMap<Integer, Tasklet> runningTasklets = new ConcurrentHashMap<>(); + private final HashMap<Integer, Tasklet> runningTasklets = new HashMap<>(); VortexWorkerManager(final VortexRequestor vortexRequestor, final RunningTask reefTask) { this.vortexRequestor = vortexRequestor; @@ -51,23 +51,22 @@ class VortexWorkerManager { vortexRequestor.send(reefTask, taskletExecutionRequest); } - <TOutput extends Serializable> - void taskletCompleted(final Integer taskletId, final TOutput result) { + <TOutput extends Serializable> Tasklet taskletCompleted(final Integer taskletId, final TOutput result) { final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId); - if (tasklet != null) { // Tasklet should complete/error only once - tasklet.completed(result); - } + assert(tasklet != null); // Tasklet should complete/error only once + tasklet.completed(result); + return tasklet; } - void taskletThrewException(final Integer taskletId, final Exception exception) { + Tasklet taskletThrewException(final Integer taskletId, final Exception exception) { final Tasklet tasklet = runningTasklets.remove(taskletId); - if (tasklet != null) { // Tasklet should complete/error only once - tasklet.threwException(exception); - } + assert(tasklet != null); // Tasklet should complete/error only once + tasklet.threwException(exception); + return tasklet; } Collection<Tasklet> removed() { - return runningTasklets.values(); + return runningTasklets.isEmpty() ? null : runningTasklets.values(); } void terminate() { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java index 4e27ba9..25573fc 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java @@ -31,6 +31,6 @@ final class AddOne { * Launch the vortex job, passing appropriate arguments. */ public static void main(final String[] args) { - VortexLauncher.launchLocal("Vortex_Example_AddOne", AddOneStart.class, 2, 1024, 4); + VortexLauncher.launchLocal("Vortex_Example_AddOne", AddOneStart.class, 2, 1024, 4, 2000); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java index 565bce2..c4e3b6f 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java @@ -31,6 +31,6 @@ final class HelloVortex { * Launch the vortex job, passing appropriate arguments. */ public static void main(final String[] args) { - VortexLauncher.launchLocal("Vortex_Example_HelloVortex", HelloVortexStart.class, 1, 1024, 1); + VortexLauncher.launchLocal("Vortex_Example_HelloVortex", HelloVortexStart.class, 1, 1024, 1, 2000); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java index f2c2f92..c5c0b30 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java @@ -41,13 +41,13 @@ public class DefaultVortexMasterTest { public void testSingleTaskletNoFailure() throws Exception { final VortexFunction vortexFunction = testUtil.newFunction(); final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); - final RunningWorkers runningWorkers = new RunningWorkers(); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); final PendingTasklets pendingTasklets = new PendingTasklets(); final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets); vortexMaster.workerAllocated(vortexWorkerManager1); final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null); - final ArrayList<Integer> taskletIds = scheduleTasklets(runningWorkers, pendingTasklets, 1); + final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1); for (final int taskletId : taskletIds) { vortexMaster.taskletCompleted(vortexWorkerManager1.getId(), taskletId, null); } @@ -63,14 +63,14 @@ public class DefaultVortexMasterTest { final VortexFunction vortexFunction = testUtil.newFunction(); final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(); final VortexWorkerManager vortexWorkerManager2 = testUtil.newWorker(); - final RunningWorkers runningWorkers = new RunningWorkers(); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); final PendingTasklets pendingTasklets = new PendingTasklets(); final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets); // Allocate worker & tasklet and schedule vortexMaster.workerAllocated(vortexWorkerManager1); final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null); - final ArrayList<Integer> taskletIds1 = scheduleTasklets(runningWorkers, pendingTasklets, 1); + final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, 1); // Preemption! vortexMaster.workerPreempted(vortexWorkerManager1.getId()); @@ -78,7 +78,7 @@ public class DefaultVortexMasterTest { // New resource allocation and scheduling vortexMaster.workerAllocated(vortexWorkerManager2); - final ArrayList<Integer> taskletIds2 = scheduleTasklets(runningWorkers, pendingTasklets, 1); + final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, pendingTasklets, 1); assertEquals("Both lists need to contain the same single tasklet id", taskletIds1, taskletIds2); // Completed? @@ -95,13 +95,13 @@ public class DefaultVortexMasterTest { public void testMultipleTaskletsFailure() throws Exception { // The tasklets that need to be executed final ArrayList<VortexFuture> vortexFutures = new ArrayList<>(); - final RunningWorkers runningWorkers = new RunningWorkers(); + final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); final PendingTasklets pendingTasklets = new PendingTasklets(); final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets); // Allocate iniital evaluators (will all be preempted later...) final List<VortexWorkerManager> initialWorkers = new ArrayList<>(); - final int numOfWorkers = 10; + final int numOfWorkers = 100; for (int i = 0; i < numOfWorkers; i++) { final VortexWorkerManager vortexWorkerManager = testUtil.newWorker(); initialWorkers.add(vortexWorkerManager); @@ -113,7 +113,7 @@ public class DefaultVortexMasterTest { for (int i = 0; i < numOfTasklets; i++) { vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null)); } - final ArrayList<Integer> taskletIds1 = scheduleTasklets(runningWorkers, pendingTasklets, numOfTasklets); + final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets); // Preempt all evaluators for (int i = 0; i < numOfWorkers; i++) { @@ -124,7 +124,7 @@ public class DefaultVortexMasterTest { for (int i = 0; i < numOfWorkers; i++) { vortexMaster.workerAllocated(testUtil.newWorker()); } - final ArrayList<Integer> taskletIds2 = scheduleTasklets(runningWorkers, pendingTasklets, numOfTasklets); + final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets); assertEquals("Must contain same tasklet ids", new HashSet<>(taskletIds1), new HashSet<>(taskletIds2)); // Completed? @@ -139,19 +139,18 @@ public class DefaultVortexMasterTest { } /** - * Schedule specified number of tasklets. - * @return ids of scheduled tasklets + * Launch specified number of tasklets as a substitute for PendingTaskletLauncher. + * @return ids of launched tasklets */ - private ArrayList<Integer> scheduleTasklets(final RunningWorkers runningWorkers, - final PendingTasklets pendingTasklets, - final int numOfTasklets) throws InterruptedException { + private ArrayList<Integer> launchTasklets(final RunningWorkers runningWorkers, + final PendingTasklets pendingTasklets, + final int numOfTasklets) throws InterruptedException { final ArrayList<Integer> taskletIds = new ArrayList<>(); for (int i = 0; i < numOfTasklets; i++) { final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no tasklet exists assertNotNull("Tasklet should exist in the pending queue", tasklet); - + runningWorkers.launchTasklet(tasklet); // blocks when no resource exists taskletIds.add(tasklet.getId()); - runningWorkers.launchTasklet(tasklet); // blocks when no worker exists } return taskletIds; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java index 03c671e..7cd0847 100644 --- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java @@ -30,7 +30,7 @@ import static org.junit.Assert.assertTrue; * Test Possible Race Conditions. */ public class RunningWorkersTest { - private final RunningWorkers runningWorkers = new RunningWorkers(); + private final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy()); private final TestUtil testUtil = new TestUtil(); /** @@ -40,7 +40,7 @@ public class RunningWorkersTest { @Test public void removeExecutorAndAddExecutor() throws Exception { final VortexWorkerManager vortexWorkerManager = testUtil.newWorker(); - assertEquals("Must be no running tasklets", 0, runningWorkers.removeWorker(vortexWorkerManager.getId()).size()); + assertFalse("Must be no running tasklets", runningWorkers.removeWorker(vortexWorkerManager.getId()).isPresent()); runningWorkers.addWorker(vortexWorkerManager); assertFalse("Executor should not be running", runningWorkers.isWorkerRunning(vortexWorkerManager.getId())); } @@ -55,7 +55,7 @@ public class RunningWorkersTest { final Tasklet tasklet = testUtil.newTasklet(); runningWorkers.addWorker(vortexWorkerManager); runningWorkers.launchTasklet(tasklet); // blocks when no worker exists - final Collection<Tasklet> tasklets = runningWorkers.removeWorker(vortexWorkerManager.getId()); + final Collection<Tasklet> tasklets = runningWorkers.removeWorker(vortexWorkerManager.getId()).get(); assertEquals("Only 1 Tasklet must have been running", 1, tasklets.size()); assertTrue("This Tasklet must have been running", tasklets.contains(tasklet)); runningWorkers.completeTasklet(vortexWorkerManager.getId(), tasklet.getId(), null); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java new file mode 100644 index 0000000..fea4235 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.driver; + +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.Deque; + +import static org.junit.Assert.*; + +/** + * Test SchedulingPolicy. + */ +public class SchedulingPolicyTest { + private final TestUtil testUtil = new TestUtil(); + + /** + * Test common traits of different scheduling policies. + */ + @Test + public void testCommon() throws Exception { + commonPolicyTests(new RandomSchedulingPolicy()); + commonPolicyTests(new FirstFitSchedulingPolicy(10)); + } + + /** + * Test FirstFitSchedulingPolicy without preemption events. + */ + @Test + public void testFirstFitNoPreemption() throws Exception { + final int workerCapacity = 1; + final int numOfWorkers = 5; + final FirstFitSchedulingPolicy policy = new FirstFitSchedulingPolicy(workerCapacity); + + // Add workers + final Deque<VortexWorkerManager> workers = new ArrayDeque<>(); + for (int i = 0; i < numOfWorkers; i++) { + final VortexWorkerManager worker = testUtil.newWorker(); + workers.addFirst(worker); + policy.workerAdded(worker); + } + + // Launch 1 tasklet per worker + for (final VortexWorkerManager worker : workers) { + final Tasklet tasklet = testUtil.newTasklet(); + assertEquals("This should be the first fit", worker.getId(), policy.trySchedule(tasklet).get()); + policy.taskletLaunched(worker, tasklet); + } + + // When all workers are full... + assertFalse("All workers should be full", policy.trySchedule(testUtil.newTasklet()).isPresent()); + } + + /** + * Test FirstFitSchedulingPolicy with preemption events. + */ + @Test + public void testFirstFitPreemptions() throws Exception { + final int workerCapacity = 1; + final int numOfWorkers = 10; + final FirstFitSchedulingPolicy policy = new FirstFitSchedulingPolicy(workerCapacity); + + // Add workers and make the odd ones full + final ArrayDeque<VortexWorkerManager> evenWorkers = new ArrayDeque<>(); + for (int i = 0; i < numOfWorkers; i++) { + final VortexWorkerManager worker = testUtil.newWorker(); + policy.workerAdded(worker); + + if (i % 2 == 1) { + policy.taskletLaunched(worker, testUtil.newTasklet()); + } else { + evenWorkers.addFirst(worker); + } + } + + // Check whether the policy returns even ones in order + for (final VortexWorkerManager worker : evenWorkers) { + final Tasklet tasklet = testUtil.newTasklet(); + assertEquals("This should be the first fit", worker.getId(), policy.trySchedule(tasklet).get()); + policy.taskletLaunched(worker, tasklet); + } + + // When all workers are full... + assertFalse("All workers should be full", policy.trySchedule(testUtil.newTasklet()).isPresent()); + } + + /** + * Simple common tests. + */ + private void commonPolicyTests(final SchedulingPolicy policy) { + // Initial state + assertFalse("No worker added yet", policy.trySchedule(testUtil.newTasklet()).isPresent()); + + // One worker added + final VortexWorkerManager worker = testUtil.newWorker(); + policy.workerAdded(worker); + assertEquals("Only one worker exists", worker.getId(), policy.trySchedule(testUtil.newTasklet()).get()); + + // One worker removed + policy.workerRemoved(worker); + assertFalse("No worker exists", policy.trySchedule(testUtil.newTasklet()).isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java index f935298..1fc2d7a 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java @@ -56,7 +56,7 @@ public final class AddOneTest { @Test public void testVortexAddOne() { final Configuration conf = - VortexConfHelper.getVortexConf("TEST_Vortex_AddOneTest", AddOneTestStart.class, 2, 1024, 4); + VortexConfHelper.getVortexConf("TEST_Vortex_AddOneTest", AddOneTestStart.class, 2, 1024, 4, 2000); final LauncherStatus status = this.testEnvironment.run(conf); Assert.assertTrue("Job state after execution: " + status, status.isSuccess()); }