This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a46d74e2cd [GOBBLIN-2123] create reevaluate dag action for jobs in
state PENDING_RETRY (#4013)
a46d74e2cd is described below
commit a46d74e2cd4e79063345a16288df3375d9516b1f
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Aug 2 22:01:22 2024 -0700
[GOBBLIN-2123] create reevaluate dag action for jobs in state PENDING_RETRY
(#4013)
* add pending_Retry to finished statuses list so that a reevaluate dag proc
is created for such GTEs
---
.../runtime/KafkaAvroJobStatusMonitorTest.java | 19 ++++++-----
.../service/monitoring/FlowStatusGenerator.java | 2 +-
.../DagManagementTaskStreamImplTest.java | 2 +-
.../orchestration/proc/ReevaluateDagProcTest.java | 39 ++++++++++++++++++++++
4 files changed, 51 insertions(+), 11 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 0017629487..d2f1588de4 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -272,21 +272,21 @@ public class KafkaAvroJobStatusMonitorTest {
//Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Because the maximum attempt is set to 2, so the state is set to
PENDING_RETRY after the first failure
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.RUNNING.name());
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
@@ -294,7 +294,7 @@ public class KafkaAvroJobStatusMonitorTest {
//Because the maximum attempt is set to 2, so the state is set to Failed
after trying twice
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.FAILED.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
- Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).addJobDagAction(any(), any(),
anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.times(2)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
@@ -475,27 +475,28 @@ public class KafkaAvroJobStatusMonitorTest {
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(true));
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.PENDING_RETRY.name());
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ // second pending retry, creates a second reevaluate dag proc
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
state = getNextJobStatusState(jobStatusMonitor, recordIterator,
this.jobGroup, this.jobName);
//Job orchestrated for retrying
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
- Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
@@ -503,7 +504,7 @@ public class KafkaAvroJobStatusMonitorTest {
// Received kill flow event, should not retry the flow even though there
is 1 pending attempt left
Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
Assert.assertEquals(state.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD),
Boolean.toString(false));
- Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addJobDagAction(any(), any(),
+ Mockito.verify(dagManagementStateStore,
Mockito.times(3)).addJobDagAction(any(), any(),
anyLong(), any(), eq(DagActionStore.DagActionType.REEVALUATE));
Mockito.verify(dagManagementStateStore,
Mockito.times(1)).deleteDagAction(eq(this.enforceJobStartDeadlineDagAction));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index b4ec8c3530..3be35de041 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.service.ExecutionStatus;
@Slf4j
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES =
Lists.newArrayList(ExecutionStatus.FAILED.name(),
- ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
+ ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name(),
ExecutionStatus.PENDING_RETRY.name());
public static final int MAX_LOOKBACK = 100;
private final JobStatusRetriever jobStatusRetriever;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index af8acb7be9..1fa237fd17 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.util.Optional;
-import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -33,6 +32,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index d2438bd64c..2d3d02461f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -262,6 +262,45 @@ public class ReevaluateDagProcTest {
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
}
+ @Test
+ public void testRetryCurrentFailedJob() throws Exception {
+ String flowName = "fn5";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ );
+ List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+ dagManagementStateStore.addDag(dag);
+ // a job status with shouldRetry=true
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
+ .jobName("job0").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.FAILED.name())
+
.startTime(flowExecutionId).shouldRetry(true).orchestratedTime(flowExecutionId).build();
+ doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)),
Optional.of(jobStatus)))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
+
+ int numOfLaunchedJobs = 1; // only the current job
+ // only the current job, that was failed, should have been retried by the
reevaluate dag proc, because jobStatus has shouldRetry=true
+ Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+ specProducers.stream().skip(numOfLaunchedJobs) // separately verified
`specProducers.get(0)`
+ .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
+
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
+ Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any());
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
+ eq(DagActionStore.DagActionType.REEVALUATE));
+ }
+
public static List<SpecProducer<Spec>>
getDagSpecProducers(Dag<JobExecutionPlan> dag) {
return dag.getNodes().stream().map(n -> {
try {