[
https://issues.apache.org/jira/browse/GOBBLIN-2123?focusedWorklogId=928618&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928618
]
ASF GitHub Bot logged work on GOBBLIN-2123:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 03/Aug/24 04:30
Start Date: 03/Aug/24 04:30
Worklog Time Spent: 10m
Work Description: arjun4084346 commented on code in PR #4013:
URL: https://github.com/apache/gobblin/pull/4013#discussion_r1702456107
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -263,6 +262,47 @@ public void testMultipleNextJobToRun() throws Exception {
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
}
+ @Test
+ public void testRetryCurrentFailedJob() throws Exception {
+ String flowName = "fn5";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ );
+ List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+ dagManagementStateStore.addDag(dag);
+ // a job status with shouldRetry=true
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
+ .jobName("job0").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.FAILED.name())
+
.startTime(flowExecutionId).shouldRetry(true).orchestratedTime(flowExecutionId).build();
+ doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)),
Optional.of(jobStatus)))
+ .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
+ dagManagementStateStore, mockedDagProcEngineMetrics));
+ reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
+
+ int numOfLaunchedJobs = 1; // only the current job
+ // only the current job should have run
+ Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+ specProducers.stream().skip(numOfLaunchedJobs) // separately verified
`specProducers.get(0)`
Review Comment:
yes, ok, updated the comment
Issue Time Tracking
-------------------
Worklog Id: (was: 928618)
Time Spent: 0.5h (was: 20m)
> create reevaluate dag action for jobs in state PENDING_RETRY
> ------------------------------------------------------------
>
> Key: GOBBLIN-2123
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2123
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)