[
https://issues.apache.org/jira/browse/GOBBLIN-1999?focusedWorklogId=903434&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-903434
]
ASF GitHub Bot logged work on GOBBLIN-1999:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Feb/24 21:35
Start Date: 02/Feb/24 21:35
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3874:
URL: https://github.com/apache/gobblin/pull/3874#discussion_r1476753421
##########
gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java:
##########
@@ -38,25 +38,61 @@
public class FlowStatusGeneratorTest {
@Test
- public void testIsFlowRunning() {
+ public void testIsFlowRunningFirstExecution() {
JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
+ long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(null);
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
- Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
currFlowExecutionId));
+ }
- //If a flow is COMPILED, isFlowRunning() should return true.
+ @Test
+ public void testIsFlowRunningCompiledPastExecution() {
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ String flowName = "testName";
+ String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
- when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(
+ jobStatusIterator);
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+ // Block the next execution if the prior one is in compiled as it's
considered still running
+ Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId + 1));
+ }
+
+ @Test
+ public void skipFlowConccurentCheckSameFlowExecutionId() {
Review Comment:
extra c, one less r in concurrent
Issue Time Tracking
-------------------
Worklog Id: (was: 903434)
Time Spent: 0.5h (was: 20m)
> Fix GaaS flowstatus reporting when multiple leaders execute flow
> ----------------------------------------------------------------
>
> Key: GOBBLIN-1999
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1999
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: William Lo
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> In GaaS multileader there is a race condition where one host will
> compile/kick off a job, and the other hosts have an expected behavior of
> compiling the job but avoiding submission until the DagManager supports
> multiple leaders. The other hosts that run slower will try to check if the
> current job is running, and then mark the flow as failed at the same time
> when the leader submits the job. This leads to scenarios where successful
> flows can be marked as failed flows.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)