This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ca7ad90e98 reevaluate dag proc should accept pending_retry status if
it is a retry (#4019)
ca7ad90e98 is described below
commit ca7ad90e98872a085bb4bdf2c19122215234c130
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Aug 7 20:01:50 2024 -0700
reevaluate dag proc should accept pending_retry status if it is a retry
(#4019)
---
.../gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java | 3 ++-
.../service/modules/orchestration/proc/ReevaluateDagProcTest.java | 4 ++--
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 04ea1258dc..1b3d10bff3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -82,8 +82,9 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
ExecutionStatus executionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
updateDagNodeStatus(dagManagementStateStore, dagNode, executionStatus);
+ boolean isRetry = jobStatus.isShouldRetry();
- if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+ if (!isRetry &&
!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
// this may happen if adding job status in the store failed/delayed
after adding a ReevaluateDagAction in KafkaJobStatusMonitor
throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action should have been "
+ "created only for finished status - %s. This may happen if
reevaluate dag action launched reevaluate dag "
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 3f856c2085..bcf16c7e40 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -275,9 +275,9 @@ public class ReevaluateDagProcTest {
);
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
dagManagementStateStore.addDag(dag);
- // a job status with shouldRetry=true
+ // a job status with shouldRetry=true, it should have execution status =
PENDING_RETRY
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
- .jobName("job0").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.FAILED.name())
+ .jobName("job0").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.PENDING_RETRY.name())
.startTime(flowExecutionId).shouldRetry(true).orchestratedTime(flowExecutionId).build();
doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)),
Optional.of(jobStatus)))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());