This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4721d086f [GOBBLIN-2102]Concurrent flow status check fix (#3989)
4721d086f is described below

commit 4721d086f78553cab28e46bf406787df6cc2dea8
Author: pratapaditya04 <[email protected]>
AuthorDate: Sat Jun 29 00:10:20 2024 +0530

    [GOBBLIN-2102]Concurrent flow status check fix (#3989)
    
    modified isFlowStatus to check all the flow statuses instead of just the 
last one
    
    ---------
    
    Co-authored-by: Aditya Pratap Singh <[email protected]>
---
 .../org/apache/gobblin/service/FlowStatusTest.java |   6 ++
 .../service/monitoring/FlowStatusGenerator.java    |  18 ++--
 .../service/monitoring/JobStatusRetriever.java     |  32 +++++-
 .../monitoring/FlowStatusGeneratorTest.java        | 112 ++++++++++++++++++---
 .../service/monitoring/FsJobStatusRetriever.java   |  17 ++++
 .../monitoring/LocalFsJobStatusRetriever.java      |  35 ++++++-
 .../monitoring/MysqlJobStatusRetriever.java        |   8 ++
 .../service/monitoring/JobStatusRetrieverTest.java |  67 +++++++++++-
 8 files changed, 268 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 4888a2c23..d4950fcf3 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -91,6 +91,12 @@ public class FlowStatusTest {
         int countJobStatusesPerFlowName) {
       return Lists.newArrayList(); // (as this method not exercised within 
`FlowStatusResource`)
     }
+
+    @Override
+    public List<org.apache.gobblin.service.monitoring.FlowStatus> 
getAllFlowStatusesForFlowExecutionsOrdered(
+        String flowGroup, String flowName) {
+      return Lists.newArrayList();// (as this method not exercised within 
`FlowStatusResource`)
+    }
   }
 
   @BeforeClass
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index c6183cc52..b4ec8c353 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -154,16 +154,22 @@ public class FlowStatusGenerator {
    * @return true, if any jobs of the flow are RUNNING.
    */
   public boolean isFlowRunning(String flowName, String flowGroup, long 
flowExecutionId) {
-    List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 
1, null);
+    List<FlowStatus> flowStatusList = 
jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup);
+
     if (flowStatusList == null || flowStatusList.isEmpty()) {
       return false;
-    } else {
-      FlowStatus flowStatus = flowStatusList.get(0);
+    }
+    // Iterating through all flow statuses to check the condition
+    for (FlowStatus flowStatus : flowStatusList) {
       ExecutionStatus flowExecutionStatus = 
flowStatus.getFlowExecutionStatus();
-      log.info("Comparing flow execution status with flowExecutionId: " + 
flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + " 
with incoming flowExecutionId: " + flowExecutionId);
-      // If the latest flow status is the current job about to get kicked off, 
we should ignore this check
-      return flowStatus.getFlowExecutionId() != flowExecutionId && 
!FINISHED_STATUSES.contains(flowExecutionStatus.name());
+      // Check if it is not the current flowExecutionId and the status is not 
in FINISHED_STATUSES
+      if (flowStatus.getFlowExecutionId() != flowExecutionId && 
!FINISHED_STATUSES.contains(flowExecutionStatus.name())) {
+        log.info("Comparing flow execution status with flowExecutionId: " + 
flowStatus.getFlowExecutionId()
+            + " and flowStatus: " + flowExecutionStatus + " with incoming 
flowExecutionId: " + flowExecutionId);
+        return true;
+      }
     }
+    return false; // Return false if all flow statuses are in terminal status
   }
 
   /** @return only `jobStatuses` that represent a flow or, when `tag != null`, 
represent a job tagged as `tag` */
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index bcbcc15fa..98889ac70 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -81,6 +82,14 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
    */
   public abstract List<FlowStatus> 
getFlowStatusesForFlowGroupExecutions(String flowGroup, int 
countJobStatusesPerFlowName);
 
+  /**
+   * Get all the  {@link FlowStatus}es of executions of flows belonging to 
this flow group and flowName.  Currently, latest flow execution
+   * is decided by comparing {@link JobStatus#getFlowExecutionId()}.
+   * @return `FlowStatus`es are ordered by descending flowExecutionId.
+   **/
+  public abstract List<FlowStatus> 
getAllFlowStatusesForFlowExecutionsOrdered(String flowGroup,String flowName);
+
+
   public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
     List<Long> lastKExecutionIds = getLatestExecutionIdsForFlow(flowName, 
flowGroup, 1);
     return lastKExecutionIds != null && !lastKExecutionIds.isEmpty() ? 
lastKExecutionIds.get(0) : -1L;
@@ -202,9 +211,18 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
     private final long flowExecutionId;
     private final List<State> jobStates;
   }
