This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1325688 [GOBBLIN-775] Add job level retries for gobblin service
1325688 is described below
commit 1325688d34e5d7f6e8cdda4390255e2b96781564
Author: Jack Moseley <[email protected]>
AuthorDate: Wed May 29 11:09:52 2019 -0700
[GOBBLIN-775] Add job level retries for gobblin service
Closes #2640 from jack-moseley/gaas-job-retry
---
.../apache/gobblin/metrics/event/TimingEvent.java | 3 +
.../gobblin/service/monitoring/JobStatus.java | 3 +
.../service/monitoring/JobStatusRetriever.java | 6 +-
.../service/modules/orchestration/DagManager.java | 7 ++
.../modules/orchestration/DagManagerUtils.java | 7 ++
.../modules/orchestration/Orchestrator.java | 1 +
.../modules/orchestration/TimingEventUtils.java | 3 +
.../service/modules/spec/JobExecutionPlan.java | 12 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 13 ++
.../modules/orchestration/DagManagerTest.java | 138 ++++++++++++++++++++-
10 files changed, 189 insertions(+), 4 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 8f37adc..8a1e6ef 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -71,6 +71,9 @@ public class TimingEvent {
public static final String LOW_WATERMARK_FIELD = "lowWatermark";
public static final String HIGH_WATERMARK_FIELD = "highWatermark";
public static final String PROCESSED_COUNT_FIELD = "processedCount";
+ public static final String MAX_ATTEMPTS_FIELD = "maxAttempts";
+ public static final String CURRENT_ATTEMPTS_FIELD = "currentAttempts";
+ public static final String SHOULD_RETRY_FIELD = "shouldRetry";
}
public static final String METADATA_START_TIME = "startTime";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 87e52c6..4f3dc2f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -43,4 +43,7 @@ public class JobStatus {
private final long processedCount;
private final String lowWatermark;
private final String highWatermark;
+ private final int maxAttempts;
+ private final int currentAttempts;
+ private final boolean shouldRetry;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 6827c3c..ebc48fe 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -79,11 +79,15 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
String lowWatermark =
jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
String highWatermark =
jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
long processedCount =
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD,
"0"));
+ int maxAttempts =
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD,
"1"));
+ int currentAttempts =
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
"1"));
+ boolean shouldRetry =
Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false"));
return
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
- message(message).processedCount(processedCount).build();
+
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
+ shouldRetry(shouldRetry).build();
}
public abstract StateStore<State> getStateStore();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index e8f45d5..8140648 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -385,6 +385,12 @@ public class DagManager extends AbstractIdleService {
jobExecutionPlan.setExecutionStatus(RUNNING);
break;
}
+
+ if (jobStatus.isShouldRetry()) {
+ log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
DagManagerUtils.getFullyQualifiedJobName(node),
+ jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+ submitJob(node);
+ }
}
for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry:
nextSubmitted.entrySet()) {
@@ -446,6 +452,7 @@ public class DagManager extends AbstractIdleService {
* Submits a {@link JobSpec} to a {@link
org.apache.gobblin.runtime.api.SpecExecutor}.
*/
private void submitJob(DagNode<JobExecutionPlan> dagNode) {
+ DagManagerUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
jobExecutionPlan.setExecutionStatus(RUNNING);
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 0044c2b..1c6a086 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -156,4 +156,11 @@ public class DagManagerUtils {
String failureOption = ConfigUtils.getString(getJobConfig(dagNode),
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
return FailureOption.valueOf(failureOption);
}
+
+ /**
+ * Increment the value of {@link JobExecutionPlan#currentAttempts}
+ */
+ static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
+
dagNode.getValue().setCurrentAttempts(dagNode.getValue().getCurrentAttempts() +
1);
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index cda6b52..f3a4f06 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -279,6 +279,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
+ DagManagerUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
// Run this spec on selected executor
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 38a9d07..6eb1a31 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -55,6 +55,9 @@ class TimingEventUtils {
jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
specExecutor.getClass().getCanonicalName());
+ jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD,
Integer.toString(jobExecutionPlan.getMaxAttempts()));
+ jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
Integer.toString(jobExecutionPlan.getCurrentAttempts()));
+ jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
Boolean.toString(false));
return jobMetadata;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 45e65bc..5433c2f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -50,11 +50,15 @@ import org.apache.gobblin.util.ConfigUtils;
* where the {@link JobSpec} will be executed.
*/
@Data
-@EqualsAndHashCode (exclude = {"executionStatus"})
+@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts"})
public class JobExecutionPlan {
+ public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
+
private final JobSpec jobSpec;
private final SpecExecutor specExecutor;
private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN;
+ private final int maxAttempts;
+ private int currentAttempts = 0;
public static class Factory {
public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
@@ -167,6 +171,12 @@ public class JobExecutionPlan {
}
}
+ public JobExecutionPlan(JobSpec jobSpec, SpecExecutor specExecutor) {
+ this.jobSpec = jobSpec;
+ this.specExecutor = specExecutor;
+ this.maxAttempts = ConfigUtils.getInt(jobSpec.getConfig(),
JOB_MAX_ATTEMPTS, 1);
+ }
+
/**
* Render the JobSpec into a JSON string.
* @return a valid JSON string representation of the JobSpec.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 99c862a..c46e01e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -43,6 +43,7 @@ import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.util.ConfigUtils;
@@ -147,9 +148,21 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
jobStatus = mergedProperties(storeName, tableName, jobStatus, stateStore);
+ modifyStateIfRetryRequired(jobStatus);
+
stateStore.put(storeName, tableName, jobStatus);
}
+ private static void
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
+ int maxAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
+ int currentAttempts =
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
+ if
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
&& currentAttempts < maxAttempts) {
+ state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
+ state.setProp(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.RUNNING.name());
+ state.removeProp(TimingEvent.JOB_END_TIME);
+ }
+ }
+
private static org.apache.gobblin.configuration.State mergedProperties(
String storeName, String tableName,
org.apache.gobblin.configuration.State jobStatus, StateStore stateStore) {
Properties mergedProperties = new Properties();
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 467ac84..a67e5dd 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -44,6 +44,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
@@ -124,9 +125,13 @@ public class DagManagerTest {
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
- private Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName) {
+ private Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName) {
+ return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup,
jobName, eventName, false);
+ }
+
+ private Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName, boolean shouldRetry) {
return
Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
- message("Test message").eventName(eventName).startTime(5000L).build());
+ message("Test
message").eventName(eventName).startTime(5000L).shouldRetry(shouldRetry).build());
}
@Test
@@ -313,6 +318,135 @@ public class DagManagerTest {
}
}
+ @Test
+ public void testSucceedAfterRetry() throws Exception {
+ long flowExecutionId = System.currentTimeMillis();
+ String flowGroupId = "0";
+ String flowGroup = "group" + flowGroupId;
+ String flowName = "flow" + flowGroupId;
+ String jobName0 = "job0";
+ String jobName1 = "job1";
+ String jobName2 = "job2";
+
+ Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId,
"FINISH_RUNNING", true);
+ String dagId = DagManagerUtils.generateDagId(dag);
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dag);
+ Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING), true);
+ Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName1, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName2, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName1, flowGroup,
String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName2, flowGroup,
String.valueOf(ExecutionStatus.COMPLETE));
+
+
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(),
+ Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4).
+ thenReturn(jobStatusIterator5).
+ thenReturn(jobStatusIterator6).
+ thenReturn(jobStatusIterator7);
+
+ //Run the thread once. Ensure the first job is running
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertTrue(this.dags.containsKey(dagId));
+ Assert.assertEquals(this.jobToDag.size(), 1);
+ Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+ Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
+ // Second run: check that first job failed and is running again after retry
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertTrue(this.dags.containsKey(dagId));
+ Assert.assertEquals(this.jobToDag.size(), 1);
+ Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+ Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
+ // Third run: check that first job completed successfully and now second
and third job are submitted
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertTrue(this.dags.containsKey(dagId));
+ Assert.assertEquals(this.jobToDag.size(), 2);
+ Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1)));
+ Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2)));
+ Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1)));
+
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2)));
+
+ // Fourth run: second and third job are running
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertTrue(this.dags.containsKey(dagId));
+ Assert.assertEquals(this.jobToDag.size(), 2);
+ Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1)));
+ Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2)));
+ Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1)));
+
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2)));
+
+ // Fifth run: second and third job complete and dag is cleaned up
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
+ }
+
+ @Test
+ public void testFailAfterRetry() throws Exception {
+ long flowExecutionId = System.currentTimeMillis();
+ String flowGroupId = "0";
+ String flowGroup = "group" + flowGroupId;
+ String flowName = "flow" + flowGroupId;
+ String jobName0 = "job0";
+
+ Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId,
"FINISH_RUNNING", true);
+ String dagId = DagManagerUtils.generateDagId(dag);
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dag);
+ Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING), true);
+ Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING), true);
+ Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.RUNNING), true);
+ Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName,
flowGroup, flowExecutionId, jobName0, flowGroup,
String.valueOf(ExecutionStatus.FAILED));
+
+
+
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(),
+ Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4).
+ thenReturn(jobStatusIterator5);
+
+ // Run 4 times, first job fails every time and is retried
+ for (int i = 0; i < 4; i++) {
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertTrue(this.dags.containsKey(dagId));
+ Assert.assertEquals(this.jobToDag.size(), 1);
+ Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+ Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
Assert.assertEquals(dag.getStartNodes().get(0).getValue().getCurrentAttempts(),
i + 1);
+ }
+
+ // Last run fails and dag is cleaned up
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
+ }
+
@AfterClass
public void cleanUp() throws Exception {
FileUtils.deleteDirectory(new File(this.dagStateStoreDir));