This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1325688  [GOBBLIN-775] Add job level retries for gobblin service
1325688 is described below

commit 1325688d34e5d7f6e8cdda4390255e2b96781564
Author: Jack Moseley <[email protected]>
AuthorDate: Wed May 29 11:09:52 2019 -0700

    [GOBBLIN-775] Add job level retries for gobblin service
    
    Closes #2640 from jack-moseley/gaas-job-retry
---
 .../apache/gobblin/metrics/event/TimingEvent.java  |   3 +
 .../gobblin/service/monitoring/JobStatus.java      |   3 +
 .../service/monitoring/JobStatusRetriever.java     |   6 +-
 .../service/modules/orchestration/DagManager.java  |   7 ++
 .../modules/orchestration/DagManagerUtils.java     |   7 ++
 .../modules/orchestration/Orchestrator.java        |   1 +
 .../modules/orchestration/TimingEventUtils.java    |   3 +
 .../service/modules/spec/JobExecutionPlan.java     |  12 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  13 ++
 .../modules/orchestration/DagManagerTest.java      | 138 ++++++++++++++++++++-
 10 files changed, 189 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 8f37adc..8a1e6ef 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -71,6 +71,9 @@ public class TimingEvent {
     public static final String LOW_WATERMARK_FIELD = "lowWatermark";
     public static final String HIGH_WATERMARK_FIELD = "highWatermark";
     public static final String PROCESSED_COUNT_FIELD = "processedCount";
+    public static final String MAX_ATTEMPTS_FIELD = "maxAttempts";
+    public static final String CURRENT_ATTEMPTS_FIELD = "currentAttempts";
+    public static final String SHOULD_RETRY_FIELD = "shouldRetry";
   }
 
   public static final String METADATA_START_TIME = "startTime";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
index 87e52c6..4f3dc2f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatus.java
@@ -43,4 +43,7 @@ public class JobStatus {
   private final long processedCount;
   private final String lowWatermark;
   private final String highWatermark;
+  private final int maxAttempts;
+  private final int currentAttempts;
+  private final boolean shouldRetry;
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index 6827c3c..ebc48fe 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -79,11 +79,15 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
     String lowWatermark = 
jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
     String highWatermark = 
jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
     long processedCount = 
Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD,
 "0"));
+    int maxAttempts = 
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD,
 "1"));
+    int currentAttempts = 
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 "1"));
+    boolean shouldRetry = 
Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
 "false"));
 
     return 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
         
jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
         
lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
-        message(message).processedCount(processedCount).build();
+        
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).
+        shouldRetry(shouldRetry).build();
   }
 
   public abstract StateStore<State> getStateStore();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index e8f45d5..8140648 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -385,6 +385,12 @@ public class DagManager extends AbstractIdleService {
             jobExecutionPlan.setExecutionStatus(RUNNING);
             break;
         }
+
+        if (jobStatus.isShouldRetry()) {
+          log.info("Retrying job: {}, current attempts: {}, max attempts: {}", 
DagManagerUtils.getFullyQualifiedJobName(node),
+              jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+          submitJob(node);
+        }
       }
 
       for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: 
