This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e5d897edae [GOBBLIN-2173] Disallow adhoc flows sharing the same FlowId
within epsilon (#4076)
e5d897edae is described below
commit e5d897edaee391d05a55e6ac8a420e3416fef6d9
Author: vsinghal85 <[email protected]>
AuthorDate: Wed Nov 20 01:27:07 2024 +0530
[GOBBLIN-2173] Disallow adhoc flows sharing the same FlowId within epsilon
(#4076)
where epsilon is the multi-active execution/lease consolidation period
---
.../restli/FlowConfigsV2ResourceHandler.java | 4 +
.../api/TooSoonToRerunSameFlowException.java | 49 +++++++++++
.../orchestration/DagManagementStateStore.java | 9 ++
.../orchestration/InstrumentedLeaseArbiter.java | 5 ++
.../orchestration/MultiActiveLeaseArbiter.java | 11 +++
.../MySqlDagManagementStateStore.java | 12 ++-
.../MysqlMultiActiveLeaseArbiter.java | 6 ++
.../modules/orchestration/Orchestrator.java | 27 ++++++
.../MySqlDagManagementStateStoreTest.java | 26 +++++-
.../MysqlMultiActiveLeaseArbiterTest.java | 45 +++++++++-
.../modules/orchestration/OrchestratorTest.java | 95 +++++++++++++++++++---
11 files changed, 272 insertions(+), 17 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
index 927909e57d..055724d83d 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java
@@ -60,6 +60,7 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -256,6 +257,9 @@ public class FlowConfigsV2ResourceHandler implements
FlowConfigsResourceHandlerI
responseMap = this.flowCatalog.put(flowSpec, true);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE,
e.getMessage());
+ } catch(TooSoonToRerunSameFlowException e) {
+ return new CreateKVResponse<>(new
RestLiServiceException(HttpStatus.S_409_CONFLICT,
+ "FlowSpec with URI " + flowSpec.getUri() + " was previously launched
within the lease consolidation period, no action will be taken"));
} catch (Throwable e) {
// TODO: Compilation errors should fall under throwable exceptions as
well instead of checking for strings
log.warn(String.format("Failed to add flow configuration %s.%s to
catalog due to", flowConfig.getId().getFlowGroup(),
flowConfig.getId().getFlowName()), e);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java
new file mode 100644
index 0000000000..f718ec4a98
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+
+/**
+ * An exception thrown when another {@link FlowSpec} with same flow name and
flow group
+ * is submitted within lease consolidation time.
+ */
+public class TooSoonToRerunSameFlowException extends RuntimeException {
+ @Getter
+ private final FlowSpec flowSpec;
+
+ /**
+ * Account for unwrapping within @{link FlowCatalog#updateOrAddSpecHelper}`s
`CallbackResult` error handling for `SpecCatalogListener`s
+ * @return `TooSoonToRerunSameFlowException` wrapped in another
`TooSoonToRerunSameFlowException
+ */
+ public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec flowSpec)
{
+ return new TooSoonToRerunSameFlowException(flowSpec, new
TooSoonToRerunSameFlowException(flowSpec));
+ }
+
+ public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
+ super("Lease already occupied by another recent execution of this flow: "
+ flowSpec);
+ this.flowSpec = flowSpec;
+ }
+
+ /** restricted-access ctor: use {@link #wrappedOnce(FlowSpec)} instead */
+ private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable cause) {
+ super("Lease already occupied by another recent execution of this flow: "
+ flowSpec, cause);
+ this.flowSpec = flowSpec;
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index fb7b23fdf0..8059eab4e1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -103,6 +103,15 @@ public interface DagManagementStateStore {
*/
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
+ /**
+ * Returns true if a flow has been launched recently with same flow name and
flow group.
+ * @param flowGroup flow group for the flow
+ * @param flowName flow name for the flow
+ * @param flowExecutionId flow execution for the flow
+ * @throws IOException
+ */
+ boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup, String
flowName, long flowExecutionId) throws IOException;
+
/**
* Returns the requested {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link
JobStatus}.
* Both params are returned as optional and are empty if not present in the
store.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index 9e1c270c49..746ab66237 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -90,6 +90,11 @@ public class InstrumentedLeaseArbiter implements
MultiActiveLeaseArbiter {
throw new RuntimeException(String.format("Unexpected LeaseAttemptStatus
(%s) for %s", leaseAttemptStatus.getClass().getName(), leaseParams));
}
+ @Override
+ public boolean
existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams
leaseParams) throws IOException {
+ return
decoratedMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams);
+ }
+
@Override
public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus
status)
throws IOException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index c9a3b152bf..f580e936a5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams,
boolean adoptConsensusFlowExecutionId)
throws IOException;
+ /**
+ * This method checks if entry for same flow name and flow group exists
within the lease consolidation period
+ * returns true if entry for the same flow exists within Lease Consolidation
Period (aka. epsilon)
+ * else returns false
+ * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action
+ * was triggered, and if the dag action event we're
checking on is a reminder event
+ * @return true if lease for a recently launched flow already exists for the
flow details in leaseParams
+ */
+ boolean
existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams
leaseParams)
+ throws IOException;
+
/**
* This method is used to indicate the owner of the lease has successfully
completed required actions while holding
* the lease of the dag action event. It marks the lease as "no longer
leasing", if the eventTimeMillis and
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index 29e652cce8..b14d6bc85c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -65,6 +65,7 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
// todo - these two stores should merge
private DagStateStoreWithDagNodes dagStateStore;
private DagStateStoreWithDagNodes failedDagStateStore;
+ private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
private final JobStatusRetriever jobStatusRetriever;
private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
@@ -79,13 +80,14 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
@Inject
public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog,
UserQuotaManager userQuotaManager,
- JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
+ JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore,
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
this.quotaManager = userQuotaManager;
this.config = config;
this.flowCatalog = flowCatalog;
this.jobStatusRetriever = jobStatusRetriever;
this.dagManagerMetrics.activate();
this.dagActionStore = dagActionStore;
+ this.multiActiveLeaseArbiter = multiActiveLeaseArbiter;
}
// It should be called after topology spec map is set
@@ -168,6 +170,14 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
this.dagStateStore.updateDagNode(dagNode);
}
+ @Override
+ public boolean existsCurrentlyLaunchingExecOfSameFlow(String flowGroup,
String flowName, long flowExecutionId) throws IOException {
+ DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+ flowExecutionId, DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+ return
multiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams);
+ }
+
@Override
public Optional<Dag<JobExecutionPlan>> getDag(Dag.DagId dagId) throws
IOException {
return Optional.ofNullable(this.dagStateStore.getDag(dagId));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 4811279048..fed800c838 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -362,6 +362,12 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
}
+ @Override
+ public boolean
existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams
leaseParams) throws IOException {
+ Optional<GetEventInfoResult> infoResult =
getExistingEventInfo(leaseParams);
+ return infoResult.isPresent() ? infoResult.get().isWithinEpsilon() : false;
+ }
+
/**
* Checks leaseArbiterTable for an existing entry for this dag action and
event time
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ae053ab51b..f0a9fdd43d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.modules.flow.FlowUtils;
@@ -78,6 +79,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
protected final SpecCompiler specCompiler;
protected final TopologyCatalog topologyCatalog;
private final JobStatusRetriever jobStatusRetriever;
+ private final DagManagementStateStore dagManagementStateStore;
protected final MetricContext metricContext;
@@ -100,6 +102,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.topologyCatalog = topologyCatalog;
this.flowLaunchHandler = flowLaunchHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
+ this.dagManagementStateStore = dagManagementStateStore;
this.jobStatusRetriever = jobStatusRetriever;
this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
// todo remove the need to set topology factory outside of constructor
GOBBLIN-2056
@@ -125,6 +128,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
_log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
} else if (addedSpec instanceof FlowSpec) {
+ enforceNoRecentAdhocExecOfSameFlow((FlowSpec) addedSpec);
_log.info("Orchestrator - onAdd[Flow]Spec: " + addedSpec);
return this.specCompiler.onAddSpec(addedSpec);
} else {
@@ -133,6 +137,29 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
return new AddSpecResponse<>(null);
}
+ /*
+ enforces that a similar adhoc flow is not launching,
+ else throw {@link TooSoonToRerunSameFlowException}
+ */
+ private void enforceNoRecentAdhocExecOfSameFlow(FlowSpec flowSpec) {
+ if (!flowSpec.isScheduled()) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+ _log.info("Checking existing adhoc flow entry for " + flowGroup + "." +
flowName);
+ try {
+ if
(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup,
flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
+ _log.warn("Another recent adhoc flow execution found for " +
flowGroup + "." + flowName);
+ throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
+ }
+ } catch (IOException exception) {
+ _log.error("Unable to check whether similar flow exists " + flowGroup
+ "." + flowName);
+ throw new RuntimeException("Unable to check whether similar flow
exists " + flowGroup + "." + flowName, exception);
+ }
+ }
+ }
+
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
index c14a7b6238..6dbbd0ba8b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -47,8 +48,7 @@ import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.CompletedFuture;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -59,6 +59,7 @@ import static org.mockito.Mockito.mock;
public class MySqlDagManagementStateStoreTest {
private ITestMetastoreDatabase testDb;
+ private static MultiActiveLeaseArbiter leaseArbiter;
private MySqlDagManagementStateStore dagManagementStateStore;
private static final String TEST_USER = "testUser";
public static final String TEST_PASSWORD = "testPassword";
@@ -68,6 +69,7 @@ public class MySqlDagManagementStateStoreTest {
@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
+ this.leaseArbiter = mock(MultiActiveLeaseArbiter.class);
this.testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore = getDummyDMSS(this.testDb);
}
@@ -92,6 +94,22 @@ public class MySqlDagManagementStateStoreTest {
return true;
}
+ @Test
+ public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws
Exception{
+
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+ String flowName = "testFlow";
+ String flowGroup = "testGroup";
+
Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup,
flowName, System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testExistsCurrentlyLaunchingSimilarFlowGivesFalse() throws
Exception{
+
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+ String flowName = "testFlow";
+ String flowGroup = "testGroup";
+
Assert.assertFalse(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow(flowGroup,
flowName, System.currentTimeMillis()));
+ }
+
@Test
public void testAddDag() throws Exception {
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
@@ -150,9 +168,11 @@ public class MySqlDagManagementStateStoreTest {
TopologySpec topologySpec =
LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
topologySpecMap.put(specExecURI, topologySpec);
+ MultiActiveLeaseArbiter multiActiveLeaseArbiter =
Mockito.mock(MultiActiveLeaseArbiter.class);
+ leaseArbiter = multiActiveLeaseArbiter;
MySqlDagManagementStateStore dagManagementStateStore =
new MySqlDagManagementStateStore(config, null, null,
jobStatusRetriever,
-
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
+
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase),
multiActiveLeaseArbiter);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 9b132fe0d9..9ba8f40c6b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.util.ExponentialBackoff;
public class MysqlMultiActiveLeaseArbiterTest {
private static final long EPSILON = 10000L;
private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1);
+ private static final long LESS_THAN_EPSILON = (long) (EPSILON * 0.90);
// NOTE: `sleep`ing this long SIGNIFICANTLY slows tests, but we need a large
enough value that exec. variability won't cause spurious failure
private static final long LINGER = 20000L;
private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1);
@@ -53,9 +54,12 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String CONSTANTS_TABLE = "constants_store";
private static final String flowGroup = "testFlowGroup";
private static final String flowGroup2 = "testFlowGroup2";
+ private static final String flowGroup3 = "testFlowGroup3";
+ private static final String flowGroup4 = "testFlowGroup4";
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
- private static final long flowExecutionId = 12345677L;
+ private static final long flowExecutionId = 12345677213L;
+ private static final long flowExecutionId1 = 12345996546L;
private static final long eventTimeMillis = 1710451837L;
// Dag actions with the same flow info but different flow action types are
considered unique
private static final DagActionStore.DagAction launchDagAction =
@@ -70,6 +74,18 @@ public class MysqlMultiActiveLeaseArbiterTest {
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2,
false, eventTimeMillis);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams3 = new DagActionStore.LeaseParams(new
DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName,
+ DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams3_similar = new DagActionStore.LeaseParams(new
DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName,
+ DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams4 = new DagActionStore.LeaseParams(new
DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName,
+ DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams4_similar = new DagActionStore.LeaseParams(new
DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName,
+ DagActionStore.DagActionType.LAUNCH), false, eventTimeMillis);
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private ITestMetastoreDatabase testDb;
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
@@ -201,6 +217,33 @@ public class MysqlMultiActiveLeaseArbiterTest {
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
+ /*
+ test to verify if leasable entity is unavailable before epsilon time
+ to account for clock drift
+ */
+ @Test
+ public void testExistsSimilarLeaseWithinConsolidationPeriod() throws
Exception{
+ LeaseAttemptStatus firstLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
+ Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ completeLeaseHelper(launchLeaseParams3);
+ Thread.sleep(LESS_THAN_EPSILON);
+
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams3_similar));
+ }
+
+ /*
+ test to verify if leasable entity exists post epsilon time
+ */
+ @Test
+ public void testDoesNotExistsSimilarLeaseWithinConsolidationPeriod() throws
Exception{
+ LeaseAttemptStatus firstLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true);
+ Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ completeLeaseHelper(launchLeaseParams4);
+ Thread.sleep(MORE_THAN_EPSILON);
+
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams4_similar));
+ }
+
/*
Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no
row matches the primary key in the table.
If such a row does exist, the method should disregard the resulting SQL
error and return 0 rows updated, indicating
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index ee5f14cb87..acfa6c51ca 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -18,13 +18,20 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.service.modules.flow.FlowUtils;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -63,6 +70,7 @@ import
org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -85,7 +93,9 @@ public class OrchestratorTest {
private FlowCatalog flowCatalog;
private FlowSpec flowSpec;
private ITestMetastoreDatabase testMetastoreDatabase;
- private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
+ private Orchestrator orchestrator;
+ private DagManagementStateStore dagManagementStateStore;
+ private SpecCompiler specCompiler;
@BeforeClass
public void setUpClass() throws Exception {
@@ -107,7 +117,7 @@ public class OrchestratorTest {
flowProperties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
this.serviceLauncher = new ServiceBasedAppLauncher(orchestratorProperties,
"OrchestratorCatalogTest");
-
+ this.specCompiler = Mockito.mock(SpecCompiler.class);
this.topologyCatalog = new
TopologyCatalog(ConfigUtils.propertiesToConfig(topologyProperties),
Optional.of(logger));
this.serviceLauncher.addService(topologyCatalog);
@@ -116,20 +126,21 @@ public class OrchestratorTest {
this.flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties),
Optional.of(logger), Optional.absent(), true);
this.serviceLauncher.addService(flowCatalog);
-
+ MultiActiveLeaseArbiter leaseArbiter =
Mockito.mock(MultiActiveLeaseArbiter.class);
MySqlDagManagementStateStore dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ this.dagManagementStateStore = dagManagementStateStore;
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(ConfigFactory.empty(),
sharedFlowMetricsSingleton, mock(UserQuotaManager.class),
dagManagementStateStore);
- this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+ this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, Optional.of(logger),
mock(FlowLaunchHandler.class), sharedFlowMetricsSingleton,
dagManagementStateStore,
flowCompilationValidationHelper, mock(JobStatusRetriever.class));
-
this.topologyCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
- this.flowCatalog.addListener(dagMgrNotFlowLaunchHandlerBasedOrchestrator);
+ this.topologyCatalog.addListener(orchestrator);
+ this.flowCatalog.addListener(orchestrator);
// Start application
this.serviceLauncher.start();
// Create Spec to play with
@@ -233,7 +244,7 @@ public class OrchestratorTest {
// TODO: this test doesn't exercise `Orchestrator` and so belongs elsewhere
- move it, then rework into `@BeforeMethod` init (since others depend on this)
@Test
public void createTopologySpec() {
- IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler)
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
+ IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
// List Current Specs
Collection<Spec> specs = topologyCatalog.getSpecs();
@@ -272,7 +283,7 @@ public class OrchestratorTest {
// TODO: fix this lingering inter-test dep from when `@BeforeClass` init,
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
createTopologySpec(); // make 1 Topology with 1 SpecProducer available and
responsible for our new FlowSpec
- IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler)
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
+ IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
SpecExecutor sei =
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
// List Current Specs
@@ -311,12 +322,72 @@ public class OrchestratorTest {
"SpecProducer should contain 0 Spec after addition");
}
+ /*
+ If another flow has already acquired lease for this flowspec details
within
+ lease consolidation time, then we do not execute this flow, hence do not
process and store the spec
+ and throw TooSoonToRerunSameFlowException
+ */
+ @Test(expectedExceptions = TooSoonToRerunSameFlowException.class)
+ public void
onAddSpecForAdhocFlowWhenSimilarExistingFlowIsCurrentlyLaunching() throws
IOException {
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+ .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
System.currentTimeMillis())
+ .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+ .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+
Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName",
FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(true);
+ orchestrator.onAddSpec(flowSpec);
+ }
+
+ /*
+ If no other flow has acquired lease within the epsilon time, then flow
+ compilation and addition to the store occurs normally
+ */
+ @Test
+ public void onAddSpecForAdhocFlowWhenNoExistingFlowIsCurrentlyLaunching()
throws IOException {
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+ .addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
System.currentTimeMillis())
+ .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+ .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+
Mockito.when(dagManagementStateStore.existsCurrentlyLaunchingExecOfSameFlow("testGroup","testName",
FlowUtils.getOrCreateFlowExecutionId(flowSpec))).thenReturn(false);
+ AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
+ Assert.assertNotNull(addSpecResponse);
+ }
+
+ /*
+ For Scheduled flow lease acquirable check does not occur,
+ and flow compilation occurs successfully
+ */
+ @Test
+ public void onAddSpecForScheduledFlow() throws IOException {
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+ .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+ .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+ AddSpecResponse response = new AddSpecResponse<>(new Object());
+ Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
+ AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
+ Assert.assertNotNull(addSpecResponse);
+ // Verifying that for scheduled flow
existsCurrentlyLaunchingExecOfSameFlow is not called
+ Mockito.verify(dagManagementStateStore,
Mockito.never()).existsCurrentlyLaunchingExecOfSameFlow(anyString(),
anyString(), anyLong());
+ }
+
@Test
public void deleteFlowSpec() throws Throwable {
// TODO: fix this lingering inter-test dep from when `@BeforeClass` init,
which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
createFlowSpec(); // make 1 Flow available (for deletion herein)
- IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler)
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSpecCompiler();
+ IdentityFlowToJobSpecCompiler specCompiler =
(IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
SpecExecutor sei =
specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor();
// List Current Specs
@@ -359,19 +430,19 @@ public class OrchestratorTest {
createTopologySpec(); // for flow compilation to pass
FlowId flowId =
GobblinServiceManagerTest.createFlowIdWithUniqueName(TEST_FLOW_GROUP_NAME);
- MetricContext metricContext =
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.getSharedFlowMetricsSingleton().getMetricContext();
+ MetricContext metricContext =
this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext();
String metricName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowId.getFlowGroup(), flowId.getFlowName(), ServiceMetricNames.COMPILED);
this.topologyCatalog.getInitComplete().countDown(); // unblock
orchestration
FlowSpec adhocSpec = createBasicFlowSpecForFlowId(flowId);
- this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(adhocSpec,
new Properties(), 0, false);
+ this.orchestrator.orchestrate(adhocSpec, new Properties(), 0, false);
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
Properties scheduledProps = new Properties();
scheduledProps.setProperty("job.schedule", "0/2 * * * * ?");
FlowSpec scheduledSpec = createBasicFlowSpecForFlowId(flowId,
scheduledProps);
-
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator.orchestrate(scheduledSpec, new
Properties(), 0, false);
+ this.orchestrator.orchestrate(scheduledSpec, new Properties(), 0, false);
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
}