Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 391615e72 -> 71184cdb1


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
new file mode 100644
index 0000000..f7aaaed
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.io.FileUtils;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+
+public class DagManagerTest {
+  private final String dagStateStoreDir = "/tmp/dagManagerTest/dagStateStore";
+  private DagStateStore _dagStateStore;
+  private JobStatusRetriever _jobStatusRetriever;
+  private DagManager.DagManagerThread _dagManagerThread;
+  private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
+  private Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
+  private Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs;
+  private Map<String, Dag<JobExecutionPlan>> dags;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
+    Config config = ConfigFactory.empty()
+        .withValue(DagManager.DAG_STATESTORE_DIR, 
ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
+    this._dagStateStore = new FSDagStateStore(config);
+    this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+    this.queue = new LinkedBlockingQueue<>();
+    this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, true);
+
+    Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
+    jobToDagField.setAccessible(true);
+    this.jobToDag = (Map<Dag.DagNode<JobExecutionPlan>, 
Dag<JobExecutionPlan>>) jobToDagField.get(this._dagManagerThread);
+
+    Field dagToJobsField = 
DagManager.DagManagerThread.class.getDeclaredField("dagToJobs");
+    dagToJobsField.setAccessible(true);
+    this.dagToJobs = (Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>>) 
dagToJobsField.get(this._dagManagerThread);
+
+    Field dagsField = 
DagManager.DagManagerThread.class.getDeclaredField("dags");
+    dagsField.setAccessible(true);
+    this.dags = (Map<String, Dag<JobExecutionPlan>>) 
dagsField.get(this._dagManagerThread);
+
+  }
+
+  /**
+   * Create a {@link Dag < JobExecutionPlan >} with one parent and remaining 
nodes as its children.
+   * @return a Dag.
+   */
+  public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, int 
numNodes)
+      throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (int i = 0; i < numNodes; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
+      if (i > 0) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  private Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String 
eventName) {
+    return 
Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
+        message("Test message").eventName(eventName).startTime(5000L).build());
+  }
+
+  @Test
+  public void testSuccessfulDag() throws URISyntaxException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    String flowGroup = "group" + flowGroupId;
+    String flowName = "flow" + flowGroupId;
+    String jobGroup = flowGroup;
+    String jobName0 = "job0";
+    String jobName1 = "job1";
+    String jobName2 = "job2";
+
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3);
+    String dagId = DagManagerUtils.generateDagId(dag);
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_RUN);
+    Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
+    Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
+    Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_RUN);
+    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
+    Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
+    Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
+
+    
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
+        Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6).
+        thenReturn(jobStatusIterator7);
+
+    //Run the thread once. Ensure the first job is running
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
+    //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 
are submitted.
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 2);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0)));
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0)));
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1)));
+
+    //Run the thread 3rd time. Ensure job1 and job2 are running.
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 2);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0)));
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0)));
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1)));
+
+    //Run the thread 4th time. One of the jobs is completed.
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+
+    //Run the thread again. Ensure all jobs completed and dag is cleaned up.
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testSuccessfulDag")
+  public void testFailedDag() throws URISyntaxException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    String flowGroup = "group" + flowGroupId;
+    String flowName = "flow" + flowGroupId;
+    String jobGroup = flowGroup;
+    String jobName0 = "job0";
+    String jobName1 = "job1";
+    String jobName2 = "job2";
+
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3);
+    String dagId = DagManagerUtils.generateDagId(dag);
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_START);
+    Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName0, 
TimingEvent.LauncherTimings.JOB_COMPLETE);
+
+    Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
+    Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_RUN);
+
+    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName1, 
TimingEvent.LauncherTimings.JOB_RUN);
+    Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobGroup, jobName2, 
TimingEvent.LauncherTimings.JOB_FAILED);
+
+    
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
+        Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6);
+
+    //Run the thread once. Ensure the first job is running
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
+    //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 
are submitted.
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 2);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0)));
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0)));
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1)));
+
+    //Run the thread 3rd time. Ensure the job0 is complete and job1 and job2 
are running.
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 2);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0)));
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0)));
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1)));
+
+    //Run the thread 4th time. One of the jobs is failed and so the dag is 
failed and all state is cleaned up.
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+  }
+
+  @AfterClass
+  public void cleanUp() throws Exception {
+    FileUtils.deleteDirectory(new File(this.dagStateStoreDir));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
new file mode 100644
index 0000000..e97d860
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+
+import static org.testng.Assert.*;
+
+
+public class DagManagerUtilsTest {
+
+  /**
+   * Create a {@link Dag <JobExecutionPlan>} with 2 parents and 1 child (i.e. 
a V-shaped dag).
+   * @return a Dag.
+   */
+  public Dag<JobExecutionPlan> buildDag() throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    Config baseConfig = ConfigBuilder.create().
+        addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group0").
+        addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow0").
+        addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
System.currentTimeMillis()).
+        addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group0").build();
+    for (int i = 0; i < 3; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = baseConfig.withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef("job" + suffix));
+      if (i == 2) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0,job1"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  @Test
+  public void testGetNext() throws URISyntaxException {
+    Dag<JobExecutionPlan> dag = buildDag();
+
+    Set<Dag.DagNode<JobExecutionPlan>> dagNodeSet = 
DagManagerUtils.getNext(dag);
+    Assert.assertEquals(dagNodeSet.size(), 2);
+
+    //Set 1st job to complete and 2nd job running state
+    JobExecutionPlan jobExecutionPlan1 = dag.getNodes().get(0).getValue();
+    jobExecutionPlan1.setExecutionStatus(ExecutionStatus.COMPLETE);
+    JobExecutionPlan jobExecutionPlan2 = dag.getNodes().get(1).getValue();
+    jobExecutionPlan2.setExecutionStatus(ExecutionStatus.RUNNING);
+
+    //No new job to run
+    dagNodeSet = DagManagerUtils.getNext(dag);
+    Assert.assertEquals(dagNodeSet.size(), 0);
+
+    //Set 2nd job to complete; we must have 3rd job to run next
+    jobExecutionPlan2.setExecutionStatus(ExecutionStatus.COMPLETE);
+    dagNodeSet = DagManagerUtils.getNext(dag);
+    Assert.assertEquals(dagNodeSet.size(), 1);
+
+    //Set the 3rd job to running state, no new jobs to run
+    JobExecutionPlan jobExecutionPlan3 = dag.getNodes().get(2).getValue();
+    jobExecutionPlan3.setExecutionStatus(ExecutionStatus.RUNNING);
+    dagNodeSet = DagManagerUtils.getNext(dag);
+    Assert.assertEquals(dagNodeSet.size(), 0);
+
+    //Set the 3rd job to complete; no new jobs to run
+    jobExecutionPlan3.setExecutionStatus(ExecutionStatus.COMPLETE);
+    dagNodeSet = DagManagerUtils.getNext(dag);
+    Assert.assertEquals(dagNodeSet.size(), 0);
+
+
+    dag = buildDag();
+    dagNodeSet = DagManagerUtils.getNext(dag);
+    Assert.assertEquals(dagNodeSet.size(), 2);
+    //Set 1st job to failed; no new jobs to run
+    jobExecutionPlan1 = dag.getNodes().get(0).getValue();
+    jobExecutionPlan1.setExecutionStatus(ExecutionStatus.FAILED);
+    dagNodeSet = DagManagerUtils.getNext(dag);
+    Assert.assertEquals(dagNodeSet.size(), 0);
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
new file mode 100644
index 0000000..05d800d
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+
+
+public class FSDagStateStoreTest {
+  private DagStateStore _dagStateStore;
+  private final String dagStateStoreDir = 
"/tmp/fsDagStateStoreTest/dagStateStore";
+  private File checkpointDir;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    Config config = 
ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR, 
ConfigValueFactory.fromAnyRef(
+        this.dagStateStoreDir));
+    this._dagStateStore = new FSDagStateStore(config);
+    this.checkpointDir = new File(dagStateStoreDir);
+    FileUtils.deleteDirectory(this.checkpointDir);
+  }
+
+  /**
+   * Create a {@link Dag<JobExecutionPlan>} with one parent and one child.
+   * @return a Dag.
+   */
+  public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId) 
throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
+      if (i > 0) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job" + (i - 1)));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = new 
InMemorySpecExecutor(ConfigFactory.empty());
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  @Test
+  public void testWriteCheckpoint() throws IOException, URISyntaxException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId);
+    this._dagStateStore.writeCheckpoint(dag);
+
+    String fileName = DagManagerUtils.generateDagId(dag) + 
FSDagStateStore.DAG_FILE_EXTENSION;
+    File dagFile = new File(this.checkpointDir, fileName);
+    Dag<JobExecutionPlan> dagDeserialized = ((FSDagStateStore) 
this._dagStateStore).getDag(dagFile);
+    Assert.assertEquals(dagDeserialized.getNodes().size(), 2);
+    Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1);
+    Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1);
+    Dag.DagNode<JobExecutionPlan> child = dagDeserialized.getEndNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> parent = 
dagDeserialized.getStartNodes().get(0);
+    Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1);
+    
Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child));
+
+    for (int i = 0; i < 2; i++) {
+      JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue();
+      Config jobConfig = plan.getJobSpec().getConfig();
+      
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), 
"group" + flowGroupId);
+      
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), 
"flow" + flowGroupId);
+      
Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), 
flowExecutionId);
+      Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING);
+    }
+  }
+
+  @Test (dependsOnMethods = "testWriteCheckpoint")
+  public void testCleanUp() throws IOException, URISyntaxException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId);
+    this._dagStateStore.writeCheckpoint(dag);
+    String fileName = DagManagerUtils.generateDagId(dag) + 
FSDagStateStore.DAG_FILE_EXTENSION;
+    File dagFile = new File(this.checkpointDir, fileName);
+    Assert.assertTrue(dagFile.exists());
+    this._dagStateStore.cleanUp(dag);
+    Assert.assertFalse(dagFile.exists());
+  }
+
+  @Test (dependsOnMethods = "testCleanUp")
+  public void testGetDags() throws IOException, URISyntaxException {
+    //Delete dag checkpoint dir
+    FileUtils.deleteDirectory(this.checkpointDir);
+    List<Long> flowExecutionIds = 
Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1);
+    for (int i = 0; i < 2; i++) {
+      String flowGroupId = Integer.toString(i);
+      Dag<JobExecutionPlan> dag = buildDag(flowGroupId, 
flowExecutionIds.get(i));
+      this._dagStateStore.writeCheckpoint(dag);
+    }
+
+    List<Dag<JobExecutionPlan>> dags = this._dagStateStore.getDags();
+    Assert.assertEquals(dags.size(), 2);
+    for (Dag<JobExecutionPlan> dag: dags) {
+      Assert.assertEquals(dag.getNodes().size(), 2);
+      Assert.assertEquals(dag.getStartNodes().size(), 1);
+      Assert.assertEquals(dag.getEndNodes().size(), 1);
+      Assert.assertEquals(dag.getParentChildMap().size(), 1);
+    }
+  }
+
+  @AfterClass
+  public void cleanUp() throws IOException {
+    FileUtils.deleteDirectory(this.checkpointDir);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 6d75f9e..bdacbb8 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -95,7 +95,7 @@ public class OrchestratorTest {
     this.serviceLauncher.addService(flowCatalog);
 
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
-        Optional.of(this.topologyCatalog), Optional.of(logger));
+        Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), 
Optional.of(logger));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
 

Reply via email to