This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 384dc09  [GOBBLIN-1554] Fix `MysqlJobStatusRetriever` to return each 
latest flow execution ID exactly once (#3406)
384dc09 is described below

commit 384dc096da8b7f05447be68fa001fcba39d64276
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Sep 29 18:57:28 2021 -0700

    [GOBBLIN-1554] Fix `MysqlJobStatusRetriever` to return each latest flow 
execution ID exactly once (#3406)
    
    A recent bug caused the flow execution ID to replicat as many times as the 
flow has
    constituent jobs.  It was mistakenly introduced in the refactoring:
      https://github.com/apache/gobblin/pull/3378
---
 .../apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java | 3 ++-
 .../apache/gobblin/service/monitoring/JobStatusRetrieverTest.java  | 7 ++++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 6d28ef9..0476782 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -112,7 +112,8 @@ public class MysqlJobStatusRetriever extends 
JobStatusRetriever {
   }
 
   private List<Long> getLatestExecutionIds(List<State> jobStatusStates, int 
count) {
-    Iterator<Long> flowExecutionIds = 
jobStatusStates.stream().map(this::getFlowExecutionId).iterator();
+    // `distinct()`, to avoid each flow execution ID replicating as many times 
as it has child jobs
+    Iterator<Long> flowExecutionIds = 
jobStatusStates.stream().map(this::getFlowExecutionId).distinct().iterator();
     return Ordering.<Long>natural().greatestOf(flowExecutionIds, count);
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index c7eb16a..d4aa468 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -204,13 +204,14 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
 
     long flowExecutionId2 = 1236L;
-    addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1, 
ExecutionStatus.RUNNING.name());
+    //IMPORTANT: multiple jobs for latest flow verifies that flow executions 
counted exactly once, not once per constituent job
+    addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1, 
ExecutionStatus.COMPLETE.name());
+    addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_2, 
ExecutionStatus.RUNNING.name());
 
     //State store now has 3 flow executions - 1234, 1235, 1236. Get the latest 
2 executions i.e. 1235 and 1236.
     List<Long> latestFlowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(FLOW_NAME, FLOW_GROUP, 2);
     Assert.assertEquals(latestFlowExecutionIds.size(), 2);
-    Assert.assertEquals(latestFlowExecutionIds.get(0), (Long) 
flowExecutionId2);
-    Assert.assertEquals(latestFlowExecutionIds.get(1), (Long) 
flowExecutionId1);
+    Assert.assertEquals(latestFlowExecutionIds, 
ImmutableList.of(flowExecutionId2, flowExecutionId1));
 
     //Remove all flow executions from state store
     cleanUpDir();

Reply via email to