This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new deab0ce22 [GOBBLIN-2000] Add logging for concurrent flow status check 
(#3875)
deab0ce22 is described below

commit deab0ce22f5b9aad8784385c7cd3d19e74f417d0
Author: William Lo <[email protected]>
AuthorDate: Tue Feb 6 16:54:00 2024 -0500

    [GOBBLIN-2000] Add logging for concurrent flow status check (#3875)
---
 .../apache/gobblin/service/monitoring/FlowStatusGenerator.java | 10 ++++------
 .../gobblin/service/monitoring/FlowStatusGeneratorTest.java    |  4 ++--
 2 files changed, 6 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 775fbeb4d..4755f8368 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -31,14 +31,14 @@ import com.google.common.collect.Lists;
 
 import javax.inject.Inject;
 
-import org.apache.gobblin.annotation.Alpha;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.service.ExecutionStatus;
 
 
 /**
  * Generator for {@link FlowStatus}, which relies on a {@link 
JobStatusRetriever}.
  */
-@Alpha
+@Slf4j
 public class FlowStatusGenerator {
   public static final List<String> FINISHED_STATUSES = 
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
   public static final int MAX_LOOKBACK = 100;
@@ -159,11 +159,9 @@ public class FlowStatusGenerator {
     } else {
       FlowStatus flowStatus = flowStatusList.get(0);
       ExecutionStatus flowExecutionStatus = 
flowStatus.getFlowExecutionStatus();
+      log.info("Comparing flow execution status with flowExecutionId: " + 
flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + " 
with incoming flowExecutionId: " + flowExecutionId);
       // If the latest flow status is the current job about to get kicked off, 
we should ignore this check
-      if (flowStatus.getFlowExecutionId() == flowExecutionId) {
-        return false;
-      }
-      return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
+      return flowStatus.getFlowExecutionId() != flowExecutionId && 
!FINISHED_STATUSES.contains(flowExecutionStatus.name());
     }
   }
 
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 601e40e5f..109c71f40 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -58,7 +58,7 @@ public class FlowStatusGeneratorTest {
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(
         Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
-        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
     when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(
         jobStatusIterator);
@@ -76,7 +76,7 @@ public class FlowStatusGeneratorTest {
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(
         Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
-        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
     when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(
         jobStatusIterator);

Reply via email to