This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a46d74e2cd [GOBBLIN-2123] create reevaluate dag action for jobs in 
state PENDING_RETRY (#4013)
a46d74e2cd is described below

commit a46d74e2cd4e79063345a16288df3375d9516b1f
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Aug 2 22:01:22 2024 -0700

    [GOBBLIN-2123] create reevaluate dag action for jobs in state PENDING_RETRY 
(#4013)
    
    * add pending_Retry to finished statuses list so that a reevaluate dag proc 
is created for such GTEs
---
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 19 ++++++-----
 .../service/monitoring/FlowStatusGenerator.java    |  2 +-
 .../DagManagementTaskStreamImplTest.java           |  2 +-
 .../orchestration/proc/ReevaluateDagProcTest.java  | 39 ++++++++++++++++++++++
 4 files changed, 51 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 0017629487..d2f1588de4 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -272,21 +272,21 @@ public class KafkaAvroJobStatusMonitorTest {
     //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Because the maximum attempt is set to 2, so the state is set to 
PENDING_RETRY after the first failure
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.RUNNING.name());
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
@@ -294,7 +294,7 @@ public class KafkaAvroJobStatusMonitorTest {
     //Because the maximum attempt is set to 2, so the state is set to Failed 
after trying twice
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.FAILED.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
-    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).addJobDagAction(any(), any(),
         anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
@@ -475,27 +475,28 @@ public class KafkaAvroJobStatusMonitorTest {
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(true));
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.PENDING_RETRY.name());
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+    // second pending retry, creates a second reevaluate dag proc
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
     state = getNextJobStatusState(jobStatusMonitor, recordIterator, 
this.jobGroup, this.jobName);
     //Job orchestrated for retrying
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
-    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
@@ -503,7 +504,7 @@ public class KafkaAvroJobStatusMonitorTest {
     // Received kill flow event, should not retry the flow even though there 
is 1 pending attempt left
     Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
     
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
 Boolean.toString(false));
-    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addJobDagAction(any(), any(),
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(3)).addJobDagAction(any(), any(),
         anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
     Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index b4ec8c3530..3be35de041 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.service.ExecutionStatus;
 @Slf4j
 public class FlowStatusGenerator {
   public static final List<String> FINISHED_STATUSES = 
Lists.newArrayList(ExecutionStatus.FAILED.name(),
-      ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
+      ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(), 
ExecutionStatus.PENDING_RETRY.name());
   public static final int MAX_LOOKBACK = 100;
 
   private final JobStatusRetriever jobStatusRetriever;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index af8acb7be9..1fa237fd17 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration;
 import java.io.IOException;
 import java.util.Optional;
 
-import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -33,6 +32,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index d2438bd64c..2d3d02461f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -262,6 +262,45 @@ public class ReevaluateDagProcTest {
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
   }
 
+  @Test
+  public void testRetryCurrentFailedJob() throws Exception {
+    String flowName = "fn5";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+    List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+    dagManagementStateStore.addDag(dag);
+    // a job status with shouldRetry=true
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
+        .jobName("job0").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.FAILED.name())
+        
.startTime(flowExecutionId).shouldRetry(true).orchestratedTime(flowExecutionId).build();
+    doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), 
Optional.of(jobStatus)))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
+        dagManagementStateStore, mockedDagProcEngineMetrics));
+    reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
+
+    int numOfLaunchedJobs = 1; // only the current job
+    // only the current job, that was failed, should have been retried by the 
reevaluate dag proc, because jobStatus has shouldRetry=true
+    Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+    specProducers.stream().skip(numOfLaunchedJobs) // separately verified 
`specProducers.get(0)`
+        .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
+
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
+    Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+  }
+
   public static List<SpecProducer<Spec>> 
getDagSpecProducers(Dag<JobExecutionPlan> dag) {
     return dag.getNodes().stream().map(n -> {
       try {

Reply via email to