This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ca7ad90e98 reevaluate dag proc should accept pending_retry status if 
it is a retry (#4019)
ca7ad90e98 is described below

commit ca7ad90e98872a085bb4bdf2c19122215234c130
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Aug 7 20:01:50 2024 -0700

    reevaluate dag proc should accept pending_retry status if it is a retry 
(#4019)
---
 .../gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java | 3 ++-
 .../service/modules/orchestration/proc/ReevaluateDagProcTest.java     | 4 ++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 04ea1258dc..1b3d10bff3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -82,8 +82,9 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
     ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
     updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus);
+    boolean isRetry = jobStatus.isShouldRetry();
 
-    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+    if (!isRetry && 
!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
       // this may happen if adding job status in the store failed/delayed 
after adding a ReevaluateDagAction in KafkaJobStatusMonitor
       throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action should have been "
               + "created only for finished status - %s. This may happen if 
reevaluate dag action launched reevaluate dag "
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 3f856c2085..bcf16c7e40 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -275,9 +275,9 @@ public class ReevaluateDagProcTest {
     );
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     dagManagementStateStore.addDag(dag);
-    // a job status with shouldRetry=true
+    // a job status with shouldRetry=true, it should have execution status = 
PENDING_RETRY
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
-        .jobName("job0").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.FAILED.name())
+        .jobName("job0").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.PENDING_RETRY.name())
         
.startTime(flowExecutionId).shouldRetry(true).orchestratedTime(flowExecutionId).build();
     doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), 
Optional.of(jobStatus)))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());

Reply via email to