nextSubmitted.entrySet()) {
@@ -446,6 +452,7 @@ public class DagManager extends AbstractIdleService {
      * Submits a {@link JobSpec} to a {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
      */
     private void submitJob(DagNode<JobExecutionPlan> dagNode) {
+      DagManagerUtils.incrementJobAttempt(dagNode);
       JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
       jobExecutionPlan.setExecutionStatus(RUNNING);
       JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 0044c2b..1c6a086 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -156,4 +156,11 @@ public class DagManagerUtils {
     String failureOption = ConfigUtils.getString(getJobConfig(dagNode), 
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
     return FailureOption.valueOf(failureOption);
   }
+
+  /**
+   * Increment the value of {@link JobExecutionPlan#currentAttempts}
+   */
+  static void incrementJobAttempt(DagNode<JobExecutionPlan> dagNode) {
+    
dagNode.getValue().setCurrentAttempts(dagNode.getValue().getCurrentAttempts() + 
1);
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index cda6b52..f3a4f06 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -279,6 +279,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
         for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
+          DagManagerUtils.incrementJobAttempt(dagNode);
           JobExecutionPlan jobExecutionPlan = dagNode.getValue();
 
           // Run this spec on selected executor
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 38a9d07..6eb1a31 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -55,6 +55,9 @@ class TimingEventUtils {
     jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
     jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
     jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
specExecutor.getClass().getCanonicalName());
+    jobMetadata.put(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 
Integer.toString(jobExecutionPlan.getMaxAttempts()));
+    jobMetadata.put(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 
Integer.toString(jobExecutionPlan.getCurrentAttempts()));
+    jobMetadata.put(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, 
Boolean.toString(false));
 
     return jobMetadata;
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 45e65bc..5433c2f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -50,11 +50,15 @@ import org.apache.gobblin.util.ConfigUtils;
  * where the {@link JobSpec} will be executed.
  */
 @Data
-@EqualsAndHashCode (exclude = {"executionStatus"})
+@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts"})
 public class JobExecutionPlan {
+  public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
+
   private final JobSpec jobSpec;
   private final SpecExecutor specExecutor;
   private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN;
+  private final int maxAttempts;
+  private int currentAttempts = 0;
 
   public static class Factory {
     public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
@@ -167,6 +171,12 @@ public class JobExecutionPlan {
     }
   }
 
+  public JobExecutionPlan(JobSpec jobSpec, SpecExecutor specExecutor) {
+    this.jobSpec = jobSpec;
+    this.specExecutor = specExecutor;
+    this.maxAttempts = ConfigUtils.getInt(jobSpec.getConfig(), 
JOB_MAX_ATTEMPTS, 1);
+  }
+
   /**
    * Render the JobSpec into a JSON string.
    * @return a valid JSON string representation of the JobSpec.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 99c862a..c46e01e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -43,6 +43,7 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -147,9 +148,21 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
     jobStatus = mergedProperties(storeName, tableName, jobStatus, stateStore);
 
+    modifyStateIfRetryRequired(jobStatus);
+
     stateStore.put(storeName, tableName, jobStatus);
   }
 
+  private static void 
modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
+    int maxAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
+    int currentAttempts = 
state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
+    if 
(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
 && currentAttempts < maxAttempts) {
+      state.setProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, true);
+      state.setProp(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.RUNNING.name());
+      state.removeProp(TimingEvent.JOB_END_TIME);
+    }
+  }
+
   private static org.apache.gobblin.configuration.State mergedProperties(
       String storeName, String tableName, 
org.apache.gobblin.configuration.State jobStatus, StateStore stateStore) {
     Properties mergedProperties = new Properties();
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 467ac84..a67e5dd 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -44,6 +44,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
@@ -124,9 +125,13 @@ public class DagManagerTest {
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
 
-  private Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String 
eventName) {
+  private Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String 
eventName) {
+    return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, 
jobName, eventName, false);
+  }
+
+  private Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String 
eventName, boolean shouldRetry) {
     return 
Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
-        message("Test message").eventName(eventName).startTime(5000L).build());
+        message("Test 
message").eventName(eventName).startTime(5000L).shouldRetry(shouldRetry).build());
   }
 
   @Test
@@ -313,6 +318,135 @@ public class DagManagerTest {
     }
   }
 
+  @Test
+  public void testSucceedAfterRetry() throws Exception {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    String flowGroup = "group" + flowGroupId;
+    String flowName = "flow" + flowGroupId;
+    String jobName0 = "job0";
+    String jobName1 = "job1";
+    String jobName2 = "job2";
+
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 
"FINISH_RUNNING", true);
+    String dagId = DagManagerUtils.generateDagId(dag);
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING), true);
+    Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName1, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName2, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName1, flowGroup, 
String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName2, flowGroup, 
String.valueOf(ExecutionStatus.COMPLETE));
+
+    
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
+        Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6).
+        thenReturn(jobStatusIterator7);
+
+    //Run the thread once. Ensure the first job is running
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
+    // Second run: check that first job failed and is running again after retry
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+
+    // Third run: check that first job completed successfully and now second 
and third job are submitted
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 2);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1)));
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1)));
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2)));
+
+    // Fourth run: second and third job are running
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId));
+    Assert.assertEquals(this.jobToDag.size(), 2);
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1)));
+    Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2)));
+    Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2);
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1)));
+    
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2)));
+
+    // Fifth run: second and third job complete and dag is cleaned up
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+    Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
+  }
+
+  @Test
+  public void testFailAfterRetry() throws Exception {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    String flowGroup = "group" + flowGroupId;
+    String flowName = "flow" + flowGroupId;
+    String jobName0 = "job0";
+
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 
"FINISH_RUNNING", true);
+    String dagId = DagManagerUtils.generateDagId(dag);
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING), true);
+    Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING), true);
+    Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.RUNNING), true);
+    Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, 
flowGroup, flowExecutionId, jobName0, flowGroup, 
String.valueOf(ExecutionStatus.FAILED));
+
+
+    
Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(),
 Mockito.anyString(),
+        Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5);
+
+    // Run 4 times, first job fails every time and is retried
+    for (int i = 0; i < 4; i++) {
+      this._dagManagerThread.run();
+      Assert.assertEquals(this.dags.size(), 1);
+      Assert.assertTrue(this.dags.containsKey(dagId));
+      Assert.assertEquals(this.jobToDag.size(), 1);
+      Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0)));
+      Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1);
+      
Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0)));
+      
Assert.assertEquals(dag.getStartNodes().get(0).getValue().getCurrentAttempts(), 
i + 1);
+    }
+
+    // Last run fails and dag is cleaned up
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+    Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
+  }
+
   @AfterClass
   public void cleanUp() throws Exception {
     FileUtils.deleteDirectory(new File(this.dagStateStoreDir));

Reply via email to