Repository: incubator-reef Updated Branches: refs/heads/master 58811a7cc -> 531ddd9f3
[REEF-695] Reorganize reef-examples/scheduler classes into packages Move classes into client and driver packages. Task classes are in a separate package, o.a.r.examples.library. JIRA: [REEF-695](https://issues.apache.org/jira/browse/REEF-695) Pull Request: Closes #442 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/531ddd9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/531ddd9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/531ddd9f Branch: refs/heads/master Commit: 531ddd9f33df0e2c6a9b63bfb876aeb9a51a8196 Parents: 58811a7 Author: Brian Cho <chobr...@apache.org> Authored: Sun Aug 30 22:46:21 2015 +0900 Committer: Yunseong Lee <yunse...@apache.org> Committed: Mon Aug 31 08:39:35 2015 +0900 ---------------------------------------------------------------------- .../reef/examples/scheduler/Scheduler.java | 231 ------------- .../examples/scheduler/SchedulerDriver.java | 339 ------------------ .../scheduler/SchedulerHttpHandler.java | 107 ------ .../reef/examples/scheduler/SchedulerREEF.java | 119 ------- .../examples/scheduler/SchedulerREEFYarn.java | 52 --- .../examples/scheduler/SchedulerResponse.java | 114 ------- .../reef/examples/scheduler/TaskEntity.java | 79 ----- .../scheduler/client/SchedulerREEF.java | 121 +++++++ .../scheduler/client/SchedulerREEFYarn.java | 52 +++ .../examples/scheduler/client/package-info.java | 22 ++ .../examples/scheduler/driver/Scheduler.java | 231 +++++++++++++ .../scheduler/driver/SchedulerDriver.java | 340 +++++++++++++++++++ .../scheduler/driver/SchedulerHttpHandler.java | 107 ++++++ .../scheduler/driver/SchedulerResponse.java | 114 +++++++ .../examples/scheduler/driver/TaskEntity.java | 79 +++++ .../examples/scheduler/driver/package-info.java | 22 ++ 16 files changed, 1088 insertions(+), 1041 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java deleted file mode 100644 index 0c79172..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler; - -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.task.TaskConfiguration; -import org.apache.reef.examples.library.Command; -import org.apache.reef.examples.library.ShellTask; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Configurations; -import org.apache.reef.tang.Tang; - -import javax.annotation.concurrent.ThreadSafe; -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * The body of Task scheduler. It owns a task queue - * and tracks the record of scheduled tasks. - */ -@ThreadSafe -final class Scheduler { - /** - * Tasks are waiting to be scheduled in the queue. - */ - private final Queue<TaskEntity> taskQueue; - - /** - * Lists of {@link TaskEntity} for different states - Running / Finished / Canceled. - */ - private final List<TaskEntity> runningTasks = new ArrayList<>(); - private final List<TaskEntity> finishedTasks = new ArrayList<>(); - private final List<TaskEntity> canceledTasks = new ArrayList<>(); - - /** - * Counts how many tasks have been scheduled. - */ - private final AtomicInteger taskCount = new AtomicInteger(0); - - @Inject - private Scheduler() { - taskQueue = new LinkedBlockingQueue<>(); - } - - /** - * Submit a task to the ActiveContext. - */ - public synchronized void submitTask(final ActiveContext context) { - final TaskEntity task = taskQueue.poll(); - final Integer taskId = task.getId(); - final String command = task.getCommand(); - - final Configuration taskConf = TaskConfiguration.CONF - .set(TaskConfiguration.TASK, ShellTask.class) - .set(TaskConfiguration.IDENTIFIER, taskId.toString()) - .build(); - final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder() - .bindNamedParameter(Command.class, command) - .build(); - - final Configuration merged = Configurations.merge(taskConf, commandConf); - context.submitTask(merged); - runningTasks.add(task); - } - - /** - * Update the record of task to mark it as canceled. - */ - public synchronized SchedulerResponse cancelTask(final int taskId) { - if (getTask(taskId, runningTasks) != null) { - return SchedulerResponse.forbidden("The task " + taskId + " is running"); - } else if (getTask(taskId, finishedTasks) != null) { - return SchedulerResponse.forbidden("The task " + taskId + " has been finished"); - } - - final TaskEntity task = getTask(taskId, taskQueue); - if (task == null) { - final String message = - new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString(); - return SchedulerResponse.notFound(message); - } else { - taskQueue.remove(task); - canceledTasks.add(task); - return SchedulerResponse.ok("Canceled " + taskId); - } - } - - /** - * Clear the pending list. - */ - public synchronized SchedulerResponse clear() { - final int count = taskQueue.size(); - for (final TaskEntity task : taskQueue) { - canceledTasks.add(task); - } - taskQueue.clear(); - return SchedulerResponse.ok(count + " tasks removed."); - } - - /** - * Get the list of Tasks, which are grouped by the states. - */ - public synchronized SchedulerResponse getList() { - final StringBuilder sb = new StringBuilder(); - sb.append("Running :"); - for (final TaskEntity running : runningTasks) { - sb.append(" ").append(running.getId()); - } - - sb.append("\nWaiting :"); - for (final TaskEntity waiting : taskQueue) { - sb.append(" ").append(waiting.getId()); - } - - sb.append("\nFinished :"); - for (final TaskEntity finished : finishedTasks) { - sb.append(" ").append(finished.getId()); - } - - sb.append("\nCanceled :"); - for (final TaskEntity canceled : canceledTasks) { - sb.append(" ").append(canceled.getId()); - } - return SchedulerResponse.ok(sb.toString()); - } - - /** - * Get the status of a Task. - */ - public synchronized SchedulerResponse getTaskStatus(final int taskId) { - - for (final TaskEntity running : runningTasks) { - if (taskId == running.getId()) { - return SchedulerResponse.ok("Running : " + running.toString()); - } - } - - for (final TaskEntity waiting : taskQueue) { - if (taskId == waiting.getId()) { - return SchedulerResponse.ok("Waiting : " + waiting.toString()); - } - } - - for (final TaskEntity finished : finishedTasks) { - if (taskId == finished.getId()) { - return SchedulerResponse.ok("Finished : " + finished.toString()); - } - } - - for (final TaskEntity finished : canceledTasks) { - if (taskId == finished.getId()) { - return SchedulerResponse.ok("Canceled: " + finished.toString()); - } - } - return SchedulerResponse.notFound( - new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString()); - } - - /** - * Assigns a TaskId to submit. - */ - public synchronized int assignTaskId() { - return taskCount.incrementAndGet(); - } - - /** - * Add a task to the queue. - */ - public synchronized void addTask(final TaskEntity task) { - taskQueue.add(task); - } - - /** - * Check whether there are tasks waiting to be submitted. - */ - public synchronized boolean hasPendingTasks() { - return !taskQueue.isEmpty(); - } - - /** - * Get the number of pending tasks in the queue. - */ - public synchronized int getNumPendingTasks() { - return taskQueue.size(); - } - - /** - * Update the record of task to mark it as finished. - */ - public synchronized void setFinished(final int taskId) { - final TaskEntity task = getTask(taskId, runningTasks); - runningTasks.remove(task); - finishedTasks.add(task); - } - - /** - * Iterate over the collection to find a TaskEntity with ID. - */ - private TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) { - TaskEntity result = null; - for (final TaskEntity task : tasks) { - if (taskId == task.getId()) { - result = task; - break; - } - } - return result; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java deleted file mode 100644 index d8a7059..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler; - -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ContextConfiguration; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorRequest; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.driver.task.CompletedTask; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; -import org.apache.reef.wake.time.event.StartTime; - -import javax.annotation.concurrent.GuardedBy; -import javax.inject.Inject; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Driver for TaskScheduler. It receives the commands by HttpRequest and - * execute them in a FIFO(First In First Out) order. - */ -@Unit -public final class SchedulerDriver { - - public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); - private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName()); - - /** - * Possible states of the job driver. Can be one of: - * <dl> - * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an evaluator.</dd> - * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator allocated with no active evaluators.</dd> - * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a new Task arrives.</dd> - * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd> - * </dl> - */ - private enum State { - INIT, WAIT_EVALUATORS, READY, RUNNING - } - - /** - * If true, it reuses evaluators when Tasks done. - */ - private boolean retainable; - - @GuardedBy("SchedulerDriver.this") - private State state = State.INIT; - - @GuardedBy("SchedulerDriver.this") - private Scheduler scheduler; - - @GuardedBy("SchedulerDriver.this") - private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0; - - private final EvaluatorRequestor requestor; - - @Inject - private SchedulerDriver(final EvaluatorRequestor requestor, - @Parameter(SchedulerREEF.Retain.class) final boolean retainable, - final Scheduler scheduler) { - this.requestor = requestor; - this.scheduler = scheduler; - this.retainable = retainable; - } - - /** - * The driver is ready to run. - */ - final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - LOG.log(Level.INFO, "Driver started at {0}", startTime); - assert (state == State.INIT); - state = State.WAIT_EVALUATORS; - - requestEvaluator(1); // Allocate an initial evaluator to avoid idle state. - } - } - - /** - * Evaluator is allocated. This occurs every time to run commands in Non-retainable version, - * while occurs only once in the Retainable version - */ - final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator evaluator) { - LOG.log(Level.INFO, "Evaluator is ready"); - synchronized (SchedulerDriver.this) { - nActiveEval++; - nRequestedEval--; - } - - evaluator.submitContext(ContextConfiguration.CONF - .set(ContextConfiguration.IDENTIFIER, "SchedulerContext") - .build()); - } - } - - /** - * Now it is ready to schedule tasks. But if the queue is empty, - * wait until commands coming up. - * - * If there is no pending task, having more than 1 evaluators must be redundant. - * It may happen, for example, when tasks are canceled during allocation. - * In these cases, the new evaluator may be abandoned. - */ - final class ActiveContextHandler implements EventHandler<ActiveContext> { - @Override - public void onNext(final ActiveContext context) { - synchronized (SchedulerDriver.this) { - LOG.log(Level.INFO, "Context available : {0}", context.getId()); - - if (scheduler.hasPendingTasks()) { - state = State.RUNNING; - scheduler.submitTask(context); - } else if (nActiveEval > 1) { - nActiveEval--; - context.close(); - } else { - state = State.READY; - waitForCommands(context); - } - } - } - } - - /** - * When a Task completes, the task is marked as finished. - * The evaluator is reused for the next Task if retainable is set to {@code true}. - * Otherwise the evaluator is released. - */ - final class CompletedTaskHandler implements EventHandler<CompletedTask> { - @Override - public void onNext(final CompletedTask task) { - final int taskId = Integer.valueOf(task.getId()); - - synchronized (SchedulerDriver.this) { - scheduler.setFinished(taskId); - - LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable)); - final ActiveContext context = task.getActiveContext(); - - if (retainable) { - retainEvaluator(context); - } else { - reallocateEvaluator(context); - } - } - } - } - - /** - * Get the list of tasks in the scheduler. - */ - public synchronized SchedulerResponse getList() { - return scheduler.getList(); - } - - /** - * Clear all the Tasks from the waiting queue. - */ - public synchronized SchedulerResponse clearList() { - return scheduler.clear(); - } - - /** - * Get the status of a task. - */ - public SchedulerResponse getTaskStatus(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : only one ID at a time"); - } - - final Integer taskId = Integer.valueOf(args.get(0)); - - synchronized (SchedulerDriver.this) { - return scheduler.getTaskStatus(taskId); - } - } - - /** - * Cancel a Task waiting on the queue. A task cannot be canceled - * once it is running. - */ - public SchedulerResponse cancelTask(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : only one ID at a time"); - } - - final Integer taskId = Integer.valueOf(args.get(0)); - - synchronized (SchedulerDriver.this) { - return scheduler.cancelTask(taskId); - } - } - - /** - * Submit a command to schedule. - */ - public SchedulerResponse submitCommands(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : only one command at a time"); - } - - final String command = args.get(0); - final Integer id; - - synchronized (SchedulerDriver.this) { - id = scheduler.assignTaskId(); - scheduler.addTask(new TaskEntity(id, command)); - - if (state == State.READY) { - SchedulerDriver.this.notify(); // Wake up at {waitForCommands} - } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) { - requestEvaluator(1); - } - } - return SchedulerResponse.ok("Task ID : " + id); - } - - /** - * Update the maximum number of evaluators to hold. - * Request more evaluators in case there are pending tasks - * in the queue and the number of evaluators is less than the limit. - */ - public SchedulerResponse setMaxEvaluators(final List<String> args) { - if (args.size() != 1) { - return SchedulerResponse.badRequest("Usage : Only one value can be used"); - } - - final int nTarget = Integer.valueOf(args.get(0)); - - synchronized (SchedulerDriver.this) { - if (nTarget < nActiveEval + nRequestedEval) { - return SchedulerResponse.forbidden(nActiveEval + nRequestedEval + - " evaluators are used now. Should be larger than that."); - } - nMaxEval = nTarget; - - if (scheduler.hasPendingTasks()) { - final int nToRequest = - Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval; - requestEvaluator(nToRequest); - } - return SchedulerResponse.ok("You can use evaluators up to " + nMaxEval + " evaluators."); - } - } - - /** - * Request evaluators. Passing a non positive number is illegal, - * so it does not make a trial for that situation. - */ - private void requestEvaluator(final int numToRequest) { - if (numToRequest <= 0) { - throw new IllegalArgumentException("The number of evaluator request should be a positive integer"); - } - - synchronized (SchedulerDriver.this) { - nRequestedEval += numToRequest; - requestor.submit(EvaluatorRequest.newBuilder() - .setMemory(32) - .setNumber(numToRequest) - .build()); - } - } - - /** - * Pick up a command from the queue and run it. Wait until - * any command coming up if no command exists. - */ - private void waitForCommands(final ActiveContext context) { - synchronized (SchedulerDriver.this) { - while (!scheduler.hasPendingTasks()) { - // Wait until any command enters in the queue - try { - SchedulerDriver.this.wait(); - } catch (final InterruptedException e) { - LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e); - } - } - // When wakes up, run the first command from the queue. - state = State.RUNNING; - scheduler.submitTask(context); - } - } - - /** - * Retain the complete evaluators submitting another task - * until there is no need to reuse them. - */ - private synchronized void retainEvaluator(final ActiveContext context) { - if (scheduler.hasPendingTasks()) { - scheduler.submitTask(context); - } else if (nActiveEval > 1) { - nActiveEval--; - context.close(); - } else { - state = State.READY; - waitForCommands(context); - } - } - - /** - * Always close the complete evaluators and - * allocate a new evaluator if necessary. - */ - private synchronized void reallocateEvaluator(final ActiveContext context) { - nActiveEval--; - context.close(); - - if (scheduler.hasPendingTasks()) { - requestEvaluator(1); - } else if (nActiveEval <= 0) { - state = State.WAIT_EVALUATORS; - requestEvaluator(1); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java deleted file mode 100644 index f390370..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler; - -import org.apache.reef.tang.InjectionFuture; -import org.apache.reef.webserver.HttpHandler; -import org.apache.reef.webserver.ParsedHttpRequest; - -import javax.inject.Inject; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Receive HttpRequest so that it can handle the command list. - */ -final class SchedulerHttpHandler implements HttpHandler { - private final InjectionFuture<SchedulerDriver> schedulerDriver; - - private String uriSpecification = "reef-example-scheduler"; - - @Inject - private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) { - this.schedulerDriver = schedulerDriver; - } - - @Override - public String getUriSpecification() { - return uriSpecification; - } - - @Override - public void setUriSpecification(final String s) { - uriSpecification = s; - } - - /** - * HttpRequest handler. You must specify UriSpecification and REST API version. - * The request url is http://{address}:{port}/reef-example-scheduler/v1 - * - * APIs - * /list to get the status list for all tasks - * /status?id={id} to query the status of such a task, given id - * /submit?cmd={cmd} to submit a Task, which returns its id - * /cancel?id={id} to cancel the task's execution - * /max-eval?num={num} to set the maximum number of evaluators - * /clear to clear the waiting queue - */ - @Override - public void onHttpRequest(final ParsedHttpRequest request, final HttpServletResponse response) - throws IOException, ServletException { - final String target = request.getTargetEntity().toLowerCase(); - final Map<String, List<String>> queryMap = request.getQueryMap(); - - final SchedulerResponse result; - switch (target) { - case "list": - result = schedulerDriver.get().getList(); - break; - case "clear": - result = schedulerDriver.get().clearList(); - break; - case "status": - result = schedulerDriver.get().getTaskStatus(queryMap.get("id")); - break; - case "submit": - result = schedulerDriver.get().submitCommands(queryMap.get("cmd")); - break; - case "cancel": - result = schedulerDriver.get().cancelTask(queryMap.get("id")); - break; - case "max-eval": - result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num")); - break; - default: - result = SchedulerResponse.notFound("Unsupported operation"); - } - - // Send response to the http client - final int status = result.getStatus(); - final String message= result.getMessage(); - - if (result.isOK()) { - response.getOutputStream().println(message); - } else { - response.sendError(status, message); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java deleted file mode 100644 index 33bb388..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler; - -import org.apache.commons.cli.ParseException; -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.client.REEF; -import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Configurations; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.CommandLine; -import org.apache.reef.util.EnvironmentUtils; -import org.apache.reef.webserver.HttpHandlerConfiguration; - -import java.io.IOException; - -/** - * REEF TaskScheduler. - */ -public final class SchedulerREEF { - /** - * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. - */ - private static final int MAX_NUMBER_OF_EVALUATORS = 3; - - /** - * Command line parameter = true to reuse evaluators,. - * or false to allocate/close for each iteration - */ - @NamedParameter(doc = "Whether or not to reuse evaluators", - short_name = "retain", default_value = "true") - public static final class Retain implements Name<Boolean> { - } - - /** - * @return The http configuration to use reef-webserver - */ - private static Configuration getHttpConf() { - final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF - .set(HttpHandlerConfiguration.HTTP_HANDLERS, SchedulerHttpHandler.class) - .build(); - return httpHandlerConf; - } - - /** - * @return The Driver configuration. - */ - private static Configuration getDriverConf() { - final Configuration driverConf = DriverConfiguration.CONF - .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SchedulerDriver.class)) - .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler") - .set(DriverConfiguration.ON_DRIVER_STARTED, SchedulerDriver.StartHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SchedulerDriver.EvaluatorAllocatedHandler.class) - .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SchedulerDriver.ActiveContextHandler.class) - .set(DriverConfiguration.ON_TASK_COMPLETED, SchedulerDriver.CompletedTaskHandler.class) - .build(); - - return driverConf; - } - - /** - * Run the Task scheduler. If '-retain true' option is passed via command line, - * the scheduler reuses evaluators to submit new Tasks. - * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc) - * @param args Command line arguments. - * @throws InjectionException - * @throws java.io.IOException - */ - public static void runTaskScheduler(final Configuration runtimeConf, final String[] args) - throws InjectionException, IOException, ParseException { - final Tang tang = Tang.Factory.getTang(); - - final Configuration commandLineConf = CommandLine.parseToConfiguration(args, Retain.class); - - // Merge the configurations to run Driver - final Configuration driverConf = Configurations.merge(getDriverConf(), getHttpConf(), commandLineConf); - - final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class); - reef.submit(driverConf); - } - - /** - * Main program. - * @param args - * @throws InjectionException - */ - public static void main(final String[] args) throws InjectionException, IOException, ParseException { - final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF - .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) - .build(); - runTaskScheduler(runtimeConfiguration, args); - } - - /** - * Empty private constructor to prohibit instantiation of utility class. - */ - private SchedulerREEF() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java deleted file mode 100644 index 7911f45..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler; - -import org.apache.commons.cli.ParseException; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.exceptions.InjectionException; - -import java.io.IOException; - -import static org.apache.reef.examples.scheduler.SchedulerREEF.runTaskScheduler; - -/** - * REEF TaskScheduler on YARN runtime. - */ -public final class SchedulerREEFYarn { - /** - * Launch the scheduler with YARN client configuration. - * @param args - * @throws InjectionException - * @throws java.io.IOException - */ - public static void main(final String[] args) - throws InjectionException, IOException, ParseException { - final Configuration runtimeConfiguration = - YarnClientConfiguration.CONF.build(); - runTaskScheduler(runtimeConfiguration, args); - } - - /** - * Empty private constructor to prohibit instantiation of utility class. - */ - private SchedulerREEFYarn() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java deleted file mode 100644 index 65c6a4f..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler; - -/** - * This class specifies the response from the Scheduler. - * It includes the status code and message. - */ -final class SchedulerResponse { - /** - * 200 OK : The request succeeded normally. - */ - private static final int SC_OK = 200; - - /** - * 400 BAD REQUEST : The request is syntactically incorrect. - */ - private static final int SC_BAD_REQUEST = 400; - - /** - * 403 FORBIDDEN : Syntactically okay but refused to process. - */ - private static final int SC_FORBIDDEN = 403; - - /** - * 404 NOT FOUND : The resource is not available. - */ - private static final int SC_NOT_FOUND = 404; - - /** - * Create a response with OK status. - */ - public static SchedulerResponse ok(final String message){ - return new SchedulerResponse(SC_OK, message); - } - - /** - * Create a response with BAD_REQUEST status. - */ - public static SchedulerResponse badRequest(final String message){ - return new SchedulerResponse(SC_BAD_REQUEST, message); - } - - /** - * Create a response with FORBIDDEN status. - */ - public static SchedulerResponse forbidden(final String message){ - return new SchedulerResponse(SC_FORBIDDEN, message); - } - - /** - * Create a response with NOT FOUND status. - */ - public static SchedulerResponse notFound(final String message){ - return new SchedulerResponse(SC_NOT_FOUND, message); - } - - /** - * Return {@code true} if the response is OK. - */ - public boolean isOK(){ - return this.status == SC_OK; - } - - /** - * Status code of the request based on RFC 2068. - */ - private int status; - - /** - * Message to send. - */ - private String message; - - /** - * Constructor using status code and message. - * @param status - * @param message - */ - private SchedulerResponse(final int status, final String message) { - this.status = status; - this.message = message; - } - - /** - * Return the status code of this response. - */ - int getStatus() { - return status; - } - - /** - * Return the message of this response. - */ - String getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java deleted file mode 100644 index a31cdeb..0000000 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.examples.scheduler; - -/** - * TaskEntity represent a single entry of task queue used in - * scheduler. Since REEF already has the class named {Task}, - * a different name is used for this class. - */ -final class TaskEntity { - private final int taskId; - private final String command; - - public TaskEntity(final int taskId, final String command) { - this.taskId = taskId; - this.command = command; - } - - /** - * Return the TaskID assigned to this Task. - */ - int getId() { - return taskId; - } - - String getCommand() { - return command; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final TaskEntity that = (TaskEntity) o; - - if (taskId != that.taskId) { - return false; - } - if (!command.equals(that.command)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = taskId; - result = 31 * result + command.hashCode(); - return result; - } - - @Override - public String toString() { - return new StringBuilder().append("<Id=").append(taskId). - append(", Command=").append(command).append(">").toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java new file mode 100644 index 0000000..15277f1 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.client; + +import org.apache.commons.cli.ParseException; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.REEF; +import org.apache.reef.examples.scheduler.driver.SchedulerDriver; +import org.apache.reef.examples.scheduler.driver.SchedulerHttpHandler; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.CommandLine; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.webserver.HttpHandlerConfiguration; + +import java.io.IOException; + +/** + * REEF TaskScheduler. + */ +public final class SchedulerREEF { + /** + * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently. + */ + private static final int MAX_NUMBER_OF_EVALUATORS = 3; + + /** + * Command line parameter = true to reuse evaluators,. + * or false to allocate/close for each iteration + */ + @NamedParameter(doc = "Whether or not to reuse evaluators", + short_name = "retain", default_value = "true") + public static final class Retain implements Name<Boolean> { + } + + /** + * @return The http configuration to use reef-webserver + */ + private static Configuration getHttpConf() { + final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF + .set(HttpHandlerConfiguration.HTTP_HANDLERS, SchedulerHttpHandler.class) + .build(); + return httpHandlerConf; + } + + /** + * @return The Driver configuration. + */ + private static Configuration getDriverConf() { + final Configuration driverConf = DriverConfiguration.CONF + .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SchedulerDriver.class)) + .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler") + .set(DriverConfiguration.ON_DRIVER_STARTED, SchedulerDriver.StartHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SchedulerDriver.EvaluatorAllocatedHandler.class) + .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SchedulerDriver.ActiveContextHandler.class) + .set(DriverConfiguration.ON_TASK_COMPLETED, SchedulerDriver.CompletedTaskHandler.class) + .build(); + + return driverConf; + } + + /** + * Run the Task scheduler. If '-retain true' option is passed via command line, + * the scheduler reuses evaluators to submit new Tasks. + * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc) + * @param args Command line arguments. + * @throws InjectionException + * @throws java.io.IOException + */ + public static void runTaskScheduler(final Configuration runtimeConf, final String[] args) + throws InjectionException, IOException, ParseException { + final Tang tang = Tang.Factory.getTang(); + + final Configuration commandLineConf = CommandLine.parseToConfiguration(args, Retain.class); + + // Merge the configurations to run Driver + final Configuration driverConf = Configurations.merge(getDriverConf(), getHttpConf(), commandLineConf); + + final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class); + reef.submit(driverConf); + } + + /** + * Main program. + * @param args + * @throws InjectionException + */ + public static void main(final String[] args) throws InjectionException, IOException, ParseException { + final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF + .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS) + .build(); + runTaskScheduler(runtimeConfiguration, args); + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private SchedulerREEF() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java new file mode 100644 index 0000000..1ed252a --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.client; + +import org.apache.commons.cli.ParseException; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.exceptions.InjectionException; + +import java.io.IOException; + +import static org.apache.reef.examples.scheduler.client.SchedulerREEF.runTaskScheduler; + +/** + * REEF TaskScheduler on YARN runtime. + */ +public final class SchedulerREEFYarn { + /** + * Launch the scheduler with YARN client configuration. + * @param args + * @throws InjectionException + * @throws java.io.IOException + */ + public static void main(final String[] args) + throws InjectionException, IOException, ParseException { + final Configuration runtimeConfiguration = + YarnClientConfiguration.CONF.build(); + runTaskScheduler(runtimeConfiguration, args); + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private SchedulerREEFYarn() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java new file mode 100644 index 0000000..c11ab08 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Task scheduler example client classes. + */ +package org.apache.reef.examples.scheduler.client; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java new file mode 100644 index 0000000..6dd6f15 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.TaskConfiguration; +import org.apache.reef.examples.library.Command; +import org.apache.reef.examples.library.ShellTask; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Tang; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The body of Task scheduler. It owns a task queue + * and tracks the record of scheduled tasks. + */ +@ThreadSafe +final class Scheduler { + /** + * Tasks are waiting to be scheduled in the queue. + */ + private final Queue<TaskEntity> taskQueue; + + /** + * Lists of {@link TaskEntity} for different states - Running / Finished / Canceled. + */ + private final List<TaskEntity> runningTasks = new ArrayList<>(); + private final List<TaskEntity> finishedTasks = new ArrayList<>(); + private final List<TaskEntity> canceledTasks = new ArrayList<>(); + + /** + * Counts how many tasks have been scheduled. + */ + private final AtomicInteger taskCount = new AtomicInteger(0); + + @Inject + private Scheduler() { + taskQueue = new LinkedBlockingQueue<>(); + } + + /** + * Submit a task to the ActiveContext. + */ + public synchronized void submitTask(final ActiveContext context) { + final TaskEntity task = taskQueue.poll(); + final Integer taskId = task.getId(); + final String command = task.getCommand(); + + final Configuration taskConf = TaskConfiguration.CONF + .set(TaskConfiguration.TASK, ShellTask.class) + .set(TaskConfiguration.IDENTIFIER, taskId.toString()) + .build(); + final Configuration commandConf = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(Command.class, command) + .build(); + + final Configuration merged = Configurations.merge(taskConf, commandConf); + context.submitTask(merged); + runningTasks.add(task); + } + + /** + * Update the record of task to mark it as canceled. + */ + public synchronized SchedulerResponse cancelTask(final int taskId) { + if (getTask(taskId, runningTasks) != null) { + return SchedulerResponse.forbidden("The task " + taskId + " is running"); + } else if (getTask(taskId, finishedTasks) != null) { + return SchedulerResponse.forbidden("The task " + taskId + " has been finished"); + } + + final TaskEntity task = getTask(taskId, taskQueue); + if (task == null) { + final String message = + new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString(); + return SchedulerResponse.notFound(message); + } else { + taskQueue.remove(task); + canceledTasks.add(task); + return SchedulerResponse.ok("Canceled " + taskId); + } + } + + /** + * Clear the pending list. + */ + public synchronized SchedulerResponse clear() { + final int count = taskQueue.size(); + for (final TaskEntity task : taskQueue) { + canceledTasks.add(task); + } + taskQueue.clear(); + return SchedulerResponse.ok(count + " tasks removed."); + } + + /** + * Get the list of Tasks, which are grouped by the states. + */ + public synchronized SchedulerResponse getList() { + final StringBuilder sb = new StringBuilder(); + sb.append("Running :"); + for (final TaskEntity running : runningTasks) { + sb.append(" ").append(running.getId()); + } + + sb.append("\nWaiting :"); + for (final TaskEntity waiting : taskQueue) { + sb.append(" ").append(waiting.getId()); + } + + sb.append("\nFinished :"); + for (final TaskEntity finished : finishedTasks) { + sb.append(" ").append(finished.getId()); + } + + sb.append("\nCanceled :"); + for (final TaskEntity canceled : canceledTasks) { + sb.append(" ").append(canceled.getId()); + } + return SchedulerResponse.ok(sb.toString()); + } + + /** + * Get the status of a Task. + */ + public synchronized SchedulerResponse getTaskStatus(final int taskId) { + + for (final TaskEntity running : runningTasks) { + if (taskId == running.getId()) { + return SchedulerResponse.ok("Running : " + running.toString()); + } + } + + for (final TaskEntity waiting : taskQueue) { + if (taskId == waiting.getId()) { + return SchedulerResponse.ok("Waiting : " + waiting.toString()); + } + } + + for (final TaskEntity finished : finishedTasks) { + if (taskId == finished.getId()) { + return SchedulerResponse.ok("Finished : " + finished.toString()); + } + } + + for (final TaskEntity finished : canceledTasks) { + if (taskId == finished.getId()) { + return SchedulerResponse.ok("Canceled: " + finished.toString()); + } + } + return SchedulerResponse.notFound( + new StringBuilder().append("Task with ID ").append(taskId).append(" is not found").toString()); + } + + /** + * Assigns a TaskId to submit. + */ + public synchronized int assignTaskId() { + return taskCount.incrementAndGet(); + } + + /** + * Add a task to the queue. + */ + public synchronized void addTask(final TaskEntity task) { + taskQueue.add(task); + } + + /** + * Check whether there are tasks waiting to be submitted. + */ + public synchronized boolean hasPendingTasks() { + return !taskQueue.isEmpty(); + } + + /** + * Get the number of pending tasks in the queue. + */ + public synchronized int getNumPendingTasks() { + return taskQueue.size(); + } + + /** + * Update the record of task to mark it as finished. + */ + public synchronized void setFinished(final int taskId) { + final TaskEntity task = getTask(taskId, runningTasks); + runningTasks.remove(task); + finishedTasks.add(task); + } + + /** + * Iterate over the collection to find a TaskEntity with ID. + */ + private TaskEntity getTask(final int taskId, final Collection<TaskEntity> tasks) { + TaskEntity result = null; + for (final TaskEntity task : tasks) { + if (taskId == task.getId()) { + result = task; + break; + } + } + return result; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java new file mode 100644 index 0000000..64e1770 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ContextConfiguration; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.task.CompletedTask; +import org.apache.reef.examples.scheduler.client.SchedulerREEF; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.apache.reef.wake.time.event.StartTime; + +import javax.annotation.concurrent.GuardedBy; +import javax.inject.Inject; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver for TaskScheduler. It receives the commands by HttpRequest and + * execute them in a FIFO(First In First Out) order. + */ +@Unit +public final class SchedulerDriver { + + public static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>(); + private static final Logger LOG = Logger.getLogger(SchedulerDriver.class.getName()); + + /** + * Possible states of the job driver. Can be one of: + * <dl> + * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an evaluator.</dd> + * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator allocated with no active evaluators.</dd> + * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a new Task arrives.</dd> + * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to READY state when the queue is empty.</dd> + * </dl> + */ + private enum State { + INIT, WAIT_EVALUATORS, READY, RUNNING + } + + /** + * If true, it reuses evaluators when Tasks done. + */ + private boolean retainable; + + @GuardedBy("SchedulerDriver.this") + private State state = State.INIT; + + @GuardedBy("SchedulerDriver.this") + private Scheduler scheduler; + + @GuardedBy("SchedulerDriver.this") + private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0; + + private final EvaluatorRequestor requestor; + + @Inject + private SchedulerDriver(final EvaluatorRequestor requestor, + @Parameter(SchedulerREEF.Retain.class) final boolean retainable, + final Scheduler scheduler) { + this.requestor = requestor; + this.scheduler = scheduler; + this.retainable = retainable; + } + + /** + * The driver is ready to run. + */ + public final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + LOG.log(Level.INFO, "Driver started at {0}", startTime); + assert (state == State.INIT); + state = State.WAIT_EVALUATORS; + + requestEvaluator(1); // Allocate an initial evaluator to avoid idle state. + } + } + + /** + * Evaluator is allocated. This occurs every time to run commands in Non-retainable version, + * while occurs only once in the Retainable version + */ + public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator evaluator) { + LOG.log(Level.INFO, "Evaluator is ready"); + synchronized (SchedulerDriver.this) { + nActiveEval++; + nRequestedEval--; + } + + evaluator.submitContext(ContextConfiguration.CONF + .set(ContextConfiguration.IDENTIFIER, "SchedulerContext") + .build()); + } + } + + /** + * Now it is ready to schedule tasks. But if the queue is empty, + * wait until commands coming up. + * + * If there is no pending task, having more than 1 evaluators must be redundant. + * It may happen, for example, when tasks are canceled during allocation. + * In these cases, the new evaluator may be abandoned. + */ + public final class ActiveContextHandler implements EventHandler<ActiveContext> { + @Override + public void onNext(final ActiveContext context) { + synchronized (SchedulerDriver.this) { + LOG.log(Level.INFO, "Context available : {0}", context.getId()); + + if (scheduler.hasPendingTasks()) { + state = State.RUNNING; + scheduler.submitTask(context); + } else if (nActiveEval > 1) { + nActiveEval--; + context.close(); + } else { + state = State.READY; + waitForCommands(context); + } + } + } + } + + /** + * When a Task completes, the task is marked as finished. + * The evaluator is reused for the next Task if retainable is set to {@code true}. + * Otherwise the evaluator is released. + */ + public final class CompletedTaskHandler implements EventHandler<CompletedTask> { + @Override + public void onNext(final CompletedTask task) { + final int taskId = Integer.valueOf(task.getId()); + + synchronized (SchedulerDriver.this) { + scheduler.setFinished(taskId); + + LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", String.valueOf(retainable)); + final ActiveContext context = task.getActiveContext(); + + if (retainable) { + retainEvaluator(context); + } else { + reallocateEvaluator(context); + } + } + } + } + + /** + * Get the list of tasks in the scheduler. + */ + public synchronized SchedulerResponse getList() { + return scheduler.getList(); + } + + /** + * Clear all the Tasks from the waiting queue. + */ + public synchronized SchedulerResponse clearList() { + return scheduler.clear(); + } + + /** + * Get the status of a task. + */ + public SchedulerResponse getTaskStatus(final List<String> args) { + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : only one ID at a time"); + } + + final Integer taskId = Integer.valueOf(args.get(0)); + + synchronized (SchedulerDriver.this) { + return scheduler.getTaskStatus(taskId); + } + } + + /** + * Cancel a Task waiting on the queue. A task cannot be canceled + * once it is running. + */ + public SchedulerResponse cancelTask(final List<String> args) { + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : only one ID at a time"); + } + + final Integer taskId = Integer.valueOf(args.get(0)); + + synchronized (SchedulerDriver.this) { + return scheduler.cancelTask(taskId); + } + } + + /** + * Submit a command to schedule. + */ + public SchedulerResponse submitCommands(final List<String> args) { + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : only one command at a time"); + } + + final String command = args.get(0); + final Integer id; + + synchronized (SchedulerDriver.this) { + id = scheduler.assignTaskId(); + scheduler.addTask(new TaskEntity(id, command)); + + if (state == State.READY) { + SchedulerDriver.this.notify(); // Wake up at {waitForCommands} + } else if (state == State.RUNNING && nMaxEval > nActiveEval + nRequestedEval) { + requestEvaluator(1); + } + } + return SchedulerResponse.ok("Task ID : " + id); + } + + /** + * Update the maximum number of evaluators to hold. + * Request more evaluators in case there are pending tasks + * in the queue and the number of evaluators is less than the limit. + */ + public SchedulerResponse setMaxEvaluators(final List<String> args) { + if (args.size() != 1) { + return SchedulerResponse.badRequest("Usage : Only one value can be used"); + } + + final int nTarget = Integer.valueOf(args.get(0)); + + synchronized (SchedulerDriver.this) { + if (nTarget < nActiveEval + nRequestedEval) { + return SchedulerResponse.forbidden(nActiveEval + nRequestedEval + + " evaluators are used now. Should be larger than that."); + } + nMaxEval = nTarget; + + if (scheduler.hasPendingTasks()) { + final int nToRequest = + Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - nRequestedEval; + requestEvaluator(nToRequest); + } + return SchedulerResponse.ok("You can use evaluators up to " + nMaxEval + " evaluators."); + } + } + + /** + * Request evaluators. Passing a non positive number is illegal, + * so it does not make a trial for that situation. + */ + private void requestEvaluator(final int numToRequest) { + if (numToRequest <= 0) { + throw new IllegalArgumentException("The number of evaluator request should be a positive integer"); + } + + synchronized (SchedulerDriver.this) { + nRequestedEval += numToRequest; + requestor.submit(EvaluatorRequest.newBuilder() + .setMemory(32) + .setNumber(numToRequest) + .build()); + } + } + + /** + * Pick up a command from the queue and run it. Wait until + * any command coming up if no command exists. + */ + private void waitForCommands(final ActiveContext context) { + synchronized (SchedulerDriver.this) { + while (!scheduler.hasPendingTasks()) { + // Wait until any command enters in the queue + try { + SchedulerDriver.this.wait(); + } catch (final InterruptedException e) { + LOG.log(Level.WARNING, "InterruptedException occurred in SchedulerDriver", e); + } + } + // When wakes up, run the first command from the queue. + state = State.RUNNING; + scheduler.submitTask(context); + } + } + + /** + * Retain the complete evaluators submitting another task + * until there is no need to reuse them. + */ + private synchronized void retainEvaluator(final ActiveContext context) { + if (scheduler.hasPendingTasks()) { + scheduler.submitTask(context); + } else if (nActiveEval > 1) { + nActiveEval--; + context.close(); + } else { + state = State.READY; + waitForCommands(context); + } + } + + /** + * Always close the complete evaluators and + * allocate a new evaluator if necessary. + */ + private synchronized void reallocateEvaluator(final ActiveContext context) { + nActiveEval--; + context.close(); + + if (scheduler.hasPendingTasks()) { + requestEvaluator(1); + } else if (nActiveEval <= 0) { + state = State.WAIT_EVALUATORS; + requestEvaluator(1); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java new file mode 100644 index 0000000..7d05e9b --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver; + +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.webserver.HttpHandler; +import org.apache.reef.webserver.ParsedHttpRequest; + +import javax.inject.Inject; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Receive HttpRequest so that it can handle the command list. + */ +public final class SchedulerHttpHandler implements HttpHandler { + private final InjectionFuture<SchedulerDriver> schedulerDriver; + + private String uriSpecification = "reef-example-scheduler"; + + @Inject + private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> schedulerDriver) { + this.schedulerDriver = schedulerDriver; + } + + @Override + public String getUriSpecification() { + return uriSpecification; + } + + @Override + public void setUriSpecification(final String s) { + uriSpecification = s; + } + + /** + * HttpRequest handler. You must specify UriSpecification and REST API version. + * The request url is http://{address}:{port}/reef-example-scheduler/v1 + * + * APIs + * /list to get the status list for all tasks + * /status?id={id} to query the status of such a task, given id + * /submit?cmd={cmd} to submit a Task, which returns its id + * /cancel?id={id} to cancel the task's execution + * /max-eval?num={num} to set the maximum number of evaluators + * /clear to clear the waiting queue + */ + @Override + public void onHttpRequest(final ParsedHttpRequest request, final HttpServletResponse response) + throws IOException, ServletException { + final String target = request.getTargetEntity().toLowerCase(); + final Map<String, List<String>> queryMap = request.getQueryMap(); + + final SchedulerResponse result; + switch (target) { + case "list": + result = schedulerDriver.get().getList(); + break; + case "clear": + result = schedulerDriver.get().clearList(); + break; + case "status": + result = schedulerDriver.get().getTaskStatus(queryMap.get("id")); + break; + case "submit": + result = schedulerDriver.get().submitCommands(queryMap.get("cmd")); + break; + case "cancel": + result = schedulerDriver.get().cancelTask(queryMap.get("id")); + break; + case "max-eval": + result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num")); + break; + default: + result = SchedulerResponse.notFound("Unsupported operation"); + } + + // Send response to the http client + final int status = result.getStatus(); + final String message= result.getMessage(); + + if (result.isOK()) { + response.getOutputStream().println(message); + } else { + response.sendError(status, message); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java new file mode 100644 index 0000000..e3d1272 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver; + +/** + * This class specifies the response from the Scheduler. + * It includes the status code and message. + */ +final class SchedulerResponse { + /** + * 200 OK : The request succeeded normally. + */ + private static final int SC_OK = 200; + + /** + * 400 BAD REQUEST : The request is syntactically incorrect. + */ + private static final int SC_BAD_REQUEST = 400; + + /** + * 403 FORBIDDEN : Syntactically okay but refused to process. + */ + private static final int SC_FORBIDDEN = 403; + + /** + * 404 NOT FOUND : The resource is not available. + */ + private static final int SC_NOT_FOUND = 404; + + /** + * Create a response with OK status. + */ + public static SchedulerResponse ok(final String message){ + return new SchedulerResponse(SC_OK, message); + } + + /** + * Create a response with BAD_REQUEST status. + */ + public static SchedulerResponse badRequest(final String message){ + return new SchedulerResponse(SC_BAD_REQUEST, message); + } + + /** + * Create a response with FORBIDDEN status. + */ + public static SchedulerResponse forbidden(final String message){ + return new SchedulerResponse(SC_FORBIDDEN, message); + } + + /** + * Create a response with NOT FOUND status. + */ + public static SchedulerResponse notFound(final String message){ + return new SchedulerResponse(SC_NOT_FOUND, message); + } + + /** + * Return {@code true} if the response is OK. + */ + public boolean isOK(){ + return this.status == SC_OK; + } + + /** + * Status code of the request based on RFC 2068. + */ + private int status; + + /** + * Message to send. + */ + private String message; + + /** + * Constructor using status code and message. + * @param status + * @param message + */ + private SchedulerResponse(final int status, final String message) { + this.status = status; + this.message = message; + } + + /** + * Return the status code of this response. + */ + int getStatus() { + return status; + } + + /** + * Return the message of this response. + */ + String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java new file mode 100644 index 0000000..bb29228 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.scheduler.driver; + +/** + * TaskEntity represent a single entry of task queue used in + * scheduler. Since REEF already has the class named {Task}, + * a different name is used for this class. + */ +final class TaskEntity { + private final int taskId; + private final String command; + + public TaskEntity(final int taskId, final String command) { + this.taskId = taskId; + this.command = command; + } + + /** + * Return the TaskID assigned to this Task. + */ + int getId() { + return taskId; + } + + String getCommand() { + return command; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final TaskEntity that = (TaskEntity) o; + + if (taskId != that.taskId) { + return false; + } + if (!command.equals(that.command)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = taskId; + result = 31 * result + command.hashCode(); + return result; + } + + @Override + public String toString() { + return new StringBuilder().append("<Id=").append(taskId). + append(", Command=").append(command).append(">").toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java new file mode 100644 index 0000000..6d731b4 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Task scheduler example driver classes. + */ +package org.apache.reef.examples.scheduler.driver;