Repository: reef
Updated Branches:
  refs/heads/master 51b76fc72 -> 72ecec743


http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java
new file mode 100644
index 0000000..6ad6b35
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+
+/**
+ * mock completed task.
+ */
+@Unstable
+@Private
+public final class MockCompletedTask implements CompletedTask {
+
+  private final MockRunningTask task;
+
+  private final byte[] returnValue;
+
+  public MockCompletedTask(final MockRunningTask task, final byte[] 
returnValue) {
+    this.task = task;
+    this.returnValue = returnValue;
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.task.getActiveContext();
+  }
+
+  @Override
+  public String getId() {
+    return this.task.getId();
+  }
+
+  @Override
+  public byte[] get() {
+    return this.returnValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java
new file mode 100644
index 0000000..2d98af0
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+
+/**
+ * mock evaluator descriptor.
+ */
+@Unstable
+@Private
+public final class MockEvaluatorDescriptor implements EvaluatorDescriptor {
+  private final NodeDescriptor nodeDescriptor;
+
+  MockEvaluatorDescriptor(final NodeDescriptor nodeDescriptor) {
+    this.nodeDescriptor = nodeDescriptor;
+  }
+
+  @Override
+  public NodeDescriptor getNodeDescriptor() {
+    return this.nodeDescriptor;
+  }
+
+  @Override
+  public EvaluatorProcess getProcess() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getMemory() {
+    return 0;
+  }
+
+  @Override
+  public int getNumberOfCores() {
+    return 1;
+  }
+
+  @Override
+  public String getRuntimeName() {
+    return "mock";
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java
new file mode 100644
index 0000000..7f90039
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.mock.request.AllocateEvaluator;
+import org.apache.reef.tang.InjectionFuture;
+
+import javax.inject.Inject;
+import java.util.UUID;
+
+/**
+ * mock evaluator requestor.
+ */
+@Unstable
+@Private
+public final class MockEvaluatorRequestor implements EvaluatorRequestor {
+
+  private final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver;
+
+  private final InjectionFuture<MockClock> clock;
+
+  @Inject
+  MockEvaluatorRequestor(
+      final InjectionFuture<MockClock> clock,
+      final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver) {
+    this.clock = clock;
+    this.mockRuntimeDriver = mockRuntimeDriver;
+  }
+
+  @Override
+  public void submit(final EvaluatorRequest req) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock closed");
+    }
+    final NodeDescriptor nodeDescriptor = new MockNodeDescriptor();
+    final MockEvaluatorDescriptor evaluatorDescriptor = new 
MockEvaluatorDescriptor(nodeDescriptor);
+    for (int i = 0; i < req.getNumber(); i++) {
+      final MockAllocatedEvalautor mockEvaluator = new MockAllocatedEvalautor(
+          this.mockRuntimeDriver.get(), UUID.randomUUID().toString(), 
evaluatorDescriptor);
+      this.mockRuntimeDriver.get().add(new AllocateEvaluator(mockEvaluator));
+    }
+  }
+
+  @Override
+  public Builder newRequest() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock closed");
+    }
+    return new Builder();
+  }
+
+
+  /**
+   * {@link EvaluatorRequest.Builder} extended with a new submit method.
+   * {@link EvaluatorRequest}s are built using this builder.
+   */
+  private final class Builder extends EvaluatorRequest.Builder<Builder> {
+    @Override
+    public void submit() {
+      MockEvaluatorRequestor.this.submit(this.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java
new file mode 100644
index 0000000..55eafe1
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock failed context.
+ */
+@Unstable
+@Private
+public final class MockFailedContext implements FailedContext {
+
+  private final MockActiveContext context;
+
+  public MockFailedContext(final MockActiveContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public Optional<ActiveContext> getParentContext() {
+    return this.context.getParentContext().isPresent() ?
+        Optional.of((ActiveContext)this.context.getParentContext().get()) :
+        Optional.<ActiveContext>empty();
+  }
+
+  @Override
+  public String getMessage() {
+    return "mock";
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<Throwable> getReason() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<byte[]> getData() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Throwable asError() {
+    return new Exception("mock");
+  }
+
+  @Override
+  public String getEvaluatorId() {
+    return this.context.getEvaluatorId();
+  }
+
+  @Override
+  public Optional<String> getParentId() {
+    return this.context.getParentId();
+  }
+
+  @Override
+  public EvaluatorDescriptor getEvaluatorDescriptor() {
+    return this.context.getEvaluatorDescriptor();
+  }
+
+  @Override
+  public String getId() {
+    return this.context.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java
new file mode 100644
index 0000000..d9c0c3c
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.util.Optional;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * mock failed evaluator.
+ */
+@Unstable
+@Private
+public final class MockFailedEvaluator implements FailedEvaluator {
+
+  private final String evaluatorID;
+
+  private final List<FailedContext> failedContextList;
+
+  private final Optional<FailedTask> failedTask;
+
+  public MockFailedEvaluator(
+      final String evaluatorID,
+      final List<FailedContext> failedContextList,
+      final Optional<FailedTask> failedTask) {
+    this.evaluatorID = evaluatorID;
+    this.failedContextList = failedContextList;
+    this.failedTask = failedTask;
+  }
+
+  public MockFailedEvaluator(final String evaluatorID) {
+    this.evaluatorID = evaluatorID;
+    this.failedContextList = new ArrayList<>();
+    this.failedTask = Optional.empty();
+  }
+
+  @Override
+  public EvaluatorException getEvaluatorException() {
+    return null;
+  }
+
+  @Override
+  public List<FailedContext> getFailedContextList() {
+    return this.failedContextList;
+  }
+
+  @Override
+  public Optional<FailedTask> getFailedTask() {
+    return this.failedTask;
+  }
+
+  @Override
+  public String getId() {
+    return this.evaluatorID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java
new file mode 100644
index 0000000..be04994
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * mock node descriptor.
+ */
+@Unstable
+@Private
+public final class MockNodeDescriptor implements NodeDescriptor {
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RackDescriptor getRackDescriptor() {
+    return new RackDescriptor() {
+      @Override
+      public List<NodeDescriptor> getNodes() {
+        final List<NodeDescriptor> nodes = new ArrayList<>();
+        nodes.add(MockNodeDescriptor.this);
+        return nodes;
+      }
+
+      @Override
+      public String getName() {
+        return "mock";
+      }
+    };
+  }
+
+  @Override
+  public String getName() {
+    return "mock";
+  }
+
+  @Override
+  public String getId() {
+    return "mock";
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java
new file mode 100644
index 0000000..bec26f4
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.request.CloseTask;
+import org.apache.reef.mock.request.SendMessageDriverToTask;
+import org.apache.reef.mock.request.SuspendTask;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock running task.
+ */
+@Unstable
+@Private
+public final class MockRunningTask implements RunningTask {
+
+  private final MockRuntimeDriver mockRuntimeDriver;
+
+  private final String taskID;
+
+  private final ActiveContext context;
+
+  MockRunningTask(
+      final MockRuntimeDriver mockRuntimeDriver,
+      final String taskID,
+      final ActiveContext context) {
+    this.mockRuntimeDriver = mockRuntimeDriver;
+    this.taskID = taskID;
+    this.context = context;
+  }
+
+  public String evaluatorID() {
+    return this.context.getEvaluatorId();
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.context;
+  }
+
+  @Override
+  public void send(final byte[] message) {
+    this.mockRuntimeDriver.add(new SendMessageDriverToTask(this, message));
+  }
+
+  @Override
+  public void suspend(final byte[] message) {
+    this.mockRuntimeDriver.add(new SuspendTask(this, Optional.of(message)));
+  }
+
+  @Override
+  public void suspend() {
+    this.mockRuntimeDriver.add(new SuspendTask(this, 
Optional.<byte[]>empty()));
+  }
+
+  @Override
+  public void close(final byte[] message) {
+    this.mockRuntimeDriver.add(new CloseTask(this, 
this.mockRuntimeDriver.getTaskReturnValueProvider()));
+  }
+
+  @Override
+  public void close() {
+    this.mockRuntimeDriver.add(new CloseTask(this, 
this.mockRuntimeDriver.getTaskReturnValueProvider()));
+  }
+
+  @Override
+  public TaskRepresenter getTaskRepresenter() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getId() {
+    return this.taskID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java
new file mode 100644
index 0000000..0b89e61
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.mock.MockRuntime;
+import org.apache.reef.mock.MockTaskReturnValueProvider;
+import org.apache.reef.mock.ProcessRequest;
+import org.apache.reef.mock.request.*;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.*;
+
+/**
+ * mock runtime driver.
+ */
+@Unstable
+@Private
+public final class MockRuntimeDriver implements MockRuntime {
+
+  private final InjectionFuture<MockClock> clock;
+
+  private final List<ProcessRequest> processRequestQueue = new ArrayList<>();
+
+  private final Set<EventHandler<StartTime>> driverStartHandlers;
+
+  private final Set<EventHandler<StopTime>> driverStopHandlers;
+
+  private final Set<EventHandler<AllocatedEvaluator>> 
allocatedEvaluatorHandlers;
+
+  private final Set<EventHandler<CompletedEvaluator>> 
completedEvaluatorHandlers;
+
+  private final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers;
+
+  private final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers;
+
+  private final Set<EventHandler<FailedTask>> taskFailedHandlers;
+
+  private final Set<EventHandler<TaskMessage>> taskMessageHandlers;
+
+  private final Set<EventHandler<CompletedTask>> taskCompletedHandlers;
+
+  private final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers;
+
+  private final Set<EventHandler<ActiveContext>> contextActiveHandlers;
+
+  private final Set<EventHandler<CloseContext>> contextClosedHandlers;
+
+  private final Set<EventHandler<ContextMessage>> contextMessageHandlers;
+
+  private final Set<EventHandler<FailedContext>> contextFailedHandlers;
+
+  private final Map<String, MockAllocatedEvalautor> allocatedEvaluatorMap = 
new HashMap<>();
+
+  private final Map<String, List<MockActiveContext>> allocatedContextsMap = 
new HashMap<>();
+
+  private final Map<String, MockRunningTask> runningTasks = new HashMap<>();
+
+  private final MockTaskReturnValueProvider taskReturnValueProvider;
+
+  @Inject
+  MockRuntimeDriver(
+      final InjectionFuture<MockClock> clock,
+      final MockTaskReturnValueProvider taskReturnValueProvider,
+      @Parameter(DriverStartHandler.class) final Set<EventHandler<StartTime>> 
driverStartHandlers,
+      @Parameter(Clock.StopHandler.class) final Set<EventHandler<StopTime>> 
driverStopHandlers,
+      @Parameter(EvaluatorAllocatedHandlers.class) final 
Set<EventHandler<AllocatedEvaluator>>
+          allocatedEvaluatorHandlers,
+      @Parameter(EvaluatorCompletedHandlers.class) final 
Set<EventHandler<CompletedEvaluator>>
+          completedEvaluatorHandlers,
+      @Parameter(EvaluatorFailedHandlers.class) final 
Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers,
+      @Parameter(TaskRunningHandlers.class) final 
Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers,
+      @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> 
taskFailedHandlers,
+      @Parameter(TaskMessageHandlers.class) final 
Set<EventHandler<TaskMessage>> taskMessageHandlers,
+      @Parameter(TaskCompletedHandlers.class) final 
Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+      @Parameter(TaskSuspendedHandlers.class) final 
Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+      @Parameter(ContextActiveHandlers.class) final 
Set<EventHandler<ActiveContext>> contextActiveHandlers,
+      @Parameter(ContextClosedHandlers.class) final 
Set<EventHandler<CloseContext>> contextClosedHandlers,
+      @Parameter(ContextMessageHandlers.class) final 
Set<EventHandler<ContextMessage>> contextMessageHandlers,
+      @Parameter(ContextFailedHandlers.class) final 
Set<EventHandler<FailedContext>> contextFailedHandlers) {
+    this.clock = clock;
+    this.taskReturnValueProvider = taskReturnValueProvider;
+    this.driverStartHandlers = driverStartHandlers;
+    this.driverStopHandlers = driverStopHandlers;
+    this.allocatedEvaluatorHandlers = allocatedEvaluatorHandlers;
+    this.completedEvaluatorHandlers = completedEvaluatorHandlers;
+    this.failedEvaluatorHandlers = failedEvaluatorHandlers;
+    this.taskRunningHandlers = taskRunningHandlers;
+    this.taskFailedHandlers = taskFailedHandlers;
+    this.taskMessageHandlers = taskMessageHandlers;
+    this.taskCompletedHandlers = taskCompletedHandlers;
+    this.taskSuspendedHandlers = taskSuspendedHandlers;
+    this.contextActiveHandlers = contextActiveHandlers;
+    this.contextClosedHandlers = contextClosedHandlers;
+    this.contextMessageHandlers = contextMessageHandlers;
+    this.contextFailedHandlers = contextFailedHandlers;
+  }
+
+  @Override
+  public Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    return new 
ArrayList<AllocatedEvaluator>(this.allocatedEvaluatorMap.values());
+  }
+
+  @Override
+  public void fail(final AllocatedEvaluator evaluator) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    if (this.allocatedEvaluatorMap.containsKey(evaluator.getId())) {
+      FailedTask failedTask =  null;
+      if (this.runningTasks.containsKey(evaluator.getId())) {
+        final RunningTask task = this.runningTasks.remove(evaluator.getId());
+        failedTask = new FailedTask(
+            task.getId(),
+            "mock",
+            Optional.<String>empty(),
+            Optional.<Throwable>empty(),
+            Optional.<byte[]>empty(),
+            Optional.<ActiveContext>of(task.getActiveContext()));
+      }
+      final List<FailedContext> failedContexts = new ArrayList<>();
+      for (final MockActiveContext context : 
this.allocatedContextsMap.get(evaluator.getId())) {
+        failedContexts.add(new MockFailedContext(context));
+      }
+      this.allocatedContextsMap.remove(evaluator.getId());
+
+      post(this.failedEvaluatorHandlers, new MockFailedEvaluator(
+          evaluator.getId(), failedContexts,
+          failedTask == null ? Optional.<FailedTask>empty() : 
Optional.of(failedTask)));
+    } else {
+      throw new IllegalStateException("unknown evaluator " + evaluator);
+    }
+  }
+
+  @Override
+  public Collection<ActiveContext> getCurrentActiveContexts() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final List<ActiveContext> currentActiveContexts = new ArrayList<>();
+    for (final List<MockActiveContext> contexts : 
this.allocatedContextsMap.values()) {
+      currentActiveContexts.addAll(contexts);
+    }
+    return currentActiveContexts;
+  }
+
+  @Override
+  public void fail(final ActiveContext context) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final MockAllocatedEvalautor evaluator = ((MockActiveContext) 
context).getEvaluator();
+    post(this.contextFailedHandlers, new MockFailedContext((MockActiveContext) 
context));
+    if (!((MockActiveContext) context).getParentContext().isPresent()) {
+      // root context failure shuts evalautor down
+      fail(evaluator);
+    } else {
+      this.allocatedContextsMap.get(evaluator.getId()).remove(context);
+    }
+  }
+
+  @Override
+  public Collection<RunningTask> getCurrentRunningTasks() {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    return new ArrayList<RunningTask>(this.runningTasks.values());
+  }
+
+  @Override
+  public void fail(final RunningTask task) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final String evaluatorID = task.getActiveContext().getEvaluatorId();
+    if (this.runningTasks.containsKey(evaluatorID) &&
+        this.runningTasks.get(evaluatorID).equals(task)) {
+      this.runningTasks.remove(evaluatorID);
+      post(taskFailedHandlers, new FailedTask(
+          task.getId(),
+          "mock",
+          Optional.<String>empty(),
+          Optional.<Throwable>empty(),
+          Optional.<byte[]>empty(),
+          Optional.of(task.getActiveContext())));
+    } else {
+      throw new IllegalStateException("unknown running task " + task);
+    }
+  }
+
+  @Override
+  public void start() {
+    post(this.driverStartHandlers, new 
StartTime(this.clock.get().getCurrentTime()));
+  }
+
+  @Override
+  public void stop() {
+    post(this.driverStopHandlers, new 
StopTime(this.clock.get().getCurrentTime()));
+  }
+
+  @Override
+  public boolean hasProcessRequest() {
+    return this.processRequestQueue.size() > 0;
+  }
+
+  @Override
+  public ProcessRequest getNextProcessRequest() {
+    if (this.processRequestQueue.size() > 0) {
+      return this.processRequestQueue.remove(0);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void succeed(final ProcessRequest pr) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final ProcessRequestInternal request = (ProcessRequestInternal) pr;
+    switch (request.getType()) {
+    case ALLOCATE_EVALUATOR:
+      final MockAllocatedEvalautor allocatedEvalautor = 
((AllocateEvaluator)request).getSuccessEvent();
+      validateAndCreate(allocatedEvalautor);
+      post(this.allocatedEvaluatorHandlers, allocatedEvalautor);
+      post(this.contextActiveHandlers, allocatedEvalautor.getRootContext());
+      break;
+    case CLOSE_EVALUATOR:
+      final CompletedEvaluator closedEvaluator = 
((CloseEvaluator)request).getSuccessEvent();
+      validateAndClose(closedEvaluator);
+      post(this.completedEvaluatorHandlers, closedEvaluator);
+      break;
+    case CREATE_CONTEXT:
+      final MockActiveContext createContext = ((CreateContext) 
request).getSuccessEvent();
+      validateAndCreate(createContext);
+      post(this.contextActiveHandlers, createContext);
+      break;
+    case CLOSE_CONTEXT:
+      final MockClosedContext closeContext = ((CloseContext) 
request).getSuccessEvent();
+      validateAndClose(closeContext);
+      post(this.contextClosedHandlers, closeContext);
+      break;
+    case CREATE_TASK:
+      final MockRunningTask createTask = 
((CreateTask)request).getSuccessEvent();
+      validateAndCreate(createTask);
+      post(this.taskRunningHandlers, request.getSuccessEvent());
+      break;
+    case SUSPEND_TASK:
+      final MockRunningTask suspendedTask = ((SuspendTask)request).getTask();
+      validateAndClose(suspendedTask);
+      post(this.taskSuspendedHandlers, request.getSuccessEvent());
+      break;
+    case CLOSE_TASK:
+    case COMPLETE_TASK:
+      final MockRunningTask completedTask = ((CompleteTask)request).getTask();
+      validateAndClose(completedTask);
+      post(this.taskCompletedHandlers, request.getSuccessEvent());
+      break;
+    case CREATE_CONTEXT_AND_TASK:
+      final CreateContextAndTask createContextTask = (CreateContextAndTask) 
request;
+      final Tuple<MockActiveContext, MockRunningTask> events = 
createContextTask.getSuccessEvent();
+      validateAndCreate(events.getKey());
+      post(this.contextActiveHandlers, events.getKey());
+      validateAndCreate(events.getValue());
+      post(this.taskRunningHandlers, events.getValue());
+      break;
+    case SEND_MESSAGE_DRIVER_TO_TASK:
+      // ignore
+      break;
+    case SEND_MESSAGE_DRIVER_TO_CONTEXT:
+      // ignore
+      break;
+    default:
+      throw new IllegalStateException("unknown type");
+    }
+
+    if (request.doAutoComplete()) {
+      add(request.getCompletionProcessRequest());
+    } else if (!this.clock.get().isClosed() && isIdle()) {
+      this.clock.get().close();
+    }
+  }
+
+  @Override
+  public void fail(final ProcessRequest pr) {
+    if (this.clock.get().isClosed()) {
+      throw new IllegalStateException("clock is closed");
+    }
+    final ProcessRequestInternal request = (ProcessRequestInternal) pr;
+    switch (request.getType()) {
+    case ALLOCATE_EVALUATOR:
+      post(this.failedEvaluatorHandlers, request.getFailureEvent());
+      break;
+    case CLOSE_EVALUATOR:
+      final CompletedEvaluator evaluator = 
((CloseEvaluator)request).getSuccessEvent();
+      validateAndClose(evaluator);
+      post(this.failedEvaluatorHandlers, request.getFailureEvent());
+      break;
+    case CREATE_CONTEXT:
+      post(this.contextFailedHandlers, request.getFailureEvent());
+      break;
+    case CLOSE_CONTEXT:
+      final MockClosedContext context = 
((CloseContext)request).getSuccessEvent();
+      validateAndClose(context);
+      if (context.getParentContext() == null) {
+        add(new CloseEvaluator(context.getMockActiveContext().getEvaluator()));
+      }
+      post(this.contextFailedHandlers, request.getFailureEvent());
+      break;
+    case CREATE_TASK:
+      post(this.taskFailedHandlers, request.getFailureEvent());
+      break;
+    case SUSPEND_TASK:
+      validateAndClose(((SuspendTask)request).getTask());
+      post(this.taskFailedHandlers, request.getFailureEvent());
+      break;
+    case CLOSE_TASK:
+    case COMPLETE_TASK:
+      validateAndClose(((CloseTask)request).getTask());
+      post(this.taskFailedHandlers, request.getFailureEvent());
+      break;
+    case CREATE_CONTEXT_AND_TASK:
+      final CreateContextAndTask createContextTask = (CreateContextAndTask) 
request;
+      final Tuple<MockFailedContext, FailedTask> events = 
createContextTask.getFailureEvent();
+      post(this.taskFailedHandlers, events.getValue());
+      post(this.contextFailedHandlers, events.getKey());
+      break;
+    case SEND_MESSAGE_DRIVER_TO_TASK:
+      // ignore
+      break;
+    case SEND_MESSAGE_DRIVER_TO_CONTEXT:
+      // ignore
+      break;
+    default:
+      throw new IllegalStateException("unknown type");
+    }
+
+    if (!this.clock.get().isClosed() && isIdle()) {
+      this.clock.get().close();
+    }
+  }
+
+  MockTaskReturnValueProvider getTaskReturnValueProvider() {
+    return this.taskReturnValueProvider;
+  }
+  /**
+   * Used by mock REEF entities (e.g., AllocatedEvaluator, RunningTask) to 
inject
+   * process requests from initiated actions e.g., RunningTask.close().
+   * @param request to inject
+   */
+  void add(final ProcessRequest request) {
+    this.processRequestQueue.add(request);
+  }
+
+  private boolean isIdle() {
+    return this.clock.get().isIdle() &&
+        this.processRequestQueue.isEmpty() &&
+        this.allocatedEvaluatorMap.isEmpty();
+  }
+
+  private <T> void post(final Set<EventHandler<T>> handlers, final Object 
event) {
+    for (final EventHandler<T> handler : handlers) {
+      handler.onNext((T) event);
+    }
+  }
+
+  private void validateAndCreate(final MockActiveContext context) {
+    if (!this.allocatedEvaluatorMap.containsKey(context.getEvaluatorId())) {
+      throw new IllegalStateException("unknown evaluator id " + 
context.getEvaluatorId());
+    } else if 
(!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
+      this.allocatedContextsMap.put(context.getEvaluatorId(), new 
ArrayList<MockActiveContext>());
+    }
+    this.allocatedContextsMap.get(context.getEvaluatorId()).add(context);
+  }
+
+  private void validateAndClose(final MockClosedContext context) {
+    if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
+      throw new IllegalStateException("unknown evaluator id " + 
context.getEvaluatorId());
+    }
+    final List<MockActiveContext> contexts = 
this.allocatedContextsMap.get(context.getEvaluatorId());
+    if (!contexts.get(contexts.size() - 
1).equals(context.getMockActiveContext())) {
+      throw new IllegalStateException("closing context that is not on the top 
of the stack");
+    }
+    contexts.remove(context.getMockActiveContext());
+  }
+
+  private void validateAndCreate(final MockRunningTask task) {
+    if (this.runningTasks.containsKey(task.evaluatorID())) {
+      throw new IllegalStateException("task already running on evaluator " +
+          task.evaluatorID());
+    }
+    this.runningTasks.put(task.evaluatorID(), task);
+  }
+
+  private void validateAndClose(final MockRunningTask task) {
+    if 
(!this.runningTasks.containsKey(task.getActiveContext().getEvaluatorId())) {
+      throw new IllegalStateException("no task running on evaluator");
+    }
+    this.runningTasks.remove(task.getActiveContext().getEvaluatorId());
+  }
+
+  private void validateAndCreate(final MockAllocatedEvalautor evalutor) {
+    if (this.allocatedEvaluatorMap.containsKey(evalutor.getId())) {
+      throw new IllegalStateException("evaluator id " + evalutor.getId() + " 
already exists");
+    }
+    this.allocatedEvaluatorMap.put(evalutor.getId(), evalutor);
+    this.allocatedContextsMap.put(evalutor.getId(), new 
ArrayList<MockActiveContext>());
+    
this.allocatedContextsMap.get(evalutor.getId()).add(evalutor.getRootContext());
+  }
+
+  private void validateAndClose(final CompletedEvaluator evalautor) {
+    if (!this.allocatedEvaluatorMap.containsKey(evalautor.getId())) {
+      throw new IllegalStateException("unknown evaluator id " + 
evalautor.getId());
+    }
+    this.allocatedEvaluatorMap.remove(evalautor.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java
new file mode 100644
index 0000000..84569ff
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.SuspendedTask;
+
+/**
+ * mock suspended task.
+ */
+@Unstable
+@Private
+public final class MockSuspendedTask implements SuspendedTask {
+
+  private final MockRunningTask task;
+
+  public MockSuspendedTask(final MockRunningTask task) {
+    this.task = task;
+  }
+
+  @Override
+  public ActiveContext getActiveContext() {
+    return this.task.getActiveContext();
+  }
+
+  @Override
+  public byte[] get() {
+    return new byte[0];
+  }
+
+  @Override
+  public String getId() {
+    return this.task.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java
new file mode 100644
index 0000000..0b073c8
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+/**
+ * mock utilities.
+ */
+@Unstable
+@Private
+final class MockUtils {
+
+  private MockUtils() {
+  }
+
+  public static <U, T extends Name<U>> U getValue(final Configuration 
configuration, final Class<T> name) {
+    try {
+      final Injector injector = 
Tang.Factory.getTang().newInjector(configuration);
+      return injector.getNamedInstance(name);
+    } catch (InjectionException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java
 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java
new file mode 100644
index 0000000..b5cf639
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+/**
+ * mock runtime implementation.
+ */
+package org.apache.reef.mock.runtime;

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java
 
b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java
new file mode 100644
index 0000000..984a9f4
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.request.ProcessRequestInternal;
+import org.apache.reef.mock.runtime.MockAllocatedEvalautor;
+import org.apache.reef.mock.runtime.MockClock;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * basic mock tests.
+ */
+final class BasicMockTests {
+
+  private MockApplication mockApplication;
+
+  private MockRuntime mockRuntime;
+
+  private MockClock mockClock;
+
+  @Before
+  public void initialize() throws Exception {
+    final Configuration conf = MockConfiguration.CONF
+        .set(MockConfiguration.ON_DRIVER_STARTED, 
MockApplication.StartHandler.class)
+        .set(MockConfiguration.ON_DRIVER_STOP, 
MockApplication.StopHandler.class)
+        .set(MockConfiguration.ON_CONTEXT_ACTIVE, 
MockApplication.ActiveContextHandler.class)
+        .set(MockConfiguration.ON_CONTEXT_CLOSED, 
MockApplication.ContextClosedHandler.class)
+        .set(MockConfiguration.ON_CONTEXT_FAILED, 
MockApplication.FailedContextHandler.class)
+        .set(MockConfiguration.ON_EVALUATOR_ALLOCATED, 
MockApplication.AllocatedEvaluatorHandler.class)
+        .set(MockConfiguration.ON_EVALUATOR_COMPLETED, 
MockApplication.CompletedEvaluatorHandler.class)
+        .set(MockConfiguration.ON_EVALUATOR_FAILED, 
MockApplication.FailedEvaluatorHandler.class)
+        .set(MockConfiguration.ON_TASK_COMPLETED, 
MockApplication.CompletedTaskHandler.class)
+        .set(MockConfiguration.ON_TASK_FAILED, 
MockApplication.FailedTaskHandler.class)
+        .set(MockConfiguration.ON_TASK_RUNNING, 
MockApplication.RunningTaskHandler.class)
+        .set(MockConfiguration.ON_TASK_SUSPENDED, 
MockApplication.SuspendedTaskHandler.class)
+        .build();
+
+    final Injector injector = Tang.Factory.getTang().newInjector(conf);
+    this.mockApplication = injector.getInstance(MockApplication.class);
+    this.mockRuntime = injector.getInstance(MockRuntime.class);
+    this.mockClock = injector.getInstance(MockClock.class);
+
+    this.mockClock.run();
+  }
+
+  @Test
+  public void testSuccessRequests() throws Exception {
+    assertTrue("mock application received start event", 
this.mockApplication.isRunning());
+
+    this.mockApplication.requestEvaluators(1);
+    assertTrue("check for process event", 
this.mockRuntime.hasProcessRequest());
+    final ProcessRequest allocateEvaluatorRequest = 
this.mockRuntime.getNextProcessRequest();
+    assertEquals("allocate evalautor request", 
ProcessRequest.Type.ALLOCATE_EVALUATOR,
+        allocateEvaluatorRequest.getType());
+    final AllocatedEvaluator evaluator =
+        ((ProcessRequestInternal<AllocatedEvaluator, 
Object>)allocateEvaluatorRequest)
+            .getSuccessEvent();
+    this.mockRuntime.succeed(allocateEvaluatorRequest);
+    assertTrue("evaluator allocation succeeded",
+        this.mockApplication.getAllocatedEvaluators().contains(evaluator));
+    final ActiveContext rootContext = 
this.mockApplication.getContext(evaluator,
+        MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + 
evaluator.getId());
+    assertTrue("root context", rootContext != null);
+
+
+    // submit a task
+    this.mockApplication.submitTask(rootContext, "test-task");
+    assertTrue("create task queued", this.mockRuntime.hasProcessRequest());
+    final ProcessRequest createTaskRequest = 
this.mockRuntime.getNextProcessRequest();
+    assertEquals("create task request", ProcessRequest.Type.CREATE_TASK,
+        createTaskRequest.getType());
+    final RunningTask task = (RunningTask) 
((ProcessRequestInternal)createTaskRequest).getSuccessEvent();
+    this.mockRuntime.succeed(createTaskRequest);
+    assertTrue("task running", 
this.mockApplication.getRunningTasks().contains(task));
+
+    // check task auto complete
+    assertTrue("check for request", this.mockRuntime.hasProcessRequest());
+    final ProcessRequestInternal completedTask =
+        (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest();
+    assertEquals("complete task request", ProcessRequest.Type.COMPLETE_TASK,
+        completedTask.getType());
+    this.mockRuntime.succeed(completedTask);
+    assertEquals("no running tasks", 0, 
this.mockApplication.getRunningTasks().size());
+
+    // create a sub-context
+    this.mockApplication.submitContext(rootContext, "child");
+    assertTrue("check for request", this.mockRuntime.hasProcessRequest());
+    final ProcessRequestInternal createContextRequest =
+        (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest();
+    assertEquals("create context request", ProcessRequest.Type.CREATE_CONTEXT,
+        createContextRequest.getType());
+    this.mockRuntime.succeed(createContextRequest);
+    final ActiveContext context = this.mockApplication.getContext(evaluator, 
"child");
+    assertTrue("child context", 
context.getParentId().get().equals(rootContext.getId()));
+  }
+
+  @Test
+  public void testFailureRequests() throws Exception {
+    assertTrue("mock application received start event", 
this.mockApplication.isRunning());
+
+    this.mockApplication.requestEvaluators(1);
+    assertTrue("check for process event", 
this.mockRuntime.hasProcessRequest());
+    ProcessRequest allocateEvaluatorRequest = 
this.mockRuntime.getNextProcessRequest();
+    this.mockRuntime.fail(allocateEvaluatorRequest);
+    assertEquals("evaluator allocation failed", 1,
+        this.mockApplication.getFailedEvaluators().size());
+
+    this.mockApplication.requestEvaluators(1);
+    allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest();
+    final AllocatedEvaluator evaluator =
+        
(AllocatedEvaluator)((ProcessRequestInternal)allocateEvaluatorRequest).getSuccessEvent();
+    this.mockRuntime.succeed(allocateEvaluatorRequest);
+    final ActiveContext rootContext = this.mockApplication
+        .getContext(evaluator, 
MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId());
+
+
+    // submit a task
+    this.mockApplication.submitTask(rootContext, "test-task");
+    assertTrue("create task queued", this.mockRuntime.hasProcessRequest());
+    final ProcessRequest createTaskRequest = 
this.mockRuntime.getNextProcessRequest();
+    assertEquals("create task request", ProcessRequest.Type.CREATE_TASK,
+        createTaskRequest.getType());
+    this.mockRuntime.fail(createTaskRequest);
+    assertEquals("task running", 1, 
this.mockApplication.getFailedTasks().size());
+
+    // create a sub-context
+    this.mockApplication.submitContext(rootContext, "child");
+    assertTrue("check for request", this.mockRuntime.hasProcessRequest());
+    final ProcessRequestInternal createContextRequest =
+        (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest();
+    this.mockRuntime.fail(createContextRequest);
+    assertEquals("child context", 1, 
this.mockApplication.getFailedContext().size());
+  }
+
+  @Test
+  public void testMockFailures() {
+    // make sure we're running
+    assertTrue("mock application received start event", 
this.mockApplication.isRunning());
+
+    // allocate an evaluator and get root context
+    this.mockApplication.requestEvaluators(1);
+    this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest());
+    final AllocatedEvaluator evaluator = 
this.mockRuntime.getCurrentAllocatedEvaluators().iterator().next();
+    final ActiveContext rootContext = 
this.mockApplication.getContext(evaluator,
+        MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + 
evaluator.getId());
+
+    // create a child context off of root context
+    this.mockApplication.submitContext(rootContext, "child");
+    this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest());
+    final ActiveContext childContext = 
this.mockApplication.getContext(evaluator, "child");
+
+    // submit a task from child context
+    this.mockApplication.submitTask(childContext, "test-task");
+    final ProcessRequest createTaskRequest = 
this.mockRuntime.getNextProcessRequest();
+    createTaskRequest.setAutoComplete(false); // keep it running
+    this.mockRuntime.succeed(createTaskRequest);
+    final RunningTask task = 
this.mockRuntime.getCurrentRunningTasks().iterator().next();
+
+    // fail task
+    this.mockRuntime.fail(task);
+    assertEquals("task failed", 1, 
this.mockApplication.getFailedTasks().size());
+
+    // fail child context
+    this.mockRuntime.fail(childContext);
+    assertTrue("child context failed",
+        
this.mockApplication.getFailedContext().iterator().next().getId().equals(childContext.getId()));
+    // evaluator should still be up
+    assertEquals("check evaluator", 0, 
this.mockApplication.getFailedEvaluators().size());
+
+    // fail evaluator
+    this.mockRuntime.fail(evaluator);
+    assertEquals("evaluator failed", 1, 
this.mockApplication.getFailedEvaluators().size());
+
+    // both contexts should be failed
+    assertEquals("root and child contexts failed", 2,
+        this.mockApplication.getFailedContext().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java
 
b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java
new file mode 100644
index 0000000..86a105e
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * mock application.
+ */
+@Unit
+final class MockApplication {
+
+  private static final Logger LOG = 
Logger.getLogger(MockApplication.class.getName());
+
+  private final Clock clock;
+
+  private final EvaluatorRequestor evaluatorRequestor;
+
+  private final Map<String, Map<String, ActiveContext>> 
evaluatorId2ContextId2ContextMap = new HashMap<>();
+
+  private final Map<String, AllocatedEvaluator> evaluatorMap = new HashMap<>();
+
+  private final Map<String, FailedEvaluator> failedEvaluatorMap = new 
HashMap<>();
+
+  private final Map<String, RunningTask> evaluatorIdRunningTaskMap = new 
HashMap<>();
+
+  private final Set<FailedContext> failedContextSet = new HashSet<>();
+
+  private final Set<FailedTask> failedTaskSet = new HashSet<>();
+
+  private final Set<SuspendedTask> suspendedTaskSet = new HashSet<>();
+
+  private boolean running = false;
+
+  @Inject
+  MockApplication(final Clock clock, final EvaluatorRequestor 
evaluatorRequestor) {
+    this.clock = clock;
+    this.evaluatorRequestor = evaluatorRequestor;
+  }
+
+  ActiveContext getContext(final AllocatedEvaluator evaluator, final String 
identifier) {
+    return 
this.evaluatorId2ContextId2ContextMap.get(evaluator.getId()).get(identifier);
+  }
+
+  Collection<RunningTask> getRunningTasks() {
+    return 
Collections.unmodifiableCollection(this.evaluatorIdRunningTaskMap.values());
+  }
+
+  Collection<AllocatedEvaluator> getAllocatedEvaluators() {
+    return Collections.unmodifiableCollection(this.evaluatorMap.values());
+  }
+
+  Collection<FailedEvaluator> getFailedEvaluators() {
+    return 
Collections.unmodifiableCollection(this.failedEvaluatorMap.values());
+  }
+
+  Collection<FailedTask> getFailedTasks() {
+    return Collections.unmodifiableCollection(this.failedTaskSet);
+  }
+
+  Collection<FailedContext> getFailedContext() {
+    return Collections.unmodifiableCollection(this.failedContextSet);
+  }
+
+  void requestEvaluators(final int numEvaluators) {
+    LOG.log(Level.INFO, "request {0} Evaluators", numEvaluators);
+    evaluatorRequestor.newRequest()
+        .setMemory(128)
+        .setNumberOfCores(1)
+        .setNumber(numEvaluators)
+        .submit();
+  }
+
+  void submitTask(final ActiveContext context, final String identifier) {
+    context.submitTask(TaskConfiguration.CONF
+        .set(TaskConfiguration.IDENTIFIER, identifier)
+        .set(TaskConfiguration.TASK, DummyTestTask.class)
+        .build());
+  }
+
+  void submitContext(final ActiveContext context, final String identifier) {
+    context.submitContext(ContextConfiguration.CONF
+        .set(ContextConfiguration.IDENTIFIER, identifier)
+        .build());
+  }
+
+  boolean isRunning() {
+    return this.running;
+  }
+
+  boolean exists(final AllocatedEvaluator evaluator) {
+    return this.evaluatorMap.containsKey(evaluator.getId());
+  }
+
+  /**
+   * Job Driver is ready and the clock is set up: request the evaluatorMap.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      clock.scheduleAlarm(Integer.MAX_VALUE, new EventHandler<Alarm>() {
+        @Override
+        public void onNext(final Alarm value) {
+          throw new RuntimeException("should not happen");
+        }
+      });
+      running = true;
+    }
+  }
+
+  /**
+   * Job Driver is is shutting down: write to the log.
+   */
+  final class StopHandler implements EventHandler<StopTime> {
+    @Override
+    public void onNext(final StopTime stopTime) {
+      running = false;
+    }
+  }
+
+  /**
+   * Receive notification that an Evaluator had been allocated,
+   * and submitTask a new Task in that Evaluator.
+   */
+  final class AllocatedEvaluatorHandler implements 
EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+      evaluatorMap.put(eval.getId(), eval);
+    }
+  }
+
+  /**
+   * Receive notification that the Evaluator has been shut down.
+   */
+  final class CompletedEvaluatorHandler implements 
EventHandler<CompletedEvaluator> {
+    @Override
+    public void onNext(final CompletedEvaluator eval) {
+      evaluatorMap.remove(eval.getId());
+      evaluatorId2ContextId2ContextMap.remove(eval.getId());
+      evaluatorIdRunningTaskMap.remove(eval.getId());
+    }
+  }
+
+  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+
+    @Override
+    public void onNext(final FailedEvaluator eval) {
+      evaluatorMap.remove(eval.getId());
+      evaluatorId2ContextId2ContextMap.remove(eval.getId());
+      evaluatorIdRunningTaskMap.remove(eval.getId());
+      failedEvaluatorMap.put(eval.getId(), eval);
+      failedContextSet.addAll(eval.getFailedContextList());
+    }
+  }
+
+  /**
+   * Receive notification that the Context is active.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) {
+      if 
(!evaluatorId2ContextId2ContextMap.containsKey(context.getEvaluatorId())) {
+        evaluatorId2ContextId2ContextMap.put(context.getEvaluatorId(), new 
HashMap<String, ActiveContext>());
+      }
+      if 
(evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).containsKey(context.getId()))
 {
+        throw new IllegalStateException(
+                String.format("Context %s on evaluator %s already exists on 
evaluator with " +
+                        "same identifier", context.getId(), 
context.getEvaluatorId()));
+      }
+      
evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).put(context.getId(),
 context);
+    }
+  }
+
+  final class ContextClosedHandler implements EventHandler<ClosedContext> {
+    @Override
+    public void onNext(final ClosedContext value) {
+      assert 
evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId());
+      assert 
evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId());
+      
evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getId());
+    }
+  }
+
+  final class FailedContextHandler implements EventHandler<FailedContext> {
+    @Override
+    public void onNext(final FailedContext value) {
+      if (evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()) 
&&
+          
evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId()))
 {
+        
evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getEvaluatorId());
+      } else {
+        // must have failed before it succeeded
+      }
+      failedContextSet.add(value);
+    }
+  }
+
+  /**
+   * Receive notification that the Task is running.
+   */
+  final class RunningTaskHandler implements EventHandler<RunningTask> {
+    @Override
+    public void onNext(final RunningTask task) {
+      evaluatorIdRunningTaskMap.put(task.getActiveContext().getEvaluatorId(), 
task);
+    }
+  }
+
+  /**
+   * Receive notification that the Task has completed successfully.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      
evaluatorIdRunningTaskMap.remove(task.getActiveContext().getEvaluatorId());
+    }
+  }
+
+  final class FailedTaskHandler implements EventHandler<FailedTask> {
+    @Override
+    public void onNext(final FailedTask value) {
+      
evaluatorIdRunningTaskMap.remove(value.getActiveContext().get().getEvaluatorId());
+      failedTaskSet.add(value);
+    }
+  }
+
+  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
+    @Override
+    public void onNext(final SuspendedTask value) {
+      
evaluatorIdRunningTaskMap.remove(value.getActiveContext().getEvaluatorId());
+      suspendedTaskSet.add(value);
+    }
+  }
+
+  private static final class DummyTestTask implements Task {
+    @Override
+    public byte[] call(final byte[] memento) throws Exception {
+      return new byte[0];
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java
 
b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java
new file mode 100644
index 0000000..e93688c
--- /dev/null
+++ 
b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+/**
+ * mock runtime tests.
+ */
+package org.apache.reef.mock;

http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6633316..7d4431e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -774,6 +774,7 @@ under the License.
         <module>lang/java/reef-runtime-local</module>
         <module>lang/java/reef-runtime-yarn</module>
         <module>lang/java/reef-runtime-mesos</module>
+        <module>lang/java/reef-runtime-mock</module>
         <module>lang/java/reef-runtime-multi</module>
         <module>lang/java/reef-runtime-standalone</module>
         <module>lang/java/reef-tang</module>

Reply via email to