This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e5756234 [GOBBLIN-1999] Ignore concurrent check if the flow execution 
ID is the same as the c… (#3874)
3e5756234 is described below

commit 3e575623413bf83883532478b5b8b70937d92ed9
Author: William Lo <[email protected]>
AuthorDate: Fri Feb 2 17:57:54 2024 -0500

    [GOBBLIN-1999] Ignore concurrent check if the flow execution ID is the same 
as the c… (#3874)
    
    * Ignore concurrent check if the flow execution ID is the same as the 
currently running flow execution ID to handle race condition of concurrent 
hosts misreporting status
    
    * cleanup
---
 .../service/monitoring/FlowStatusGenerator.java    |  6 ++-
 .../monitoring/FlowStatusGeneratorTest.java        | 54 ++++++++++++++++++----
 .../modules/orchestration/Orchestrator.java        |  2 +-
 .../utils/FlowCompilationValidationHelper.java     | 20 ++++----
 4 files changed, 63 insertions(+), 19 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 0bf8b71ea..775fbeb4d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -152,13 +152,17 @@ public class FlowStatusGenerator {
    * @param flowGroup
    * @return true, if any jobs of the flow are RUNNING.
    */
-  public boolean isFlowRunning(String flowName, String flowGroup) {
+  public boolean isFlowRunning(String flowName, String flowGroup, long 
flowExecutionId) {
     List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 
1, null);
     if (flowStatusList == null || flowStatusList.isEmpty()) {
       return false;
     } else {
       FlowStatus flowStatus = flowStatusList.get(0);
       ExecutionStatus flowExecutionStatus = 
flowStatus.getFlowExecutionStatus();
+      // If the latest flow status is the current job about to get kicked off, 
we should ignore this check
+      if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+        return false;
+      }
       return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
     }
   }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 380177936..601e40e5f 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -38,25 +38,61 @@ import static org.mockito.Mockito.when;
 public class FlowStatusGeneratorTest {
 
   @Test
-  public void testIsFlowRunning() {
+  public void testIsFlowRunningFirstExecution() {
     JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
     String flowName = "testName";
     String flowGroup = "testGroup";
+    long currFlowExecutionId = 1234L;
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(null);
 
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
-    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
currFlowExecutionId));
+  }
 
-    //If a flow is COMPILED, isFlowRunning() should return true.
+  @Test
+  public void testIsFlowRunningCompiledPastExecution() {
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    String flowName = "testName";
+    String flowGroup = "testGroup";
     long flowExecutionId = 1234L;
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(
         Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
-    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(
+        jobStatusIterator);
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+    // Block the next execution if the prior one is in compiled as it's 
considered still running
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId + 1));
+  }
+
+  @Test
+  public void skipFlowConcurrentCheckSameFlowExecutionId() {
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(
+        Lists.newArrayList(flowExecutionId));
+    JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+    Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(
+        jobStatusIterator);
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+    // If the flow is compiled but the flow execution status is the same as 
the one about to be kicked off, do not consider it as running.
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId));
+  }
 
+    @Test
+    public void testIsFlowRunningJobExecutionIgnored() {
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 
1)).thenReturn(
+        Lists.newArrayList(flowExecutionId));
     //JobStatuses should be ignored, only the flow level status matters.
     String job1 = "job1";
     String job2 = "job2";
@@ -69,15 +105,17 @@ public class FlowStatusGeneratorTest {
         .jobName(job3).eventName("CANCELLED").build();
     JobStatus flowStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
-    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, 
flowStatus).iterator();
+    Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, 
jobStatus2, jobStatus3, flowStatus).iterator();
+    FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
+
     when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId));
 
     flowStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
     jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, 
flowStatus).iterator();
     when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, 
flowExecutionId+1));
   }
 
   @Test
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 809779f10..618d8b050 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -247,7 +247,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
         Optional<Dag<JobExecutionPlan>> compiledDagOptional =
             
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
-                flowName);
+                flowName, flowMetadata);
 
         if (!compiledDagOptional.isPresent()) {
           Instrumented.markMeter(this.flowOrchestrationFailedMeter);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index cda69ebd9..7400602f7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -76,9 +76,9 @@ public final class FlowCompilationValidationHelper {
     specCompiler.awaitHealthy();
 
     TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILED);
-    Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
-        validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, 
flowName);
     Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+    Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+        validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, 
flowName, flowMetadata);
 
     if (!jobExecutionPlanDagOptional.isPresent()) {
       return Optional.absent();
@@ -89,7 +89,6 @@ public final class FlowCompilationValidationHelper {
       return Optional.absent();
     }
 
-    addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
     flowCompilationTimer.stop(flowMetadata);
     return jobExecutionPlanDagOptional;
   }
@@ -101,13 +100,18 @@ public final class FlowCompilationValidationHelper {
    * @throws IOException
    */
   public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
-      String flowGroup, String flowName) throws IOException {
+      String flowGroup, String flowName, Map<String,String> flowMetadata) 
throws IOException {
     boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
isFlowConcurrencyEnabled);
 
     Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    if (jobExecutionPlanDag.isEmpty()) {
+      return Optional.absent();
+    }
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
 
-    if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+    if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution,
+        
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
 {
       return Optional.fromNullable(jobExecutionPlanDag);
     } else {
       log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
@@ -121,9 +125,7 @@ public final class FlowCompilationValidationHelper {
           quotaManager.releaseQuota(dagNode);
         }
       }
-
       // Send FLOW_FAILED event
-      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
       flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
           + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flowSpec to change this behaviour.");
       new TimingEvent(eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
@@ -140,8 +142,8 @@ public final class FlowCompilationValidationHelper {
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
   private boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowName, String flowGroup,
-      boolean allowConcurrentExecution) {
-    return allowConcurrentExecution || 
!flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+      boolean allowConcurrentExecution, long flowExecutionId) {
+    return allowConcurrentExecution || 
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
   }
 
   /**

Reply via email to