Repository: incubator-reef
Updated Branches:
  refs/heads/master 58811a7cc -> 531ddd9f3


[REEF-695] Reorganize reef-examples/scheduler classes into packages

Move classes into client and driver packages.
Task classes are in a separate package, o.a.r.examples.library.

JIRA:
  [REEF-695](https://issues.apache.org/jira/browse/REEF-695)

Pull Request:
  Closes #442


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/531ddd9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/531ddd9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/531ddd9f

Branch: refs/heads/master
Commit: 531ddd9f33df0e2c6a9b63bfb876aeb9a51a8196
Parents: 58811a7
Author: Brian Cho <chobr...@apache.org>
Authored: Sun Aug 30 22:46:21 2015 +0900
Committer: Yunseong Lee <yunse...@apache.org>
Committed: Mon Aug 31 08:39:35 2015 +0900

----------------------------------------------------------------------
 .../reef/examples/scheduler/Scheduler.java      | 231 -------------
 .../examples/scheduler/SchedulerDriver.java     | 339 ------------------
 .../scheduler/SchedulerHttpHandler.java         | 107 ------
 .../reef/examples/scheduler/SchedulerREEF.java  | 119 -------
 .../examples/scheduler/SchedulerREEFYarn.java   |  52 ---
 .../examples/scheduler/SchedulerResponse.java   | 114 -------
 .../reef/examples/scheduler/TaskEntity.java     |  79 -----
 .../scheduler/client/SchedulerREEF.java         | 121 +++++++
 .../scheduler/client/SchedulerREEFYarn.java     |  52 +++
 .../examples/scheduler/client/package-info.java |  22 ++
 .../examples/scheduler/driver/Scheduler.java    | 231 +++++++++++++
 .../scheduler/driver/SchedulerDriver.java       | 340 +++++++++++++++++++
 .../scheduler/driver/SchedulerHttpHandler.java  | 107 ++++++
 .../scheduler/driver/SchedulerResponse.java     | 114 +++++++
 .../examples/scheduler/driver/TaskEntity.java   |  79 +++++
 .../examples/scheduler/driver/package-info.java |  22 ++
 16 files changed, 1088 insertions(+), 1041 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
deleted file mode 100644
index 0c79172..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.task.TaskConfiguration;
-import org.apache.reef.examples.library.Command;
-import org.apache.reef.examples.library.ShellTask;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.Tang;
-
-import javax.annotation.concurrent.ThreadSafe;
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * The body of Task scheduler. It owns a task queue
- * and tracks the record of scheduled tasks.
- */
-@ThreadSafe
-final class Scheduler {
-  /**
-   * Tasks are waiting to be scheduled in the queue.
-   */
-  private final Queue<TaskEntity> taskQueue;
-
-  /**
-   * Lists of {@link TaskEntity} for different states - Running / Finished / 
Canceled.
-   */
-  private final List<TaskEntity> runningTasks = new ArrayList<>();
-  private final List<TaskEntity> finishedTasks = new ArrayList<>();
-  private final List<TaskEntity> canceledTasks = new ArrayList<>();
-
-  /**
-   * Counts how many tasks have been scheduled.
-   */
-  private final AtomicInteger taskCount = new AtomicInteger(0);
-
-  @Inject
-  private Scheduler() {
-    taskQueue = new LinkedBlockingQueue<>();
-  }
-
-  /**
-   * Submit a task to the ActiveContext.
-   */
-  public synchronized void submitTask(final ActiveContext context) {
-    final TaskEntity task = taskQueue.poll();
-    final Integer taskId = task.getId();
-    final String command = task.getCommand();
-
-    final Configuration taskConf = TaskConfiguration.CONF
-        .set(TaskConfiguration.TASK, ShellTask.class)
-        .set(TaskConfiguration.IDENTIFIER, taskId.toString())
-        .build();
-    final Configuration commandConf = 
Tang.Factory.getTang().newConfigurationBuilder()
-        .bindNamedParameter(Command.class, command)
-        .build();
-
-    final Configuration merged = Configurations.merge(taskConf, commandConf);
-    context.submitTask(merged);
-    runningTasks.add(task);
-  }
-
-  /**
-   * Update the record of task to mark it as canceled.
-   */
-  public synchronized SchedulerResponse cancelTask(final int taskId) {
-    if (getTask(taskId, runningTasks) != null) {
-      return SchedulerResponse.forbidden("The task " + taskId + " is running");
-    } else if (getTask(taskId, finishedTasks) != null) {
-      return SchedulerResponse.forbidden("The task " + taskId + " has been 
finished");
-    }
-
-    final TaskEntity task = getTask(taskId, taskQueue);
-    if (task == null) {
-      final String message =
-          new StringBuilder().append("Task with ID ").append(taskId).append(" 
is not found").toString();
-      return SchedulerResponse.notFound(message);
-    } else {
-      taskQueue.remove(task);
-      canceledTasks.add(task);
-      return SchedulerResponse.ok("Canceled " + taskId);
-    }
-  }
-
-  /**
-   * Clear the pending list.
-   */
-  public synchronized SchedulerResponse clear() {
-    final int count = taskQueue.size();
-    for (final TaskEntity task : taskQueue) {
-      canceledTasks.add(task);
-    }
-    taskQueue.clear();
-    return SchedulerResponse.ok(count + " tasks removed.");
-  }
-
-  /**
-   * Get the list of Tasks, which are grouped by the states.
-   */
-  public synchronized SchedulerResponse getList() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("Running :");
-    for (final TaskEntity running : runningTasks) {
-      sb.append(" ").append(running.getId());
-    }
-
-    sb.append("\nWaiting :");
-    for (final TaskEntity waiting : taskQueue) {
-      sb.append(" ").append(waiting.getId());
-    }
-
-    sb.append("\nFinished :");
-    for (final TaskEntity finished : finishedTasks) {
-      sb.append(" ").append(finished.getId());
-    }
-
-    sb.append("\nCanceled :");
-    for (final TaskEntity canceled : canceledTasks) {
-      sb.append(" ").append(canceled.getId());
-    }
-    return SchedulerResponse.ok(sb.toString());
-  }
-
-  /**
-   * Get the status of a Task.
-   */
-  public synchronized SchedulerResponse getTaskStatus(final int taskId) {
-
-    for (final TaskEntity running : runningTasks) {
-      if (taskId == running.getId()) {
-        return SchedulerResponse.ok("Running : " + running.toString());
-      }
-    }
-
-    for (final TaskEntity waiting : taskQueue) {
-      if (taskId == waiting.getId()) {
-        return SchedulerResponse.ok("Waiting : " + waiting.toString());
-      }
-    }
-
-    for (final TaskEntity finished : finishedTasks) {
-      if (taskId == finished.getId()) {
-        return SchedulerResponse.ok("Finished : " + finished.toString());
-      }
-    }
-
-    for (final TaskEntity finished : canceledTasks) {
-      if (taskId == finished.getId()) {
-        return SchedulerResponse.ok("Canceled: " + finished.toString());
-      }
-    }
-    return SchedulerResponse.notFound(
-        new StringBuilder().append("Task with ID ").append(taskId).append(" is 
not found").toString());
-  }
-
-  /**
-   * Assigns a TaskId to submit.
-   */
-  public synchronized int assignTaskId() {
-    return taskCount.incrementAndGet();
-  }
-
-  /**
-   * Add a task to the queue.
-   */
-  public synchronized void addTask(final TaskEntity task) {
-    taskQueue.add(task);
-  }
-
-  /**
-   * Check whether there are tasks waiting to be submitted.
-   */
-  public synchronized boolean hasPendingTasks() {
-    return !taskQueue.isEmpty();
-  }
-
-  /**
-   * Get the number of pending tasks in the queue.
-   */
-  public synchronized int getNumPendingTasks() {
-    return taskQueue.size();
-  }
-
-  /**
-   * Update the record of task to mark it as finished.
-   */
-  public synchronized void setFinished(final int taskId) {
-    final TaskEntity task = getTask(taskId, runningTasks);
-    runningTasks.remove(task);
-    finishedTasks.add(task);
-  }
-
-  /**
-   * Iterate over the collection to find a TaskEntity with ID.
-   */
-  private TaskEntity getTask(final int taskId, final Collection<TaskEntity> 
tasks) {
-    TaskEntity result = null;
-    for (final TaskEntity task : tasks) {
-      if (taskId == task.getId()) {
-        result = task;
-        break;
-      }
-    }
-    return result;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
deleted file mode 100644
index d8a7059..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerDriver.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ContextConfiguration;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-import org.apache.reef.wake.time.event.StartTime;
-
-import javax.annotation.concurrent.GuardedBy;
-import javax.inject.Inject;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Driver for TaskScheduler. It receives the commands by HttpRequest and
- * execute them in a FIFO(First In First Out) order.
- */
-@Unit
-public final class SchedulerDriver {
-
-  public static final ObjectSerializableCodec<String> CODEC = new 
ObjectSerializableCodec<>();
-  private static final Logger LOG = 
Logger.getLogger(SchedulerDriver.class.getName());
-
-  /**
-   * Possible states of the job driver. Can be one of:
-   * <dl>
-   * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an 
evaluator.</dd>
-   * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator 
allocated with no active evaluators.</dd>
-   * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a 
new Task arrives.</dd>
-   * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to 
READY state when the queue is empty.</dd>
-   * </dl>
-   */
-  private enum State {
-    INIT, WAIT_EVALUATORS, READY, RUNNING
-  }
-
-  /**
-   * If true, it reuses evaluators when Tasks done.
-   */
-  private boolean retainable;
-
-  @GuardedBy("SchedulerDriver.this")
-  private State state = State.INIT;
-
-  @GuardedBy("SchedulerDriver.this")
-  private Scheduler scheduler;
-
-  @GuardedBy("SchedulerDriver.this")
-  private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
-
-  private final EvaluatorRequestor requestor;
-
-  @Inject
-  private SchedulerDriver(final EvaluatorRequestor requestor,
-                          @Parameter(SchedulerREEF.Retain.class) final boolean 
retainable,
-                          final Scheduler scheduler) {
-    this.requestor = requestor;
-    this.scheduler = scheduler;
-    this.retainable = retainable;
-  }
-
-  /**
-   * The driver is ready to run.
-   */
-  final class StartHandler implements EventHandler<StartTime> {
-    @Override
-    public void onNext(final StartTime startTime) {
-      LOG.log(Level.INFO, "Driver started at {0}", startTime);
-      assert (state == State.INIT);
-      state = State.WAIT_EVALUATORS;
-
-      requestEvaluator(1); // Allocate an initial evaluator to avoid idle 
state.
-    }
-  }
-
-  /**
-   * Evaluator is allocated. This occurs every time to run commands in 
Non-retainable version,
-   * while occurs only once in the Retainable version
-   */
-  final class EvaluatorAllocatedHandler implements 
EventHandler<AllocatedEvaluator> {
-    @Override
-    public void onNext(final AllocatedEvaluator evaluator) {
-      LOG.log(Level.INFO, "Evaluator is ready");
-      synchronized (SchedulerDriver.this) {
-        nActiveEval++;
-        nRequestedEval--;
-      }
-
-      evaluator.submitContext(ContextConfiguration.CONF
-          .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
-          .build());
-    }
-  }
-
-  /**
-   * Now it is ready to schedule tasks. But if the queue is empty,
-   * wait until commands coming up.
-   *
-   * If there is no pending task, having more than 1 evaluators must be 
redundant.
-   * It may happen, for example, when tasks are canceled during allocation.
-   * In these cases, the new evaluator may be abandoned.
-   */
-  final class ActiveContextHandler implements EventHandler<ActiveContext> {
-    @Override
-    public void onNext(final ActiveContext context) {
-      synchronized (SchedulerDriver.this) {
-        LOG.log(Level.INFO, "Context available : {0}", context.getId());
-
-        if (scheduler.hasPendingTasks()) {
-          state = State.RUNNING;
-          scheduler.submitTask(context);
-        } else if (nActiveEval > 1) {
-          nActiveEval--;
-          context.close();
-        } else {
-          state = State.READY;
-          waitForCommands(context);
-        }
-      }
-    }
-  }
-
-  /**
-   * When a Task completes, the task is marked as finished.
-   * The evaluator is reused for the next Task if retainable is set to {@code 
true}.
-   * Otherwise the evaluator is released.
-   */
-  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
-    @Override
-    public void onNext(final CompletedTask task) {
-      final int taskId = Integer.valueOf(task.getId());
-
-      synchronized (SchedulerDriver.this) {
-        scheduler.setFinished(taskId);
-
-        LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", 
String.valueOf(retainable));
-        final ActiveContext context = task.getActiveContext();
-
-        if (retainable) {
-          retainEvaluator(context);
-        } else {
-          reallocateEvaluator(context);
-        }
-      }
-    }
-  }
-
-  /**
-   * Get the list of tasks in the scheduler.
-   */
-  public synchronized SchedulerResponse getList() {
-    return scheduler.getList();
-  }
-
-  /**
-   * Clear all the Tasks from the waiting queue.
-   */
-  public synchronized SchedulerResponse clearList() {
-    return scheduler.clear();
-  }
-
-  /**
-   * Get the status of a task.
-   */
-  public SchedulerResponse getTaskStatus(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : only one ID at a time");
-    }
-
-    final Integer taskId = Integer.valueOf(args.get(0));
-
-    synchronized (SchedulerDriver.this) {
-      return scheduler.getTaskStatus(taskId);
-    }
-  }
-
-  /**
-   * Cancel a Task waiting on the queue. A task cannot be canceled
-   * once it is running.
-   */
-  public SchedulerResponse cancelTask(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : only one ID at a time");
-    }
-
-    final Integer taskId = Integer.valueOf(args.get(0));
-
-    synchronized (SchedulerDriver.this) {
-      return scheduler.cancelTask(taskId);
-    }
-  }
-
-  /**
-   * Submit a command to schedule.
-   */
-  public SchedulerResponse submitCommands(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : only one command at a 
time");
-    }
-
-    final String command = args.get(0);
-    final Integer id;
-
-    synchronized (SchedulerDriver.this) {
-      id = scheduler.assignTaskId();
-      scheduler.addTask(new TaskEntity(id, command));
-
-      if (state == State.READY) {
-        SchedulerDriver.this.notify(); // Wake up at {waitForCommands}
-      } else if (state == State.RUNNING && nMaxEval > nActiveEval + 
nRequestedEval) {
-        requestEvaluator(1);
-      }
-    }
-    return SchedulerResponse.ok("Task ID : " + id);
-  }
-
-  /**
-   * Update the maximum number of evaluators to hold.
-   * Request more evaluators in case there are pending tasks
-   * in the queue and the number of evaluators is less than the limit.
-   */
-  public SchedulerResponse setMaxEvaluators(final List<String> args) {
-    if (args.size() != 1) {
-      return SchedulerResponse.badRequest("Usage : Only one value can be 
used");
-    }
-
-    final int nTarget = Integer.valueOf(args.get(0));
-
-    synchronized (SchedulerDriver.this) {
-      if (nTarget < nActiveEval + nRequestedEval) {
-        return SchedulerResponse.forbidden(nActiveEval + nRequestedEval +
-            " evaluators are used now. Should be larger than that.");
-      }
-      nMaxEval = nTarget;
-
-      if (scheduler.hasPendingTasks()) {
-        final int nToRequest =
-            Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - 
nRequestedEval;
-        requestEvaluator(nToRequest);
-      }
-      return SchedulerResponse.ok("You can use evaluators up to " + nMaxEval + 
" evaluators.");
-    }
-  }
-
-  /**
-   * Request evaluators. Passing a non positive number is illegal,
-   * so it does not make a trial for that situation.
-   */
-  private void requestEvaluator(final int numToRequest) {
-    if (numToRequest <= 0) {
-      throw new IllegalArgumentException("The number of evaluator request 
should be a positive integer");
-    }
-
-    synchronized (SchedulerDriver.this) {
-      nRequestedEval += numToRequest;
-      requestor.submit(EvaluatorRequest.newBuilder()
-          .setMemory(32)
-          .setNumber(numToRequest)
-          .build());
-    }
-  }
-
-  /**
-   * Pick up a command from the queue and run it. Wait until
-   * any command coming up if no command exists.
-   */
-  private void waitForCommands(final ActiveContext context) {
-    synchronized (SchedulerDriver.this) {
-      while (!scheduler.hasPendingTasks()) {
-        // Wait until any command enters in the queue
-        try {
-          SchedulerDriver.this.wait();
-        } catch (final InterruptedException e) {
-          LOG.log(Level.WARNING, "InterruptedException occurred in 
SchedulerDriver", e);
-        }
-      }
-      // When wakes up, run the first command from the queue.
-      state = State.RUNNING;
-      scheduler.submitTask(context);
-    }
-  }
-
-  /**
-   * Retain the complete evaluators submitting another task
-   * until there is no need to reuse them.
-   */
-  private synchronized void retainEvaluator(final ActiveContext context) {
-    if (scheduler.hasPendingTasks()) {
-      scheduler.submitTask(context);
-    } else if (nActiveEval > 1) {
-      nActiveEval--;
-      context.close();
-    } else {
-      state = State.READY;
-      waitForCommands(context);
-    }
-  }
-
-  /**
-   * Always close the complete evaluators and
-   * allocate a new evaluator if necessary.
-   */
-  private synchronized void reallocateEvaluator(final ActiveContext context) {
-    nActiveEval--;
-    context.close();
-
-    if (scheduler.hasPendingTasks()) {
-      requestEvaluator(1);
-    } else if (nActiveEval <= 0) {
-      state = State.WAIT_EVALUATORS;
-      requestEvaluator(1);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
deleted file mode 100644
index f390370..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerHttpHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-import org.apache.reef.tang.InjectionFuture;
-import org.apache.reef.webserver.HttpHandler;
-import org.apache.reef.webserver.ParsedHttpRequest;
-
-import javax.inject.Inject;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Receive HttpRequest so that it can handle the command list.
- */
-final class SchedulerHttpHandler implements HttpHandler {
-  private final InjectionFuture<SchedulerDriver> schedulerDriver;
-
-  private String uriSpecification = "reef-example-scheduler";
-
-  @Inject
-  private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> 
schedulerDriver) {
-    this.schedulerDriver = schedulerDriver;
-  }
-
-  @Override
-  public String getUriSpecification() {
-    return uriSpecification;
-  }
-
-  @Override
-  public void setUriSpecification(final String s) {
-    uriSpecification = s;
-  }
-
-  /**
-   * HttpRequest handler. You must specify UriSpecification and REST API 
version.
-   * The request url is http://{address}:{port}/reef-example-scheduler/v1
-   *
-   * APIs
-   *   /list                to get the status list for all tasks
-   *   /status?id={id}      to query the status of such a task, given id
-   *   /submit?cmd={cmd}    to submit a Task, which returns its id
-   *   /cancel?id={id}      to cancel the task's execution
-   *   /max-eval?num={num}  to set the maximum number of evaluators
-   *   /clear               to clear the waiting queue
-   */
-  @Override
-  public void onHttpRequest(final ParsedHttpRequest request, final 
HttpServletResponse response)
-    throws IOException, ServletException {
-    final String target = request.getTargetEntity().toLowerCase();
-    final Map<String, List<String>> queryMap = request.getQueryMap();
-
-    final SchedulerResponse result;
-    switch (target) {
-    case "list":
-      result = schedulerDriver.get().getList();
-      break;
-    case "clear":
-      result = schedulerDriver.get().clearList();
-      break;
-    case "status":
-      result = schedulerDriver.get().getTaskStatus(queryMap.get("id"));
-      break;
-    case "submit":
-      result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
-      break;
-    case "cancel":
-      result = schedulerDriver.get().cancelTask(queryMap.get("id"));
-      break;
-    case "max-eval":
-      result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num"));
-      break;
-    default:
-      result = SchedulerResponse.notFound("Unsupported operation");
-    }
-
-    // Send response to the http client
-    final int status = result.getStatus();
-    final String message= result.getMessage();
-
-    if (result.isOK()) {
-      response.getOutputStream().println(message);
-    } else {
-      response.sendError(status, message);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
deleted file mode 100644
index 33bb388..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEF.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.client.REEF;
-import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Configurations;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.CommandLine;
-import org.apache.reef.util.EnvironmentUtils;
-import org.apache.reef.webserver.HttpHandlerConfiguration;
-
-import java.io.IOException;
-
-/**
- * REEF TaskScheduler.
- */
-public final class SchedulerREEF {
-  /**
-   * The upper limit on the number of Evaluators that the local 
resourcemanager will hand out concurrently.
-   */
-  private static final int MAX_NUMBER_OF_EVALUATORS = 3;
-
-  /**
-   * Command line parameter = true to reuse evaluators,.
-   * or false to allocate/close for each iteration
-   */
-  @NamedParameter(doc = "Whether or not to reuse evaluators",
-      short_name = "retain", default_value = "true")
-  public static final class Retain implements Name<Boolean> {
-  }
-
-  /**
-   * @return The http configuration to use reef-webserver
-   */
-  private static Configuration getHttpConf() {
-    final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
-        .set(HttpHandlerConfiguration.HTTP_HANDLERS, 
SchedulerHttpHandler.class)
-        .build();
-    return httpHandlerConf;
-  }
-
-  /**
-   * @return The Driver configuration.
-   */
-  private static Configuration getDriverConf() {
-    final Configuration driverConf = DriverConfiguration.CONF
-        .set(DriverConfiguration.GLOBAL_LIBRARIES, 
EnvironmentUtils.getClassLocation(SchedulerDriver.class))
-        .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler")
-        .set(DriverConfiguration.ON_DRIVER_STARTED, 
SchedulerDriver.StartHandler.class)
-        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, 
SchedulerDriver.EvaluatorAllocatedHandler.class)
-        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, 
SchedulerDriver.ActiveContextHandler.class)
-        .set(DriverConfiguration.ON_TASK_COMPLETED, 
SchedulerDriver.CompletedTaskHandler.class)
-        .build();
-
-    return driverConf;
-  }
-
-  /**
-   * Run the Task scheduler. If '-retain true' option is passed via command 
line,
-   * the scheduler reuses evaluators to submit new Tasks.
-   * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc)
-   * @param args Command line arguments.
-   * @throws InjectionException
-   * @throws java.io.IOException
-   */
-  public static void runTaskScheduler(final Configuration runtimeConf, final 
String[] args)
-    throws InjectionException, IOException, ParseException {
-    final Tang tang = Tang.Factory.getTang();
-
-    final Configuration commandLineConf = 
CommandLine.parseToConfiguration(args, Retain.class);
-
-    // Merge the configurations to run Driver
-    final Configuration driverConf = Configurations.merge(getDriverConf(), 
getHttpConf(), commandLineConf);
-
-    final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class);
-    reef.submit(driverConf);
-  }
-
-  /**
-   * Main program.
-   * @param args
-   * @throws InjectionException
-   */
-  public static void main(final String[] args) throws InjectionException, 
IOException, ParseException {
-    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
-        .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 
MAX_NUMBER_OF_EVALUATORS)
-        .build();
-    runTaskScheduler(runtimeConfiguration, args);
-  }
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private SchedulerREEF() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
deleted file mode 100644
index 7911f45..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.exceptions.InjectionException;
-
-import java.io.IOException;
-
-import static 
org.apache.reef.examples.scheduler.SchedulerREEF.runTaskScheduler;
-
-/**
- * REEF TaskScheduler on YARN runtime.
- */
-public final class SchedulerREEFYarn {
-  /**
-   * Launch the scheduler with YARN client configuration.
-   * @param args
-   * @throws InjectionException
-   * @throws java.io.IOException
-   */
-  public static void main(final String[] args)
-    throws InjectionException, IOException, ParseException {
-    final Configuration runtimeConfiguration =
-        YarnClientConfiguration.CONF.build();
-    runTaskScheduler(runtimeConfiguration, args);
-  }
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private SchedulerREEFYarn() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
deleted file mode 100644
index 65c6a4f..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-/**
- * This class specifies the response from the Scheduler.
- * It includes the status code and message.
- */
-final class SchedulerResponse {
-  /**
-   * 200 OK : The request succeeded normally.
-   */
-  private static final int SC_OK = 200;
-
-  /**
-   * 400 BAD REQUEST : The request is syntactically incorrect.
-   */
-  private static final int SC_BAD_REQUEST = 400;
-
-  /**
-   * 403 FORBIDDEN : Syntactically okay but refused to process.
-   */
-  private static final int SC_FORBIDDEN = 403;
-
-  /**
-   * 404 NOT FOUND :  The resource is not available.
-   */
-  private static final int SC_NOT_FOUND = 404;
-
-  /**
-   * Create a response with OK status.
-   */
-  public static SchedulerResponse ok(final String message){
-    return new SchedulerResponse(SC_OK, message);
-  }
-
-  /**
-   * Create a response with BAD_REQUEST status.
-   */
-  public static SchedulerResponse badRequest(final String message){
-    return new SchedulerResponse(SC_BAD_REQUEST, message);
-  }
-
-  /**
-   * Create a response with FORBIDDEN status.
-   */
-  public static SchedulerResponse forbidden(final String message){
-    return new SchedulerResponse(SC_FORBIDDEN, message);
-  }
-
-  /**
-   * Create a response with NOT FOUND status.
-   */
-  public static SchedulerResponse notFound(final String message){
-    return new SchedulerResponse(SC_NOT_FOUND, message);
-  }
-
-  /**
-   * Return {@code true} if the response is OK.
-   */
-  public boolean isOK(){
-    return this.status == SC_OK;
-  }
-
-  /**
-   * Status code of the request based on RFC 2068.
-   */
-  private int status;
-
-  /**
-   * Message to send.
-   */
-  private String message;
-
-  /**
-   * Constructor using status code and message.
-   * @param status
-   * @param message
-   */
-  private SchedulerResponse(final int status, final String message) {
-    this.status = status;
-    this.message = message;
-  }
-
-  /**
-   * Return the status code of this response.
-   */
-  int getStatus() {
-    return status;
-  }
-
-  /**
-   * Return the message of this response.
-   */
-  String getMessage() {
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
deleted file mode 100644
index a31cdeb..0000000
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.examples.scheduler;
-
-/**
- * TaskEntity represent a single entry of task queue used in
- * scheduler. Since REEF already has the class named {Task},
- * a different name is used for this class.
- */
-final class TaskEntity {
-  private final int taskId;
-  private final String command;
-
-  public TaskEntity(final int taskId, final String command) {
-    this.taskId = taskId;
-    this.command = command;
-  }
-
-  /**
-   * Return the TaskID assigned to this Task.
-   */
-  int getId() {
-    return taskId;
-  }
-
-  String getCommand() {
-    return command;
-  }
-
-  @Override
-  public boolean equals(final Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    final TaskEntity that = (TaskEntity) o;
-
-    if (taskId != that.taskId) {
-      return false;
-    }
-    if (!command.equals(that.command)) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = taskId;
-    result = 31 * result + command.hashCode();
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append("<Id=").append(taskId).
-      append(", Command=").append(command).append(">").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
new file mode 100644
index 0000000..15277f1
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEF.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.client;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.examples.scheduler.driver.SchedulerDriver;
+import org.apache.reef.examples.scheduler.driver.SchedulerHttpHandler;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+
+import java.io.IOException;
+
+/**
+ * REEF TaskScheduler.
+ */
+public final class SchedulerREEF {
+  /**
+   * The upper limit on the number of Evaluators that the local 
resourcemanager will hand out concurrently.
+   */
+  private static final int MAX_NUMBER_OF_EVALUATORS = 3;
+
+  /**
+   * Command line parameter = true to reuse evaluators,.
+   * or false to allocate/close for each iteration
+   */
+  @NamedParameter(doc = "Whether or not to reuse evaluators",
+      short_name = "retain", default_value = "true")
+  public static final class Retain implements Name<Boolean> {
+  }
+
+  /**
+   * @return The http configuration to use reef-webserver
+   */
+  private static Configuration getHttpConf() {
+    final Configuration httpHandlerConf = HttpHandlerConfiguration.CONF
+        .set(HttpHandlerConfiguration.HTTP_HANDLERS, 
SchedulerHttpHandler.class)
+        .build();
+    return httpHandlerConf;
+  }
+
+  /**
+   * @return The Driver configuration.
+   */
+  private static Configuration getDriverConf() {
+    final Configuration driverConf = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, 
EnvironmentUtils.getClassLocation(SchedulerDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "TaskScheduler")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, 
SchedulerDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, 
SchedulerDriver.EvaluatorAllocatedHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, 
SchedulerDriver.ActiveContextHandler.class)
+        .set(DriverConfiguration.ON_TASK_COMPLETED, 
SchedulerDriver.CompletedTaskHandler.class)
+        .build();
+
+    return driverConf;
+  }
+
+  /**
+   * Run the Task scheduler. If '-retain true' option is passed via command 
line,
+   * the scheduler reuses evaluators to submit new Tasks.
+   * @param runtimeConf The runtime configuration (e.g. Local, YARN, etc)
+   * @param args Command line arguments.
+   * @throws InjectionException
+   * @throws java.io.IOException
+   */
+  public static void runTaskScheduler(final Configuration runtimeConf, final 
String[] args)
+    throws InjectionException, IOException, ParseException {
+    final Tang tang = Tang.Factory.getTang();
+
+    final Configuration commandLineConf = 
CommandLine.parseToConfiguration(args, Retain.class);
+
+    // Merge the configurations to run Driver
+    final Configuration driverConf = Configurations.merge(getDriverConf(), 
getHttpConf(), commandLineConf);
+
+    final REEF reef = tang.newInjector(runtimeConf).getInstance(REEF.class);
+    reef.submit(driverConf);
+  }
+
+  /**
+   * Main program.
+   * @param args
+   * @throws InjectionException
+   */
+  public static void main(final String[] args) throws InjectionException, 
IOException, ParseException {
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+        .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 
MAX_NUMBER_OF_EVALUATORS)
+        .build();
+    runTaskScheduler(runtimeConfiguration, args);
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private SchedulerREEF() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java
new file mode 100644
index 0000000..1ed252a
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/SchedulerREEFYarn.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.client;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+
+import static 
org.apache.reef.examples.scheduler.client.SchedulerREEF.runTaskScheduler;
+
+/**
+ * REEF TaskScheduler on YARN runtime.
+ */
+public final class SchedulerREEFYarn {
+  /**
+   * Launch the scheduler with YARN client configuration.
+   * @param args
+   * @throws InjectionException
+   * @throws java.io.IOException
+   */
+  public static void main(final String[] args)
+    throws InjectionException, IOException, ParseException {
+    final Configuration runtimeConfiguration =
+        YarnClientConfiguration.CONF.build();
+    runTaskScheduler(runtimeConfiguration, args);
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private SchedulerREEFYarn() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java
new file mode 100644
index 0000000..c11ab08
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/client/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example client classes.
+ */
+package org.apache.reef.examples.scheduler.client;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
new file mode 100644
index 0000000..6dd6f15
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/Scheduler.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.examples.library.Command;
+import org.apache.reef.examples.library.ShellTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The body of Task scheduler. It owns a task queue
+ * and tracks the record of scheduled tasks.
+ */
+@ThreadSafe
+final class Scheduler {
+  /**
+   * Tasks are waiting to be scheduled in the queue.
+   */
+  private final Queue<TaskEntity> taskQueue;
+
+  /**
+   * Lists of {@link TaskEntity} for different states - Running / Finished / 
Canceled.
+   */
+  private final List<TaskEntity> runningTasks = new ArrayList<>();
+  private final List<TaskEntity> finishedTasks = new ArrayList<>();
+  private final List<TaskEntity> canceledTasks = new ArrayList<>();
+
+  /**
+   * Counts how many tasks have been scheduled.
+   */
+  private final AtomicInteger taskCount = new AtomicInteger(0);
+
+  @Inject
+  private Scheduler() {
+    taskQueue = new LinkedBlockingQueue<>();
+  }
+
+  /**
+   * Submit a task to the ActiveContext.
+   */
+  public synchronized void submitTask(final ActiveContext context) {
+    final TaskEntity task = taskQueue.poll();
+    final Integer taskId = task.getId();
+    final String command = task.getCommand();
+
+    final Configuration taskConf = TaskConfiguration.CONF
+        .set(TaskConfiguration.TASK, ShellTask.class)
+        .set(TaskConfiguration.IDENTIFIER, taskId.toString())
+        .build();
+    final Configuration commandConf = 
Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(Command.class, command)
+        .build();
+
+    final Configuration merged = Configurations.merge(taskConf, commandConf);
+    context.submitTask(merged);
+    runningTasks.add(task);
+  }
+
+  /**
+   * Update the record of task to mark it as canceled.
+   */
+  public synchronized SchedulerResponse cancelTask(final int taskId) {
+    if (getTask(taskId, runningTasks) != null) {
+      return SchedulerResponse.forbidden("The task " + taskId + " is running");
+    } else if (getTask(taskId, finishedTasks) != null) {
+      return SchedulerResponse.forbidden("The task " + taskId + " has been 
finished");
+    }
+
+    final TaskEntity task = getTask(taskId, taskQueue);
+    if (task == null) {
+      final String message =
+          new StringBuilder().append("Task with ID ").append(taskId).append(" 
is not found").toString();
+      return SchedulerResponse.notFound(message);
+    } else {
+      taskQueue.remove(task);
+      canceledTasks.add(task);
+      return SchedulerResponse.ok("Canceled " + taskId);
+    }
+  }
+
+  /**
+   * Clear the pending list.
+   */
+  public synchronized SchedulerResponse clear() {
+    final int count = taskQueue.size();
+    for (final TaskEntity task : taskQueue) {
+      canceledTasks.add(task);
+    }
+    taskQueue.clear();
+    return SchedulerResponse.ok(count + " tasks removed.");
+  }
+
+  /**
+   * Get the list of Tasks, which are grouped by the states.
+   */
+  public synchronized SchedulerResponse getList() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("Running :");
+    for (final TaskEntity running : runningTasks) {
+      sb.append(" ").append(running.getId());
+    }
+
+    sb.append("\nWaiting :");
+    for (final TaskEntity waiting : taskQueue) {
+      sb.append(" ").append(waiting.getId());
+    }
+
+    sb.append("\nFinished :");
+    for (final TaskEntity finished : finishedTasks) {
+      sb.append(" ").append(finished.getId());
+    }
+
+    sb.append("\nCanceled :");
+    for (final TaskEntity canceled : canceledTasks) {
+      sb.append(" ").append(canceled.getId());
+    }
+    return SchedulerResponse.ok(sb.toString());
+  }
+
+  /**
+   * Get the status of a Task.
+   */
+  public synchronized SchedulerResponse getTaskStatus(final int taskId) {
+
+    for (final TaskEntity running : runningTasks) {
+      if (taskId == running.getId()) {
+        return SchedulerResponse.ok("Running : " + running.toString());
+      }
+    }
+
+    for (final TaskEntity waiting : taskQueue) {
+      if (taskId == waiting.getId()) {
+        return SchedulerResponse.ok("Waiting : " + waiting.toString());
+      }
+    }
+
+    for (final TaskEntity finished : finishedTasks) {
+      if (taskId == finished.getId()) {
+        return SchedulerResponse.ok("Finished : " + finished.toString());
+      }
+    }
+
+    for (final TaskEntity finished : canceledTasks) {
+      if (taskId == finished.getId()) {
+        return SchedulerResponse.ok("Canceled: " + finished.toString());
+      }
+    }
+    return SchedulerResponse.notFound(
+        new StringBuilder().append("Task with ID ").append(taskId).append(" is 
not found").toString());
+  }
+
+  /**
+   * Assigns a TaskId to submit.
+   */
+  public synchronized int assignTaskId() {
+    return taskCount.incrementAndGet();
+  }
+
+  /**
+   * Add a task to the queue.
+   */
+  public synchronized void addTask(final TaskEntity task) {
+    taskQueue.add(task);
+  }
+
+  /**
+   * Check whether there are tasks waiting to be submitted.
+   */
+  public synchronized boolean hasPendingTasks() {
+    return !taskQueue.isEmpty();
+  }
+
+  /**
+   * Get the number of pending tasks in the queue.
+   */
+  public synchronized int getNumPendingTasks() {
+    return taskQueue.size();
+  }
+
+  /**
+   * Update the record of task to mark it as finished.
+   */
+  public synchronized void setFinished(final int taskId) {
+    final TaskEntity task = getTask(taskId, runningTasks);
+    runningTasks.remove(task);
+    finishedTasks.add(task);
+  }
+
+  /**
+   * Iterate over the collection to find a TaskEntity with ID.
+   */
+  private TaskEntity getTask(final int taskId, final Collection<TaskEntity> 
tasks) {
+    TaskEntity result = null;
+    for (final TaskEntity task : tasks) {
+      if (taskId == task.getId()) {
+        result = task;
+        break;
+      }
+    }
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
new file mode 100644
index 0000000..64e1770
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerDriver.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.examples.scheduler.client.SchedulerREEF;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.inject.Inject;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver for TaskScheduler. It receives the commands by HttpRequest and
+ * execute them in a FIFO(First In First Out) order.
+ */
+@Unit
+public final class SchedulerDriver {
+
+  public static final ObjectSerializableCodec<String> CODEC = new 
ObjectSerializableCodec<>();
+  private static final Logger LOG = 
Logger.getLogger(SchedulerDriver.class.getName());
+
+  /**
+   * Possible states of the job driver. Can be one of:
+   * <dl>
+   * <dt><code>INIT</code></dt><dd>Initial state. Ready to request an 
evaluator.</dd>
+   * <dt><code>WAIT_EVALUATORS</code></dt><dd>Waiting for an evaluator 
allocated with no active evaluators.</dd>
+   * <dt><code>READY</code></dt><dd>Wait for the commands. Reactivated when a 
new Task arrives.</dd>
+   * <dt><code>RUNNING</code></dt><dd>Run commands in the queue. Go back to 
READY state when the queue is empty.</dd>
+   * </dl>
+   */
+  private enum State {
+    INIT, WAIT_EVALUATORS, READY, RUNNING
+  }
+
+  /**
+   * If true, it reuses evaluators when Tasks done.
+   */
+  private boolean retainable;
+
+  @GuardedBy("SchedulerDriver.this")
+  private State state = State.INIT;
+
+  @GuardedBy("SchedulerDriver.this")
+  private Scheduler scheduler;
+
+  @GuardedBy("SchedulerDriver.this")
+  private int nMaxEval = 3, nActiveEval = 0, nRequestedEval = 0;
+
+  private final EvaluatorRequestor requestor;
+
+  @Inject
+  private SchedulerDriver(final EvaluatorRequestor requestor,
+                          @Parameter(SchedulerREEF.Retain.class) final boolean 
retainable,
+                          final Scheduler scheduler) {
+    this.requestor = requestor;
+    this.scheduler = scheduler;
+    this.retainable = retainable;
+  }
+
+  /**
+   * The driver is ready to run.
+   */
+  public final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.INFO, "Driver started at {0}", startTime);
+      assert (state == State.INIT);
+      state = State.WAIT_EVALUATORS;
+
+      requestEvaluator(1); // Allocate an initial evaluator to avoid idle 
state.
+    }
+  }
+
+  /**
+   * Evaluator is allocated. This occurs every time to run commands in 
Non-retainable version,
+   * while occurs only once in the Retainable version
+   */
+  public final class EvaluatorAllocatedHandler implements 
EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator evaluator) {
+      LOG.log(Level.INFO, "Evaluator is ready");
+      synchronized (SchedulerDriver.this) {
+        nActiveEval++;
+        nRequestedEval--;
+      }
+
+      evaluator.submitContext(ContextConfiguration.CONF
+          .set(ContextConfiguration.IDENTIFIER, "SchedulerContext")
+          .build());
+    }
+  }
+
+  /**
+   * Now it is ready to schedule tasks. But if the queue is empty,
+   * wait until commands coming up.
+   *
+   * If there is no pending task, having more than 1 evaluators must be 
redundant.
+   * It may happen, for example, when tasks are canceled during allocation.
+   * In these cases, the new evaluator may be abandoned.
+   */
+  public final class ActiveContextHandler implements 
EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) {
+      synchronized (SchedulerDriver.this) {
+        LOG.log(Level.INFO, "Context available : {0}", context.getId());
+
+        if (scheduler.hasPendingTasks()) {
+          state = State.RUNNING;
+          scheduler.submitTask(context);
+        } else if (nActiveEval > 1) {
+          nActiveEval--;
+          context.close();
+        } else {
+          state = State.READY;
+          waitForCommands(context);
+        }
+      }
+    }
+  }
+
+  /**
+   * When a Task completes, the task is marked as finished.
+   * The evaluator is reused for the next Task if retainable is set to {@code 
true}.
+   * Otherwise the evaluator is released.
+   */
+  public final class CompletedTaskHandler implements 
EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      final int taskId = Integer.valueOf(task.getId());
+
+      synchronized (SchedulerDriver.this) {
+        scheduler.setFinished(taskId);
+
+        LOG.log(Level.INFO, "Task completed. Reuse the evaluator : {0}", 
String.valueOf(retainable));
+        final ActiveContext context = task.getActiveContext();
+
+        if (retainable) {
+          retainEvaluator(context);
+        } else {
+          reallocateEvaluator(context);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get the list of tasks in the scheduler.
+   */
+  public synchronized SchedulerResponse getList() {
+    return scheduler.getList();
+  }
+
+  /**
+   * Clear all the Tasks from the waiting queue.
+   */
+  public synchronized SchedulerResponse clearList() {
+    return scheduler.clear();
+  }
+
+  /**
+   * Get the status of a task.
+   */
+  public SchedulerResponse getTaskStatus(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : only one ID at a time");
+    }
+
+    final Integer taskId = Integer.valueOf(args.get(0));
+
+    synchronized (SchedulerDriver.this) {
+      return scheduler.getTaskStatus(taskId);
+    }
+  }
+
+  /**
+   * Cancel a Task waiting on the queue. A task cannot be canceled
+   * once it is running.
+   */
+  public SchedulerResponse cancelTask(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : only one ID at a time");
+    }
+
+    final Integer taskId = Integer.valueOf(args.get(0));
+
+    synchronized (SchedulerDriver.this) {
+      return scheduler.cancelTask(taskId);
+    }
+  }
+
+  /**
+   * Submit a command to schedule.
+   */
+  public SchedulerResponse submitCommands(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : only one command at a 
time");
+    }
+
+    final String command = args.get(0);
+    final Integer id;
+
+    synchronized (SchedulerDriver.this) {
+      id = scheduler.assignTaskId();
+      scheduler.addTask(new TaskEntity(id, command));
+
+      if (state == State.READY) {
+        SchedulerDriver.this.notify(); // Wake up at {waitForCommands}
+      } else if (state == State.RUNNING && nMaxEval > nActiveEval + 
nRequestedEval) {
+        requestEvaluator(1);
+      }
+    }
+    return SchedulerResponse.ok("Task ID : " + id);
+  }
+
+  /**
+   * Update the maximum number of evaluators to hold.
+   * Request more evaluators in case there are pending tasks
+   * in the queue and the number of evaluators is less than the limit.
+   */
+  public SchedulerResponse setMaxEvaluators(final List<String> args) {
+    if (args.size() != 1) {
+      return SchedulerResponse.badRequest("Usage : Only one value can be 
used");
+    }
+
+    final int nTarget = Integer.valueOf(args.get(0));
+
+    synchronized (SchedulerDriver.this) {
+      if (nTarget < nActiveEval + nRequestedEval) {
+        return SchedulerResponse.forbidden(nActiveEval + nRequestedEval +
+            " evaluators are used now. Should be larger than that.");
+      }
+      nMaxEval = nTarget;
+
+      if (scheduler.hasPendingTasks()) {
+        final int nToRequest =
+            Math.min(scheduler.getNumPendingTasks(), nMaxEval - nActiveEval) - 
nRequestedEval;
+        requestEvaluator(nToRequest);
+      }
+      return SchedulerResponse.ok("You can use evaluators up to " + nMaxEval + 
" evaluators.");
+    }
+  }
+
+  /**
+   * Request evaluators. Passing a non positive number is illegal,
+   * so it does not make a trial for that situation.
+   */
+  private void requestEvaluator(final int numToRequest) {
+    if (numToRequest <= 0) {
+      throw new IllegalArgumentException("The number of evaluator request 
should be a positive integer");
+    }
+
+    synchronized (SchedulerDriver.this) {
+      nRequestedEval += numToRequest;
+      requestor.submit(EvaluatorRequest.newBuilder()
+          .setMemory(32)
+          .setNumber(numToRequest)
+          .build());
+    }
+  }
+
+  /**
+   * Pick up a command from the queue and run it. Wait until
+   * any command coming up if no command exists.
+   */
+  private void waitForCommands(final ActiveContext context) {
+    synchronized (SchedulerDriver.this) {
+      while (!scheduler.hasPendingTasks()) {
+        // Wait until any command enters in the queue
+        try {
+          SchedulerDriver.this.wait();
+        } catch (final InterruptedException e) {
+          LOG.log(Level.WARNING, "InterruptedException occurred in 
SchedulerDriver", e);
+        }
+      }
+      // When wakes up, run the first command from the queue.
+      state = State.RUNNING;
+      scheduler.submitTask(context);
+    }
+  }
+
+  /**
+   * Retain the complete evaluators submitting another task
+   * until there is no need to reuse them.
+   */
+  private synchronized void retainEvaluator(final ActiveContext context) {
+    if (scheduler.hasPendingTasks()) {
+      scheduler.submitTask(context);
+    } else if (nActiveEval > 1) {
+      nActiveEval--;
+      context.close();
+    } else {
+      state = State.READY;
+      waitForCommands(context);
+    }
+  }
+
+  /**
+   * Always close the complete evaluators and
+   * allocate a new evaluator if necessary.
+   */
+  private synchronized void reallocateEvaluator(final ActiveContext context) {
+    nActiveEval--;
+    context.close();
+
+    if (scheduler.hasPendingTasks()) {
+      requestEvaluator(1);
+    } else if (nActiveEval <= 0) {
+      state = State.WAIT_EVALUATORS;
+      requestEvaluator(1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
new file mode 100644
index 0000000..7d05e9b
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerHttpHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.webserver.HttpHandler;
+import org.apache.reef.webserver.ParsedHttpRequest;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Receive HttpRequest so that it can handle the command list.
+ */
+public final class SchedulerHttpHandler implements HttpHandler {
+  private final InjectionFuture<SchedulerDriver> schedulerDriver;
+
+  private String uriSpecification = "reef-example-scheduler";
+
+  @Inject
+  private SchedulerHttpHandler(final InjectionFuture<SchedulerDriver> 
schedulerDriver) {
+    this.schedulerDriver = schedulerDriver;
+  }
+
+  @Override
+  public String getUriSpecification() {
+    return uriSpecification;
+  }
+
+  @Override
+  public void setUriSpecification(final String s) {
+    uriSpecification = s;
+  }
+
+  /**
+   * HttpRequest handler. You must specify UriSpecification and REST API 
version.
+   * The request url is http://{address}:{port}/reef-example-scheduler/v1
+   *
+   * APIs
+   *   /list                to get the status list for all tasks
+   *   /status?id={id}      to query the status of such a task, given id
+   *   /submit?cmd={cmd}    to submit a Task, which returns its id
+   *   /cancel?id={id}      to cancel the task's execution
+   *   /max-eval?num={num}  to set the maximum number of evaluators
+   *   /clear               to clear the waiting queue
+   */
+  @Override
+  public void onHttpRequest(final ParsedHttpRequest request, final 
HttpServletResponse response)
+    throws IOException, ServletException {
+    final String target = request.getTargetEntity().toLowerCase();
+    final Map<String, List<String>> queryMap = request.getQueryMap();
+
+    final SchedulerResponse result;
+    switch (target) {
+    case "list":
+      result = schedulerDriver.get().getList();
+      break;
+    case "clear":
+      result = schedulerDriver.get().clearList();
+      break;
+    case "status":
+      result = schedulerDriver.get().getTaskStatus(queryMap.get("id"));
+      break;
+    case "submit":
+      result = schedulerDriver.get().submitCommands(queryMap.get("cmd"));
+      break;
+    case "cancel":
+      result = schedulerDriver.get().cancelTask(queryMap.get("id"));
+      break;
+    case "max-eval":
+      result = schedulerDriver.get().setMaxEvaluators(queryMap.get("num"));
+      break;
+    default:
+      result = SchedulerResponse.notFound("Unsupported operation");
+    }
+
+    // Send response to the http client
+    final int status = result.getStatus();
+    final String message= result.getMessage();
+
+    if (result.isOK()) {
+      response.getOutputStream().println(message);
+    } else {
+      response.sendError(status, message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
new file mode 100644
index 0000000..e3d1272
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/SchedulerResponse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver;
+
+/**
+ * This class specifies the response from the Scheduler.
+ * It includes the status code and message.
+ */
+final class SchedulerResponse {
+  /**
+   * 200 OK : The request succeeded normally.
+   */
+  private static final int SC_OK = 200;
+
+  /**
+   * 400 BAD REQUEST : The request is syntactically incorrect.
+   */
+  private static final int SC_BAD_REQUEST = 400;
+
+  /**
+   * 403 FORBIDDEN : Syntactically okay but refused to process.
+   */
+  private static final int SC_FORBIDDEN = 403;
+
+  /**
+   * 404 NOT FOUND :  The resource is not available.
+   */
+  private static final int SC_NOT_FOUND = 404;
+
+  /**
+   * Create a response with OK status.
+   */
+  public static SchedulerResponse ok(final String message){
+    return new SchedulerResponse(SC_OK, message);
+  }
+
+  /**
+   * Create a response with BAD_REQUEST status.
+   */
+  public static SchedulerResponse badRequest(final String message){
+    return new SchedulerResponse(SC_BAD_REQUEST, message);
+  }
+
+  /**
+   * Create a response with FORBIDDEN status.
+   */
+  public static SchedulerResponse forbidden(final String message){
+    return new SchedulerResponse(SC_FORBIDDEN, message);
+  }
+
+  /**
+   * Create a response with NOT FOUND status.
+   */
+  public static SchedulerResponse notFound(final String message){
+    return new SchedulerResponse(SC_NOT_FOUND, message);
+  }
+
+  /**
+   * Return {@code true} if the response is OK.
+   */
+  public boolean isOK(){
+    return this.status == SC_OK;
+  }
+
+  /**
+   * Status code of the request based on RFC 2068.
+   */
+  private int status;
+
+  /**
+   * Message to send.
+   */
+  private String message;
+
+  /**
+   * Constructor using status code and message.
+   * @param status
+   * @param message
+   */
+  private SchedulerResponse(final int status, final String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  /**
+   * Return the status code of this response.
+   */
+  int getStatus() {
+    return status;
+  }
+
+  /**
+   * Return the message of this response.
+   */
+  String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java
new file mode 100644
index 0000000..bb29228
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/TaskEntity.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler.driver;
+
+/**
+ * TaskEntity represent a single entry of task queue used in
+ * scheduler. Since REEF already has the class named {Task},
+ * a different name is used for this class.
+ */
+final class TaskEntity {
+  private final int taskId;
+  private final String command;
+
+  public TaskEntity(final int taskId, final String command) {
+    this.taskId = taskId;
+    this.command = command;
+  }
+
+  /**
+   * Return the TaskID assigned to this Task.
+   */
+  int getId() {
+    return taskId;
+  }
+
+  String getCommand() {
+    return command;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    final TaskEntity that = (TaskEntity) o;
+
+    if (taskId != that.taskId) {
+      return false;
+    }
+    if (!command.equals(that.command)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = taskId;
+    result = 31 * result + command.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("<Id=").append(taskId).
+      append(", Command=").append(command).append(">").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/531ddd9f/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java
new file mode 100644
index 0000000..6d731b4
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/driver/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example driver classes.
+ */
+package org.apache.reef.examples.scheduler.driver;

Reply via email to