This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b80aca9f [GOBBLIN-2151] ignore flows that are running beyond job 
start and flow finish deadline when doing the concurrency check (#4050)
f9b80aca9f is described below

commit f9b80aca9fd010acfd81c04df835d1f68e442cec
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Sep 11 12:05:55 2024 -0700

    [GOBBLIN-2151] ignore flows that are running beyond job start and flow 
finish deadline when doing the concurrency check (#4050)
---
 .../apache/gobblin/service/FlowConfigsV2Test.java  |   3 +-
 .../monitoring/FlowStatusGeneratorTest.java        |   4 +-
 .../orchestration/DagManagementStateStore.java     |   6 +
 .../MySqlDagManagementStateStore.java              |   7 +
 .../utils/FlowCompilationValidationHelper.java     | 105 ++++++++++++--
 .../monitoring/MysqlJobStatusRetriever.java        |   2 +-
 .../modules/orchestration/OrchestratorTest.java    |   3 +-
 .../utils/FlowCompilationValidationHelperTest.java | 160 ++++++++++++++++++++-
 8 files changed, 272 insertions(+), 18 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
index 01e9d77f8f..07b7c5070d 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
@@ -132,8 +132,7 @@ public class FlowConfigsV2Test {
       
binder.bind(GroupOwnershipService.class).toInstance(groupOwnershipService);
     });
 
-    _server = EmbeddedRestliServer.builder().resources(
-        
Lists.newArrayList(FlowConfigsV2Resource.class)).injector(injector).build();
+    _server = 
EmbeddedRestliServer.builder().resources(Lists.newArrayList(FlowConfigsV2Resource.class)).injector(injector).build();
 
     _server.startAsync();
     _server.awaitRunning();
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 00ad043df8..deeffe98d3 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -58,7 +58,7 @@ public class FlowStatusGeneratorTest {
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
-    FlowStatus flowStatus = new 
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+    FlowStatus flowStatus = new FlowStatus(flowName, flowGroup, 
flowExecutionId, jobStatusIterator, ExecutionStatus.COMPILED);
     
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, 
flowName)).thenReturn(
         Lists.newArrayList(flowStatus));
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
@@ -77,7 +77,7 @@ public class FlowStatusGeneratorTest {
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
-    FlowStatus flowStatus = new 
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+    FlowStatus flowStatus = new FlowStatus(flowName, flowGroup, 
flowExecutionId, jobStatusIterator, ExecutionStatus.COMPILED);
     
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, 
flowName)).thenReturn(
         Lists.newArrayList(flowStatus));
     FlowStatusGenerator flowStatusGenerator = new 
