This is an automated email from the ASF dual-hosted git repository.
sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 0c42189 [NEMO-152] Support Multiple DAGs Execution in a Single User
Program (#94)
0c42189 is described below
commit 0c4218994f5700272b2b826610d102e004e63b76
Author: Won Wook SONG <[email protected]>
AuthorDate: Thu Aug 9 13:56:45 2018 +0900
[NEMO-152] Support Multiple DAGs Execution in a Single User Program (#94)
JIRA: [NEMO-152: Support Multiple DAGs Execution in a Single User
Program](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-152)
**Major changes:**
- Divides the usage of the word 'Job' and 'Plan' to be more specific about
what is being run. Before, a job had a single physical plan, so it was possible
to use them with an identical name, but as jobs can contain multiple physical
plans, it was necessary to divide them
- Fixes the scheduler and the scheduler runner to accept multiple physical
plans
**Minor changes to note:**
- Grammatical errors on the comments
- The convert method on the DAG had almost no usages, and it was misleading
on the way it was used, so it was removed.
- I've allowed multiple spark contexts on an evaluator, so that an
evaluator can be reused for multiple physical plans.
**Tests for the changes:**
- A sample program, performing a line count on one execution, and then word
count on another, is added as an integration test. All existing tests pass with
the changes.
**Other comments:**
- N/A
resolves
[NEMO-152](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-152)
---
bin/json2dot.py | 18 +--
.../java/edu/snu/nemo/client/ClientEndpoint.java | 24 ++--
.../java/edu/snu/nemo/client/DriverEndpoint.java | 46 +++----
.../java/edu/snu/nemo/client/StateTranslator.java | 11 +-
.../edu/snu/nemo/client/ClientEndpointTest.java | 26 ++--
.../src/main/java/edu/snu/nemo/common/dag/DAG.java | 12 --
.../nemo/compiler/backend/nemo/NemoBackend.java | 5 +-
.../frontend/beam/BeamStateTranslator.java | 4 +-
.../compiler/frontend/beam/NemoPipelineResult.java | 2 +-
.../compiler/frontend/spark/sql/SparkSession.java | 3 +
.../snu/nemo/compiler/optimizer/policy/Policy.java | 1 +
.../compiler/backend/nemo/DAGConverterTest.java | 2 +-
.../java/edu/snu/nemo/examples/spark/MRJava.java | 22 +++-
.../snu/nemo/runtime/common/metric/JobMetric.java | 10 +-
.../edu/snu/nemo/runtime/common/plan/Task.java | 18 +--
.../common/state/{JobState.java => PlanState.java} | 16 +--
.../edu/snu/nemo/driver/UserApplicationRunner.java | 10 +-
.../nemo/runtime/master/BlockManagerMaster.java | 5 +-
...{JobStateManager.java => PlanStateManager.java} | 139 +++++++++++----------
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 24 ++--
.../UpdatePhysicalPlanEventHandler.java | 2 +-
...SingleJobScheduler.java => BatchScheduler.java} | 76 ++++++-----
.../nemo/runtime/master/scheduler/Scheduler.java | 16 +--
.../runtime/master/scheduler/SchedulerRunner.java | 24 ++--
...eManagerTest.java => PlanStateManagerTest.java} | 50 ++++----
...bSchedulerTest.java => BatchSchedulerTest.java} | 32 ++---
.../master/scheduler/SchedulerTestUtil.java | 14 +--
.../runtime/master/scheduler/TaskRetryTest.java | 50 ++++----
.../runtime/common/plan/TestPlanGenerator.java | 2 +-
29 files changed, 336 insertions(+), 328 deletions(-)
diff --git a/bin/json2dot.py b/bin/json2dot.py
index 582f30a..f41146b 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -44,15 +44,15 @@ def stateToColor(state):
except:
return 'white'
-class JobState:
+class PlanState:
def __init__(self, data):
- self.id = data['jobId']
+ self.id = data['planId']
self.stages = {}
for stage in data['stages']:
self.stages[stage['id']] = StageState(stage)
@classmethod
def empty(cls):
- return cls({'jobId': None, 'stages': []})
+ return cls({'planId': None, 'stages': []})
def get(self, id):
try:
return self.stages[id]
@@ -96,11 +96,11 @@ class DAG:
A class for converting DAG to Graphviz representation.
JSON representation should be formatted like what toString method in
DAG.java does.
'''
- def __init__(self, dag, jobState):
+ def __init__(self, dag, planState):
self.vertices = {}
self.edges = []
for vertex in dag['vertices']:
- self.vertices[vertex['id']] = Vertex(vertex['id'],
vertex['properties'], jobState.get(vertex['id']))
+ self.vertices[vertex['id']] = Vertex(vertex['id'],
vertex['properties'], planState.get(vertex['id']))
for edge in dag['edges']:
self.edges.append(Edge(self.vertices[edge['src']],
self.vertices[edge['dst']], edge['properties']))
@property
@@ -178,7 +178,7 @@ class NormalVertex:
class LoopVertex:
def __init__(self, id, properties):
self.id = id
- self.dag = DAG(properties['DAG'], JobState.empty())
+ self.dag = DAG(properties['DAG'], PlanState.empty())
self.remaining_iteration = properties['remainingIteration']
self.executionProperties = properties['executionProperties']
self.incoming = properties['dagIncomingEdges']
@@ -217,7 +217,7 @@ class Stage:
def __init__(self, id, properties, state):
self.id = id
self.properties = properties
- self.stageDAG = DAG(properties['irDag'], JobState.empty())
+ self.stageDAG = DAG(properties['irDag'], PlanState.empty())
self.idx = getIdx()
self.state = state
self.executionProperties = self.properties['executionProperties']
@@ -316,9 +316,9 @@ class RuntimeEdge:
def jsonToDot(jsonDict):
try:
- dag = DAG(jsonDict['dag'], JobState(jsonDict['jobState']))
+ dag = DAG(jsonDict['dag'], PlanState(jsonDict['planState']))
except:
- dag = DAG(jsonDict, JobState.empty())
+ dag = DAG(jsonDict, PlanState.empty())
return 'digraph dag {compound=true; nodesep=1.0; forcelabels=true;' +
dag.dot + '}'
if __name__ == "__main__":
diff --git a/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
b/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
index 2d27647..b31807a 100644
--- a/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
+++ b/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.client;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -24,12 +24,12 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
- * A request endpoint in client side of a job.
+ * A request endpoint in client side of a plan.
*/
public abstract class ClientEndpoint {
/**
- * The request endpoint in driver side of the job.
+ * The request endpoint in driver side of the plan.
*/
private final AtomicReference<DriverEndpoint> driverEndpoint;
@@ -41,13 +41,13 @@ public abstract class ClientEndpoint {
private static final long DEFAULT_DRIVER_WAIT_IN_MILLIS = 100;
/**
- * A {@link StateTranslator} for this job.
+ * A {@link StateTranslator} for this plan.
*/
private final StateTranslator stateTranslator;
/**
* Constructor.
- * @param stateTranslator translator to translate between the state of job
and corresponding.
+ * @param stateTranslator translator to translate between the state of plan
and corresponding.
*/
public ClientEndpoint(final StateTranslator stateTranslator) {
this.driverEndpoint = new AtomicReference<>();
@@ -57,7 +57,7 @@ public abstract class ClientEndpoint {
}
/**
- * Connect the driver endpoint of this job.
+ * Connect the driver endpoint of this plan.
* This method will be called by {@link DriverEndpoint}.
*
* @param dep connected with this client.
@@ -122,15 +122,15 @@ public abstract class ClientEndpoint {
}
/**
- * Get the current state of the running job.
+ * Get the current state of the running plan.
*
- * @return the current state of the running job.
+ * @return the current state of the running plan.
*/
- public final synchronized Enum getJobState() {
+ public final synchronized Enum getPlanState() {
if (driverEndpoint.get() != null) {
return stateTranslator.translateState(driverEndpoint.get().getState());
} else {
- return stateTranslator.translateState(JobState.State.READY);
+ return stateTranslator.translateState(PlanState.State.READY);
}
}
@@ -161,7 +161,7 @@ public abstract class ClientEndpoint {
return stateTranslator.translateState(driverEndpoint.get().
waitUntilFinish(timeout - unit.convert(consumedTime,
TimeUnit.NANOSECONDS), unit));
} else {
- return JobState.State.READY;
+ return PlanState.State.READY;
}
}
}
@@ -181,7 +181,7 @@ public abstract class ClientEndpoint {
if (driverIsConnected) {
return
stateTranslator.translateState(driverEndpoint.get().waitUntilFinish());
} else {
- return JobState.State.READY;
+ return PlanState.State.READY;
}
}
}
diff --git a/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
b/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
index fc33eea..4338877 100644
--- a/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
+++ b/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
@@ -15,67 +15,67 @@
*/
package edu.snu.nemo.client;
-import edu.snu.nemo.runtime.common.state.JobState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.common.state.PlanState;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import java.util.concurrent.TimeUnit;
/**
- * A request endpoint in driver side of a job.
+ * A request endpoint in driver side of a plan.
*/
public final class DriverEndpoint {
/**
- * The {@link JobStateManager} of the running job.
+ * The {@link PlanStateManager} of the running plan.
*/
- private final JobStateManager jobStateManager;
+ private final PlanStateManager planStateManager;
/**
- * The {@link ClientEndpoint} of the job.
+ * The {@link ClientEndpoint} of the plan.
*/
private final ClientEndpoint clientEndpoint;
/**
* Construct an endpoint in driver side.
* This method will be called by {@link ClientEndpoint}.
- * @param jobStateManager of running job.
- * @param clientEndpoint of running job.
+ * @param planStateManager of running plan.
+ * @param clientEndpoint of running plan.
*/
- public DriverEndpoint(final JobStateManager jobStateManager,
+ public DriverEndpoint(final PlanStateManager planStateManager,
final ClientEndpoint clientEndpoint) {
- this.jobStateManager = jobStateManager;
+ this.planStateManager = planStateManager;
this.clientEndpoint = clientEndpoint;
clientEndpoint.connectDriver(this);
}
/**
- * Get the current state of the running job.
+ * Get the current state of the running plan.
* This method will be called by {@link ClientEndpoint}.
- * @return the current state of the running job.
+ * @return the current state of the running plan.
*/
- JobState.State getState() {
- return jobStateManager.getJobState();
+ PlanState.State getState() {
+ return planStateManager.getPlanState();
}
/**
- * Wait for this job to be finished and return the final state.
+ * Wait for this plan to be finished and return the final state.
* It wait for at most the given time.
* This method will be called by {@link ClientEndpoint}.
* @param timeout of waiting.
* @param unit of the timeout.
- * @return the final state of this job.
+ * @return the final state of this plan.
*/
- JobState.State waitUntilFinish(final long timeout,
- final TimeUnit unit) {
- return jobStateManager.waitUntilFinish(timeout, unit);
+ PlanState.State waitUntilFinish(final long timeout,
+ final TimeUnit unit) {
+ return planStateManager.waitUntilFinish(timeout, unit);
}
/**
- * Wait for this job to be finished and return the final state.
+ * Wait for this plan to be finished and return the final state.
* This method will be called by {@link ClientEndpoint}.
- * @return the final state of this job.
+ * @return the final state of this plan.
*/
- JobState.State waitUntilFinish() {
- return jobStateManager.waitUntilFinish();
+ PlanState.State waitUntilFinish() {
+ return planStateManager.waitUntilFinish();
}
}
diff --git a/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
b/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
index 35082d7..287a0a0 100644
--- a/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
+++ b/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
@@ -15,19 +15,18 @@
*/
package edu.snu.nemo.client;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
/**
- * A class provides the translation between the state of job and corresponding
- * {@link ClientEndpoint}.
+ * A class provides the translation between the state of plan and
corresponding {@link ClientEndpoint}.
*/
public interface StateTranslator {
/**
- * Translate a job state of nemo to a corresponding client endpoint state.
+ * Translate a plan state of nemo to a corresponding client endpoint state.
*
- * @param jobState to translate.
+ * @param planState to translate.
* @return the translated state.
*/
- Enum translateState(final JobState.State jobState);
+ Enum translateState(final PlanState.State planState);
}
diff --git a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
index 3f0d1a0..744a992 100644
--- a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
+++ b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
@@ -16,10 +16,10 @@
package edu.snu.nemo.client;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,32 +50,32 @@ public class ClientEndpointTest {
final StateTranslator stateTranslator = mock(StateTranslator.class);
when(stateTranslator.translateState(any())).then(state ->
state.getArgument(0));
final ClientEndpoint clientEndpoint = new
TestClientEndpoint(stateTranslator);
- assertEquals(clientEndpoint.getJobState(), JobState.State.READY);
+ assertEquals(clientEndpoint.getPlanState(), PlanState.State.READY);
// Wait for connection but not connected.
- assertEquals(clientEndpoint.waitUntilJobFinish(100,
TimeUnit.MILLISECONDS), JobState.State.READY);
+ assertEquals(clientEndpoint.waitUntilJobFinish(100,
TimeUnit.MILLISECONDS), PlanState.State.READY);
- // Create a JobStateManager of a dag and create a DriverEndpoint with it.
+ // Create a PlanStateManager of a dag and create a DriverEndpoint with it.
final PhysicalPlan physicalPlan =
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
false);
- final JobStateManager jobStateManager =
- new JobStateManager(physicalPlan, metricMessageHandler,
MAX_SCHEDULE_ATTEMPT);
+ final PlanStateManager planStateManager =
+ new PlanStateManager(physicalPlan, metricMessageHandler,
MAX_SCHEDULE_ATTEMPT);
- final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager,
clientEndpoint);
+ final DriverEndpoint driverEndpoint = new DriverEndpoint(planStateManager,
clientEndpoint);
// Check the current state.
- assertEquals(clientEndpoint.getJobState(), JobState.State.EXECUTING);
+ assertEquals(clientEndpoint.getPlanState(), PlanState.State.EXECUTING);
// Wait for the job to finish but not finished
- assertEquals(clientEndpoint.waitUntilJobFinish(100,
TimeUnit.MILLISECONDS), JobState.State.EXECUTING);
+ assertEquals(clientEndpoint.waitUntilJobFinish(100,
TimeUnit.MILLISECONDS), PlanState.State.EXECUTING);
// Check finish.
final List<String> tasks =
physicalPlan.getStageDAG().getTopologicalSort().stream()
.flatMap(stage -> stage.getTaskIds().stream())
.collect(Collectors.toList());
- tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId,
TaskState.State.EXECUTING));
- tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId,
TaskState.State.COMPLETE));
- assertEquals(JobState.State.COMPLETE, clientEndpoint.waitUntilJobFinish());
+ tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId,
TaskState.State.EXECUTING));
+ tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId,
TaskState.State.COMPLETE));
+ assertEquals(PlanState.State.COMPLETE,
clientEndpoint.waitUntilJobFinish());
}
/**
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index aeaea6f..593a99b 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -27,7 +27,6 @@ import java.io.PrintWriter;
import java.io.Serializable;
import java.util.*;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -84,17 +83,6 @@ public final class DAG<V extends Vertex, E extends Edge<V>>
implements Serializa
}
/**
- * Converts a DAG into another DAG according to a function.
- * @param function to apply when converting a DAG to another.
- * @param <V2> the converted DAG's vertex type.
- * @param <E2> the converted DAG's edge type.
- * @return the converted DAG.
- */
- public <V2 extends Vertex, E2 extends Edge<V2>> DAG<V2, E2> convert(final
Function<DAG<V, E>, DAG<V2, E2>> function) {
- return function.apply(this);
- }
-
- /**
* Retrieves the vertex given its ID.
* @param id of the vertex to retrieve
* @return the vertex
diff --git
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 465fb77..3e933a2 100644
---
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -60,8 +60,7 @@ public final class NemoBackend implements
Backend<PhysicalPlan> {
*/
public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
final PhysicalPlanGenerator
physicalPlanGenerator) {
- final DAG<Stage, StageEdge> stageDAG =
irDAG.convert(physicalPlanGenerator);
- final PhysicalPlan physicalPlan = new
PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
- return physicalPlan;
+ final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
+ return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
stageDAG);
}
}
diff --git
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
index 0212c20..3f420e8 100644
---
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
+++
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.compiler.frontend.beam;
import edu.snu.nemo.client.StateTranslator;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
import static org.apache.beam.sdk.PipelineResult.State.*;
@@ -33,7 +33,7 @@ public final class BeamStateTranslator implements
StateTranslator {
* @return the translated state.
*/
@Override
- public Enum translateState(final JobState.State jobState) {
+ public Enum translateState(final PlanState.State jobState) {
switch (jobState) {
case READY:
return RUNNING;
diff --git
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
index ffd09cf..cf0ab51 100644
---
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -37,7 +37,7 @@ public final class NemoPipelineResult extends ClientEndpoint
implements Pipeline
@Override
public State getState() {
- return (State) super.getJobState();
+ return (State) super.getPlanState();
}
@Override
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
index 65515cc..9393ec6 100644
---
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
@@ -321,6 +321,9 @@ public final class SparkSession extends
org.apache.spark.sql.SparkSession implem
if (!options.containsKey("spark.master")) { // default spark_master
option.
return this.master("local[*]").getOrCreate();
}
+ if (!options.containsKey("spark.driver.allowMultipleContexts")) {
+ return this.config("spark.driver.allowMultipleContexts",
"true").getOrCreate();
+ }
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("ubuntu"));
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
index 6516012..60d897b 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
/**
* An interface for policies, each of which is composed of a list of static
optimization passes.
* The list of static optimization passes are run in the order provided by the
implementation.
+ * Most policies follow the implementation in {@link PolicyImpl}.
*/
public interface Policy extends Serializable {
/**
diff --git
a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
index b4237bf..52cc006 100644
---
a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
+++
b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -76,7 +76,7 @@ public final class DAGConverterTest {
final DAG<IRVertex, IREdge> irDAG = new
TestPolicy().runCompileTimeOptimization(
irDAGBuilder.buildWithoutSourceSinkCheck(), DAG.EMPTY_DAG_DIRECTORY);
final DAG<Stage, StageEdge> DAGOfStages =
physicalPlanGenerator.stagePartitionIrDAG(irDAG);
- final DAG<Stage, StageEdge> physicalDAG =
irDAG.convert(physicalPlanGenerator);
+ final DAG<Stage, StageEdge> physicalDAG =
physicalPlanGenerator.apply(irDAG);
// Test DAG of stages
final List<Stage> sortedDAGOfStages = DAGOfStages.getTopologicalSort();
diff --git
a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index e49dd0c..31ac261 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -67,5 +67,25 @@ public final class MRJava {
}
}
- // TODO #152: enable execution of multiple jobs (call scheduleJob multiple
times with caching).
+ @Test(timeout = TIMEOUT)
+ public void testSparkWordAndLineCount() throws Exception {
+ final String inputFileName = "test_input_wordcount_spark";
+ final String outputFileName = "test_output_wordcount_spark";
+ final String expectedOutputFilename =
"expected_output_word_and_line_count";
+ final String inputFilePath = fileBasePath + inputFileName;
+ final String outputFilePath = fileBasePath + outputFileName;
+
+ JobLauncher.main(builder
+ .addJobId(JavaWordAndLineCount.class.getSimpleName() + "_test")
+ .addUserMain(JavaWordAndLineCount.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath)
+ .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName,
expectedOutputFilename);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
}
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
index fea3a20..ffedf21 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
@@ -20,7 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
import java.io.IOException;
import java.util.ArrayList;
@@ -29,9 +29,9 @@ import java.util.List;
/**
* Metric class for Job (or {@link PhysicalPlan}).
*/
-public final class JobMetric implements StateMetric<JobState.State> {
+public final class JobMetric implements StateMetric<PlanState.State> {
private String id;
- private List<StateTransitionEvent<JobState.State>> stateTransitionEvents =
new ArrayList<>();
+ private List<StateTransitionEvent<PlanState.State>> stateTransitionEvents =
new ArrayList<>();
private JsonNode stageDagJson;
public JobMetric(final PhysicalPlan physicalPlan) {
@@ -63,12 +63,12 @@ public final class JobMetric implements
StateMetric<JobState.State> {
}
@Override
- public List<StateTransitionEvent<JobState.State>> getStateTransitionEvents()
{
+ public List<StateTransitionEvent<PlanState.State>>
getStateTransitionEvents() {
return stateTransitionEvents;
}
@Override
- public void addEvent(final JobState.State prevState, final JobState.State
newState) {
+ public void addEvent(final PlanState.State prevState, final PlanState.State
newState) {
stateTransitionEvents.add(new
StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
}
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 7663a8c..0cfe863 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -28,7 +28,7 @@ import java.util.Optional;
* A Task is a self-contained executable that can be executed on a machine.
*/
public final class Task implements Serializable {
- private final String jobId;
+ private final String planId;
private final String taskId;
private final List<StageEdge> taskIncomingEdges;
private final List<StageEdge> taskOutgoingEdges;
@@ -40,7 +40,7 @@ public final class Task implements Serializable {
/**
* Constructor.
*
- * @param jobId the id of the job.
+ * @param planId the id of the physical plan.
* @param taskId the ID of the task.
* @param attemptIdx the attempt index.
* @param executionProperties {@link VertexExecutionProperty} map for the
corresponding stage
@@ -49,7 +49,7 @@ public final class Task implements Serializable {
* @param taskOutgoingEdges the outgoing edges of the task.
* @param irVertexIdToReadable the map between IRVertex id to readable.
*/
- public Task(final String jobId,
+ public Task(final String planId,
final String taskId,
final int attemptIdx,
final ExecutionPropertyMap<VertexExecutionProperty>
executionProperties,
@@ -57,7 +57,7 @@ public final class Task implements Serializable {
final List<StageEdge> taskIncomingEdges,
final List<StageEdge> taskOutgoingEdges,
final Map<String, Readable> irVertexIdToReadable) {
- this.jobId = jobId;
+ this.planId = planId;
this.taskId = taskId;
this.attemptIdx = attemptIdx;
this.executionProperties = executionProperties;
@@ -68,10 +68,10 @@ public final class Task implements Serializable {
}
/**
- * @return the id of the job.
+ * @return the id of the plan.
*/
- public String getJobId() {
- return jobId;
+ public String getPlanId() {
+ return planId;
}
/**
@@ -138,8 +138,8 @@ public final class Task implements Serializable {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
- sb.append("jobId: ");
- sb.append(jobId);
+ sb.append("planId: ");
+ sb.append(planId);
sb.append(" / taskId: ");
sb.append(taskId);
sb.append(" / attempt: ");
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
similarity index 79%
rename from
runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
rename to
runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
index 6d65691..7edafab 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
@@ -20,10 +20,10 @@ import edu.snu.nemo.common.StateMachine;
/**
* Represents the states and their transitions of a physical plan.
*/
-public final class JobState {
+public final class PlanState {
private final StateMachine stateMachine;
- public JobState() {
+ public PlanState() {
stateMachine = buildTaskStateMachine();
}
@@ -31,16 +31,16 @@ public final class JobState {
final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
// Add states
- stateMachineBuilder.addState(State.READY, "The job has been created and
submitted to runtime.");
- stateMachineBuilder.addState(State.EXECUTING, "The job is executing (with
its stages executing).");
- stateMachineBuilder.addState(State.COMPLETE, "The job is complete.");
- stateMachineBuilder.addState(State.FAILED, "Job failed.");
+ stateMachineBuilder.addState(State.READY, "The plan has been created and
submitted to runtime.");
+ stateMachineBuilder.addState(State.EXECUTING, "The plan is executing (with
its stages executing).");
+ stateMachineBuilder.addState(State.COMPLETE, "The plan is complete.");
+ stateMachineBuilder.addState(State.FAILED, "Plan failed.");
// Add transitions
stateMachineBuilder.addTransition(State.READY, State.EXECUTING,
"Begin executing!");
stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE,
- "All stages complete, job complete");
+ "All stages complete, plan complete");
stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED,
"Unrecoverable failure in a stage");
@@ -54,7 +54,7 @@ public final class JobState {
}
/**
- * JobState.
+ * PlanState.
*/
public enum State {
READY,
diff --git
a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 52ea468..6863b6f 100644
---
a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++
b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -26,7 +26,7 @@ import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
import edu.snu.nemo.compiler.optimizer.policy.Policy;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.RuntimeMaster;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.reef.tang.Injector;
@@ -102,16 +102,16 @@ public final class UserApplicationRunner {
physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical
execution plan by compiler");
// Execute!
- final Pair<JobStateManager, ScheduledExecutorService> executionResult =
+ final Pair<PlanStateManager, ScheduledExecutorService> executionResult =
runtimeMaster.execute(physicalPlan, maxScheduleAttempt);
// Wait for the job to finish and stop logging
- final JobStateManager jobStateManager = executionResult.left();
+ final PlanStateManager planStateManager = executionResult.left();
final ScheduledExecutorService dagLoggingExecutor =
executionResult.right();
- jobStateManager.waitUntilFinish();
+ planStateManager.waitUntilFinish();
dagLoggingExecutor.shutdown();
- jobStateManager.storeJSON(dagDirectory, "final");
+ planStateManager.storeJSON(dagDirectory, "final");
LOG.info("{} is complete!", physicalPlan.getId());
} catch (final Exception e) {
throw new RuntimeException(e);
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index f07fe1b..5461039 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static edu.snu.nemo.runtime.common.state.BlockState.State.IN_PROGRESS;
+import static edu.snu.nemo.runtime.common.state.BlockState.State.NOT_AVAILABLE;
/**
* Master-side block manager.
@@ -229,8 +230,8 @@ public final class BlockManagerMaster {
try {
if (producerTaskIdToBlockIds.containsKey(scheduledTaskId)) {
producerTaskIdToBlockIds.get(scheduledTaskId).forEach(blockId -> {
- if (!blockIdToMetadata.get(blockId).getBlockState()
- .getStateMachine().getCurrentState().equals(IN_PROGRESS)) {
+ if (blockIdToMetadata.get(blockId).getBlockState()
+ .getStateMachine().getCurrentState().equals(NOT_AVAILABLE)) {
onBlockStateChanged(blockId, IN_PROGRESS, null);
}
});
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
similarity index 73%
rename from
runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
rename to
runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index 9ef2f8e..dd9f302 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.StateMachine;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
import edu.snu.nemo.runtime.common.state.StageState;
import java.io.File;
@@ -48,23 +48,23 @@ import javax.annotation.concurrent.ThreadSafe;
import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
/**
- * Maintains three levels of state machines (JobState, StageState, and
TaskState) of a physical plan.
+ * Maintains three levels of state machines (PlanState, StageState, and
TaskState) of a physical plan.
* The main API this class provides is onTaskStateReportFromExecutor(), which
directly changes a TaskState.
- * JobState and StageState are updated internally in the class, and can only
be read from the outside.
+ * PlanState and StageState are updated internally in the class, and can only
be read from the outside.
*
* (CONCURRENCY) The public methods of this class are synchronized.
*/
@DriverSide
@ThreadSafe
-public final class JobStateManager {
- private static final Logger LOG =
LoggerFactory.getLogger(JobStateManager.class.getName());
- private final String jobId;
+public final class PlanStateManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(PlanStateManager.class.getName());
+ private final String planId;
private final int maxScheduleAttempt;
/**
- * The data structures below track the execution states of this job.
+ * The data structures below track the execution states of this plan.
*/
- private final JobState jobState;
+ private final PlanState planState;
private final Map<String, StageState> idToStageStates;
private final Map<String, TaskState> idToTaskStates;
@@ -75,15 +75,15 @@ public final class JobStateManager {
private final Map<String, Integer> taskIdToCurrentAttempt;
/**
- * Represents the job to manage.
+ * Represents the plan to manage.
*/
private final PhysicalPlan physicalPlan;
/**
- * A lock and condition to check whether the job is finished or not.
+ * A lock and condition to check whether the plan is finished or not.
*/
private final Lock finishLock;
- private final Condition jobFinishedCondition;
+ private final Condition planFinishedCondition;
/**
* For metrics.
@@ -92,33 +92,33 @@ public final class JobStateManager {
private MetricStore metricStore;
- public JobStateManager(final PhysicalPlan physicalPlan,
- final MetricMessageHandler metricMessageHandler,
- final int maxScheduleAttempt) {
- this.jobId = physicalPlan.getId();
+ public PlanStateManager(final PhysicalPlan physicalPlan,
+ final MetricMessageHandler metricMessageHandler,
+ final int maxScheduleAttempt) {
+ this.planId = physicalPlan.getId();
this.physicalPlan = physicalPlan;
this.metricMessageHandler = metricMessageHandler;
this.maxScheduleAttempt = maxScheduleAttempt;
- this.jobState = new JobState();
+ this.planState = new PlanState();
this.idToStageStates = new HashMap<>();
this.idToTaskStates = new HashMap<>();
this.taskIdToCurrentAttempt = new HashMap<>();
this.finishLock = new ReentrantLock();
- this.jobFinishedCondition = finishLock.newCondition();
+ this.planFinishedCondition = finishLock.newCondition();
this.metricStore = MetricStore.getStore();
- metricStore.getOrCreateMetric(JobMetric.class,
jobId).setStageDAG(physicalPlan.getStageDAG());
- metricStore.triggerBroadcast(JobMetric.class, jobId);
+ metricStore.getOrCreateMetric(JobMetric.class,
planId).setStageDAG(physicalPlan.getStageDAG());
+ metricStore.triggerBroadcast(JobMetric.class, planId);
initializeComputationStates();
}
/**
- * Initializes the states for the job/stages/tasks for this job.
+ * Initializes the states for the plan/stages/tasks for this plan.
*/
private void initializeComputationStates() {
- onJobStateChanged(JobState.State.EXECUTING);
+ onPlanStateChanged(PlanState.State.EXECUTING);
- // Initialize the states for the job down to task-level.
+ // Initialize the states for the plan down to task-level.
physicalPlan.getStageDAG().topologicalDo(stage -> {
idToStageStates.put(stage.getId(), new StageState());
stage.getTaskIds().forEach(taskId -> {
@@ -132,9 +132,9 @@ public final class JobStateManager {
* Updates the state of a task.
* Task state changes can occur both in master and executor.
* State changes that occur in master are
- * initiated in {@link
edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}.
+ * initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchScheduler}.
* State changes that occur in executors are sent to master as a control
message,
- * and the call to this method is initiated in {@link
edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}
+ * and the call to this method is initiated in {@link
edu.snu.nemo.runtime.master.scheduler.BatchScheduler}
* when the message/event is received.
*
* @param taskId the ID of the task.
@@ -226,97 +226,98 @@ public final class JobStateManager {
new Object[]{stageId, stageStateMachine.getCurrentState(),
newStageState});
stageStateMachine.setState(newStageState);
- // Change job state if needed
+ // Change plan state if needed
final boolean allStagesCompleted =
idToStageStates.values().stream().allMatch(state ->
state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
if (allStagesCompleted) {
- onJobStateChanged(JobState.State.COMPLETE);
+ onPlanStateChanged(PlanState.State.COMPLETE);
}
}
/**
* (PRIVATE METHOD)
- * Updates the state of the job.
- * @param newState of the job.
+ * Updates the state of the plan.
+ * @param newState of the plan.
*/
- private void onJobStateChanged(final JobState.State newState) {
- metricStore.getOrCreateMetric(JobMetric.class, jobId)
- .addEvent((JobState.State)
jobState.getStateMachine().getCurrentState(), newState);
- metricStore.triggerBroadcast(JobMetric.class, jobId);
+ private void onPlanStateChanged(final PlanState.State newState) {
+ metricStore.getOrCreateMetric(JobMetric.class, planId)
+ .addEvent((PlanState.State)
planState.getStateMachine().getCurrentState(), newState);
+ metricStore.triggerBroadcast(JobMetric.class, planId);
- jobState.getStateMachine().setState(newState);
+ planState.getStateMachine().setState(newState);
- if (newState == JobState.State.EXECUTING) {
- LOG.debug("Executing Job ID {}...", this.jobId);
- } else if (newState == JobState.State.COMPLETE || newState ==
JobState.State.FAILED) {
- LOG.debug("Job ID {} {}!", new Object[]{jobId, newState});
+ if (newState == PlanState.State.EXECUTING) {
+ LOG.debug("Executing Plan ID {}...", this.planId);
+ } else if (newState == PlanState.State.COMPLETE || newState ==
PlanState.State.FAILED) {
+ LOG.debug("Plan ID {} {}!", new Object[]{planId, newState});
- // Awake all threads waiting the finish of this job.
+ // Awake all threads waiting the finish of this plan.
finishLock.lock();
try {
- jobFinishedCondition.signalAll();
+ planFinishedCondition.signalAll();
} finally {
finishLock.unlock();
}
} else {
- throw new IllegalStateTransitionException(new Exception("Illegal Job
State Transition"));
+ throw new IllegalStateTransitionException(new Exception("Illegal Plan
State Transition"));
}
}
/**
- * Wait for this job to be finished and return the final state.
- * @return the final state of this job.
+ * Wait for this plan to be finished and return the final state.
+ * @return the final state of this plan.
*/
- public JobState.State waitUntilFinish() {
+ public PlanState.State waitUntilFinish() {
finishLock.lock();
try {
- if (!isJobDone()) {
- jobFinishedCondition.await();
+ if (!isPlanDone()) {
+ planFinishedCondition.await();
}
} catch (final InterruptedException e) {
- LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+ LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
Thread.currentThread().interrupt();
} finally {
finishLock.unlock();
}
- return getJobState();
+ return getPlanState();
}
/**
- * Wait for this job to be finished and return the final state.
+ * Wait for this plan to be finished and return the final state.
* It wait for at most the given time.
* @param timeout of waiting.
* @param unit of the timeout.
- * @return the final state of this job.
+ * @return the final state of this plan.
*/
- public JobState.State waitUntilFinish(final long timeout, final TimeUnit
unit) {
+ public PlanState.State waitUntilFinish(final long timeout, final TimeUnit
unit) {
finishLock.lock();
try {
- if (!isJobDone()) {
- if (!jobFinishedCondition.await(timeout, unit)) {
- LOG.warn("Timeout during waiting the finish of Job ID {}", jobId);
+ if (!isPlanDone()) {
+ if (!planFinishedCondition.await(timeout, unit)) {
+ LOG.warn("Timeout during waiting the finish of Plan ID {}", planId);
}
}
} catch (final InterruptedException e) {
- LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+ LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
Thread.currentThread().interrupt();
} finally {
finishLock.unlock();
}
- return getJobState();
+ return getPlanState();
}
- public synchronized boolean isJobDone() {
- return (getJobState() == JobState.State.COMPLETE || getJobState() ==
JobState.State.FAILED);
+ public synchronized boolean isPlanDone() {
+ return (getPlanState() == PlanState.State.COMPLETE || getPlanState() ==
PlanState.State.FAILED);
}
- public synchronized String getJobId() {
- return jobId;
+
+ public synchronized String getPlanId() {
+ return planId;
}
- public synchronized JobState.State getJobState() {
- return (JobState.State) jobState.getStateMachine().getCurrentState();
+ public synchronized PlanState.State getPlanState() {
+ return (PlanState.State) planState.getStateMachine().getCurrentState();
}
public synchronized StageState.State getStageState(final String stageId) {
@@ -341,7 +342,7 @@ public final class JobStateManager {
}
/**
- * Stores JSON representation of job state into a file.
+ * Stores JSON representation of plan state into a file.
* @param directory the directory which JSON representation is saved to
* @param suffix suffix for file name
*/
@@ -350,29 +351,29 @@ public final class JobStateManager {
return;
}
- final File file = new File(directory, jobId + "-" + suffix + ".json");
+ final File file = new File(directory, planId + "-" + suffix + ".json");
file.getParentFile().mkdirs();
try (final PrintWriter printWriter = new PrintWriter(file)) {
printWriter.println(toStringWithPhysicalPlan());
- LOG.debug(String.format("JSON representation of job state for %s(%s) was
saved to %s",
- jobId, suffix, file.getPath()));
+ LOG.debug(String.format("JSON representation of plan state for %s(%s)
was saved to %s",
+ planId, suffix, file.getPath()));
} catch (final IOException e) {
- LOG.warn(String.format("Cannot store JSON representation of job state
for %s(%s) to %s: %s",
- jobId, suffix, file.getPath(), e.toString()));
+ LOG.warn(String.format("Cannot store JSON representation of plan state
for %s(%s) to %s: %s",
+ planId, suffix, file.getPath(), e.toString()));
}
}
public String toStringWithPhysicalPlan() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"dag\":
").append(physicalPlan.getStageDAG().toString()).append(", ");
- sb.append("\"jobState\": ").append(toString()).append("}");
+ sb.append("\"planState\": ").append(toString()).append("}");
return sb.toString();
}
@Override
public synchronized String toString() {
final StringBuilder sb = new StringBuilder("{");
- sb.append("\"jobId\": \"").append(jobId).append("\", ");
+ sb.append("\"planId\": \"").append(planId).append("\", ");
sb.append("\"stages\": [");
boolean isFirstStage = true;
for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 0aef6a6..41ad243 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -62,7 +62,7 @@ import static
edu.snu.nemo.runtime.common.state.TaskState.State.ON_HOLD;
* Runtime Master is the central controller of Runtime.
* Compiler submits an {@link PhysicalPlan} to Runtime Master to execute a job.
* Runtime Master handles:
- * a) Scheduling the job with {@link Scheduler}.
+ * a) Scheduling the plan with {@link Scheduler}.
* b) Managing resources with {@link ContainerManager}.
* c) Managing blocks with {@link BlockManagerMaster}.
* d) Receiving and sending control messages with {@link
MessageEnvironment}.
@@ -156,22 +156,22 @@ public final class RuntimeMaster {
* @param plan to execute
* @param maxScheduleAttempt the max number of times this plan/sub-part of
the plan should be attempted.
*/
- public Pair<JobStateManager, ScheduledExecutorService> execute(final
PhysicalPlan plan,
- final int
maxScheduleAttempt) {
- final Callable<Pair<JobStateManager, ScheduledExecutorService>>
jobExecutionCallable = () -> {
+ public Pair<PlanStateManager, ScheduledExecutorService> execute(final
PhysicalPlan plan,
+ final int
maxScheduleAttempt) {
+ final Callable<Pair<PlanStateManager, ScheduledExecutorService>>
planExecutionCallable = () -> {
this.irVertices.addAll(plan.getIdToIRVertex().values());
try {
blockManagerMaster.initialize(plan);
- final JobStateManager jobStateManager = new JobStateManager(plan,
metricMessageHandler, maxScheduleAttempt);
- scheduler.scheduleJob(plan, jobStateManager);
- final ScheduledExecutorService dagLoggingExecutor =
scheduleDagLogging(jobStateManager);
- return Pair.of(jobStateManager, dagLoggingExecutor);
+ final PlanStateManager planStateManager = new PlanStateManager(plan,
metricMessageHandler, maxScheduleAttempt);
+ scheduler.schedulePlan(plan, planStateManager);
+ final ScheduledExecutorService dagLoggingExecutor =
scheduleDagLogging(planStateManager);
+ return Pair.of(planStateManager, dagLoggingExecutor);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
try {
- return runtimeMasterThread.submit(jobExecutionCallable).get();
+ return runtimeMasterThread.submit(planExecutionCallable).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -444,17 +444,17 @@ public final class RuntimeMaster {
/**
* Schedules a periodic DAG logging thread.
- * @param jobStateManager for the job the DAG should be logged.
+ * @param planStateManager for the plan the DAG should be logged.
* TODO #20: RESTful APIs to Access Job State and Metric.
* @return the scheduled executor service.
*/
- private ScheduledExecutorService scheduleDagLogging(final JobStateManager
jobStateManager) {
+ private ScheduledExecutorService scheduleDagLogging(final PlanStateManager
planStateManager) {
final ScheduledExecutorService dagLoggingExecutor =
Executors.newSingleThreadScheduledExecutor();
dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
private int dagLogFileIndex = 0;
public void run() {
- jobStateManager.storeJSON(dagDirectory,
String.valueOf(dagLogFileIndex++));
+ planStateManager.storeJSON(dagDirectory,
String.valueOf(dagLogFileIndex++));
}
}, DAG_LOGGING_PERIOD, DAG_LOGGING_PERIOD, TimeUnit.MILLISECONDS);
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 4047cbb..9872f95 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -48,6 +48,6 @@ public final class UpdatePhysicalPlanEventHandler implements
CompilerEventHandle
public void onNext(final UpdatePhysicalPlanEvent updatePhysicalPlanEvent) {
final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
- this.scheduler.updateJob(newPlan.getId(), newPlan);
+ this.scheduler.updatePlan(newPlan.getId(), newPlan);
}
}
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
similarity index 88%
rename from
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
rename to
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 57db43b..ffee479 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -31,7 +31,7 @@ import edu.snu.nemo.common.exception.*;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.LoggerFactory;
@@ -49,15 +49,15 @@ import org.slf4j.Logger;
* (CONCURRENCY) Only a single dedicated thread should use the public methods
of this class.
* (i.e., runtimeMasterThread in RuntimeMaster)
*
- * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute
and schedules the Tasks.
+ * BatchScheduler receives a single {@link PhysicalPlan} to execute and
schedules the Tasks.
*/
@DriverSide
@NotThreadSafe
-public final class BatchSingleJobScheduler implements Scheduler {
- private static final Logger LOG =
LoggerFactory.getLogger(BatchSingleJobScheduler.class.getName());
+public final class BatchScheduler implements Scheduler {
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchScheduler.class.getName());
/**
- * Components related to scheduling the given job.
+ * Components related to scheduling the given plan.
*/
private final SchedulerRunner schedulerRunner;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
@@ -70,19 +70,19 @@ public final class BatchSingleJobScheduler implements
Scheduler {
private final PubSubEventHandlerWrapper pubSubEventHandlerWrapper;
/**
- * The below variables depend on the submitted job to execute.
+ * The below variables depend on the submitted plan to execute.
*/
private PhysicalPlan physicalPlan;
- private JobStateManager jobStateManager;
+ private PlanStateManager planStateManager;
private List<List<Stage>> sortedScheduleGroups;
@Inject
- private BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
- final PendingTaskCollectionPointer
pendingTaskCollectionPointer,
- final BlockManagerMaster blockManagerMaster,
- final PubSubEventHandlerWrapper
pubSubEventHandlerWrapper,
- final UpdatePhysicalPlanEventHandler
updatePhysicalPlanEventHandler,
- final ExecutorRegistry executorRegistry) {
+ private BatchScheduler(final SchedulerRunner schedulerRunner,
+ final PendingTaskCollectionPointer
pendingTaskCollectionPointer,
+ final BlockManagerMaster blockManagerMaster,
+ final PubSubEventHandlerWrapper
pubSubEventHandlerWrapper,
+ final UpdatePhysicalPlanEventHandler
updatePhysicalPlanEventHandler,
+ final ExecutorRegistry executorRegistry) {
this.schedulerRunner = schedulerRunner;
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.blockManagerMaster = blockManagerMaster;
@@ -96,26 +96,22 @@ public final class BatchSingleJobScheduler implements
Scheduler {
}
/**
- * @param physicalPlanOfJob of the job.
- * @param jobStateManagerOfJob of the job.
+ * @param submittedPhysicalPlan the physical plan to schedule.
+ * @param submittedPlanStateManager the state manager of the plan.
*/
@Override
- public void scheduleJob(final PhysicalPlan physicalPlanOfJob, final
JobStateManager jobStateManagerOfJob) {
- if (this.physicalPlan != null || this.jobStateManager != null) {
- throw new IllegalStateException("scheduleJob() has been called more than
once");
- }
+ public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final
PlanStateManager submittedPlanStateManager) {
+ LOG.info("Scheduled plan");
- this.physicalPlan = physicalPlanOfJob;
- this.jobStateManager = jobStateManagerOfJob;
+ this.physicalPlan = submittedPhysicalPlan;
+ this.planStateManager = submittedPlanStateManager;
- schedulerRunner.run(jobStateManager);
- LOG.info("Job to schedule: {}", this.physicalPlan.getId());
+ schedulerRunner.run(this.planStateManager);
+ LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
- this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices()
- .stream()
+ this.sortedScheduleGroups =
this.physicalPlan.getStageDAG().getVertices().stream()
.collect(Collectors.groupingBy(Stage::getScheduleGroup))
- .entrySet()
- .stream()
+ .entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList());
@@ -124,8 +120,8 @@ public final class BatchSingleJobScheduler implements
Scheduler {
}
@Override
- public void updateJob(final String jobId, final PhysicalPlan
newPhysicalPlan) {
- // update the job in the scheduler.
+ public void updatePlan(final String planId, final PhysicalPlan
newPhysicalPlan) {
+ // update the physical plan in the scheduler.
// NOTE: what's already been executed is not modified in the new physical
plan.
this.physicalPlan = newPhysicalPlan;
}
@@ -148,11 +144,11 @@ public final class BatchSingleJobScheduler implements
Scheduler {
final TaskState.State newState,
@Nullable final String
vertexPutOnHold,
final
TaskState.RecoverableTaskFailureCause failureCause) {
- final int currentTaskAttemptIndex = jobStateManager.getTaskAttempt(taskId);
+ final int currentTaskAttemptIndex =
planStateManager.getTaskAttempt(taskId);
if (taskAttemptIndex == currentTaskAttemptIndex) {
// Do change state, as this notification is for the current task attempt.
- jobStateManager.onTaskStateChanged(taskId, newState);
+ planStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
onTaskExecutionComplete(executorId, taskId);
@@ -165,7 +161,7 @@ public final class BatchSingleJobScheduler implements
Scheduler {
onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
break;
case FAILED:
- throw new UnrecoverableFailureException(new Exception(new
StringBuffer().append("The job failed on Task #")
+ throw new UnrecoverableFailureException(new Exception(new
StringBuffer().append("The plan failed on Task #")
.append(taskId).append(" in Executor
").append(executorId).toString()));
case READY:
case EXECUTING:
@@ -181,8 +177,8 @@ public final class BatchSingleJobScheduler implements
Scheduler {
case ON_HOLD:
// If the stage has completed
final String stageIdForTaskUponCompletion =
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
- if
(jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE))
{
- if (!jobStateManager.isJobDone()) {
+ if
(planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE))
{
+ if (!planStateManager.isPlanDone()) {
doSchedule();
}
}
@@ -296,7 +292,7 @@ public final class BatchSingleJobScheduler implements
Scheduler {
return sortedScheduleGroups.stream()
.filter(scheduleGroup -> scheduleGroup.stream()
.map(Stage::getId)
- .map(jobStateManager::getStageState)
+ .map(planStateManager::getStageState)
.anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) //
any incomplete stage in the group
.findFirst(); // selects the one with the smallest scheduling group
index.
}
@@ -309,7 +305,7 @@ public final class BatchSingleJobScheduler implements
Scheduler {
final List<String> taskIdsToSchedule = new LinkedList<>();
for (final String taskId : stageToSchedule.getTaskIds()) {
- final TaskState.State taskState = jobStateManager.getTaskState(taskId);
+ final TaskState.State taskState = planStateManager.getTaskState(taskId);
switch (taskState) {
// Don't schedule these.
@@ -320,7 +316,7 @@ public final class BatchSingleJobScheduler implements
Scheduler {
// These are schedulable.
case SHOULD_RETRY:
- jobStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
+ planStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
case READY:
taskIdsToSchedule.add(taskId);
break;
@@ -337,7 +333,7 @@ public final class BatchSingleJobScheduler implements
Scheduler {
taskIdsToSchedule.forEach(taskId -> {
blockManagerMaster.onProducerTaskScheduled(taskId); // Notify the block
manager early for push edges.
final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
- final int attemptIdx = jobStateManager.getTaskAttempt(taskId);
+ final int attemptIdx = planStateManager.getTaskAttempt(taskId);
tasks.add(new Task(
physicalPlan.getId(),
taskId,
@@ -385,7 +381,7 @@ public final class BatchSingleJobScheduler implements
Scheduler {
final String stageIdForTaskUponCompletion =
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
final boolean stageComplete =
-
jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+
planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
if (stageComplete) {
// get optimization vertex from the task.
@@ -441,7 +437,7 @@ public final class BatchSingleJobScheduler implements
Scheduler {
final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
LOG.info("Will be retried: {}", tasksToRetry);
tasksToRetry.forEach(
- taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute,
TaskState.State.SHOULD_RETRY));
+ taskToReExecute ->
planStateManager.onTaskStateChanged(taskToReExecute,
TaskState.State.SHOULD_RETRY));
}
private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String>
children) {
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index aaa82e0..f8413f1 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -31,23 +31,23 @@ import javax.annotation.Nullable;
* Other scheduler-related classes that are accessed by only one of the two
threads are not synchronized(NotThreadSafe).
*/
@DriverSide
-@DefaultImplementation(BatchSingleJobScheduler.class)
+@DefaultImplementation(BatchScheduler.class)
public interface Scheduler {
/**
- * Schedules the given job.
+ * Schedules the given plan.
* @param physicalPlan of the job being submitted.
- * @param jobStateManager to manage the states of the submitted job.
+ * @param planStateManager to manage the states of the submitted plan.
*/
- void scheduleJob(PhysicalPlan physicalPlan,
- JobStateManager jobStateManager);
+ void schedulePlan(PhysicalPlan physicalPlan,
+ PlanStateManager planStateManager);
/**
* Receives and updates the scheduler with a new physical plan for a job.
- * @param jobId the ID of the job to change the physical plan.
+ * @param planId the ID of the physical plan to change.
* @param newPhysicalPlan new physical plan for the job.
*/
- void updateJob(String jobId, PhysicalPlan newPhysicalPlan);
+ void updatePlan(String planId, PhysicalPlan newPhysicalPlan);
/**
* Called when an executor is added to Runtime, so that the extra resource
can be used to execute the job.
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index defc91b..750c0fc 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.reef.annotations.audience.DriverSide;
@@ -46,7 +46,7 @@ import javax.inject.Inject;
@NotThreadSafe
public final class SchedulerRunner {
private static final Logger LOG =
LoggerFactory.getLogger(SchedulerRunner.class.getName());
- private final Map<String, JobStateManager> jobStateManagers;
+ private final Map<String, PlanStateManager> planStateManagers;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorService schedulerThread;
private boolean isSchedulerRunning;
@@ -62,7 +62,7 @@ public final class SchedulerRunner {
final SchedulingPolicy schedulingPolicy,
final PendingTaskCollectionPointer
pendingTaskCollectionPointer,
final ExecutorRegistry executorRegistry) {
- this.jobStateManagers = new HashMap<>();
+ this.planStateManagers = new HashMap<>();
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
new Thread(runnable, "SchedulerRunner thread"));
@@ -84,11 +84,11 @@ public final class SchedulerRunner {
doScheduleTaskList();
schedulingIteration.await();
}
- jobStateManagers.values().forEach(jobStateManager -> {
- if (jobStateManager.isJobDone()) {
- LOG.info("{} is complete.", jobStateManager.getJobId());
+ planStateManagers.values().forEach(planStateManager -> {
+ if (planStateManager.isPlanDone()) {
+ LOG.info("{} is complete.", planStateManager.getPlanId());
} else {
- LOG.info("{} is incomplete.", jobStateManager.getJobId());
+ LOG.info("{} is incomplete.", planStateManager.getPlanId());
}
});
LOG.info("SchedulerRunner Terminated!");
@@ -106,8 +106,8 @@ public final class SchedulerRunner {
final Collection<Task> taskList = taskListOptional.get();
final List<Task> couldNotSchedule = new ArrayList<>();
for (final Task task : taskList) {
- final JobStateManager jobStateManager =
jobStateManagers.get(task.getJobId());
- if
(!jobStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY))
{
+ final PlanStateManager planStateManager =
planStateManagers.get(task.getPlanId());
+ if
(!planStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY))
{
// Guard against race conditions causing duplicate task launches
LOG.debug("Skipping {} as it is not READY", task.getTaskId());
continue;
@@ -128,7 +128,7 @@ public final class SchedulerRunner {
final ExecutorRepresenter selectedExecutor
= schedulingPolicy.selectExecutor(candidateExecutors.getValue(),
task);
// update metadata first
- jobStateManager.onTaskStateChanged(task.getTaskId(),
TaskState.State.EXECUTING);
+ planStateManager.onTaskStateChanged(task.getTaskId(),
TaskState.State.EXECUTING);
LOG.info("{} scheduled to {}", task.getTaskId(),
selectedExecutor.getExecutorId());
@@ -164,9 +164,9 @@ public final class SchedulerRunner {
/**
* Run the scheduler thread.
*/
- void run(final JobStateManager jobStateManager) {
+ void run(final PlanStateManager planStateManager) {
+ planStateManagers.put(planStateManager.getPlanId(), planStateManager);
if (!isTerminated && !isSchedulerRunning) {
- jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
schedulerThread.execute(new SchedulerThread());
schedulerThread.shutdown();
isSchedulerRunning = true;
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
similarity index 67%
rename from
runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
rename to
runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
index 73176be..2003153 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
@@ -22,7 +22,7 @@ import
edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
@@ -42,11 +42,11 @@ import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
/**
- * Tests {@link JobStateManager}.
+ * Tests {@link PlanStateManager}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(MetricMessageHandler.class)
-public final class JobStateManagerTest {
+public final class PlanStateManagerTest {
private static final int MAX_SCHEDULE_ATTEMPT = 2;
private MetricMessageHandler metricMessageHandler;
@@ -59,17 +59,17 @@ public final class JobStateManagerTest {
}
/**
- * This method builds a physical DAG starting from an IR DAG and submits it
to {@link JobStateManager}.
+ * This method builds a physical DAG starting from an IR DAG and submits it
to {@link PlanStateManager}.
* State changes are explicitly called to check whether states are managed
correctly or not.
*/
@Test
public void testPhysicalPlanStateChanges() throws Exception {
final PhysicalPlan physicalPlan =
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
false);
- final JobStateManager jobStateManager =
- new JobStateManager(physicalPlan, metricMessageHandler,
MAX_SCHEDULE_ATTEMPT);
+ final PlanStateManager planStateManager =
+ new PlanStateManager(physicalPlan, metricMessageHandler,
MAX_SCHEDULE_ATTEMPT);
- assertEquals(jobStateManager.getJobId(), "TestPlan");
+ assertEquals(planStateManager.getPlanId(), "TestPlan");
final List<Stage> stageList =
physicalPlan.getStageDAG().getTopologicalSort();
@@ -77,45 +77,45 @@ public final class JobStateManagerTest {
final Stage stage = stageList.get(stageIdx);
final List<String> taskIds = stage.getTaskIds();
taskIds.forEach(taskId -> {
- jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
- jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
+ planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
+ planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() -
1) {
- assertEquals(StageState.State.COMPLETE,
jobStateManager.getStageState(stage.getId()));
+ assertEquals(StageState.State.COMPLETE,
planStateManager.getStageState(stage.getId()));
}
});
- taskIds.forEach(taskId ->
assertEquals(jobStateManager.getTaskState(taskId), TaskState.State.COMPLETE));
+ taskIds.forEach(taskId ->
assertEquals(planStateManager.getTaskState(taskId), TaskState.State.COMPLETE));
if (stageIdx == stageList.size() - 1) {
- assertEquals(jobStateManager.getJobState(), JobState.State.COMPLETE);
+ assertEquals(planStateManager.getPlanState(),
PlanState.State.COMPLETE);
}
}
}
/**
- * Test whether the methods waiting finish of job works properly.
+ * Test whether the methods waiting for the finish of the plan works
properly.
*/
@Test(timeout = 2000)
public void testWaitUntilFinish() throws Exception {
final PhysicalPlan physicalPlan =
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
false);
- final JobStateManager jobStateManager =
- new JobStateManager(physicalPlan, metricMessageHandler,
MAX_SCHEDULE_ATTEMPT);
+ final PlanStateManager planStateManager =
+ new PlanStateManager(physicalPlan, metricMessageHandler,
MAX_SCHEDULE_ATTEMPT);
- assertFalse(jobStateManager.isJobDone());
+ assertFalse(planStateManager.isPlanDone());
- // Wait for the job to finish and check the job state.
+ // Wait for the plan to finish and check the plan state.
// It have to return EXECUTING state after timeout.
- final JobState.State executingState = jobStateManager.waitUntilFinish(100,
TimeUnit.MILLISECONDS);
- assertEquals(JobState.State.EXECUTING, executingState);
+ final PlanState.State executingState =
planStateManager.waitUntilFinish(100, TimeUnit.MILLISECONDS);
+ assertEquals(PlanState.State.EXECUTING, executingState);
- // Complete the job and check the result again.
- // It have to return COMPLETE.
+ // Complete the plan and check the result again.
+ // It has to return COMPLETE.
final List<String> tasks =
physicalPlan.getStageDAG().getTopologicalSort().stream()
.flatMap(stage -> stage.getTaskIds().stream())
.collect(Collectors.toList());
- tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId,
TaskState.State.EXECUTING));
- tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId,
TaskState.State.COMPLETE));
- final JobState.State completedState = jobStateManager.waitUntilFinish();
- assertEquals(JobState.State.COMPLETE, completedState);
+ tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId,
TaskState.State.EXECUTING));
+ tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId,
TaskState.State.COMPLETE));
+ final PlanState.State completedState = planStateManager.waitUntilFinish();
+ assertEquals(PlanState.State.COMPLETE, completedState);
}
}
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
similarity index 88%
rename from
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
rename to
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index cfa6194..9733c75 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.runtime.common.message.MessageSender;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -54,13 +54,13 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
- * Tests {@link BatchSingleJobScheduler}.
+ * Tests {@link BatchScheduler}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ContainerManager.class, BlockManagerMaster.class,
PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class,
MetricMessageHandler.class})
-public final class BatchSingleJobSchedulerTest {
- private static final Logger LOG =
LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
+public final class BatchSchedulerTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchSchedulerTest.class.getName());
private Scheduler scheduler;
private ExecutorRegistry executorRegistry;
private final MetricMessageHandler metricMessageHandler =
mock(MetricMessageHandler.class);
@@ -80,7 +80,7 @@ public final class BatchSingleJobSchedulerTest {
injector.bindVolatileInstance(BlockManagerMaster.class,
mock(BlockManagerMaster.class));
injector.bindVolatileInstance(PubSubEventHandlerWrapper.class,
mock(PubSubEventHandlerWrapper.class));
injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class,
mock(UpdatePhysicalPlanEventHandler.class));
- scheduler = injector.getInstance(BatchSingleJobScheduler.class);
+ scheduler = injector.getInstance(BatchScheduler.class);
final ActiveContext activeContext = mock(ActiveContext.class);
Mockito.doThrow(new RuntimeException()).when(activeContext).close();
@@ -114,28 +114,28 @@ public final class BatchSingleJobSchedulerTest {
}
/**
- * This method builds a physical DAG starting from an IR DAG and submits it
to {@link BatchSingleJobScheduler}.
+ * This method builds a physical DAG starting from an IR DAG and submits it
to {@link BatchScheduler}.
* Task state changes are explicitly submitted to scheduler instead of
executor messages.
*/
@Test(timeout=10000)
public void testPull() throws Exception {
- scheduleAndCheckJobTermination(
+ scheduleAndCheckPlanTermination(
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
false));
}
/**
- * This method builds a physical DAG starting from an IR DAG and submits it
to {@link BatchSingleJobScheduler}.
+ * This method builds a physical DAG starting from an IR DAG and submits it
to {@link BatchScheduler}.
* Task state changes are explicitly submitted to scheduler instead of
executor messages.
*/
@Test(timeout=10000)
public void testPush() throws Exception {
- scheduleAndCheckJobTermination(
+ scheduleAndCheckPlanTermination(
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
true));
}
- private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws
InjectionException {
- final JobStateManager jobStateManager = new JobStateManager(plan,
metricMessageHandler, 1);
- scheduler.scheduleJob(plan, jobStateManager);
+ private void scheduleAndCheckPlanTermination(final PhysicalPlan plan) throws
InjectionException {
+ final PlanStateManager planStateManager = new PlanStateManager(plan,
metricMessageHandler, 1);
+ scheduler.schedulePlan(plan, planStateManager);
// For each ScheduleGroup, test if the tasks of the next ScheduleGroup are
scheduled
// after the stages of each ScheduleGroup are made "complete".
@@ -146,14 +146,14 @@ public final class BatchSingleJobSchedulerTest {
LOG.debug("Checking that all stages of ScheduleGroup {} enter the
executing state", scheduleGroupIdx);
stages.forEach(stage -> {
SchedulerTestUtil.completeStage(
- jobStateManager, scheduler, executorRegistry, stage,
SCHEDULE_ATTEMPT_INDEX);
+ planStateManager, scheduler, executorRegistry, stage,
SCHEDULE_ATTEMPT_INDEX);
});
}
- LOG.debug("Waiting for job termination after sending stage completion
events");
- while (!jobStateManager.isJobDone()) {
+ LOG.debug("Waiting for plan termination after sending stage completion
events");
+ while (!planStateManager.isPlanDone()) {
}
- assertTrue(jobStateManager.isJobDone());
+ assertTrue(planStateManager.isPlanDone());
}
private List<Stage> filterStagesWithAScheduleGroup(
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index 815dff8..d31965b 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import java.util.Optional;
@@ -29,25 +29,25 @@ import java.util.Optional;
final class SchedulerTestUtil {
/**
* Complete the stage by completing all of its Tasks.
- * @param jobStateManager for the submitted job.
- * @param scheduler for the submitted job.
+ * @param planStateManager for the submitted plan.
+ * @param scheduler for the submitted plan.
* @param executorRegistry provides executor representers
* @param stage for which the states should be marked as complete.
*/
- static void completeStage(final JobStateManager jobStateManager,
+ static void completeStage(final PlanStateManager planStateManager,
final Scheduler scheduler,
final ExecutorRegistry executorRegistry,
final Stage stage,
final int attemptIdx) {
// Loop until the stage completes.
while (true) {
- final StageState.State stageState =
jobStateManager.getStageState(stage.getId());
+ final StageState.State stageState =
planStateManager.getStageState(stage.getId());
if (StageState.State.COMPLETE == stageState) {
// Stage has completed, so we break out of the loop.
break;
} else if (StageState.State.INCOMPLETE == stageState) {
stage.getTaskIds().forEach(taskId -> {
- final TaskState.State taskState =
jobStateManager.getTaskState(taskId);
+ final TaskState.State taskState =
planStateManager.getTaskState(taskId);
if (TaskState.State.EXECUTING == taskState) {
sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
TaskState.State.COMPLETE, attemptIdx, null);
@@ -66,7 +66,7 @@ final class SchedulerTestUtil {
/**
* Sends task state change event to scheduler.
* This replaces executor's task completion messages for testing purposes.
- * @param scheduler for the submitted job.
+ * @param scheduler for the submitted plan.
* @param executorRegistry provides executor representers
* @param taskId for the task to change the state.
* @param newState for the task.
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index cd54432..e1ab449 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -20,10 +20,10 @@ import
edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageSender;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -68,7 +68,7 @@ public final class TaskRetryTest {
private Random random;
private Scheduler scheduler;
private ExecutorRegistry executorRegistry;
- private JobStateManager jobStateManager;
+ private PlanStateManager planStateManager;
private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
@@ -91,14 +91,14 @@ public final class TaskRetryTest {
injector.bindVolatileInstance(BlockManagerMaster.class,
mock(BlockManagerMaster.class));
scheduler = injector.getInstance(Scheduler.class);
- // Get JobStateManager
- jobStateManager =
runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
+ // Get PlanStateManager
+ planStateManager =
runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
}
@Test(timeout=7000)
public void testExecutorRemoved() throws Exception {
- // Until the job finishes, events happen
- while (!jobStateManager.isJobDone()) {
+ // Until the plan finishes, events happen
+ while (!planStateManager.isPlanDone()) {
// 50% chance remove, 50% chance add, 80% chance task completed
executorRemoved(0.5);
executorAdded(0.5);
@@ -108,9 +108,9 @@ public final class TaskRetryTest {
Thread.sleep(10);
}
- // Job should COMPLETE
- assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
- assertTrue(jobStateManager.isJobDone());
+ // Plan should COMPLETE
+ assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
+ assertTrue(planStateManager.isPlanDone());
}
@Test(timeout=7000)
@@ -120,8 +120,8 @@ public final class TaskRetryTest {
executorAdded(1.0);
executorAdded(1.0);
- // Until the job finishes, events happen
- while (!jobStateManager.isJobDone()) {
+ // Until the plan finishes, events happen
+ while (!planStateManager.isPlanDone()) {
// 50% chance task completed
// 50% chance task output write failed
taskCompleted(0.5);
@@ -131,9 +131,9 @@ public final class TaskRetryTest {
Thread.sleep(10);
}
- // Job should COMPLETE
- assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
- assertTrue(jobStateManager.isJobDone());
+ // Plan should COMPLETE
+ assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
+ assertTrue(planStateManager.isPlanDone());
}
////////////////////////////////////////////////////////////////// Events
@@ -177,12 +177,12 @@ public final class TaskRetryTest {
return;
}
- final List<String> executingTasks = getTasksInState(jobStateManager,
TaskState.State.EXECUTING);
+ final List<String> executingTasks = getTasksInState(planStateManager,
TaskState.State.EXECUTING);
if (!executingTasks.isEmpty()) {
final int randomIndex = random.nextInt(executingTasks.size());
final String selectedTask = executingTasks.get(randomIndex);
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler,
executorRegistry, selectedTask,
- TaskState.State.COMPLETE,
jobStateManager.getTaskAttempt(selectedTask));
+ TaskState.State.COMPLETE,
planStateManager.getTaskAttempt(selectedTask));
}
}
@@ -191,30 +191,30 @@ public final class TaskRetryTest {
return;
}
- final List<String> executingTasks = getTasksInState(jobStateManager,
TaskState.State.EXECUTING);
+ final List<String> executingTasks = getTasksInState(planStateManager,
TaskState.State.EXECUTING);
if (!executingTasks.isEmpty()) {
final int randomIndex = random.nextInt(executingTasks.size());
final String selectedTask = executingTasks.get(randomIndex);
SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler,
executorRegistry, selectedTask,
- TaskState.State.SHOULD_RETRY,
jobStateManager.getTaskAttempt(selectedTask),
+ TaskState.State.SHOULD_RETRY,
planStateManager.getTaskAttempt(selectedTask),
TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE);
}
}
////////////////////////////////////////////////////////////////// Helper
methods
- private List<String> getTasksInState(final JobStateManager jobStateManager,
final TaskState.State state) {
- return jobStateManager.getAllTaskStates().entrySet().stream()
+ private List<String> getTasksInState(final PlanStateManager
planStateManager, final TaskState.State state) {
+ return planStateManager.getAllTaskStates().entrySet().stream()
.filter(entry ->
entry.getValue().getStateMachine().getCurrentState().equals(state))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
- private JobStateManager runPhysicalPlan(final TestPlanGenerator.PlanType
planType) throws Exception {
+ private PlanStateManager runPhysicalPlan(final TestPlanGenerator.PlanType
planType) throws Exception {
final MetricMessageHandler metricMessageHandler =
mock(MetricMessageHandler.class);
final PhysicalPlan plan = TestPlanGenerator.generatePhysicalPlan(planType,
false);
- final JobStateManager jobStateManager = new JobStateManager(plan,
metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
- scheduler.scheduleJob(plan, jobStateManager);
- return jobStateManager;
+ final PlanStateManager planStateManager = new PlanStateManager(plan,
metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+ scheduler.schedulePlan(plan, planStateManager);
+ return planStateManager;
}
}
diff --git
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 5742238..3b3646e 100644
---
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -94,7 +94,7 @@ public final class TestPlanGenerator {
private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge>
irDAG,
final Policy policy) throws
Exception {
final DAG<IRVertex, IREdge> optimized =
policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
- final DAG<Stage, StageEdge> physicalDAG =
optimized.convert(PLAN_GENERATOR);
+ final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
return new PhysicalPlan("TestPlan", physicalDAG);
}