[ 
https://issues.apache.org/jira/browse/GOBBLIN-1999?focusedWorklogId=903434&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-903434
 ]

ASF GitHub Bot logged work on GOBBLIN-1999:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Feb/24 21:35
            Start Date: 02/Feb/24 21:35
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3874:
URL: https://github.com/apache/gobblin/pull/3874#discussion_r1476753421


##########
gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java:
##########
@@ -38,25 +38,61 @@
 public class FlowStatusGeneratorTest {
 
   @Test
-  public void testIsFlowRunning() {
+  public void testIsFlowRunningFirstExecution() {
     JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
     String flowName = "testName";
     String flowGroup = "testGroup";
+    long currFlowExecutionId = 1234L;
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(null);
 
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
-    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
currFlowExecutionId));
+  }
 
-    //If a flow is COMPILED, isFlowRunning() should return true.
+  @Test
+  public void testIsFlowRunningCompiledPastExecution() {
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    String flowName = "testName";
+    String flowGroup = "testGroup";
     long flowExecutionId = 1234L;
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(
         Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
-    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(
+        jobStatusIterator);
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+    // Block the next execution if the prior one is in compiled as it's 
considered still running
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId + 1));
+  }
+
+  @Test
+  public void skipFlowConccurentCheckSameFlowExecutionId() {

Review Comment:
   extra c, one less r in concurrent 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 903434)
    Time Spent: 0.5h  (was: 20m)

> Fix GaaS flowstatus reporting when multiple leaders execute flow
> ----------------------------------------------------------------
>
>                 Key: GOBBLIN-1999
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1999
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> In GaaS multileader there is a race condition where one host will 
> compile/kick off a job, and the other hosts have an expected behavior of 
> compiling the job but avoiding submission until the DagManager supports 
> multiple leaders. The other hosts that run slower will try to check if the 
> current job is running, and then mark the flow as failed at the same time 
> when the leader submits the job. This leads to scenarios where successful 
> flows can be marked as failed flows.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to