This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 384dc09 [GOBBLIN-1554] Fix `MysqlJobStatusRetriever` to return each
latest flow execution ID exactly once (#3406)
384dc09 is described below
commit 384dc096da8b7f05447be68fa001fcba39d64276
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Sep 29 18:57:28 2021 -0700
[GOBBLIN-1554] Fix `MysqlJobStatusRetriever` to return each latest flow
execution ID exactly once (#3406)
A recent bug caused the flow execution ID to replicat as many times as the
flow has
constituent jobs. It was mistakenly introduced in the refactoring:
https://github.com/apache/gobblin/pull/3378
---
.../apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java | 3 ++-
.../apache/gobblin/service/monitoring/JobStatusRetrieverTest.java | 7 ++++---
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 6d28ef9..0476782 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -112,7 +112,8 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
}
private List<Long> getLatestExecutionIds(List<State> jobStatusStates, int
count) {
- Iterator<Long> flowExecutionIds =
jobStatusStates.stream().map(this::getFlowExecutionId).iterator();
+ // `distinct()`, to avoid each flow execution ID replicating as many times
as it has child jobs
+ Iterator<Long> flowExecutionIds =
jobStatusStates.stream().map(this::getFlowExecutionId).distinct().iterator();
return Ordering.<Long>natural().greatestOf(flowExecutionIds, count);
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index c7eb16a..d4aa468 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -204,13 +204,14 @@ public abstract class JobStatusRetrieverTest {
Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId1);
long flowExecutionId2 = 1236L;
- addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name());
+ //IMPORTANT: multiple jobs for latest flow verifies that flow executions
counted exactly once, not once per constituent job
+ addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_1,
ExecutionStatus.COMPLETE.name());
+ addJobStatusToStateStore(flowExecutionId2, MY_JOB_NAME_2,
ExecutionStatus.RUNNING.name());
//State store now has 3 flow executions - 1234, 1235, 1236. Get the latest
2 executions i.e. 1235 and 1236.
List<Long> latestFlowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(FLOW_NAME, FLOW_GROUP, 2);
Assert.assertEquals(latestFlowExecutionIds.size(), 2);
- Assert.assertEquals(latestFlowExecutionIds.get(0), (Long)
flowExecutionId2);
- Assert.assertEquals(latestFlowExecutionIds.get(1), (Long)
flowExecutionId1);
+ Assert.assertEquals(latestFlowExecutionIds,
ImmutableList.of(flowExecutionId2, flowExecutionId1));
//Remove all flow executions from state store
cleanUpDir();