Repository: incubator-gobblin Updated Branches: refs/heads/master 391615e72 -> 71184cdb1
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java new file mode 100644 index 0000000..f7aaaed --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.File; +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.io.FileUtils; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.Iterators; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; +import org.apache.gobblin.service.monitoring.JobStatus; +import org.apache.gobblin.service.monitoring.JobStatusRetriever; + + +public class DagManagerTest { + private final String dagStateStoreDir = "/tmp/dagManagerTest/dagStateStore"; + private DagStateStore _dagStateStore; + private JobStatusRetriever _jobStatusRetriever; + private DagManager.DagManagerThread _dagManagerThread; + private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue; + private Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag; + private Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs; + private Map<String, Dag<JobExecutionPlan>> dags; + + @BeforeClass + public void setUp() throws Exception { + FileUtils.deleteDirectory(new File(this.dagStateStoreDir)); + Config config = ConfigFactory.empty() + .withValue(DagManager.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(this.dagStateStoreDir)); + this._dagStateStore = new FSDagStateStore(config); + this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class); + this.queue = new LinkedBlockingQueue<>(); + this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, true); + + Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag"); + jobToDagField.setAccessible(true); + this.jobToDag = (Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>) jobToDagField.get(this._dagManagerThread); + + Field dagToJobsField = DagManager.DagManagerThread.class.getDeclaredField("dagToJobs"); + dagToJobsField.setAccessible(true); + this.dagToJobs = (Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>>) dagToJobsField.get(this._dagManagerThread); + + Field dagsField = DagManager.DagManagerThread.class.getDeclaredField("dags"); + dagsField.setAccessible(true); + this.dags = (Map<String, Dag<JobExecutionPlan>>) dagsField.get(this._dagManagerThread); + + } + + /** + * Create a {@link Dag < JobExecutionPlan >} with one parent and remaining nodes as its children. + * @return a Dag. + */ + public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, int numNodes) + throws URISyntaxException { + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + String suffix = Integer.toString(i); + Config jobConfig = ConfigBuilder.create(). + addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id). + addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). + addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build(); + if (i > 0) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0")); + } + JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). + withTemplate(new URI("job" + suffix)).build(); + SpecExecutor specExecutor = InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i)); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } + + private Iterator<JobStatus> getMockJobStatus(String flowName, String flowGroup, Long flowExecutionId, String jobGroup, String jobName, String eventName) { + return Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId). + message("Test message").eventName(eventName).startTime(5000L).build()); + } + + @Test + public void testSuccessfulDag() throws URISyntaxException { + long flowExecutionId = System.currentTimeMillis(); + String flowGroupId = "0"; + String flowGroup = "group" + flowGroupId; + String flowName = "flow" + flowGroupId; + String jobGroup = flowGroup; + String jobName0 = "job0"; + String jobName1 = "job1"; + String jobName2 = "job2"; + + Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3); + String dagId = DagManagerUtils.generateDagId(dag); + + //Add a dag to the queue of dags + this.queue.offer(dag); + Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_RUN); + Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_COMPLETE); + Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); + Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_RUN); + Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); + Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_COMPLETE); + Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_COMPLETE); + + Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), + Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())). + thenReturn(jobStatusIterator1). + thenReturn(jobStatusIterator2). + thenReturn(jobStatusIterator3). + thenReturn(jobStatusIterator4). + thenReturn(jobStatusIterator5). + thenReturn(jobStatusIterator6). + thenReturn(jobStatusIterator7); + + //Run the thread once. Ensure the first job is running + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 1); + Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0))); + + //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 are submitted. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 2); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0))); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0))); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1))); + + //Run the thread 3rd time. Ensure job1 and job2 are running. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 2); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0))); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0))); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1))); + + //Run the thread 4th time. One of the jobs is completed. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 1); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1); + + //Run the thread again. Ensure all jobs completed and dag is cleaned up. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 0); + Assert.assertEquals(this.jobToDag.size(), 0); + Assert.assertEquals(this.dagToJobs.size(), 0); + } + + @Test (dependsOnMethods = "testSuccessfulDag") + public void testFailedDag() throws URISyntaxException { + long flowExecutionId = System.currentTimeMillis(); + String flowGroupId = "0"; + String flowGroup = "group" + flowGroupId; + String flowName = "flow" + flowGroupId; + String jobGroup = flowGroup; + String jobName0 = "job0"; + String jobName1 = "job1"; + String jobName2 = "job2"; + + Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3); + String dagId = DagManagerUtils.generateDagId(dag); + + //Add a dag to the queue of dags + this.queue.offer(dag); + Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_START); + Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_COMPLETE); + + Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); + Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_RUN); + + Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); + Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_FAILED); + + Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), + Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())). + thenReturn(jobStatusIterator1). + thenReturn(jobStatusIterator2). + thenReturn(jobStatusIterator3). + thenReturn(jobStatusIterator4). + thenReturn(jobStatusIterator5). + thenReturn(jobStatusIterator6); + + //Run the thread once. Ensure the first job is running + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 1); + Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0))); + + //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 are submitted. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 2); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0))); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0))); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1))); + + //Run the thread 3rd time. Ensure the job0 is complete and job1 and job2 are running. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 2); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0))); + Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0))); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1))); + + //Run the thread 4th time. One of the jobs is failed and so the dag is failed and all state is cleaned up. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 0); + Assert.assertEquals(this.jobToDag.size(), 0); + Assert.assertEquals(this.dagToJobs.size(), 0); + } + + @AfterClass + public void cleanUp() throws Exception { + FileUtils.deleteDirectory(new File(this.dagStateStoreDir)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java new file mode 100644 index 0000000..e97d860 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.junit.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; + +import static org.testng.Assert.*; + + +public class DagManagerUtilsTest { + + /** + * Create a {@link Dag <JobExecutionPlan>} with 2 parents and 1 child (i.e. a V-shaped dag). + * @return a Dag. + */ + public Dag<JobExecutionPlan> buildDag() throws URISyntaxException { + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + Config baseConfig = ConfigBuilder.create(). + addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group0"). + addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow0"). + addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()). + addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group0").build(); + for (int i = 0; i < 3; i++) { + String suffix = Integer.toString(i); + Config jobConfig = baseConfig.withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef("job" + suffix)); + if (i == 2) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0,job1")); + } + JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). + withTemplate(new URI("job" + suffix)).build(); + SpecExecutor specExecutor = InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i)); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } + + @Test + public void testGetNext() throws URISyntaxException { + Dag<JobExecutionPlan> dag = buildDag(); + + Set<Dag.DagNode<JobExecutionPlan>> dagNodeSet = DagManagerUtils.getNext(dag); + Assert.assertEquals(dagNodeSet.size(), 2); + + //Set 1st job to complete and 2nd job running state + JobExecutionPlan jobExecutionPlan1 = dag.getNodes().get(0).getValue(); + jobExecutionPlan1.setExecutionStatus(ExecutionStatus.COMPLETE); + JobExecutionPlan jobExecutionPlan2 = dag.getNodes().get(1).getValue(); + jobExecutionPlan2.setExecutionStatus(ExecutionStatus.RUNNING); + + //No new job to run + dagNodeSet = DagManagerUtils.getNext(dag); + Assert.assertEquals(dagNodeSet.size(), 0); + + //Set 2nd job to complete; we must have 3rd job to run next + jobExecutionPlan2.setExecutionStatus(ExecutionStatus.COMPLETE); + dagNodeSet = DagManagerUtils.getNext(dag); + Assert.assertEquals(dagNodeSet.size(), 1); + + //Set the 3rd job to running state, no new jobs to run + JobExecutionPlan jobExecutionPlan3 = dag.getNodes().get(2).getValue(); + jobExecutionPlan3.setExecutionStatus(ExecutionStatus.RUNNING); + dagNodeSet = DagManagerUtils.getNext(dag); + Assert.assertEquals(dagNodeSet.size(), 0); + + //Set the 3rd job to complete; no new jobs to run + jobExecutionPlan3.setExecutionStatus(ExecutionStatus.COMPLETE); + dagNodeSet = DagManagerUtils.getNext(dag); + Assert.assertEquals(dagNodeSet.size(), 0); + + + dag = buildDag(); + dagNodeSet = DagManagerUtils.getNext(dag); + Assert.assertEquals(dagNodeSet.size(), 2); + //Set 1st job to failed; no new jobs to run + jobExecutionPlan1 = dag.getNodes().get(0).getValue(); + jobExecutionPlan1.setExecutionStatus(ExecutionStatus.FAILED); + dagNodeSet = DagManagerUtils.getNext(dag); + Assert.assertEquals(dagNodeSet.size(), 0); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java new file mode 100644 index 0000000..05d800d --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; + + +public class FSDagStateStoreTest { + private DagStateStore _dagStateStore; + private final String dagStateStoreDir = "/tmp/fsDagStateStoreTest/dagStateStore"; + private File checkpointDir; + + @BeforeClass + public void setUp() throws IOException { + Config config = ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef( + this.dagStateStoreDir)); + this._dagStateStore = new FSDagStateStore(config); + this.checkpointDir = new File(dagStateStoreDir); + FileUtils.deleteDirectory(this.checkpointDir); + } + + /** + * Create a {@link Dag<JobExecutionPlan>} with one parent and one child. + * @return a Dag. + */ + public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId) throws URISyntaxException { + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + String suffix = Integer.toString(i); + Config jobConfig = ConfigBuilder.create(). + addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id). + addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id). + addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). + addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build(); + if (i > 0) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job" + (i - 1))); + } + JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). + withTemplate(new URI("job" + suffix)).build(); + SpecExecutor specExecutor = new InMemorySpecExecutor(ConfigFactory.empty()); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } + + @Test + public void testWriteCheckpoint() throws IOException, URISyntaxException { + long flowExecutionId = System.currentTimeMillis(); + String flowGroupId = "0"; + Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId); + this._dagStateStore.writeCheckpoint(dag); + + String fileName = DagManagerUtils.generateDagId(dag) + FSDagStateStore.DAG_FILE_EXTENSION; + File dagFile = new File(this.checkpointDir, fileName); + Dag<JobExecutionPlan> dagDeserialized = ((FSDagStateStore) this._dagStateStore).getDag(dagFile); + Assert.assertEquals(dagDeserialized.getNodes().size(), 2); + Assert.assertEquals(dagDeserialized.getStartNodes().size(), 1); + Assert.assertEquals(dagDeserialized.getEndNodes().size(), 1); + Dag.DagNode<JobExecutionPlan> child = dagDeserialized.getEndNodes().get(0); + Dag.DagNode<JobExecutionPlan> parent = dagDeserialized.getStartNodes().get(0); + Assert.assertEquals(dagDeserialized.getParentChildMap().size(), 1); + Assert.assertTrue(dagDeserialized.getParentChildMap().get(parent).contains(child)); + + for (int i = 0; i < 2; i++) { + JobExecutionPlan plan = dagDeserialized.getNodes().get(i).getValue(); + Config jobConfig = plan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), "group" + flowGroupId); + Assert.assertEquals(jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), "flow" + flowGroupId); + Assert.assertEquals(jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId); + Assert.assertEquals(plan.getExecutionStatus(), ExecutionStatus.RUNNING); + } + } + + @Test (dependsOnMethods = "testWriteCheckpoint") + public void testCleanUp() throws IOException, URISyntaxException { + long flowExecutionId = System.currentTimeMillis(); + String flowGroupId = "0"; + Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId); + this._dagStateStore.writeCheckpoint(dag); + String fileName = DagManagerUtils.generateDagId(dag) + FSDagStateStore.DAG_FILE_EXTENSION; + File dagFile = new File(this.checkpointDir, fileName); + Assert.assertTrue(dagFile.exists()); + this._dagStateStore.cleanUp(dag); + Assert.assertFalse(dagFile.exists()); + } + + @Test (dependsOnMethods = "testCleanUp") + public void testGetDags() throws IOException, URISyntaxException { + //Delete dag checkpoint dir + FileUtils.deleteDirectory(this.checkpointDir); + List<Long> flowExecutionIds = Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1); + for (int i = 0; i < 2; i++) { + String flowGroupId = Integer.toString(i); + Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionIds.get(i)); + this._dagStateStore.writeCheckpoint(dag); + } + + List<Dag<JobExecutionPlan>> dags = this._dagStateStore.getDags(); + Assert.assertEquals(dags.size(), 2); + for (Dag<JobExecutionPlan> dag: dags) { + Assert.assertEquals(dag.getNodes().size(), 2); + Assert.assertEquals(dag.getStartNodes().size(), 1); + Assert.assertEquals(dag.getEndNodes().size(), 1); + Assert.assertEquals(dag.getParentChildMap().size(), 1); + } + } + + @AfterClass + public void cleanUp() throws IOException { + FileUtils.deleteDirectory(this.checkpointDir); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 6d75f9e..bdacbb8 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -95,7 +95,7 @@ public class OrchestratorTest { this.serviceLauncher.addService(flowCatalog); this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), - Optional.of(this.topologyCatalog), Optional.of(logger)); + Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger)); this.topologyCatalog.addListener(orchestrator); this.flowCatalog.addListener(orchestrator);
