This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new deab0ce22 [GOBBLIN-2000] Add logging for concurrent flow status check
(#3875)
deab0ce22 is described below
commit deab0ce22f5b9aad8784385c7cd3d19e74f417d0
Author: William Lo <[email protected]>
AuthorDate: Tue Feb 6 16:54:00 2024 -0500
[GOBBLIN-2000] Add logging for concurrent flow status check (#3875)
---
.../apache/gobblin/service/monitoring/FlowStatusGenerator.java | 10 ++++------
.../gobblin/service/monitoring/FlowStatusGeneratorTest.java | 4 ++--
2 files changed, 6 insertions(+), 8 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 775fbeb4d..4755f8368 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -31,14 +31,14 @@ import com.google.common.collect.Lists;
import javax.inject.Inject;
-import org.apache.gobblin.annotation.Alpha;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.ExecutionStatus;
/**
* Generator for {@link FlowStatus}, which relies on a {@link
JobStatusRetriever}.
*/
-@Alpha
+@Slf4j
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES =
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
public static final int MAX_LOOKBACK = 100;
@@ -159,11 +159,9 @@ public class FlowStatusGenerator {
} else {
FlowStatus flowStatus = flowStatusList.get(0);
ExecutionStatus flowExecutionStatus =
flowStatus.getFlowExecutionStatus();
+ log.info("Comparing flow execution status with flowExecutionId: " +
flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + "
with incoming flowExecutionId: " + flowExecutionId);
// If the latest flow status is the current job about to get kicked off,
we should ignore this check
- if (flowStatus.getFlowExecutionId() == flowExecutionId) {
- return false;
- }
- return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
+ return flowStatus.getFlowExecutionId() != flowExecutionId &&
!FINISHED_STATUSES.contains(flowExecutionStatus.name());
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 601e40e5f..109c71f40 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -58,7 +58,7 @@ public class FlowStatusGeneratorTest {
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
-
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
@@ -76,7 +76,7 @@ public class FlowStatusGeneratorTest {
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
-
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);