-
+  /**
+   * Groups job status states by flow execution IDs optionally limiting the 
number of executions per flow name.
+   *
+   * @param flowGroup The group to which the flow executions belong.
+   * @param jobStatusStates List of job status states to process.
+   * @param maxCountPerFlowName Maximum number of executions to retain per 
flow name.
+   *                           If null, all executions are returned
+   * @return List of FlowExecutionJobStateGrouping objects containing the 
latest job states
+   *         grouped by flow execution ID and sorted by flow name in ascending 
order.
+   */
   protected List<FlowExecutionJobStateGrouping> 
groupByFlowExecutionAndRetainLatest(
-      String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
+      String flowGroup, List<State> jobStatusStates, Integer 
maxCountPerFlowName) {
     Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName = 
jobStatusStates.stream().collect(
         Collectors.groupingBy(JobStatusRetriever::getFlowName, 
Collectors.groupingBy(JobStatusRetriever::getFlowExecutionId)));
 
@@ -212,7 +230,15 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
       String flowName = flowNameEntry.getKey();
       Map<Long, List<State>> statesByFlowExecutionIdForName = 
flowNameEntry.getValue();
 
-      List<Long> executionIds = 
Ordering.<Long>natural().greatestOf(statesByFlowExecutionIdForName.keySet(), 
maxCountPerFlowName);
+      List<Long> executionIds;
+      if (maxCountPerFlowName != null) {
+        // If maxCountPerFlowName is specified, limit the number of executions 
per flow name
+        executionIds = 
Ordering.natural().greatestOf(statesByFlowExecutionIdForName.keySet(), 
maxCountPerFlowName);
+      } else {
+        // If maxCountPerFlowName is not specified (null), return all 
execution IDs sorted in descending order
+        executionIds = new 
ArrayList<>(statesByFlowExecutionIdForName.keySet());
+        executionIds.sort(Comparator.reverseOrder());
+      }
       return executionIds.stream().map(executionId ->
           new FlowExecutionJobStateGrouping(flowGroup, flowName, executionId, 
statesByFlowExecutionIdForName.get(executionId)));
     }).collect(Collectors.toList());
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 109c71f40..9e84a1fa0 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -43,7 +43,7 @@ public class FlowStatusGeneratorTest {
     String flowName = "testName";
     String flowGroup = "testGroup";
     long currFlowExecutionId = 1234L;
-    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(null);
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup)).thenReturn(null);
 
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
     Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
currFlowExecutionId));
@@ -55,13 +55,12 @@ public class FlowStatusGeneratorTest {
     String flowName = "testName";
     String flowGroup = "testGroup";
     long flowExecutionId = 1234L;
-    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(
-        Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
-    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(
-        jobStatusIterator);
+    FlowStatus flowStatus = new 
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup)).thenReturn(
+        Lists.newArrayList(flowStatus));
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
     // Block the next execution if the prior one is in compiled as it's 
considered still running
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId + 1));
@@ -78,8 +77,9 @@ public class FlowStatusGeneratorTest {
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
-    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(
-        jobStatusIterator);
+    FlowStatus flowStatus = new 
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup)).thenReturn(
+        Lists.newArrayList(flowStatus));
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
     // If the flow is compiled but the flow execution status is the same as 
the one about to be kicked off, do not consider it as running.
     Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId));
