This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 079746870 [GOBBLIN-2094] verify dag action exist in deadline dag procs
(#3979)
079746870 is described below
commit 0797468702e9a931e464980f8cf2f329ab892177
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 18 14:40:52 2024 -0700
[GOBBLIN-2094] verify dag action exist in deadline dag procs (#3979)
* verify dag action exist in deadline dag procs
* fix test
* address review comment
---
.../modules/orchestration/DagActionStore.java | 2 +-
.../orchestration/DagManagementStateStore.java | 2 +-
.../modules/orchestration/DagProcFactory.java | 8 +--
.../MostlyMySqlDagManagementStateStore.java | 2 +-
.../modules/orchestration/MysqlDagActionStore.java | 46 +++++--------
.../proc/DeadlineEnforcementDagProc.java | 76 ++++++++++++++++++++++
.../proc/EnforceFlowFinishDeadlineDagProc.java | 36 ++--------
.../proc/EnforceJobStartDeadlineDagProc.java | 31 ++-------
.../MostlyMySqlDagManagementStateStoreTest.java | 3 +-
.../orchestration/MysqlDagActionStoreTest.java | 21 +++---
.../proc/EnforceDeadlineDagProcsTest.java | 54 +++++++++++++--
.../orchestration/proc/LaunchDagProcTest.java | 2 +-
12 files changed, 175 insertions(+), 108 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 54f9a65f5..57b40a1d6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -106,7 +106,7 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- boolean exists(String flowGroup, String flowName, long flowExecutionId,
String jobName, DagActionType dagActionType) throws IOException, SQLException;
+ boolean exists(String flowGroup, String flowName, long flowExecutionId,
String jobName, DagActionType dagActionType) throws IOException;
/**
* Check if an action exists in dagAction store by flow group, flow name,
and flow execution id, it assumes jobName is
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 7cfe0ff94..2367a810a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -207,7 +207,7 @@ public interface DagManagementStateStore {
* @throws IOException
*/
boolean existsJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
- DagActionStore.DagActionType dagActionType) throws IOException,
SQLException;
+ DagActionStore.DagActionType dagActionType) throws IOException;
/**
* Check if an action exists in dagAction store by flow group, flow name,
and flow execution id, it assumes jobName is
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 51f3c6ad2..0503f2d0c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -46,7 +46,7 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@Alpha
@Singleton
-public class DagProcFactory implements DagTaskVisitor<DagProc> {
+public class DagProcFactory implements DagTaskVisitor<DagProc<?>> {
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
@@ -56,16 +56,15 @@ public class DagProcFactory implements
DagTaskVisitor<DagProc> {
}
@Override
- public DagProc meet(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask) {
+ public EnforceFlowFinishDeadlineDagProc
meet(EnforceFlowFinishDeadlineDagTask enforceFlowFinishDeadlineDagTask) {
return new
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask);
}
@Override
- public DagProc meet(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
+ public EnforceJobStartDeadlineDagProc meet(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask);
}
-
@Override
public LaunchDagProc meet(LaunchDagTask launchDagTask) {
return new LaunchDagProc(launchDagTask,
this.flowCompilationValidationHelper);
@@ -85,6 +84,5 @@ public class DagProcFactory implements
DagTaskVisitor<DagProc> {
public ResumeDagProc meet(ResumeDagTask resumeDagTask) {
return new ResumeDagProc(resumeDagTask);
}
- //todo - overload meet method for other dag tasks
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 82504c729..b69f7d740 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -277,7 +277,7 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
@Override
public boolean existsJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
- DagActionStore.DagActionType dagActionType) throws IOException,
SQLException {
+ DagActionStore.DagActionType dagActionType) throws IOException {
return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
jobName, dagActionType);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 12d786966..5ba32539f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -97,26 +97,15 @@ public class MysqlDagActionStore implements DagActionStore {
}
@Override
- public boolean exists(String flowGroup, String flowName, long
flowExecutionId, String jobName, DagActionType dagActionType) throws
IOException, SQLException {
+ public boolean exists(String flowGroup, String flowName, long
flowExecutionId, String jobName, DagActionType dagActionType) throws
IOException {
return
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT,
tableName), existStatement -> {
- int i = 0;
- existStatement.setString(++i, flowGroup);
- existStatement.setString(++i, flowName);
- existStatement.setString(++i, String.valueOf(flowExecutionId));
- existStatement.setString(++i, jobName);
- existStatement.setString(++i, dagActionType.toString());
- ResultSet rs = null;
- try {
- rs = existStatement.executeQuery();
+ fillPreparedStatement(flowGroup, flowName, flowExecutionId, jobName,
dagActionType, existStatement);
+ try (ResultSet rs = existStatement.executeQuery()) {
rs.next();
return rs.getBoolean(1);
} catch (SQLException e) {
throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
new DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType), tableName), e);
- } finally {
- if (rs != null) {
- rs.close();
- }
}
}, true);
}
@@ -131,12 +120,7 @@ public class MysqlDagActionStore implements DagActionStore
{
throws IOException {
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
try {
- int i = 0;
- insertStatement.setString(++i, flowGroup);
- insertStatement.setString(++i, flowName);
- insertStatement.setString(++i, String.valueOf(flowExecutionId));
- insertStatement.setString(++i, jobName);
- insertStatement.setString(++i, dagActionType.toString());
+ fillPreparedStatement(flowGroup, flowName, flowExecutionId, jobName,
dagActionType, insertStatement);
return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
@@ -148,14 +132,9 @@ public class MysqlDagActionStore implements DagActionStore
{
public boolean deleteDagAction(DagAction dagAction) throws IOException {
return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
try {
- int i = 0;
- deleteStatement.setString(++i, dagAction.getFlowGroup());
- deleteStatement.setString(++i, dagAction.getFlowName());
- deleteStatement.setString(++i,
String.valueOf(dagAction.getFlowExecutionId()));
- deleteStatement.setString(++i, dagAction.getJobName());
- deleteStatement.setString(++i, dagAction.getDagActionType().toString());
- int result = deleteStatement.executeUpdate();
- return result != 0;
+ fillPreparedStatement(dagAction.getFlowGroup(), dagAction.getFlowName(),
dagAction.getFlowExecutionId(),
+ dagAction.getJobName(), dagAction.getDagActionType(),
deleteStatement);
+ return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
tableName), e);
@@ -202,4 +181,15 @@ public class MysqlDagActionStore implements DagActionStore
{
}
}, true);
}
+
+ private static void fillPreparedStatement(String flowGroup, String flowName,
long flowExecutionId, String jobName,
+ DagActionType dagActionType, PreparedStatement statement)
+ throws SQLException {
+ int i = 0;
+ statement.setString(++i, flowGroup);
+ statement.setString(++i, flowName);
+ statement.setString(++i, String.valueOf(flowExecutionId));
+ statement.setString(++i, jobName);
+ statement.setString(++i, dagActionType.toString());
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
new file mode 100644
index 000000000..032a26df6
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+/**
+ * An abstract implementation for {@link DagProc} that enforces deadline for
jobs.
+ */
+@Slf4j
+abstract public class DeadlineEnforcementDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public DeadlineEnforcementDagProc(DagTask dagTask) {
+ super(dagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ if (validate(dag, dagManagementStateStore)) {
+ enforceDeadline(dagManagementStateStore, dag.get());
+ }
+ }
+
+ private boolean validate(Optional<Dag<JobExecutionPlan>> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+ DagActionStore.DagAction dagAction = getDagTask().getDagAction();
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Dag not present when validating {}. It may already have
cancelled/finished. Dag {}",
+ getDagId(), dagAction);
+ return false;
+ }
+
+ if (!dagManagementStateStore.existsJobDagAction(dagAction.getFlowGroup(),
dagAction.getFlowName(),
+ dagAction.getFlowExecutionId(), dagAction.getJobName(),
dagAction.getDagActionType())) {
+ log.warn("Dag action {} is cleaned up from DMSS. No further action is
required.", dagAction);
+ return false;
+ }
+
+ return true;
+ }
+
+ abstract void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag) throws IOException;
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index 3b1ac2886..a7d388b9b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
@@ -36,51 +35,30 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
* {@link
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
*/
@Slf4j
-public class EnforceFlowFinishDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+public class EnforceFlowFinishDeadlineDagProc extends
DeadlineEnforcementDagProc {
public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask) {
super(enforceFlowFinishDeadlineDagTask);
}
- @Override
- protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag)
throws IOException {
- return dagManagementStateStore.getDag(getDagId());
- }
-
- @Override
- protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
- throws IOException {
- log.info("Request to enforce deadlines for dag {}", getDagId());
-
- if (!dag.isPresent()) {
- // todo - add a metric here
- log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
- getDagId());
- return;
- }
-
- enforceFlowFinishDeadline(dagManagementStateStore, dag);
- }
-
- private void enforceFlowFinishDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
- throws IOException {
- Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
// note that this condition should be true because the triggered dag
action has waited enough before reaching here
if (System.currentTimeMillis() > flowStartTime + flowFinishDeadline) {
- List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel =
dag.get().getNodes();
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), getDagId());
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
}
-
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
- dag.get().setMessage("Flow killed due to exceeding SLA of " +
flowFinishDeadline + " ms");
- dagManagementStateStore.checkpointDag(dag.get());
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+ dag.setMessage("Flow killed due to exceeding SLA of " +
flowFinishDeadline + " ms");
+ dagManagementStateStore.checkpointDag(dag);
} else {
log.error("EnforceFlowFinishDeadline dagAction received before due time.
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 8cf01be6b..15fa11c7e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -42,34 +42,13 @@ import static
org.apache.gobblin.service.ExecutionStatus.valueOf;
* {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
*/
@Slf4j
-public class EnforceJobStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+public class EnforceJobStartDeadlineDagProc extends DeadlineEnforcementDagProc
{
public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
super(enforceJobStartDeadlineDagTask);
}
- @Override
- protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
- throws IOException {
- return dagManagementStateStore.getDag(getDagId());
- }
-
- @Override
- protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
- throws IOException {
- log.info("Request to enforce deadlines for dag {}", getDagId());
-
- if (!dag.isPresent()) {
- // todo - add a metric here
- log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
- getDagId());
- return;
- }
-
- enforceJobStartDeadline(dagManagementStateStore, dag);
- }
-
- private void enforceJobStartDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag)
throws IOException {
Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
@@ -96,9 +75,9 @@ public class EnforceJobStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExec
DagManagerUtils.getJobName(dagNode), jobOrchestratedTime,
timeOutForJobStart);
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
-
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
- dag.get().setMessage("Flow killed because no update received for " +
timeOutForJobStart + " ms after orchestration");
- dagManagementStateStore.checkpointDag(dag.get());
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+ dag.setMessage("Flow killed because no update received for " +
timeOutForJobStart + " ms after orchestration");
+ dagManagementStateStore.checkpointDag(dag);
}
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index aaabf6a86..7cf2efcc8 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -135,7 +135,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
topologySpecMap.put(specExecURI, topologySpec);
MostlyMySqlDagManagementStateStore dagManagementStateStore =
- new MostlyMySqlDagManagementStateStore(config, null, null,
jobStatusRetriever, mock(DagActionStore.class));
+ new MostlyMySqlDagManagementStateStore(config, null, null,
jobStatusRetriever,
+
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index faf6cd28b..513001953 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -46,19 +46,12 @@ public class MysqlDagActionStoreTest {
private static final long flowExecutionId_2 = 12345678L;
private static final long flowExecutionId_3 = 12345679L;
private ITestMetastoreDatabase testDb;
- private MysqlDagActionStore mysqlDagActionStore;
+ private DagActionStore mysqlDagActionStore;
@BeforeClass
public void setUp() throws Exception {
this.testDb = TestMetastoreDatabaseFactory.get();
- Config config = ConfigBuilder.create()
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
- .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
- .build();
-
- this.mysqlDagActionStore = new MysqlDagActionStore(config);
+ this.mysqlDagActionStore = getTestDagActionStore(this.testDb);
}
@AfterClass(alwaysRun = true)
@@ -67,6 +60,16 @@ public class MysqlDagActionStoreTest {
this.testDb.close();
}
+ public static DagActionStore getTestDagActionStore(ITestMetastoreDatabase
testDb) throws Exception {
+ Config config = ConfigBuilder.create()
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+ return new MysqlDagActionStore(config);
+ }
+
@Test
public void testAddAction() throws Exception {
this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index ff05be247..41bdf3334 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -48,7 +48,6 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.JobStatus;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -79,8 +78,9 @@ public class EnforceDeadlineDagProcsTest {
String flowName = "fn";
long flowExecutionId = System.currentTimeMillis();
MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
- doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
- doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
+ DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
5, "user5", ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
@@ -94,6 +94,7 @@ public class EnforceDeadlineDagProcsTest {
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+ dagManagementStateStore.addDagAction(dagAction);
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
@@ -106,6 +107,45 @@ public class EnforceDeadlineDagProcsTest {
.filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
}
+ /*
+ This test simulate deletion of a dag action (by not adding it into DMSS).
+ Absence of dag action signals that deadline enforcement is not required and
hence the test verifies no further action
+ is taken by deadline dag proc.
+ */
+ @Test
+ public void enforceJobStartDeadlineTestWithMissingDagAction() throws
Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
+ DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
+ new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
+ enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+ int expectedNumOfDeleteDagNodeStates = 0; // no dag node because we
simulated (by not adding) missing dag action
+ Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ }
+
/*
This test simulate submitting a dag with a very short flow finish deadline
that will definitely be breached,
resulting in the dag requiring to be killed
@@ -116,8 +156,9 @@ public class EnforceDeadlineDagProcsTest {
String flowName = "fn";
long flowExecutionId = System.currentTimeMillis();
MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
- doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
- doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
+ DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
int numOfDagNodes = 5;
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
numOfDagNodes, "user5", ConfigFactory.empty()
@@ -132,10 +173,11 @@ public class EnforceDeadlineDagProcsTest {
doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
is in running state
+ dagManagementStateStore.addDagAction(dagAction);
EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new
EnforceFlowFinishDeadlineDagProc(
new EnforceFlowFinishDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
- "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, dagManagementStateStore));
+ "job0",
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null,
dagManagementStateStore));
enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
Assert.assertEquals(numOfDagNodes,
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index e57e09589..6b5e7bbdd 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -119,7 +119,7 @@ public class LaunchDagProcTest {
@Test
public void launchDagWithMultipleParallelJobs() throws IOException,
InterruptedException, URISyntaxException {
String flowGroup = "fg";
- String flowName = "fn";
+ String flowName = "fn2";
long flowExecutionId = 12345L;
Dag<JobExecutionPlan> dag =
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5",
ConfigFactory.empty()