seojangho closed pull request #97: [NEMO-136] Rename SchedulerRunner to
TaskDispatcher
URL: https://github.com/apache/incubator-nemo/pull/97
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index ffee47920..24c0e435f 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -59,7 +59,7 @@
/**
* Components related to scheduling the given plan.
*/
- private final SchedulerRunner schedulerRunner;
+ private final TaskDispatcher taskDispatcher;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorRegistry executorRegistry;
@@ -77,13 +77,13 @@
private List<List<Stage>> sortedScheduleGroups;
@Inject
- private BatchScheduler(final SchedulerRunner schedulerRunner,
+ private BatchScheduler(final TaskDispatcher taskDispatcher,
final PendingTaskCollectionPointer
pendingTaskCollectionPointer,
final BlockManagerMaster blockManagerMaster,
final PubSubEventHandlerWrapper
pubSubEventHandlerWrapper,
final UpdatePhysicalPlanEventHandler
updatePhysicalPlanEventHandler,
final ExecutorRegistry executorRegistry) {
- this.schedulerRunner = schedulerRunner;
+ this.taskDispatcher = taskDispatcher;
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.blockManagerMaster = blockManagerMaster;
this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
@@ -106,7 +106,7 @@ public void schedulePlan(final PhysicalPlan
submittedPhysicalPlan, final PlanSta
this.physicalPlan = submittedPhysicalPlan;
this.planStateManager = submittedPlanStateManager;
- schedulerRunner.run(this.planStateManager);
+ taskDispatcher.run(this.planStateManager);
LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
this.sortedScheduleGroups =
this.physicalPlan.getStageDAG().getVertices().stream()
@@ -191,13 +191,13 @@ public void onTaskStateReportFromExecutor(final String
executorId,
break;
}
- // Invoke schedulerRunner.onExecutorSlotAvailable()
+ // Invoke taskDispatcher.onExecutorSlotAvailable()
switch (newState) {
// These three states mean that a slot is made available.
case COMPLETE:
case ON_HOLD:
case SHOULD_RETRY:
- schedulerRunner.onExecutorSlotAvailable();
+ taskDispatcher.onExecutorSlotAvailable();
break;
default:
break;
@@ -218,7 +218,7 @@ public void onTaskStateReportFromExecutor(final String
executorId,
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(),
executorRepresenter.getNodeName());
executorRegistry.registerExecutor(executorRepresenter);
- schedulerRunner.onExecutorSlotAvailable();
+ taskDispatcher.onExecutorSlotAvailable();
}
@Override
@@ -242,7 +242,7 @@ public void onExecutorRemoved(final String executorId) {
@Override
public void terminate() {
- this.schedulerRunner.terminate();
+ this.taskDispatcher.terminate();
this.executorRegistry.terminate();
}
@@ -254,7 +254,7 @@ public void terminate() {
*
* These are the reasons why.
* - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new
tasks to it
- * - We make {@link SchedulerRunner} run only tasks that are READY.
+ * - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
*/
private void doSchedule() {
final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
@@ -277,8 +277,8 @@ private void doSchedule() {
// Set the pointer to the schedulable tasks.
pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
- // Notify the runner that a new collection is available.
- schedulerRunner.onNewPendingTaskCollectionAvailable();
+ // Notify the dispatcher that a new collection is available.
+ taskDispatcher.onNewPendingTaskCollectionAvailable();
} else {
LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
}
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
similarity index 90%
rename from
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
rename to
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
index 750c0fcc9..6c0222c41 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -37,15 +37,15 @@
import javax.inject.Inject;
/**
- * Schedules tasks in discrete batches (scheduling iterations).
- * A scheduling iteration occurs under one of the following conditions
+ * Dispatches tasks to executors in discrete batches (dispatch iterations).
+ * A dispatch iteration occurs under one of the following conditions
* - An executor slot becomes available (for reasons such as task
completion/failure, or executor addition)
* - A new list of tasks become available (for reasons such as stage
completion, task failure, or executor removal)
*/
@DriverSide
@NotThreadSafe
-public final class SchedulerRunner {
- private static final Logger LOG =
LoggerFactory.getLogger(SchedulerRunner.class.getName());
+final class TaskDispatcher {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskDispatcher.class.getName());
private final Map<String, PlanStateManager> planStateManagers;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorService schedulerThread;
@@ -58,14 +58,14 @@
private final SchedulingPolicy schedulingPolicy;
@Inject
- private SchedulerRunner(final SchedulingConstraintRegistry
schedulingConstraintRegistry,
- final SchedulingPolicy schedulingPolicy,
- final PendingTaskCollectionPointer
pendingTaskCollectionPointer,
- final ExecutorRegistry executorRegistry) {
+ private TaskDispatcher(final SchedulingConstraintRegistry
schedulingConstraintRegistry,
+ final SchedulingPolicy schedulingPolicy,
+ final PendingTaskCollectionPointer
pendingTaskCollectionPointer,
+ final ExecutorRegistry executorRegistry) {
this.planStateManagers = new HashMap<>();
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
- new Thread(runnable, "SchedulerRunner thread"));
+ new Thread(runnable, "TaskDispatcher thread"));
this.isSchedulerRunning = false;
this.isTerminated = false;
this.executorRegistry = executorRegistry;
@@ -74,7 +74,7 @@ private SchedulerRunner(final SchedulingConstraintRegistry
schedulingConstraintR
}
/**
- * A separate thread is run to schedule tasks to executors.
+ * A separate thread is run to dispatch tasks to executors.
* See comments in the {@link Scheduler} for avoiding race conditions.
*/
private final class SchedulerThread implements Runnable {
@@ -91,11 +91,11 @@ public void run() {
LOG.info("{} is incomplete.", planStateManager.getPlanId());
}
});
- LOG.info("SchedulerRunner Terminated!");
+ LOG.info("TaskDispatcher Terminated!");
}
}
- void doScheduleTaskList() {
+ private void doScheduleTaskList() {
final Optional<Collection<Task>> taskListOptional =
pendingTaskCollectionPointer.getAndSetNull();
if (!taskListOptional.isPresent()) {
// Task list is empty
@@ -162,7 +162,7 @@ void onNewPendingTaskCollectionAvailable() {
}
/**
- * Run the scheduler thread.
+ * Run the dispatcher thread.
*/
void run(final PlanStateManager planStateManager) {
planStateManagers.put(planStateManager.getPlanId(), planStateManager);
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index e1ab44936..2746bf52c 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -57,7 +57,7 @@
* Tests fault tolerance.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class,
SchedulingConstraintRegistry.class,
+@PrepareForTest({BlockManagerMaster.class, TaskDispatcher.class,
SchedulingConstraintRegistry.class,
PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class,
MetricMessageHandler.class})
public final class TaskRetryTest {
@Rule public TestName testName = new TestName();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services