Repository: reef Updated Branches: refs/heads/master 51b76fc72 -> 72ecec743
http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java new file mode 100644 index 0000000..6ad6b35 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.CompletedTask; + +/** + * mock completed task. + */ +@Unstable +@Private +public final class MockCompletedTask implements CompletedTask { + + private final MockRunningTask task; + + private final byte[] returnValue; + + public MockCompletedTask(final MockRunningTask task, final byte[] returnValue) { + this.task = task; + this.returnValue = returnValue; + } + + @Override + public ActiveContext getActiveContext() { + return this.task.getActiveContext(); + } + + @Override + public String getId() { + return this.task.getId(); + } + + @Override + public byte[] get() { + return this.returnValue; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java new file mode 100644 index 0000000..2d98af0 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorProcess; + +/** + * mock evaluator descriptor. + */ +@Unstable +@Private +public final class MockEvaluatorDescriptor implements EvaluatorDescriptor { + private final NodeDescriptor nodeDescriptor; + + MockEvaluatorDescriptor(final NodeDescriptor nodeDescriptor) { + this.nodeDescriptor = nodeDescriptor; + } + + @Override + public NodeDescriptor getNodeDescriptor() { + return this.nodeDescriptor; + } + + @Override + public EvaluatorProcess getProcess() { + throw new UnsupportedOperationException(); + } + + @Override + public int getMemory() { + return 0; + } + + @Override + public int getNumberOfCores() { + return 1; + } + + @Override + public String getRuntimeName() { + return "mock"; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java new file mode 100644 index 0000000..7f90039 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.mock.request.AllocateEvaluator; +import org.apache.reef.tang.InjectionFuture; + +import javax.inject.Inject; +import java.util.UUID; + +/** + * mock evaluator requestor. + */ +@Unstable +@Private +public final class MockEvaluatorRequestor implements EvaluatorRequestor { + + private final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver; + + private final InjectionFuture<MockClock> clock; + + @Inject + MockEvaluatorRequestor( + final InjectionFuture<MockClock> clock, + final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver) { + this.clock = clock; + this.mockRuntimeDriver = mockRuntimeDriver; + } + + @Override + public void submit(final EvaluatorRequest req) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock closed"); + } + final NodeDescriptor nodeDescriptor = new MockNodeDescriptor(); + final MockEvaluatorDescriptor evaluatorDescriptor = new MockEvaluatorDescriptor(nodeDescriptor); + for (int i = 0; i < req.getNumber(); i++) { + final MockAllocatedEvalautor mockEvaluator = new MockAllocatedEvalautor( + this.mockRuntimeDriver.get(), UUID.randomUUID().toString(), evaluatorDescriptor); + this.mockRuntimeDriver.get().add(new AllocateEvaluator(mockEvaluator)); + } + } + + @Override + public Builder newRequest() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock closed"); + } + return new Builder(); + } + + + /** + * {@link EvaluatorRequest.Builder} extended with a new submit method. + * {@link EvaluatorRequest}s are built using this builder. + */ + private final class Builder extends EvaluatorRequest.Builder<Builder> { + @Override + public void submit() { + MockEvaluatorRequestor.this.submit(this.build()); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java new file mode 100644 index 0000000..55eafe1 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.util.Optional; + +/** + * mock failed context. + */ +@Unstable +@Private +public final class MockFailedContext implements FailedContext { + + private final MockActiveContext context; + + public MockFailedContext(final MockActiveContext context) { + this.context = context; + } + + @Override + public Optional<ActiveContext> getParentContext() { + return this.context.getParentContext().isPresent() ? + Optional.of((ActiveContext)this.context.getParentContext().get()) : + Optional.<ActiveContext>empty(); + } + + @Override + public String getMessage() { + return "mock"; + } + + @Override + public Optional<String> getDescription() { + return Optional.empty(); + } + + @Override + public Optional<Throwable> getReason() { + return Optional.empty(); + } + + @Override + public Optional<byte[]> getData() { + return Optional.empty(); + } + + @Override + public Throwable asError() { + return new Exception("mock"); + } + + @Override + public String getEvaluatorId() { + return this.context.getEvaluatorId(); + } + + @Override + public Optional<String> getParentId() { + return this.context.getParentId(); + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.context.getEvaluatorDescriptor(); + } + + @Override + public String getId() { + return this.context.getId(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java new file mode 100644 index 0000000..d9c0c3c --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.exception.EvaluatorException; +import org.apache.reef.util.Optional; + +import java.util.ArrayList; +import java.util.List; + +/** + * mock failed evaluator. + */ +@Unstable +@Private +public final class MockFailedEvaluator implements FailedEvaluator { + + private final String evaluatorID; + + private final List<FailedContext> failedContextList; + + private final Optional<FailedTask> failedTask; + + public MockFailedEvaluator( + final String evaluatorID, + final List<FailedContext> failedContextList, + final Optional<FailedTask> failedTask) { + this.evaluatorID = evaluatorID; + this.failedContextList = failedContextList; + this.failedTask = failedTask; + } + + public MockFailedEvaluator(final String evaluatorID) { + this.evaluatorID = evaluatorID; + this.failedContextList = new ArrayList<>(); + this.failedTask = Optional.empty(); + } + + @Override + public EvaluatorException getEvaluatorException() { + return null; + } + + @Override + public List<FailedContext> getFailedContextList() { + return this.failedContextList; + } + + @Override + public Optional<FailedTask> getFailedTask() { + return this.failedTask; + } + + @Override + public String getId() { + return this.evaluatorID; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java new file mode 100644 index 0000000..be04994 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.catalog.RackDescriptor; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +/** + * mock node descriptor. + */ +@Unstable +@Private +public final class MockNodeDescriptor implements NodeDescriptor { + @Override + public InetSocketAddress getInetSocketAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public RackDescriptor getRackDescriptor() { + return new RackDescriptor() { + @Override + public List<NodeDescriptor> getNodes() { + final List<NodeDescriptor> nodes = new ArrayList<>(); + nodes.add(MockNodeDescriptor.this); + return nodes; + } + + @Override + public String getName() { + return "mock"; + } + }; + } + + @Override + public String getName() { + return "mock"; + } + + @Override + public String getId() { + return "mock"; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java new file mode 100644 index 0000000..bec26f4 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.mock.request.CloseTask; +import org.apache.reef.mock.request.SendMessageDriverToTask; +import org.apache.reef.mock.request.SuspendTask; +import org.apache.reef.runtime.common.driver.task.TaskRepresenter; +import org.apache.reef.util.Optional; + +/** + * mock running task. + */ +@Unstable +@Private +public final class MockRunningTask implements RunningTask { + + private final MockRuntimeDriver mockRuntimeDriver; + + private final String taskID; + + private final ActiveContext context; + + MockRunningTask( + final MockRuntimeDriver mockRuntimeDriver, + final String taskID, + final ActiveContext context) { + this.mockRuntimeDriver = mockRuntimeDriver; + this.taskID = taskID; + this.context = context; + } + + public String evaluatorID() { + return this.context.getEvaluatorId(); + } + + @Override + public ActiveContext getActiveContext() { + return this.context; + } + + @Override + public void send(final byte[] message) { + this.mockRuntimeDriver.add(new SendMessageDriverToTask(this, message)); + } + + @Override + public void suspend(final byte[] message) { + this.mockRuntimeDriver.add(new SuspendTask(this, Optional.of(message))); + } + + @Override + public void suspend() { + this.mockRuntimeDriver.add(new SuspendTask(this, Optional.<byte[]>empty())); + } + + @Override + public void close(final byte[] message) { + this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider())); + } + + @Override + public void close() { + this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider())); + } + + @Override + public TaskRepresenter getTaskRepresenter() { + throw new UnsupportedOperationException(); + } + + @Override + public String getId() { + return this.taskID; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java new file mode 100644 index 0000000..0b89e61 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.parameters.*; +import org.apache.reef.driver.task.*; +import org.apache.reef.io.Tuple; +import org.apache.reef.mock.MockRuntime; +import org.apache.reef.mock.MockTaskReturnValueProvider; +import org.apache.reef.mock.ProcessRequest; +import org.apache.reef.mock.request.*; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.*; + +/** + * mock runtime driver. + */ +@Unstable +@Private +public final class MockRuntimeDriver implements MockRuntime { + + private final InjectionFuture<MockClock> clock; + + private final List<ProcessRequest> processRequestQueue = new ArrayList<>(); + + private final Set<EventHandler<StartTime>> driverStartHandlers; + + private final Set<EventHandler<StopTime>> driverStopHandlers; + + private final Set<EventHandler<AllocatedEvaluator>> allocatedEvaluatorHandlers; + + private final Set<EventHandler<CompletedEvaluator>> completedEvaluatorHandlers; + + private final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers; + + private final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers; + + private final Set<EventHandler<FailedTask>> taskFailedHandlers; + + private final Set<EventHandler<TaskMessage>> taskMessageHandlers; + + private final Set<EventHandler<CompletedTask>> taskCompletedHandlers; + + private final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers; + + private final Set<EventHandler<ActiveContext>> contextActiveHandlers; + + private final Set<EventHandler<CloseContext>> contextClosedHandlers; + + private final Set<EventHandler<ContextMessage>> contextMessageHandlers; + + private final Set<EventHandler<FailedContext>> contextFailedHandlers; + + private final Map<String, MockAllocatedEvalautor> allocatedEvaluatorMap = new HashMap<>(); + + private final Map<String, List<MockActiveContext>> allocatedContextsMap = new HashMap<>(); + + private final Map<String, MockRunningTask> runningTasks = new HashMap<>(); + + private final MockTaskReturnValueProvider taskReturnValueProvider; + + @Inject + MockRuntimeDriver( + final InjectionFuture<MockClock> clock, + final MockTaskReturnValueProvider taskReturnValueProvider, + @Parameter(DriverStartHandler.class) final Set<EventHandler<StartTime>> driverStartHandlers, + @Parameter(Clock.StopHandler.class) final Set<EventHandler<StopTime>> driverStopHandlers, + @Parameter(EvaluatorAllocatedHandlers.class) final Set<EventHandler<AllocatedEvaluator>> + allocatedEvaluatorHandlers, + @Parameter(EvaluatorCompletedHandlers.class) final Set<EventHandler<CompletedEvaluator>> + completedEvaluatorHandlers, + @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers, + @Parameter(TaskRunningHandlers.class) final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers, + @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskFailedHandlers, + @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageHandlers, + @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers, + @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, + @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers, + @Parameter(ContextClosedHandlers.class) final Set<EventHandler<CloseContext>> contextClosedHandlers, + @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers, + @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers) { + this.clock = clock; + this.taskReturnValueProvider = taskReturnValueProvider; + this.driverStartHandlers = driverStartHandlers; + this.driverStopHandlers = driverStopHandlers; + this.allocatedEvaluatorHandlers = allocatedEvaluatorHandlers; + this.completedEvaluatorHandlers = completedEvaluatorHandlers; + this.failedEvaluatorHandlers = failedEvaluatorHandlers; + this.taskRunningHandlers = taskRunningHandlers; + this.taskFailedHandlers = taskFailedHandlers; + this.taskMessageHandlers = taskMessageHandlers; + this.taskCompletedHandlers = taskCompletedHandlers; + this.taskSuspendedHandlers = taskSuspendedHandlers; + this.contextActiveHandlers = contextActiveHandlers; + this.contextClosedHandlers = contextClosedHandlers; + this.contextMessageHandlers = contextMessageHandlers; + this.contextFailedHandlers = contextFailedHandlers; + } + + @Override + public Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + return new ArrayList<AllocatedEvaluator>(this.allocatedEvaluatorMap.values()); + } + + @Override + public void fail(final AllocatedEvaluator evaluator) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + if (this.allocatedEvaluatorMap.containsKey(evaluator.getId())) { + FailedTask failedTask = null; + if (this.runningTasks.containsKey(evaluator.getId())) { + final RunningTask task = this.runningTasks.remove(evaluator.getId()); + failedTask = new FailedTask( + task.getId(), + "mock", + Optional.<String>empty(), + Optional.<Throwable>empty(), + Optional.<byte[]>empty(), + Optional.<ActiveContext>of(task.getActiveContext())); + } + final List<FailedContext> failedContexts = new ArrayList<>(); + for (final MockActiveContext context : this.allocatedContextsMap.get(evaluator.getId())) { + failedContexts.add(new MockFailedContext(context)); + } + this.allocatedContextsMap.remove(evaluator.getId()); + + post(this.failedEvaluatorHandlers, new MockFailedEvaluator( + evaluator.getId(), failedContexts, + failedTask == null ? Optional.<FailedTask>empty() : Optional.of(failedTask))); + } else { + throw new IllegalStateException("unknown evaluator " + evaluator); + } + } + + @Override + public Collection<ActiveContext> getCurrentActiveContexts() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final List<ActiveContext> currentActiveContexts = new ArrayList<>(); + for (final List<MockActiveContext> contexts : this.allocatedContextsMap.values()) { + currentActiveContexts.addAll(contexts); + } + return currentActiveContexts; + } + + @Override + public void fail(final ActiveContext context) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final MockAllocatedEvalautor evaluator = ((MockActiveContext) context).getEvaluator(); + post(this.contextFailedHandlers, new MockFailedContext((MockActiveContext) context)); + if (!((MockActiveContext) context).getParentContext().isPresent()) { + // root context failure shuts evalautor down + fail(evaluator); + } else { + this.allocatedContextsMap.get(evaluator.getId()).remove(context); + } + } + + @Override + public Collection<RunningTask> getCurrentRunningTasks() { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + return new ArrayList<RunningTask>(this.runningTasks.values()); + } + + @Override + public void fail(final RunningTask task) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final String evaluatorID = task.getActiveContext().getEvaluatorId(); + if (this.runningTasks.containsKey(evaluatorID) && + this.runningTasks.get(evaluatorID).equals(task)) { + this.runningTasks.remove(evaluatorID); + post(taskFailedHandlers, new FailedTask( + task.getId(), + "mock", + Optional.<String>empty(), + Optional.<Throwable>empty(), + Optional.<byte[]>empty(), + Optional.of(task.getActiveContext()))); + } else { + throw new IllegalStateException("unknown running task " + task); + } + } + + @Override + public void start() { + post(this.driverStartHandlers, new StartTime(this.clock.get().getCurrentTime())); + } + + @Override + public void stop() { + post(this.driverStopHandlers, new StopTime(this.clock.get().getCurrentTime())); + } + + @Override + public boolean hasProcessRequest() { + return this.processRequestQueue.size() > 0; + } + + @Override + public ProcessRequest getNextProcessRequest() { + if (this.processRequestQueue.size() > 0) { + return this.processRequestQueue.remove(0); + } else { + return null; + } + } + + @Override + public void succeed(final ProcessRequest pr) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final ProcessRequestInternal request = (ProcessRequestInternal) pr; + switch (request.getType()) { + case ALLOCATE_EVALUATOR: + final MockAllocatedEvalautor allocatedEvalautor = ((AllocateEvaluator)request).getSuccessEvent(); + validateAndCreate(allocatedEvalautor); + post(this.allocatedEvaluatorHandlers, allocatedEvalautor); + post(this.contextActiveHandlers, allocatedEvalautor.getRootContext()); + break; + case CLOSE_EVALUATOR: + final CompletedEvaluator closedEvaluator = ((CloseEvaluator)request).getSuccessEvent(); + validateAndClose(closedEvaluator); + post(this.completedEvaluatorHandlers, closedEvaluator); + break; + case CREATE_CONTEXT: + final MockActiveContext createContext = ((CreateContext) request).getSuccessEvent(); + validateAndCreate(createContext); + post(this.contextActiveHandlers, createContext); + break; + case CLOSE_CONTEXT: + final MockClosedContext closeContext = ((CloseContext) request).getSuccessEvent(); + validateAndClose(closeContext); + post(this.contextClosedHandlers, closeContext); + break; + case CREATE_TASK: + final MockRunningTask createTask = ((CreateTask)request).getSuccessEvent(); + validateAndCreate(createTask); + post(this.taskRunningHandlers, request.getSuccessEvent()); + break; + case SUSPEND_TASK: + final MockRunningTask suspendedTask = ((SuspendTask)request).getTask(); + validateAndClose(suspendedTask); + post(this.taskSuspendedHandlers, request.getSuccessEvent()); + break; + case CLOSE_TASK: + case COMPLETE_TASK: + final MockRunningTask completedTask = ((CompleteTask)request).getTask(); + validateAndClose(completedTask); + post(this.taskCompletedHandlers, request.getSuccessEvent()); + break; + case CREATE_CONTEXT_AND_TASK: + final CreateContextAndTask createContextTask = (CreateContextAndTask) request; + final Tuple<MockActiveContext, MockRunningTask> events = createContextTask.getSuccessEvent(); + validateAndCreate(events.getKey()); + post(this.contextActiveHandlers, events.getKey()); + validateAndCreate(events.getValue()); + post(this.taskRunningHandlers, events.getValue()); + break; + case SEND_MESSAGE_DRIVER_TO_TASK: + // ignore + break; + case SEND_MESSAGE_DRIVER_TO_CONTEXT: + // ignore + break; + default: + throw new IllegalStateException("unknown type"); + } + + if (request.doAutoComplete()) { + add(request.getCompletionProcessRequest()); + } else if (!this.clock.get().isClosed() && isIdle()) { + this.clock.get().close(); + } + } + + @Override + public void fail(final ProcessRequest pr) { + if (this.clock.get().isClosed()) { + throw new IllegalStateException("clock is closed"); + } + final ProcessRequestInternal request = (ProcessRequestInternal) pr; + switch (request.getType()) { + case ALLOCATE_EVALUATOR: + post(this.failedEvaluatorHandlers, request.getFailureEvent()); + break; + case CLOSE_EVALUATOR: + final CompletedEvaluator evaluator = ((CloseEvaluator)request).getSuccessEvent(); + validateAndClose(evaluator); + post(this.failedEvaluatorHandlers, request.getFailureEvent()); + break; + case CREATE_CONTEXT: + post(this.contextFailedHandlers, request.getFailureEvent()); + break; + case CLOSE_CONTEXT: + final MockClosedContext context = ((CloseContext)request).getSuccessEvent(); + validateAndClose(context); + if (context.getParentContext() == null) { + add(new CloseEvaluator(context.getMockActiveContext().getEvaluator())); + } + post(this.contextFailedHandlers, request.getFailureEvent()); + break; + case CREATE_TASK: + post(this.taskFailedHandlers, request.getFailureEvent()); + break; + case SUSPEND_TASK: + validateAndClose(((SuspendTask)request).getTask()); + post(this.taskFailedHandlers, request.getFailureEvent()); + break; + case CLOSE_TASK: + case COMPLETE_TASK: + validateAndClose(((CloseTask)request).getTask()); + post(this.taskFailedHandlers, request.getFailureEvent()); + break; + case CREATE_CONTEXT_AND_TASK: + final CreateContextAndTask createContextTask = (CreateContextAndTask) request; + final Tuple<MockFailedContext, FailedTask> events = createContextTask.getFailureEvent(); + post(this.taskFailedHandlers, events.getValue()); + post(this.contextFailedHandlers, events.getKey()); + break; + case SEND_MESSAGE_DRIVER_TO_TASK: + // ignore + break; + case SEND_MESSAGE_DRIVER_TO_CONTEXT: + // ignore + break; + default: + throw new IllegalStateException("unknown type"); + } + + if (!this.clock.get().isClosed() && isIdle()) { + this.clock.get().close(); + } + } + + MockTaskReturnValueProvider getTaskReturnValueProvider() { + return this.taskReturnValueProvider; + } + /** + * Used by mock REEF entities (e.g., AllocatedEvaluator, RunningTask) to inject + * process requests from initiated actions e.g., RunningTask.close(). + * @param request to inject + */ + void add(final ProcessRequest request) { + this.processRequestQueue.add(request); + } + + private boolean isIdle() { + return this.clock.get().isIdle() && + this.processRequestQueue.isEmpty() && + this.allocatedEvaluatorMap.isEmpty(); + } + + private <T> void post(final Set<EventHandler<T>> handlers, final Object event) { + for (final EventHandler<T> handler : handlers) { + handler.onNext((T) event); + } + } + + private void validateAndCreate(final MockActiveContext context) { + if (!this.allocatedEvaluatorMap.containsKey(context.getEvaluatorId())) { + throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId()); + } else if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) { + this.allocatedContextsMap.put(context.getEvaluatorId(), new ArrayList<MockActiveContext>()); + } + this.allocatedContextsMap.get(context.getEvaluatorId()).add(context); + } + + private void validateAndClose(final MockClosedContext context) { + if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) { + throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId()); + } + final List<MockActiveContext> contexts = this.allocatedContextsMap.get(context.getEvaluatorId()); + if (!contexts.get(contexts.size() - 1).equals(context.getMockActiveContext())) { + throw new IllegalStateException("closing context that is not on the top of the stack"); + } + contexts.remove(context.getMockActiveContext()); + } + + private void validateAndCreate(final MockRunningTask task) { + if (this.runningTasks.containsKey(task.evaluatorID())) { + throw new IllegalStateException("task already running on evaluator " + + task.evaluatorID()); + } + this.runningTasks.put(task.evaluatorID(), task); + } + + private void validateAndClose(final MockRunningTask task) { + if (!this.runningTasks.containsKey(task.getActiveContext().getEvaluatorId())) { + throw new IllegalStateException("no task running on evaluator"); + } + this.runningTasks.remove(task.getActiveContext().getEvaluatorId()); + } + + private void validateAndCreate(final MockAllocatedEvalautor evalutor) { + if (this.allocatedEvaluatorMap.containsKey(evalutor.getId())) { + throw new IllegalStateException("evaluator id " + evalutor.getId() + " already exists"); + } + this.allocatedEvaluatorMap.put(evalutor.getId(), evalutor); + this.allocatedContextsMap.put(evalutor.getId(), new ArrayList<MockActiveContext>()); + this.allocatedContextsMap.get(evalutor.getId()).add(evalutor.getRootContext()); + } + + private void validateAndClose(final CompletedEvaluator evalautor) { + if (!this.allocatedEvaluatorMap.containsKey(evalautor.getId())) { + throw new IllegalStateException("unknown evaluator id " + evalautor.getId()); + } + this.allocatedEvaluatorMap.remove(evalautor.getId()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java new file mode 100644 index 0000000..84569ff --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.SuspendedTask; + +/** + * mock suspended task. + */ +@Unstable +@Private +public final class MockSuspendedTask implements SuspendedTask { + + private final MockRunningTask task; + + public MockSuspendedTask(final MockRunningTask task) { + this.task = task; + } + + @Override + public ActiveContext getActiveContext() { + return this.task.getActiveContext(); + } + + @Override + public byte[] get() { + return new byte[0]; + } + + @Override + public String getId() { + return this.task.getId(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java new file mode 100644 index 0000000..0b073c8 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock.runtime; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.exceptions.InjectionException; + +/** + * mock utilities. + */ +@Unstable +@Private +final class MockUtils { + + private MockUtils() { + } + + public static <U, T extends Name<U>> U getValue(final Configuration configuration, final Class<T> name) { + try { + final Injector injector = Tang.Factory.getTang().newInjector(configuration); + return injector.getNamedInstance(name); + } catch (InjectionException e) { + throw new IllegalStateException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java new file mode 100644 index 0000000..b5cf639 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/** + * mock runtime implementation. + */ +package org.apache.reef.mock.runtime; http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java new file mode 100644 index 0000000..984a9f4 --- /dev/null +++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.mock.request.ProcessRequestInternal; +import org.apache.reef.mock.runtime.MockAllocatedEvalautor; +import org.apache.reef.mock.runtime.MockClock; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * basic mock tests. + */ +final class BasicMockTests { + + private MockApplication mockApplication; + + private MockRuntime mockRuntime; + + private MockClock mockClock; + + @Before + public void initialize() throws Exception { + final Configuration conf = MockConfiguration.CONF + .set(MockConfiguration.ON_DRIVER_STARTED, MockApplication.StartHandler.class) + .set(MockConfiguration.ON_DRIVER_STOP, MockApplication.StopHandler.class) + .set(MockConfiguration.ON_CONTEXT_ACTIVE, MockApplication.ActiveContextHandler.class) + .set(MockConfiguration.ON_CONTEXT_CLOSED, MockApplication.ContextClosedHandler.class) + .set(MockConfiguration.ON_CONTEXT_FAILED, MockApplication.FailedContextHandler.class) + .set(MockConfiguration.ON_EVALUATOR_ALLOCATED, MockApplication.AllocatedEvaluatorHandler.class) + .set(MockConfiguration.ON_EVALUATOR_COMPLETED, MockApplication.CompletedEvaluatorHandler.class) + .set(MockConfiguration.ON_EVALUATOR_FAILED, MockApplication.FailedEvaluatorHandler.class) + .set(MockConfiguration.ON_TASK_COMPLETED, MockApplication.CompletedTaskHandler.class) + .set(MockConfiguration.ON_TASK_FAILED, MockApplication.FailedTaskHandler.class) + .set(MockConfiguration.ON_TASK_RUNNING, MockApplication.RunningTaskHandler.class) + .set(MockConfiguration.ON_TASK_SUSPENDED, MockApplication.SuspendedTaskHandler.class) + .build(); + + final Injector injector = Tang.Factory.getTang().newInjector(conf); + this.mockApplication = injector.getInstance(MockApplication.class); + this.mockRuntime = injector.getInstance(MockRuntime.class); + this.mockClock = injector.getInstance(MockClock.class); + + this.mockClock.run(); + } + + @Test + public void testSuccessRequests() throws Exception { + assertTrue("mock application received start event", this.mockApplication.isRunning()); + + this.mockApplication.requestEvaluators(1); + assertTrue("check for process event", this.mockRuntime.hasProcessRequest()); + final ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); + assertEquals("allocate evalautor request", ProcessRequest.Type.ALLOCATE_EVALUATOR, + allocateEvaluatorRequest.getType()); + final AllocatedEvaluator evaluator = + ((ProcessRequestInternal<AllocatedEvaluator, Object>)allocateEvaluatorRequest) + .getSuccessEvent(); + this.mockRuntime.succeed(allocateEvaluatorRequest); + assertTrue("evaluator allocation succeeded", + this.mockApplication.getAllocatedEvaluators().contains(evaluator)); + final ActiveContext rootContext = this.mockApplication.getContext(evaluator, + MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId()); + assertTrue("root context", rootContext != null); + + + // submit a task + this.mockApplication.submitTask(rootContext, "test-task"); + assertTrue("create task queued", this.mockRuntime.hasProcessRequest()); + final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); + assertEquals("create task request", ProcessRequest.Type.CREATE_TASK, + createTaskRequest.getType()); + final RunningTask task = (RunningTask) ((ProcessRequestInternal)createTaskRequest).getSuccessEvent(); + this.mockRuntime.succeed(createTaskRequest); + assertTrue("task running", this.mockApplication.getRunningTasks().contains(task)); + + // check task auto complete + assertTrue("check for request", this.mockRuntime.hasProcessRequest()); + final ProcessRequestInternal completedTask = + (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest(); + assertEquals("complete task request", ProcessRequest.Type.COMPLETE_TASK, + completedTask.getType()); + this.mockRuntime.succeed(completedTask); + assertEquals("no running tasks", 0, this.mockApplication.getRunningTasks().size()); + + // create a sub-context + this.mockApplication.submitContext(rootContext, "child"); + assertTrue("check for request", this.mockRuntime.hasProcessRequest()); + final ProcessRequestInternal createContextRequest = + (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest(); + assertEquals("create context request", ProcessRequest.Type.CREATE_CONTEXT, + createContextRequest.getType()); + this.mockRuntime.succeed(createContextRequest); + final ActiveContext context = this.mockApplication.getContext(evaluator, "child"); + assertTrue("child context", context.getParentId().get().equals(rootContext.getId())); + } + + @Test + public void testFailureRequests() throws Exception { + assertTrue("mock application received start event", this.mockApplication.isRunning()); + + this.mockApplication.requestEvaluators(1); + assertTrue("check for process event", this.mockRuntime.hasProcessRequest()); + ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); + this.mockRuntime.fail(allocateEvaluatorRequest); + assertEquals("evaluator allocation failed", 1, + this.mockApplication.getFailedEvaluators().size()); + + this.mockApplication.requestEvaluators(1); + allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest(); + final AllocatedEvaluator evaluator = + (AllocatedEvaluator)((ProcessRequestInternal)allocateEvaluatorRequest).getSuccessEvent(); + this.mockRuntime.succeed(allocateEvaluatorRequest); + final ActiveContext rootContext = this.mockApplication + .getContext(evaluator, MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId()); + + + // submit a task + this.mockApplication.submitTask(rootContext, "test-task"); + assertTrue("create task queued", this.mockRuntime.hasProcessRequest()); + final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); + assertEquals("create task request", ProcessRequest.Type.CREATE_TASK, + createTaskRequest.getType()); + this.mockRuntime.fail(createTaskRequest); + assertEquals("task running", 1, this.mockApplication.getFailedTasks().size()); + + // create a sub-context + this.mockApplication.submitContext(rootContext, "child"); + assertTrue("check for request", this.mockRuntime.hasProcessRequest()); + final ProcessRequestInternal createContextRequest = + (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest(); + this.mockRuntime.fail(createContextRequest); + assertEquals("child context", 1, this.mockApplication.getFailedContext().size()); + } + + @Test + public void testMockFailures() { + // make sure we're running + assertTrue("mock application received start event", this.mockApplication.isRunning()); + + // allocate an evaluator and get root context + this.mockApplication.requestEvaluators(1); + this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest()); + final AllocatedEvaluator evaluator = this.mockRuntime.getCurrentAllocatedEvaluators().iterator().next(); + final ActiveContext rootContext = this.mockApplication.getContext(evaluator, + MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId()); + + // create a child context off of root context + this.mockApplication.submitContext(rootContext, "child"); + this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest()); + final ActiveContext childContext = this.mockApplication.getContext(evaluator, "child"); + + // submit a task from child context + this.mockApplication.submitTask(childContext, "test-task"); + final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest(); + createTaskRequest.setAutoComplete(false); // keep it running + this.mockRuntime.succeed(createTaskRequest); + final RunningTask task = this.mockRuntime.getCurrentRunningTasks().iterator().next(); + + // fail task + this.mockRuntime.fail(task); + assertEquals("task failed", 1, this.mockApplication.getFailedTasks().size()); + + // fail child context + this.mockRuntime.fail(childContext); + assertTrue("child context failed", + this.mockApplication.getFailedContext().iterator().next().getId().equals(childContext.getId())); + // evaluator should still be up + assertEquals("check evaluator", 0, this.mockApplication.getFailedEvaluators().size()); + + // fail evaluator + this.mockRuntime.fail(evaluator); + assertEquals("evaluator failed", 1, this.mockApplication.getFailedEvaluators().size()); + + // both contexts should be failed + assertEquals("root and child contexts failed", 2, + this.mockApplication.getFailedContext().size()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java new file mode 100644 index 0000000..86a105e --- /dev/null +++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.mock; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextConfiguration; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.task.*; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.task.Task; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * mock application. + */ +@Unit +final class MockApplication { + + private static final Logger LOG = Logger.getLogger(MockApplication.class.getName()); + + private final Clock clock; + + private final EvaluatorRequestor evaluatorRequestor; + + private final Map<String, Map<String, ActiveContext>> evaluatorId2ContextId2ContextMap = new HashMap<>(); + + private final Map<String, AllocatedEvaluator> evaluatorMap = new HashMap<>(); + + private final Map<String, FailedEvaluator> failedEvaluatorMap = new HashMap<>(); + + private final Map<String, RunningTask> evaluatorIdRunningTaskMap = new HashMap<>(); + + private final Set<FailedContext> failedContextSet = new HashSet<>(); + + private final Set<FailedTask> failedTaskSet = new HashSet<>(); + + private final Set<SuspendedTask> suspendedTaskSet = new HashSet<>(); + + private boolean running = false; + + @Inject + MockApplication(final Clock clock, final EvaluatorRequestor evaluatorRequestor) { + this.clock = clock; + this.evaluatorRequestor = evaluatorRequestor; + } + + ActiveContext getContext(final AllocatedEvaluator evaluator, final String identifier) { + return this.evaluatorId2ContextId2ContextMap.get(evaluator.getId()).get(identifier); + } + + Collection<RunningTask> getRunningTasks() { + return Collections.unmodifiableCollection(this.evaluatorIdRunningTaskMap.values()); + } + + Collection<AllocatedEvaluator> getAllocatedEvaluators() { + return Collections.unmodifiableCollection(this.evaluatorMap.values()); + } + + Collection<FailedEvaluator> getFailedEvaluators() { + return Collections.unmodifiableCollection(this.failedEvaluatorMap.values()); + } + + Collection<FailedTask> getFailedTasks() { + return Collections.unmodifiableCollection(this.failedTaskSet); + } + + Collection<FailedContext> getFailedContext() { + return Collections.unmodifiableCollection(this.failedContextSet); + } + + void requestEvaluators(final int numEvaluators) { + LOG.log(Level.INFO, "request {0} Evaluators", numEvaluators); + evaluatorRequestor.newRequest() + .setMemory(128) + .setNumberOfCores(1) + .setNumber(numEvaluators) + .submit(); + } + + void submitTask(final ActiveContext context, final String identifier) { + context.submitTask(TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, identifier) + .set(TaskConfiguration.TASK, DummyTestTask.class) + .build()); + } + + void submitContext(final ActiveContext context, final String identifier) { + context.submitContext(ContextConfiguration.CONF + .set(ContextConfiguration.IDENTIFIER, identifier) + .build()); + } + + boolean isRunning() { + return this.running; + } + + boolean exists(final AllocatedEvaluator evaluator) { + return this.evaluatorMap.containsKey(evaluator.getId()); + } + + /** + * Job Driver is ready and the clock is set up: request the evaluatorMap. + */ + final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + clock.scheduleAlarm(Integer.MAX_VALUE, new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + throw new RuntimeException("should not happen"); + } + }); + running = true; + } + } + + /** + * Job Driver is is shutting down: write to the log. + */ + final class StopHandler implements EventHandler<StopTime> { + @Override + public void onNext(final StopTime stopTime) { + running = false; + } + } + + /** + * Receive notification that an Evaluator had been allocated, + * and submitTask a new Task in that Evaluator. + */ + final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator eval) { + evaluatorMap.put(eval.getId(), eval); + } + } + + /** + * Receive notification that the Evaluator has been shut down. + */ + final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { + @Override + public void onNext(final CompletedEvaluator eval) { + evaluatorMap.remove(eval.getId()); + evaluatorId2ContextId2ContextMap.remove(eval.getId()); + evaluatorIdRunningTaskMap.remove(eval.getId()); + } + } + + final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { + + @Override + public void onNext(final FailedEvaluator eval) { + evaluatorMap.remove(eval.getId()); + evaluatorId2ContextId2ContextMap.remove(eval.getId()); + evaluatorIdRunningTaskMap.remove(eval.getId()); + failedEvaluatorMap.put(eval.getId(), eval); + failedContextSet.addAll(eval.getFailedContextList()); + } + } + + /** + * Receive notification that the Context is active. + */ + final class ActiveContextHandler implements EventHandler<ActiveContext> { + @Override + public void onNext(final ActiveContext context) { + if (!evaluatorId2ContextId2ContextMap.containsKey(context.getEvaluatorId())) { + evaluatorId2ContextId2ContextMap.put(context.getEvaluatorId(), new HashMap<String, ActiveContext>()); + } + if (evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).containsKey(context.getId())) { + throw new IllegalStateException( + String.format("Context %s on evaluator %s already exists on evaluator with " + + "same identifier", context.getId(), context.getEvaluatorId())); + } + evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).put(context.getId(), context); + } + } + + final class ContextClosedHandler implements EventHandler<ClosedContext> { + @Override + public void onNext(final ClosedContext value) { + assert evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()); + assert evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId()); + evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getId()); + } + } + + final class FailedContextHandler implements EventHandler<FailedContext> { + @Override + public void onNext(final FailedContext value) { + if (evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()) && + evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId())) { + evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getEvaluatorId()); + } else { + // must have failed before it succeeded + } + failedContextSet.add(value); + } + } + + /** + * Receive notification that the Task is running. + */ + final class RunningTaskHandler implements EventHandler<RunningTask> { + @Override + public void onNext(final RunningTask task) { + evaluatorIdRunningTaskMap.put(task.getActiveContext().getEvaluatorId(), task); + } + } + + /** + * Receive notification that the Task has completed successfully. + */ + final class CompletedTaskHandler implements EventHandler<CompletedTask> { + @Override + public void onNext(final CompletedTask task) { + evaluatorIdRunningTaskMap.remove(task.getActiveContext().getEvaluatorId()); + } + } + + final class FailedTaskHandler implements EventHandler<FailedTask> { + @Override + public void onNext(final FailedTask value) { + evaluatorIdRunningTaskMap.remove(value.getActiveContext().get().getEvaluatorId()); + failedTaskSet.add(value); + } + } + + final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { + @Override + public void onNext(final SuspendedTask value) { + evaluatorIdRunningTaskMap.remove(value.getActiveContext().getEvaluatorId()); + suspendedTaskSet.add(value); + } + } + + private static final class DummyTestTask implements Task { + @Override + public byte[] call(final byte[] memento) throws Exception { + return new byte[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java new file mode 100644 index 0000000..e93688c --- /dev/null +++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +/** + * mock runtime tests. + */ +package org.apache.reef.mock; http://git-wip-us.apache.org/repos/asf/reef/blob/72ecec74/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6633316..7d4431e 100644 --- a/pom.xml +++ b/pom.xml @@ -774,6 +774,7 @@ under the License. <module>lang/java/reef-runtime-local</module> <module>lang/java/reef-runtime-yarn</module> <module>lang/java/reef-runtime-mesos</module> + <module>lang/java/reef-runtime-mock</module> <module>lang/java/reef-runtime-multi</module> <module>lang/java/reef-runtime-standalone</module> <module>lang/java/reef-tang</module>