@@ -103,18 +103,20 @@ public class FlowStatusGeneratorTest {
         .jobName(job2).eventName("FAILED").build();
     JobStatus jobStatus3 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(job3).eventName("CANCELLED").build();
-    JobStatus flowStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+    JobStatus jobStatus4 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
-    Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, 
jobStatus2, jobStatus3, flowStatus).iterator();
+    Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, 
jobStatus2, jobStatus3, jobStatus4).iterator();
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
 
-    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    FlowStatus flowStatus = new 
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup)).thenReturn(Lists.newArrayList(flowStatus));
     Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId));
 
-    flowStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+    jobStatus4 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
-    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, 
flowStatus).iterator();
-    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, 
jobStatus4).iterator();
+    flowStatus = new 
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup)).thenReturn(Collections.singletonList(flowStatus));
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId+1));
   }
 
@@ -175,6 +177,90 @@ public class FlowStatusGeneratorTest {
         Arrays.asList(f0jsmDep2)));
   }
 
+  @Test
+  public void testIsFlowRunning_NoFlowStatuses_ReturnsFalse() {
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+
+    // Mocking the retrieval of empty flowStatusList
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup))
+        .thenReturn(Collections.emptyList());
+
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId));
+  }
+
+  @Test
+  public void testIsFlowRunning_AllFinishedFlowStatuses_ReturnsFalse() {
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+
+    // Mocking flowStatusList with all finished statuses
+    List<FlowStatus> flowStatusList = Arrays.asList(
+        createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE"),
+        createFlowStatus(flowName, flowGroup, flowExecutionId, "FAILED"),
+        createFlowStatus(flowName, flowGroup, flowExecutionId, "CANCELLED")
+    );
+
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup))
+        .thenReturn(flowStatusList);
+
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId));
+  }
+
+  @Test
+  public void 
testIsFlowRunning_FlowStatusNotMatchingFlowExecutionIdAndOneOfTheStatusIsRunning_ReturnsTrue()
 {
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+
+    // Mocking flowStatusList with a running status and a different flow 
execution id
+    List<FlowStatus> flowStatusList = Arrays.asList(
+        createFlowStatus(flowName, flowGroup, flowExecutionId+4, "COMPLETE"),
+        createFlowStatus(flowName, flowGroup, flowExecutionId+3, "COMPLETE"),
+        createFlowStatus(flowName, flowGroup, flowExecutionId+2, "RUNNING"),
+        createFlowStatus(flowName, flowGroup, flowExecutionId+1, "FAILED"),
+        createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE")
+
+        );
+
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup))
+        .thenReturn(flowStatusList);
+
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId + 1));
+  }
+
+  @Test
+  public void 
testIsFlowRunning_FlowStatusMatchingFlowExecutionId_ReturnsFalse() {
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+
+    // Mocking flowStatusList with a running status and the same flow 
execution id
+    List<FlowStatus> flowStatusList = Collections.singletonList(
+        createFlowStatus(flowName, flowGroup, flowExecutionId, "RUNNING")
+    );
+
+    
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, 
flowGroup))
+        .thenReturn(flowStatusList);
+
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId));
+  }
+
+  private FlowStatus createFlowStatus(String flowName, String flowGroup, long 
flowExecutionId, String status) {
+    ExecutionStatus executionStatus = ExecutionStatus.valueOf(status);
+    return new FlowStatus(flowName, flowGroup, flowExecutionId, null, 
executionStatus);
+  }
+
   private FlowStatus createFlowStatus(String flowGroup, String flowName, long 
flowExecutionId, List<JobStatus> jobStatuses) {
     return new FlowStatus(flowName, flowGroup, flowExecutionId, 
jobStatuses.iterator(),
         
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index e27ce534e..f2aeaba2d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -131,6 +131,23 @@ public class FsJobStatusRetriever extends 
JobStatusRetriever {
     }
   }
 
+  @Override
+  public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String 
flowGroup, String flowName) {
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+    try {
+      String storeNamePrefix = 
KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+      List<String> storeNamesForFlowGroup = stateStore.getStoreNames(storeName 
-> storeName.startsWith(storeNamePrefix));
+      List<State> flowGroupExecutionsStates = 
storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapToUnchecked(storeName
 ->
+          stateStore.getAll(storeName).stream()
+      )).collect(Collectors.toList());
+      return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, 
flowGroupExecutionsStates, null));
+    } catch (IOException | RuntimeException e) { // (latter likely wrapping 
`IOException` originating within `wrapUnchecked`)
+      log.error(String.format("Exception encountered when listing files for 
flow group: %s", flowGroup), e);
+      return ImmutableList.of();
+    }
+  }
+
   /**
    * @param flowName
    * @param flowGroup
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 8a9346571..72561d39a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -90,11 +90,25 @@ public class LocalFsJobStatusRetriever extends 
JobStatusRetriever {
 
     String JOB_DONE_SUFFIX = ".done";
     if (this.doesJobExist(flowName, flowGroup, flowExecutionId, 
JOB_DONE_SUFFIX)) {
-      jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-          
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.COMPLETE.name()).build();
+      jobStatus = JobStatus.builder()
+          .flowName(flowName)
+          .flowGroup(flowGroup)
+          .flowExecutionId(flowExecutionId)
+          .jobName(jobName)
+          .jobGroup(jobGroup)
+          .jobExecutionId(flowExecutionId)
+          .eventName(ExecutionStatus.COMPLETE.name())
+          .build();
     } else if (this.doesJobExist(flowName, flowGroup, flowExecutionId, "")) {
-      jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-          
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.PENDING.name()).build();
+      jobStatus = JobStatus.builder()
+          .flowName(flowName)
+          .flowGroup(flowGroup)
+          .flowExecutionId(flowExecutionId)
+          .jobName(jobName)
+          .jobGroup(jobGroup)
+          .jobExecutionId(flowExecutionId)
+          .eventName(ExecutionStatus.PENDING.name())
+          .build();
     } else {
       return Collections.emptyIterator();
     }
@@ -131,6 +145,19 @@ public class LocalFsJobStatusRetriever extends 
JobStatusRetriever {
     throw new UnsupportedOperationException("Not yet implemented");
   }
 
+  /**
+   * @param flowGroup
+   * @param flowName
+   * @return  all the  flow statuses for the given flowGroup and flowName.
+   */
+  @Override
+  public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String 
flowGroup, String flowName) {
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
   public StateStore<State> getStateStore() {
     // this jobstatus retriever does not have a state store
     // only used in tests so this is okay
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index d323d710c..81d8776c5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -95,6 +95,14 @@ public class MysqlJobStatusRetriever extends 
JobStatusRetriever {
     return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, 
jobStatusStates, countJobStatusesPerFlowName));
   }
 
