This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e5d897edae [GOBBLIN-2173] Disallow adhoc flows sharing the same FlowId 
within epsilon (#4076)
e5d897edae is described below

commit e5d897edaee391d05a55e6ac8a420e3416fef6d9
Author: vsinghal85 <[email protected]>
AuthorDate: Wed Nov 20 01:27:07 2024 +0530

    [GOBBLIN-2173] Disallow adhoc flows sharing the same FlowId within epsilon 
(#4076)
    
    where epsilon is the multi-active execution/lease consolidation period
---
 .../restli/FlowConfigsV2ResourceHandler.java       |  4 +
 .../api/TooSoonToRerunSameFlowException.java       | 49 +++++++++++
 .../orchestration/DagManagementStateStore.java     |  9 ++
 .../orchestration/InstrumentedLeaseArbiter.java    |  5 ++
 .../orchestration/MultiActiveLeaseArbiter.java     | 11 +++
 .../MySqlDagManagementStateStore.java              | 12 ++-
 .../MysqlMultiActiveLeaseArbiter.java              |  6 ++
 .../modules/orchestration/Orchestrator.java        | 27 ++++++
 .../MySqlDagManagementStateStoreTest.java          | 26 +++++-
 .../MysqlMultiActiveLeaseArbiterTest.java          | 45 +++++++++-
 .../modules/orchestration/OrchestratorTest.java    | 95 +++++++++++++++++++---
 11 files changed, 272 insertions(+), 17 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
index 927909e57d..055724d83d 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
@@ -60,6 +60,7 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -256,6 +257,9 @@ public class FlowConfigsV2ResourceHandler implements 
FlowConfigsResourceHandlerI
       responseMap = this.flowCatalog.put(flowSpec, true);
     } catch (QuotaExceededException e) {
       throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, 
e.getMessage());
+    } catch(TooSoonToRerunSameFlowException e) {
+      return new CreateKVResponse<>(new 
RestLiServiceException(HttpStatus.S_409_CONFLICT,
+          "FlowSpec with URI " + flowSpec.getUri() + " was previously launched 
within the lease consolidation period, no action will be taken"));
     } catch (Throwable e) {
       // TODO: Compilation errors should fall under throwable exceptions as 
well instead of checking for strings
       log.warn(String.format("Failed to add flow configuration %s.%s to 
catalog due to", flowConfig.getId().getFlowGroup(), 
flowConfig.getId().getFlowName()), e);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java
new file mode 100644
index 0000000000..f718ec4a98
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+
+/**
+ * An exception thrown when another {@link FlowSpec} with same flow name and 
flow group
+ * is submitted within lease consolidation time.
+ */
+public class TooSoonToRerunSameFlowException extends RuntimeException {
+  @Getter
+  private final FlowSpec flowSpec;
+
+  /**
+   * Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s 
`CallbackResult` error handling for `SpecCatalogListener`s
+   * @return `TooSoonToRerunSameFlowException` wrapped in another 
`TooSoonToRerunSameFlowException
+   */
+  public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec) 
{
+    return new TooSoonToRerunSameFlowException(flowSpec, new 
TooSoonToRerunSameFlowException(flowSpec));
+  }
+
+  public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
+    super("Lease already occupied by another recent execution of this flow: " 
+ flowSpec);
+    this.flowSpec = flowSpec;
+  }
+
+  /** restricted-access ctor: use {@link #wrappedOnce(FlowSpec)} instead */
+  private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) {
+    super("Lease already occupied by another recent execution of this flow: " 
+ flowSpec, cause);
+    this.flowSpec = flowSpec;
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index fb7b23fdf0..8059eab4e1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -103,6 +103,15 @@ public interface DagManagementStateStore {
    */
   void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
 
+  /**
+   * Returns true if a flow has been launched recently with same flow name and 
flow group.
+   * @param flowGroup flow group for the flow
+   * @param flowName flow name for the flow
+   * @param flowExecutionId flow execution for the flow
+   * @throws IOException
+   */
+  boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup, String 
flowName, long flowExecutionId) throws IOException;
+
   /**
    * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link 
JobStatus}.
    * Both params are returned as optional and are empty if not present in the 
store.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index 9e1c270c49..746ab66237 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -90,6 +90,11 @@ public class InstrumentedLeaseArbiter implements 
MultiActiveLeaseArbiter {
     throw new RuntimeException(String.format("Unexpected LeaseAttemptStatus 
(%s) for %s", leaseAttemptStatus.getClass().getName(), leaseParams));
   }
 
+  @Override
+  public boolean 
existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams 
leaseParams) throws IOException {
+    return 
decoratedMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams);
+  }
+
   @Override
   public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus 
status)
       throws IOException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index c9a3b152bf..f580e936a5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
   LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, 
boolean adoptConsensusFlowExecutionId)
       throws IOException;
 
+  /**
+   * This method checks if entry for same flow name and flow group exists 
within the lease consolidation period
+   * returns true if entry for the same flow exists within Lease Consolidation 
Period (aka. epsilon)
+   * else returns false
+   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action
+   *                      was triggered, and if the dag action event we're 
checking on is a reminder event
+   * @return true if lease for a recently launched flow already exists for the 
flow details in leaseParams
+   */
+  boolean 
existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams 
leaseParams)
+      throws IOException;
+
   /**
    * This method is used to indicate the owner of the lease has successfully 
completed required actions while holding
    * the lease of the dag action event. It marks the lease as "no longer 
leasing", if the eventTimeMillis and
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index 29e652cce8..b14d6bc85c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -65,6 +65,7 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
   // todo - these two stores should merge
   private DagStateStoreWithDagNodes dagStateStore;
   private DagStateStoreWithDagNodes failedDagStateStore;
+  private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
   private final JobStatusRetriever jobStatusRetriever;
   private boolean dagStoresInitialized = false;
   private final UserQuotaManager quotaManager;
@@ -79,13 +80,14 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
 
   @Inject
   public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, 
UserQuotaManager userQuotaManager,
-      JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
+      JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
     this.quotaManager = userQuotaManager;
     this.config = config;
     this.flowCatalog = flowCatalog;
     this.jobStatusRetriever = jobStatusRetriever;
     this.dagManagerMetrics.activate();
     this.dagActionStore = dagActionStore;
+    this.multiActiveLeaseArbiter = multiActiveLeaseArbiter;
    }
 
   // It should be called after topology spec map is set
@@ -168,6 +170,14 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
     this.dagStateStore.updateDagNode(dagNode);
   }
 
+  @Override
+  public boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup, 
String flowName, long flowExecutionId) throws IOException {
+    DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+        flowExecutionId, DagActionStore.DagActionType.LAUNCH);
+    DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+    return 
multiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams);
+  }
+
   @Override
   public Optional<Dag<JobExecutionPlan>> getDag(Dag.DagId dagId) throws 
IOException {
     return Optional.ofNullable(this.dagStateStore.getDag(dagId));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 4811279048..fed800c838 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -362,6 +362,12 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     }
   }
 
+  @Override
+  public boolean 
existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams 
leaseParams) throws IOException {
+    Optional<GetEventInfoResult> infoResult = 
getExistingEventInfo(leaseParams);
+    return infoResult.isPresent() ? infoResult.get().isWithinEpsilon() : false;
+  }
+
   /**
    * Checks leaseArbiterTable for an existing entry for this dag action and 
event time
    */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ae053ab51b..f0a9fdd43d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.modules.flow.FlowUtils;
@@ -78,6 +79,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   protected final SpecCompiler specCompiler;
   protected final TopologyCatalog topologyCatalog;
   private final JobStatusRetriever jobStatusRetriever;
+  private final DagManagementStateStore dagManagementStateStore;
 
   protected final MetricContext metricContext;
 
@@ -100,6 +102,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     this.topologyCatalog = topologyCatalog;
     this.flowLaunchHandler = flowLaunchHandler;
     this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
+    this.dagManagementStateStore = dagManagementStateStore;
     this.jobStatusRetriever = jobStatusRetriever;
     this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
     // todo remove the need to set topology factory outside of constructor 
GOBBLIN-2056
@@ -125,6 +128,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       _log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec);
       this.specCompiler.onAddSpec(addedSpec);
     } else if (addedSpec instanceof FlowSpec) {
+      enforceNoRecentAdhocExecOfSameFlow((FlowSpec) addedSpec);
       _log.info("Orchestrator - onAdd[Flow]Spec: " + addedSpec);
       return this.specCompiler.onAddSpec(addedSpec);
     } else {
@@ -133,6 +137,29 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     return new AddSpecResponse<>(null);
   }
 
