[
https://issues.apache.org/jira/browse/GOBBLIN-2133?focusedWorklogId=930489&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-930489
]
ASF GitHub Bot logged work on GOBBLIN-2133:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Aug/24 22:32
Start Date: 15/Aug/24 22:32
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4027:
URL: https://github.com/apache/gobblin/pull/4027#discussion_r1719075016
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java:
##########
@@ -112,15 +141,35 @@ public void killDag() throws IOException,
URISyntaxException, InterruptedExcepti
null, this.dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
killDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
- long cancelJobCount = specProducers.stream()
+ int numOfLaunchedJobs = 1;
+ int numOfCancelledJobs = 5; // all jobs in the dag
+ int numOfCancelledFlows = 1;
+ int numOfCancelledJobsWithJobFuture = numOfLaunchedJobs;
+ long actualCancelJobCount = specProducers.stream()
.mapToLong(p -> Mockito.mockingDetails(p)
.getInvocations()
.stream()
.filter(a -> a.getMethod().getName().equals("cancelJob"))
.count())
.sum();
+
+ // kill dag procs kill only the launched jobs with parameters containing
jobFuture
+ Mockito.verify(specProducers.get(0),
Mockito.times(numOfCancelledJobsWithJobFuture)).cancelJob(any(), argThat(props
->
+ props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
"ABSENT").equals(MockedSpecExecutor.dummySerializedFuture)));
+
+ // job future object is not available for rest of the jobs cancel
parameters
+ specProducers.stream()
+ .skip(numOfCancelledJobsWithJobFuture) // separately verified
`specProducers.get(0)` above
+ .forEach(sp -> Mockito.verify(sp, Mockito.never()).cancelJob(any(),
argThat(props ->
+
props.getProperty(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
"ABSENT").equals(MockedSpecExecutor.dummySerializedFuture))));
+
// kill dag proc tries to cancel all the dag nodes
- Assert.assertEquals(cancelJobCount, 5);
+ Assert.assertEquals(actualCancelJobCount, numOfCancelledJobs);
+
+ Mockito.verify(this.mockedEventSubmitter,
Mockito.times(numOfCancelledJobs))
+ .submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
+ Mockito.verify(this.mockedEventSubmitter,
Mockito.times(numOfCancelledFlows))
+ .submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
Review Comment:
nice work!
Issue Time Tracking
-------------------
Worklog Id: (was: 930489)
Time Spent: 1h (was: 50m)
> provide job future before calling SpecProducer::cancelJob
> ---------------------------------------------------------
>
> Key: GOBBLIN-2133
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2133
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)