+  @Override
+  public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String 
flowGroup, String flowName) {
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
+    List<State> jobStatusStates = timeOpAndWrapIOException(() -> 
this.stateStore.getAllWithPrefix(storeName),
+        GET_LATEST_FLOW_GROUP_STATUS_METRIC);
+    return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, 
jobStatusStates,null));
+  }
+
   @Override
   public List<Long> getLatestExecutionIdsForFlow(String flowName, String 
flowGroup, int count) {
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index ca505f73e..1b8a496f9 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -44,9 +45,13 @@ public abstract class JobStatusRetrieverTest {
   protected static final String FLOW_NAME = "myFlowName";
   protected static final String FLOW_GROUP_ALT_A = "myFlowGroup-alt-A";
   protected static final String FLOW_GROUP_ALT_B = "myFlowGroup-alt-B";
+  protected static final String FLOW_GROUP_ALT_C = "myFlowGroup-alt-C";
+  protected static final String FLOW_GROUP_ALT_D = "myFlowGroup-alt-D";
   protected static final String FLOW_NAME_ALT_1 = "myFlowName-alt-1";
   protected static final String FLOW_NAME_ALT_2 = "myFlowName-alt-2";
   protected static final String FLOW_NAME_ALT_3 = "myFlowName-alt-3";
+  protected static final String FLOW_NAME_ALT_4 = "myFlowName-alt-4";
+  protected static final String FLOW_NAME_ALT_5 = "myFlowName-alt-5";
   protected String jobGroup;
   private static final String MY_JOB_GROUP = "myJobGroup";
   protected static final String MY_JOB_NAME_1 = "myJobName1";
@@ -280,7 +285,7 @@ public abstract class JobStatusRetrieverTest {
   }
 
   @Test
-  public void testGetFlowStatusesForFlowGroupExecutions() throws IOException {
+  public void testGetFlowStatusesForFlowGroupExecutions() throws Exception {
     // a.) simplify to begin, in `FLOW_GROUP_ALT_A`, leaving out job-level 
status
     addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 101L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
     addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 102L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
@@ -332,6 +337,66 @@ public abstract class JobStatusRetrieverTest {
             JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L, 
ExecutionStatus.COMPLETE.name()),
             JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, 
ExecutionStatus.ORCHESTRATED.name()))));
   }
