This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e64b831  Clear flow event after cancel (#3271)
e64b831 is described below

commit e64b8315523ae3ecf71256c8c87bcc0697a8b141
Author: umustafi <[email protected]>
AuthorDate: Mon Apr 26 16:47:54 2021 -0700

    Clear flow event after cancel (#3271)
    
    * cleared flow event after cancelled job to allow resume event status to 
work
    
    * moved clearing of FlowEvent to occur before dags are cleaned up to avoid 
regression
    
    * added comment to clarify nullification of FlowEvent field
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../service/modules/orchestration/DagManager.java  |  2 +
 .../modules/orchestration/DagManagerTest.java      | 82 ++++++++++++++++++++++
 2 files changed, 84 insertions(+)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2641b92..da46d72 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -1200,6 +1200,8 @@ public class DagManager extends AbstractIdleService {
      * @param dagId
      */
     private synchronized void cleanUpDag(String dagId) {
+      // clears flow event after cancelled job to allow resume event status to 
be set
+      this.dags.get(dagId).setFlowEvent(null);
        try {
          this.dagStateStore.cleanUp(dags.get(dagId));
        } catch (IOException ioe) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 62fa545..70cb769 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -553,6 +554,87 @@ public class DagManagerTest {
     Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
   }
 
+  @Test (dependsOnMethods = "testFailAfterRetry")
+  public void testResumeCancelledDag() throws URISyntaxException, IOException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    String flowGroup = "group" + flowGroupId;
+    String flowName = "flow" + flowGroupId;
+    String jobName0 = "job0";
+    String jobName1 = "job1";
+    String jobName2 = "job2";
+
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 
"FINISH_RUNNING", true);
+    String dagId = DagManagerUtils.generateDagId(dag);
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator5 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator6 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.CANCELLED));
+    Iterator<JobStatus> jobStatusIterator7 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY", 
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
+        Iterator<JobStatus> jobStatusIterator8 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator9 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator10 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+    Iterator<JobStatus> jobStatusIterator11 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator12 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6).
+        thenReturn(jobStatusIterator7).
+        thenReturn(jobStatusIterator8).
+        thenReturn(jobStatusIterator9).
+        thenReturn(jobStatusIterator10).
+        thenReturn(jobStatusIterator11).
+        thenReturn(jobStatusIterator12);
+
+    // Run until job2 cancelled
+    for (int i = 0; i < 3; i++) {
+      this._dagManagerThread.run();
+    }
+
+    // Cancel job2
+    this.cancelQueue.offer(dagId);
+
+    this._dagManagerThread.run();
+    Assert.assertTrue(this.failedDags.containsKey(dagId));
+    Assert.assertTrue((this.failedDags.get(dagId).getFlowEvent() == null));
+
+    // Resume dag
+    this.resumeQueue.offer(dagId);
+
+    // Job2 rerunning
+    this._dagManagerThread.run();
+    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertTrue(this.dags.containsKey(dagId));
+
+    // Job2 complete
+    this._dagManagerThread.run();
+    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertFalse(this.dags.containsKey(dagId));
+  }
+
   @AfterClass
   public void cleanUp() throws Exception {
     FileUtils.deleteDirectory(new File(this.dagStateStoreDir));

Reply via email to