FlowStatusGenerator(jobStatusRetriever);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index d936f0129b..fb7b23fdf0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -142,6 +143,11 @@ public interface DagManagementStateStore {
    */
   Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
 
+  /**
+   * @return list of {@link org.apache.gobblin.service.monitoring.FlowStatus} 
for the provided flow group and flow name.
+   */
+  List<org.apache.gobblin.service.monitoring.FlowStatus> 
getAllFlowStatusesForFlow(String flowGroup, String flowName);
+
   /**
    * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
    * @param flowGroup flow group for the dag action
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index adfe5157d4..d10302000a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -43,6 +44,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatus;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
@@ -211,6 +213,11 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
     }
   }
 
+  @Override
+  public List<FlowStatus> getAllFlowStatusesForFlow(String flowGroup, String 
flowName) {
+    return 
this.jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup, 
flowName);
+  }
+
   @Override
   public boolean existsJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
       DagActionStore.DagActionType dagActionType) throws IOException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 473ff3cf1a..3763658da0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.service.modules.utils;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -38,16 +40,23 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import org.apache.gobblin.service.modules.orchestration.DagUtils;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatus;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
 
 /**
  * Helper class with functionality meant to be re-used between the 
LaunchDagProc and Orchestrator when launching
@@ -69,12 +78,12 @@ public class FlowCompilationValidationHelper {
   private final SpecCompiler specCompiler;
   private final UserQuotaManager quotaManager;
   private final EventSubmitter eventSubmitter;
-  private final FlowStatusGenerator flowStatusGenerator;
+  private final DagManagementStateStore dagManagementStateStore;
   private final boolean isFlowConcurrencyEnabled;
 
   @Inject
   public FlowCompilationValidationHelper(Config config, 
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
-      UserQuotaManager userQuotaManager, FlowStatusGenerator 
flowStatusGenerator) {
+      UserQuotaManager userQuotaManager, DagManagementStateStore 
dagManagementStateStore) {
     try {
       String specCompilerClassName = ConfigUtils.getString(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
           ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
@@ -88,7 +97,7 @@ public class FlowCompilationValidationHelper {
     this.quotaManager = userQuotaManager;
     MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
     this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
-    this.flowStatusGenerator = flowStatusGenerator;
+    this.dagManagementStateStore = dagManagementStateStore;
     this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
         ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
   }
@@ -156,9 +165,9 @@ public class FlowCompilationValidationHelper {
     }
     addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
 
-    if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName, 
allowConcurrentExecution,
-        
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
 {
-      return Optional.fromNullable(jobExecutionPlanDag);
+    if (isExecutionPermitted(flowGroup, flowName,
+        
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)),
 allowConcurrentExecution)) {
+      return Optional.of(jobExecutionPlanDag);
     } else {
       log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
           + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
@@ -167,7 +176,7 @@ public class FlowCompilationValidationHelper {
       
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
       if (!flowSpec.isScheduled()) {
         // For ad-hoc flow, we might already increase quota, we need to 
decrease here
-        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+        for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getStartNodes()) {
           quotaManager.releaseQuota(dagNode);
         }
       }
@@ -187,9 +196,85 @@ public class FlowCompilationValidationHelper {
    * @param allowConcurrentExecution
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
-  private boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowGroup, String flowName,
-      boolean allowConcurrentExecution, long flowExecutionId) {
-    return allowConcurrentExecution || 
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+  private boolean isExecutionPermitted(String flowGroup, String flowName, long 
flowExecutionId, boolean allowConcurrentExecution)
+      throws IOException {
+    return allowConcurrentExecution || !isPriorFlowExecutionRunning(flowGroup, 
flowName, flowExecutionId, dagManagementStateStore);
+  }
+
+  /**
+   * Returns true if any previous execution for the flow determined by the 
provided flowGroup, flowName, flowExecutionId is running.
+   * We ignore the execution that has the provided flowExecutionId so that if 
first attempt of some LaunchDagProc fails
+   * to complete the lease after create a Dag and storing it, the second 
attempt can continue the unfinished work without
+   * thinking that the flow is already running.
+   * We also ignore the flows that are running beyond the job start deadline 
and flow finish deadline.
+   * If this method returns `false`, callers may start a flow and subsequent 
calls to this method may return `true`.
+   */
+  @VisibleForTesting
+  static boolean isPriorFlowExecutionRunning(String flowGroup, String 
flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)
+      throws IOException {
+    List<FlowStatus> flowStatusList = 
dagManagementStateStore.getAllFlowStatusesForFlow(flowGroup, flowName);
+
+    if (flowStatusList == null || flowStatusList.isEmpty()) {
+      return false;
+    }
+
+    for (FlowStatus flowStatus : flowStatusList) {
+      ExecutionStatus flowExecutionStatus = 
flowStatus.getFlowExecutionStatus();
+      if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+        // a duplicate call to this method indicate that the prior caller of 
this method could not complete the required action,
+        // so we ignore any flow status for the current execution to give the 
caller another chance to complete them
+        // but this should be rate, so lets log it
+        if (flowExecutionStatus == COMPILED) {
+          log.info("A previous execution with the same flowExecutionId found 
{}. Previous execution may not be "
+              + "successfully submitted.", flowStatus);
+        } else if (flowExecutionStatus == RUNNING) {
+          log.error("A previous execution with the same flowExecutionId found 
{}. This is a rare case of previous "
+              + "execution getting submitted but then LaunchDagProc failed to 
complete the lease", flowStatus);
+        } else {
+          log.warn("A previous execution with the same flowExecutionId and an 
unexpected status is found {}.", flowStatus);
+        }
+        continue;
+      }
+
+      log.debug("Verifying if {} is running...", flowStatus);
+
+      if 
(FlowStatusGenerator.FINISHED_STATUSES.contains(flowExecutionStatus.name()) || 
flowExecutionStatus == $UNKNOWN) {
+        // ignore finished entries
+        // todo - make changes so `getAllFlowStatusesForFlow` never returns 
$UNKNOWN flow status
+      } else if (flowExecutionStatus == COMPILED || flowExecutionStatus == 
PENDING
+          || flowExecutionStatus == PENDING_RESUME || flowExecutionStatus == 
RUNNING) {
+        // these are the only four non-terminal statuses that a flow can have. 
jobs have two more non-terminal statuses
+        // ORCHESTRATED and PENDING_RETRY
+        Dag.DagId dagIdOfOldExecution = new Dag.DagId(flowGroup, flowName, 
flowStatus.getFlowExecutionId());
+        java.util.Optional<Dag<JobExecutionPlan>> dag = 
dagManagementStateStore.getDag(dagIdOfOldExecution);
+
+        if (!dag.isPresent()) {
+          log.error("Dag is finished and cleaned up, job status monitor 
somehow did not receive/update the flow status. Ignoring it here...");
+          continue;
+        }
+
+        Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+        long flowStartTime = DagUtils.getFlowStartTime(dagNode);
+        long jobStartDeadline =
+            DagUtils.getJobStartDeadline(dagNode, 
DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());
+        long flowFinishDeadline = DagUtils.getFlowFinishDeadline(dagNode);
+          if ((flowExecutionStatus == COMPILED || flowExecutionStatus == 
PENDING)
+                && System.currentTimeMillis() < flowStartTime + 
jobStartDeadline
+              || (flowExecutionStatus == RUNNING || flowExecutionStatus == 
PENDING_RESUME)
+                && System.currentTimeMillis() < flowStartTime + 
flowFinishDeadline) {
+            log.info("{} is still running. Found a dag for this, flowStartTime 
{}, jobStartDeadline {}, flowFinishDeadline {}",
+                flowStatus, flowStartTime, jobStartDeadline, 
flowFinishDeadline);
+            return true;
+          } else {
+            log.warn("Dag {} is still running beyond deadline! flowStartTime 
{}, jobStartDeadline {}, flowFinishDeadline {}",
+                dag, flowStartTime, jobStartDeadline, flowFinishDeadline);
+          }
+        } else {
+          log.error("Unknown status {}", flowExecutionStatus);
+        }
+      }
+
+    return false;
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 81d8776c53..5678fef6aa 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -98,7 +98,7 @@ public class MysqlJobStatusRetriever extends 
JobStatusRetriever {
   @Override
   public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String 
flowGroup, String flowName) {
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
-    List<State> jobStatusStates = timeOpAndWrapIOException(() -> 
this.stateStore.getAllWithPrefix(storeName),
+    List<State> jobStatusStates = timeOpAndWrapIOException(() -> 
this.stateStore.getAll(storeName),
         GET_LATEST_FLOW_GROUP_STATUS_METRIC);
     return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, 
jobStatusStates,null));
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 9652e0faac..ee5f14cb87 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -59,7 +59,6 @@ import org.apache.gobblin.service.GobblinServiceManagerTest;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
@@ -124,7 +123,7 @@ public class OrchestratorTest {
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
     FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(ConfigFactory.empty(),
-        sharedFlowMetricsSingleton, mock(UserQuotaManager.class), 
mock(FlowStatusGenerator.class));
+        sharedFlowMetricsSingleton, mock(UserQuotaManager.class), 
dagManagementStateStore);
     this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.topologyCatalog, Optional.of(logger), 
mock(FlowLaunchHandler.class), sharedFlowMetricsSingleton, 
dagManagementStateStore,
         flowCompilationValidationHelper, mock(JobStatusRetriever.class));
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
index 349b161ad8..c865cae14a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
@@ -17,17 +17,45 @@
 
 package org.apache.gobblin.service.modules.utils;
 
+import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
 import org.apache.gobblin.service.modules.orchestration.DagTestUtils;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatus;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -36,12 +64,19 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 public class FlowCompilationValidationHelperTest {
   private final Long jobSpecFlowExecutionId = 1234L;
   private Dag<JobExecutionPlan> jobExecutionPlanDag;
+  private MySqlDagManagementStateStore dagManagementStateStore;
 
   @BeforeClass
-  public void setup() throws URISyntaxException {
+  public void setup() throws Exception {
     String dagId = "testDag";
     jobExecutionPlanDag =  DagTestUtils.buildDag(dagId, 
jobSpecFlowExecutionId);
+  }
 
+  @BeforeMethod
+  public void resetDMSS() throws Exception {
+    ITestMetastoreDatabase testMetastoreDatabase = 
TestMetastoreDatabaseFactory.get();
+    this.dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+    LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
   }
 
   /*
@@ -66,4 +101,127 @@ public class FlowCompilationValidationHelperTest {
     
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
         existingFlowExecutionId);
   }
+
+  @Test
+  public void testConcurrentFlowPreviousFlowWithNonTerminalStatusWithNoDag() 
throws IOException {
+    List<FlowStatus> list = new ArrayList<>();
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long previousFlowExecutionId = 12345L;
+    JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(previousFlowExecutionId)
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
+    Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
+    list.add(new FlowStatus(flowName, flowGroup, previousFlowExecutionId, 
jobStatusIterator, ExecutionStatus.COMPILED));
+    when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(), 
anyString())).thenReturn(list);
+
+    
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
 flowName,
+        previousFlowExecutionId, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void 
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningBeyondJobStartDeadline()
+      throws IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long jobStartDeadline = 10L;
+    // extra minus 1 because sometimes assertion reach within a millisecond 
and makes the flow running within the deadline
+    long flowStartTime = System.currentTimeMillis() - jobStartDeadline - 1;
+    long currentFlowExecutionId = System.currentTimeMillis() ;
+
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()
+        .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+        .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(jobStartDeadline)));
+
+    
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
 flowName,
+        currentFlowExecutionId, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void 
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningBeyondFlowFinishDeadline()
+      throws IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowFinishDeadline = 30L;
+    long flowStartTime = System.currentTimeMillis() - flowFinishDeadline - 1;
+    long currentFlowExecutionId = System.currentTimeMillis() ;
+
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING_RESUME,
+        ConfigFactory.empty()
+            
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
+
+    
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
 flowName,
+        currentFlowExecutionId, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void 
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithinFlowFinishDeadline()
+      throws IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowFinishDeadline = 10000L;
+    long flowStartTime = System.currentTimeMillis() - 1 ;  // giving test 
flowFinishDeadline + 1 ms to finish
+    long currentFlowExecutionId = System.currentTimeMillis() ;
+
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.RUNNING,
+        ConfigFactory.empty()
+            
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
+
+    
Assert.assertTrue(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
 flowName,
+        currentFlowExecutionId, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void testConcurrentFlowNoPreviousExecutionRunning() throws 
IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowStartTime = System.currentTimeMillis();  // giving test 
flowFinishDeadline to finish
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+    // change the mock to not return any previous flow status
+    when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(), 
anyString())).thenReturn(Collections.emptyList());
+
+    
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
 flowName,
+        flowStartTime, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void testSameFlowExecAlreadyCompiledWithinJobStartDeadline() throws 
IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long jobStartDeadline = 10000L;
+    long flowStartTime = System.currentTimeMillis();
+
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.COMPILED,
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(jobStartDeadline)));
+
+    // flowStartTime = currentFlowExecutionId
+    
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
 flowName,
+        flowStartTime, this.dagManagementStateStore));
+  }
+
+  private void insertFlowIntoDMSSMock(String flowGroup, String flowName, long 
flowStartTime, ExecutionStatus executionStatus, Config config)
+      throws URISyntaxException, IOException {
+    List<FlowStatus> list = new ArrayList<>();
+    JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowStartTime)
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(executionStatus.name()).build();
+    Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
+    list.add(new FlowStatus(flowName, flowGroup, flowStartTime, 
jobStatusIterator, executionStatus));
+    when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(), 
anyString())).thenReturn(list);
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowStartTime,
+        DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, 
"user5", config
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    dag.getNodes().forEach(node -> 
node.getValue().setFlowStartTime(flowStartTime));
+    this.dagManagementStateStore.addDag(dag);
+  }
 }

Reply via email to