+  @Test
+  public void testAllFlowStatusesForFlowExecutionsOrdered() throws Exception {
+    // a.) simplify to begin, in `FLOW_GROUP_ALT_A`, leaving out job-level 
status
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 101L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 102L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 111L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 121L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 122L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 131L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 132L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 133L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.PENDING_RESUME.name());
+
+    // b.) include job-level status, in `FLOW_GROUP_ALT_B`
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 211L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 211L, 
MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 231L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 231L, 
MY_JOB_NAME_1, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 231L, 
MY_JOB_NAME_2, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 232L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 233L, 
JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 233L, 
MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name());
+    addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 233L, 
MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+
+    List<FlowStatus> flowStatusesForGroupAltA = 
this.jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(FLOW_GROUP_ALT_C,
 FLOW_NAME_ALT_4);
+    Assert.assertEquals(flowStatusesForGroupAltA.size(), 8);
+
+    assertThat(flowStatusesForGroupAltA.get(0), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 133L, 
ExecutionStatus.PENDING_RESUME));
+    assertThat(flowStatusesForGroupAltA.get(1), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 132L, 
ExecutionStatus.COMPLETE));
+    assertThat(flowStatusesForGroupAltA.get(2), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 131L, 
ExecutionStatus.FAILED));
+
+    assertThat(flowStatusesForGroupAltA.get(3), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 122L, 
ExecutionStatus.COMPILED));
+
+    assertThat(flowStatusesForGroupAltA.get(4), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 121L, 
ExecutionStatus.COMPLETE));
+    assertThat(flowStatusesForGroupAltA.get(5), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 111L, 
ExecutionStatus.COMPILED));
+
+    assertThat(flowStatusesForGroupAltA.get(6), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 102L, 
ExecutionStatus.RUNNING));
+    assertThat(flowStatusesForGroupAltA.get(7), 
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 101L, 
ExecutionStatus.COMPILED));
+
+
+    List<FlowStatus> flowStatusesForGroupAltB = 
this.jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(FLOW_GROUP_ALT_D,
 FLOW_NAME_ALT_5);
+    Assert.assertEquals(flowStatusesForGroupAltB.size(), 4);
+
+    assertThat(flowStatusesForGroupAltB.get(0), 
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 
233L, ExecutionStatus.ORCHESTRATED,
+        ImmutableList.of(
+            JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L, 
ExecutionStatus.COMPLETE.name()),
+            JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, 
ExecutionStatus.ORCHESTRATED.name()))));
+
+    assertThat(flowStatusesForGroupAltB.get(1), 
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 
232L, ExecutionStatus.FAILED,
+        Collections.emptyList()));
+    assertThat(flowStatusesForGroupAltB.get(2), 
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 
231L, ExecutionStatus.COMPLETE,
+        ImmutableList.of(
+            JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L, 
ExecutionStatus.FAILED.name()),
+            JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, 
ExecutionStatus.COMPLETE.name()))));
+    assertThat(flowStatusesForGroupAltB.get(3), 
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 
211L, ExecutionStatus.FAILED,
+        ImmutableList.of(
+            JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L, 
ExecutionStatus.ORCHESTRATED.name()))));
+  }
 
   abstract void cleanUpDir() throws Exception;
 

Reply via email to