This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e64b831 Clear flow event after cancel (#3271)
e64b831 is described below
commit e64b8315523ae3ecf71256c8c87bcc0697a8b141
Author: umustafi <[email protected]>
AuthorDate: Mon Apr 26 16:47:54 2021 -0700
Clear flow event after cancel (#3271)
* cleared flow event after cancelled job to allow resume event status to
work
* moved clearing of FlowEvent to occur before dags are cleaned up to avoid
regression
* added comment to clarify nullification of FlowEvent field
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../service/modules/orchestration/DagManager.java | 2 +
.../modules/orchestration/DagManagerTest.java | 82 ++++++++++++++++++++++
2 files changed, 84 insertions(+)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2641b92..da46d72 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -1200,6 +1200,8 @@ public class DagManager extends AbstractIdleService {
* @param dagId
*/
private synchronized void cleanUpDag(String dagId) {
+ // clears flow event after cancelled job to allow resume event status to
be set
+ this.dags.get(dagId).setFlowEvent(null);
try {
this.dagStateStore.cleanUp(dags.get(dagId));
} catch (IOException ioe) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 62fa545..70cb769 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -553,6 +554,87 @@ public class DagManagerTest {
Assert.assertEquals(this._dagStateStore.getDags().size(), 0);
}
+ @Test (dependsOnMethods = "testFailAfterRetry")
+ public void testResumeCancelledDag() throws URISyntaxException, IOException {
+ long flowExecutionId = System.currentTimeMillis();
+ String flowGroupId = "0";
+ String flowGroup = "group" + flowGroupId;
+ String flowName = "flow" + flowGroupId;
+ String jobName0 = "job0";
+ String jobName1 = "job1";
+ String jobName2 = "job2";
+
+ Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId,
"FINISH_RUNNING", true);
+ String dagId = DagManagerUtils.generateDagId(dag);
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dag);
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator6 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.CANCELLED));
+ Iterator<JobStatus> jobStatusIterator7 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY",
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
+ Iterator<JobStatus> jobStatusIterator8 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator9 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator10 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+ Iterator<JobStatus> jobStatusIterator11 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator12 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4).
+ thenReturn(jobStatusIterator5).
+ thenReturn(jobStatusIterator6).
+ thenReturn(jobStatusIterator7).
+ thenReturn(jobStatusIterator8).
+ thenReturn(jobStatusIterator9).
+ thenReturn(jobStatusIterator10).
+ thenReturn(jobStatusIterator11).
+ thenReturn(jobStatusIterator12);
+
+ // Run until job2 cancelled
+ for (int i = 0; i < 3; i++) {
+ this._dagManagerThread.run();
+ }
+
+ // Cancel job2
+ this.cancelQueue.offer(dagId);
+
+ this._dagManagerThread.run();
+ Assert.assertTrue(this.failedDags.containsKey(dagId));
+ Assert.assertTrue((this.failedDags.get(dagId).getFlowEvent() == null));
+
+ // Resume dag
+ this.resumeQueue.offer(dagId);
+
+ // Job2 rerunning
+ this._dagManagerThread.run();
+ Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertTrue(this.dags.containsKey(dagId));
+
+ // Job2 complete
+ this._dagManagerThread.run();
+ Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertFalse(this.dags.containsKey(dagId));
+ }
+
@AfterClass
public void cleanUp() throws Exception {
FileUtils.deleteDirectory(new File(this.dagStateStoreDir));