This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new d2b016e [NEMO-224] Simple StreamingScheduler (#126)
d2b016e is described below
commit d2b016ea875fcf07a64a1942dd20b037d930d34b
Author: John Yang <[email protected]>
AuthorDate: Wed Oct 24 08:51:36 2018 +0900
[NEMO-224] Simple StreamingScheduler (#126)
JIRA: [NEMO-224: Simple
StreamingScheduler](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-224)
**Major changes:**
- StreamingScheduler with the following properties:
- Keeps track of new executors
- Schedules all tasks in a reverse topological order.
- Crashes the system upon any other events (should be fixed in the future)
- Never stops running.
**Minor changes to note:**
- N/A
**Tests for the changes:**
- StreamingSchedulerTest
**Other comments:**
- N/A
Closes #126
---
.../master/scheduler/StreamingScheduler.java | 151 +++++++++++++++++++++
.../master/scheduler/BatchSchedulerTest.java | 6 +-
.../master/scheduler/StreamingSchedulerTest.java | 83 +++++++++++
3 files changed, 236 insertions(+), 4 deletions(-)
diff --git
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
new file mode 100644
index 0000000..d098905
--- /dev/null
+++
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.master.scheduler;
+
+import com.google.common.collect.Lists;
+import org.apache.nemo.common.exception.UnknownExecutionStateException;
+import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.Stage;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.common.plan.Task;
+import org.apache.nemo.runtime.common.state.TaskState;
+import org.apache.nemo.runtime.master.*;
+import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A simple scheduler for streaming workloads.
+ * - Keeps track of new executors
+ * - Schedules all tasks in a reverse topological order.
+ * - Crashes the system upon any other events (should be fixed in the future)
+ * - Never stops running.
+ */
+@DriverSide
+@NotThreadSafe
+public final class StreamingScheduler implements Scheduler {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingScheduler.class.getName());
+
+ private final TaskDispatcher taskDispatcher;
+ private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
+ private final ExecutorRegistry executorRegistry;
+ private final PlanStateManager planStateManager;
+
+ StreamingScheduler(final TaskDispatcher taskDispatcher,
+ final PendingTaskCollectionPointer
pendingTaskCollectionPointer,
+ final ExecutorRegistry executorRegistry,
+ final PlanStateManager planStateManager) {
+ this.taskDispatcher = taskDispatcher;
+ this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
+ this.executorRegistry = executorRegistry;
+ this.planStateManager = planStateManager;
+ }
+
+ @Override
+ public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
+ final int maxScheduleAttempt) {
+ // Housekeeping stuff
+ taskDispatcher.run();
+ planStateManager.updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
+ planStateManager.storeJSON("submitted");
+
+ // Prepare tasks
+ final List<Stage> reverseTopoStages =
Lists.reverse(submittedPhysicalPlan.getStageDAG().getTopologicalSort());
+ final List<Task> reverseTopoTasks =
reverseTopoStages.stream().flatMap(stageToSchedule -> {
+ // Helper variables for this stage
+ final List<StageEdge> stageIncomingEdges =
+
submittedPhysicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
+ final List<StageEdge> stageOutgoingEdges =
+
submittedPhysicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
+ final List<Map<String, Readable>> vertexIdToReadables =
stageToSchedule.getVertexIdToReadables();
+ final List<String> taskIdsToSchedule =
planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
+
+ // Create tasks of this stage
+ return taskIdsToSchedule.stream().map(taskId -> new Task(
+ submittedPhysicalPlan.getPlanId(),
+ taskId,
+ stageToSchedule.getExecutionProperties(),
+ stageToSchedule.getSerializedIRDAG(),
+ stageIncomingEdges,
+ stageOutgoingEdges,
+ vertexIdToReadables.get(RuntimeIdManager.getIndexFromTaskId(taskId))));
+ }).collect(Collectors.toList());
+
+ // Schedule everything at once
+ pendingTaskCollectionPointer.setToOverwrite(reverseTopoTasks);
+ }
+
+ @Override
+ public void updatePlan(final PhysicalPlan newPhysicalPlan) {
+ // TODO #227: StreamingScheduler Dynamic Optimization
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void onTaskStateReportFromExecutor(final String executorId,
+ final String taskId,
+ final int taskAttemptIndex,
+ final TaskState.State newState,
+ @Nullable final String
vertexPutOnHold,
+ final
TaskState.RecoverableTaskFailureCause failureCause) {
+ switch (newState) {
+ case COMPLETE:
+ case SHOULD_RETRY:
+ case ON_HOLD:
+ case FAILED:
+ // TODO #226: StreamingScheduler Fault Tolerance
+ throw new UnsupportedOperationException();
+ case READY:
+ case EXECUTING:
+ throw new RuntimeException("The states READY/EXECUTING cannot occur at
this point");
+ default:
+ throw new UnknownExecutionStateException(new Exception("This TaskState
is unknown: " + newState));
+ }
+ }
+
+ @Override
+ public void onSpeculativeExecutionCheck() {
+ // TODO #228: StreamingScheduler Speculative Execution
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
+ LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(),
executorRepresenter.getNodeName());
+ executorRegistry.registerExecutor(executorRepresenter);
+ }
+
+ @Override
+ public void onExecutorRemoved(final String executorId) {
+ // TODO #226: StreamingScheduler Fault Tolerance
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void terminate() {
+ this.taskDispatcher.terminate();
+ this.executorRegistry.terminate();
+ }
+}
diff --git
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index 3021afa..b4591f4 100644
---
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -27,7 +27,6 @@ import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.MetricMessageHandler;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import
org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import org.apache.nemo.runtime.master.resource.ContainerManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ResourceSpecification;
import org.apache.nemo.common.dag.DAG;
@@ -57,11 +56,10 @@ import static org.mockito.Mockito.mock;
* Tests {@link BatchScheduler}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ContainerManager.class, BlockManagerMaster.class,
- PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class})
+@PrepareForTest({BlockManagerMaster.class, PubSubEventHandlerWrapper.class,
UpdatePhysicalPlanEventHandler.class})
public final class BatchSchedulerTest {
private static final Logger LOG =
LoggerFactory.getLogger(BatchSchedulerTest.class.getName());
- private Scheduler scheduler;
+ private BatchScheduler scheduler;
private PlanStateManager planStateManager;
private ExecutorRegistry executorRegistry;
private final MessageSender<ControlMessage.Message> mockMsgSender =
mock(MessageSender.class);
diff --git
a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
new file mode 100644
index 0000000..276af97
--- /dev/null
+++
b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.master.scheduler;
+
+import org.apache.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.PhysicalPlan;
+import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
+import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PlanStateManager;
+import
org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link StreamingScheduler}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BlockManagerMaster.class, PubSubEventHandlerWrapper.class,
UpdatePhysicalPlanEventHandler.class,
+ TaskDispatcher.class, PendingTaskCollectionPointer.class,
ExecutorRegistry.class, PlanStateManager.class})
+public final class StreamingSchedulerTest {
+ private static final int ATTEMPTS_PER_STAGE = 2;
+
+ private StreamingScheduler scheduler;
+ private PendingTaskCollectionPointer pendingTaskCollectionPointer;
+
+ @Before
+ public void setUp() throws Exception {
+ final TaskDispatcher taskDispatcher = mock(TaskDispatcher.class);
+ final PendingTaskCollectionPointer pendingTaskCollectionPointer =
mock(PendingTaskCollectionPointer.class);
+ this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
+ final ExecutorRegistry executorRegistry = mock(ExecutorRegistry.class);
+ final PlanStateManager planStateManager = mock(PlanStateManager.class);
+
+
when(planStateManager.getTaskAttemptsToSchedule(any())).thenAnswer(invocationOnMock
-> {
+ final String stageId = invocationOnMock.getArgument(0);
+ return generateAttempts(stageId);
+ });
+
+ scheduler = new StreamingScheduler(taskDispatcher,
pendingTaskCollectionPointer, executorRegistry, planStateManager);
+ }
+
+ private List<String> generateAttempts(final String stageId) {
+ return IntStream.range(0, ATTEMPTS_PER_STAGE)
+ .mapToObj(taskIndex -> RuntimeIdManager.generateTaskId(stageId,
taskIndex, 0))
+ .collect(Collectors.toList());
+ }
+
+ @Test(timeout=10000)
+ public void testScheduleEverything() throws Exception {
+ final PhysicalPlan physicalPlan =
+
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
false);
+ final int numOfTotalTasks =
physicalPlan.getStageDAG().getVertices().size() * ATTEMPTS_PER_STAGE;
+ scheduler.schedulePlan(physicalPlan, 1);
+ verify(pendingTaskCollectionPointer).setToOverwrite(argThat(tasks ->
tasks.size() == numOfTotalTasks));
+ }
+}