+  /*
+    enforces that a similar adhoc flow is not launching,
+    else throw {@link TooSoonToRerunSameFlowException}
+   */
+  private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) {
+    if (!flowSpec.isScheduled()) {
+      Config flowConfig = flowSpec.getConfig();
+      String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+      _log.info("Checking existing adhoc flow entry for " + flowGroup + "." + 
flowName);
+      try {
+        if 
(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup, 
flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
+          _log.warn("Another recent adhoc flow execution found for " + 
flowGroup + "." + flowName);
+          throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
+        }
+      } catch (IOException exception) {
+        _log.error("Unable to check whether similar flow exists " +  flowGroup 
+ "." + flowName);
+        throw new RuntimeException("Unable to check whether similar flow 
exists " +  flowGroup + "." + flowName, exception);
+      }
+    }
+  }
+
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
     onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
index c14a7b6238..6dbbd0ba8b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -47,8 +48,7 @@ import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.CompletedFuture;
 
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -59,6 +59,7 @@ import static org.mockito.Mockito.mock;
 public class MySqlDagManagementStateStoreTest {
 
   private ITestMetastoreDatabase testDb;
+  private static MultiActiveLeaseArbiter leaseArbiter;
   private MySqlDagManagementStateStore dagManagementStateStore;
   private static final String TEST_USER = "testUser";
   public static final String TEST_PASSWORD = "testPassword";
@@ -68,6 +69,7 @@ public class MySqlDagManagementStateStoreTest {
   @BeforeClass
   public void setUp() throws Exception {
     // Setting up mock DB
+    this.leaseArbiter = mock(MultiActiveLeaseArbiter.class);
     this.testDb = TestMetastoreDatabaseFactory.get();
     this.dagManagementStateStore = getDummyDMSS(this.testDb);
   }
@@ -92,6 +94,22 @@ public class MySqlDagManagementStateStoreTest {
     return true;
   }
 
+  @Test
+  public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws 
Exception{
+    
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+    String flowName = "testFlow";
+    String flowGroup = "testGroup";
+    
Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup,
 flowName, System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testExistsCurrentlyLaunchingSimilarFlowGivesFalse() throws 
Exception{
+    
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+    String flowName = "testFlow";
+    String flowGroup = "testGroup";
+    
Assert.assertFalse(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup,
 flowName, System.currentTimeMillis()));
+  }
+
   @Test
   public void testAddDag() throws Exception {
     Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
@@ -150,9 +168,11 @@ public class MySqlDagManagementStateStoreTest {
     TopologySpec topologySpec = 
LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
     URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
     topologySpecMap.put(specExecURI, topologySpec);
+    MultiActiveLeaseArbiter multiActiveLeaseArbiter = 
Mockito.mock(MultiActiveLeaseArbiter.class);
+    leaseArbiter = multiActiveLeaseArbiter;
     MySqlDagManagementStateStore dagManagementStateStore =
         new MySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever,
-            
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
+            
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase), 
multiActiveLeaseArbiter);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     return dagManagementStateStore;
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 9b132fe0d9..9ba8f40c6b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.util.ExponentialBackoff;
 public class MysqlMultiActiveLeaseArbiterTest {
   private static final long EPSILON = 10000L;
   private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1);
+  private static final long LESS_THAN_EPSILON = (long) (EPSILON * 0.90);
   // NOTE: `sleep`ing this long SIGNIFICANTLY slows tests, but we need a large 
enough value that exec. variability won't cause spurious failure
   private static final long LINGER = 20000L;
   private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1);
@@ -53,9 +54,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String CONSTANTS_TABLE = "constants_store";
   private static final String flowGroup = "testFlowGroup";
   private static final String flowGroup2 = "testFlowGroup2";
+  private static final String flowGroup3 = "testFlowGroup3";
+  private static final String flowGroup4 = "testFlowGroup4";
   private static final String flowName = "testFlowName";
   private static final String jobName = "testJobName";
-  private static final long flowExecutionId = 12345677L;
+  private static final long flowExecutionId = 12345677213L;
+  private static final long flowExecutionId1 = 12345996546L;
   private static final long eventTimeMillis = 1710451837L;
   // Dag actions with the same flow info but different flow action types are 
considered unique
   private static final DagActionStore.DagAction launchDagAction =
@@ -70,6 +74,18 @@ public class MysqlMultiActiveLeaseArbiterTest {
       new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
   private static final DagActionStore.LeaseParams
       launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2, 
false, eventTimeMillis);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams3 = new DagActionStore.LeaseParams(new 
DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName,
+      DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams3_similar = new DagActionStore.LeaseParams(new 
DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName,
+      DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams4 = new DagActionStore.LeaseParams(new 
DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName,
+      DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams4_similar = new DagActionStore.LeaseParams(new 
DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName,
+      DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
   private static final Timestamp dummyTimestamp = new Timestamp(99999);
   private ITestMetastoreDatabase testDb;
   private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -201,6 +217,33 @@ public class MysqlMultiActiveLeaseArbiterTest {
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
 
+  /*
+   test to verify if leasable entity is unavailable before epsilon time
+   to account for clock drift
+  */
+  @Test
+  public void testExistsSimilarLeaseWithinConsolidationPeriod() throws 
Exception{
+    LeaseAttemptStatus firstLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
+    Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    completeLeaseHelper(launchLeaseParams3);
+    Thread.sleep(LESS_THAN_EPSILON);
+    
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams3_similar));
+  }
+
+  /*
+     test to verify if leasable entity exists post epsilon time
+   */
+  @Test
+  public void testDoesNotExistsSimilarLeaseWithinConsolidationPeriod() throws 
Exception{
+    LeaseAttemptStatus firstLaunchStatus =
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true);
+    Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
+    completeLeaseHelper(launchLeaseParams4);
+    Thread.sleep(MORE_THAN_EPSILON);
+    
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams4_similar));
+  }
+
   /*
      Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no 
row matches the primary key in the table.
      If such a row does exist, the method should disregard the resulting SQL 
error and return 0 rows updated, indicating
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index ee5f14cb87..acfa6c51ca 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -18,13 +18,20 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.service.modules.flow.FlowUtils;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -63,6 +70,7 @@ import 
org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -85,7 +93,9 @@ public class OrchestratorTest {
   private FlowCatalog flowCatalog;
   private FlowSpec flowSpec;
   private ITestMetastoreDatabase testMetastoreDatabase;
-  private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
+  private Orchestrator orchestrator;
+  private DagManagementStateStore dagManagementStateStore;
+  private SpecCompiler specCompiler;
 
   @BeforeClass
   public void setUpClass() throws Exception {
@@ -107,7 +117,7 @@ public class OrchestratorTest {
     flowProperties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
 
     this.serviceLauncher = new ServiceBasedAppLauncher(orchestratorProperties, 
"OrchestratorCatalogTest");
-
+    this.specCompiler = Mockito.mock(SpecCompiler.class);
     this.topologyCatalog = new 
TopologyCatalog(ConfigUtils.propertiesToConfig(topologyProperties),
         Optional.of(logger));
     this.serviceLauncher.addService(topologyCatalog);
@@ -116,20 +126,21 @@ public class OrchestratorTest {
     this.flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), 
Optional.of(logger), Optional.absent(), true);
 
     this.serviceLauncher.addService(flowCatalog);
-
+    MultiActiveLeaseArbiter leaseArbiter = 
Mockito.mock(MultiActiveLeaseArbiter.class);
     MySqlDagManagementStateStore dagManagementStateStore =
         
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    this.dagManagementStateStore = dagManagementStateStore;
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
     FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(ConfigFactory.empty(),
         sharedFlowMetricsSingleton, mock(UserQuotaManager.class), 
dagManagementStateStore);
-    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+    this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.topologyCatalog, Optional.of(logger), 
mock(FlowLaunchHandler.class), sharedFlowMetricsSingleton, 
dagManagementStateStore,
         flowCompilationValidationHelper, mock(JobStatusRetriever.class));
 
-    
this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
-    this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
+    this.topologyCatalog.addListener(orchestrator);
+    this.flowCatalog.addListener(orchestrator);
     // Start application
     this.serviceLauncher.start();
     // Create Spec to play with
@@ -233,7 +244,7 @@ public class OrchestratorTest {
   // TODO: this test doesn't exercise `Orchestrator` and so belongs elsewhere 
- move it, then rework into `@BeforeMethod` init (since others depend on this)
   @Test
   public void createTopologySpec() {
-    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
+    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
 
     // List Current Specs
     Collection<Spec> specs = topologyCatalog.getSpecs();
@@ -272,7 +283,7 @@ public class OrchestratorTest {
     // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, 
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
     createTopologySpec(); // make 1 Topology with 1 SpecProducer available and 
responsible for our new FlowSpec
 
-    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
+    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
     SpecExecutor sei = 
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
 
     // List Current Specs
@@ -311,12 +322,72 @@ public class OrchestratorTest {
         "SpecProducer should contain 0 Spec after addition");
   }
 
+  /*
+     If another flow has already acquired lease for this flowspec details 
within
+     lease consolidation time, then we do not execute this flow, hence do not 
process and store the spec
+     and throw TooSoonToRerunSameFlowException
+   */
+  @Test(expectedExceptions = TooSoonToRerunSameFlowException.class)
+  public void 
onAddSpecForAdhocFlowWhenSimilarExistingFlowIsCurrentlyLaunching() throws 
IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
System.currentTimeMillis())
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName",
 FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(true);
+    orchestrator.onAddSpec(flowSpec);
+  }
+
+  /*
+   If no other flow has acquired lease within the epsilon time, then flow
+   compilation and addition to the store occurs normally
+ */
+  @Test
+  public void onAddSpecForAdhocFlowWhenNoExistingFlowIsCurrentlyLaunching() 
throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
System.currentTimeMillis())
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    
Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName",
 FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(false);
+    AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
+    Assert.assertNotNull(addSpecResponse);
+  }
+
+  /*
+    For Scheduled flow lease acquirable check does not occur,
+    and flow compilation occurs successfully
+   */
+  @Test
+  public void onAddSpecForScheduledFlow() throws IOException {
+    ConfigBuilder configBuilder = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+        .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+        .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+        .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+    Config config = configBuilder.build();
+    FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+    AddSpecResponse response = new AddSpecResponse<>(new Object());
+    Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
+    AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
+    Assert.assertNotNull(addSpecResponse);
+    // Verifying that for scheduled flow 
existsCurrentlyLaunchingExecOfSameFlow is not called
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).existsCurrentlyLaunchingExecOfSameFlow(anyString(), 
anyString(), anyLong());
+  }
+
   @Test
   public void deleteFlowSpec() throws Throwable {
     // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, 
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
     createFlowSpec(); // make 1 Flow available (for deletion herein)
 
-    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
+    IdentityFlowToJobSpecCompiler specCompiler = 
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
     SpecExecutor sei = 
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
 
     // List Current Specs
@@ -359,19 +430,19 @@ public class OrchestratorTest {
     createTopologySpec(); // for flow compilation to pass
 
     FlowId flowId = 
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
-    MetricContext metricContext = 
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSharedFlowMetricsSingleton().getMetricContext();
+    MetricContext metricContext = 
this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext();
     String metricName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowId.getFlowGroup(), flowId.getFlowName(), ServiceMetricNames.COMPILED);
 
     this.topologyCatalog.getInitComplete().countDown(); // unblock 
orchestration
 
     FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
-    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(adhocSpec, 
new Properties(), 0, false);
+    this.orchestrator.orchestrate(adhocSpec, new Properties(), 0, false);
     
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
 
     Properties scheduledProps = new Properties();
     scheduledProps.setProperty("job.schedule", "0/2 * * * * ?");
     FlowSpec scheduledSpec = createBasicFlowSpecForFlowId(flowId, 
scheduledProps);
-    
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(scheduledSpec, new 
Properties(), 0, false);
+    this.orchestrator.orchestrate(scheduledSpec, new Properties(), 0, false);
     
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
   }
 


Reply via email to