This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 079746870 [GOBBLIN-2094] verify dag action exist in deadline dag procs 
(#3979)
079746870 is described below

commit 0797468702e9a931e464980f8cf2f329ab892177
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 18 14:40:52 2024 -0700

    [GOBBLIN-2094] verify dag action exist in deadline dag procs (#3979)
    
    * verify dag action exist in deadline dag procs
    * fix test
    * address review comment
---
 .../modules/orchestration/DagActionStore.java      |  2 +-
 .../orchestration/DagManagementStateStore.java     |  2 +-
 .../modules/orchestration/DagProcFactory.java      |  8 +--
 .../MostlyMySqlDagManagementStateStore.java        |  2 +-
 .../modules/orchestration/MysqlDagActionStore.java | 46 +++++--------
 .../proc/DeadlineEnforcementDagProc.java           | 76 ++++++++++++++++++++++
 .../proc/EnforceFlowFinishDeadlineDagProc.java     | 36 ++--------
 .../proc/EnforceJobStartDeadlineDagProc.java       | 31 ++-------
 .../MostlyMySqlDagManagementStateStoreTest.java    |  3 +-
 .../orchestration/MysqlDagActionStoreTest.java     | 21 +++---
 .../proc/EnforceDeadlineDagProcsTest.java          | 54 +++++++++++++--
 .../orchestration/proc/LaunchDagProcTest.java      |  2 +-
 12 files changed, 175 insertions(+), 108 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 54f9a65f5..57b40a1d6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -106,7 +106,7 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  boolean exists(String flowGroup, String flowName, long flowExecutionId, 
String jobName, DagActionType dagActionType) throws IOException, SQLException;
+  boolean exists(String flowGroup, String flowName, long flowExecutionId, 
String jobName, DagActionType dagActionType) throws IOException;
 
   /**
    * Check if an action exists in dagAction store by flow group, flow name, 
and flow execution id, it assumes jobName is
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 7cfe0ff94..2367a810a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -207,7 +207,7 @@ public interface DagManagementStateStore {
    * @throws IOException
    */
   boolean existsJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
-      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException;
+      DagActionStore.DagActionType dagActionType) throws IOException;
 
   /**
    * Check if an action exists in dagAction store by flow group, flow name, 
and flow execution id, it assumes jobName is
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 51f3c6ad2..0503f2d0c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -46,7 +46,7 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 @Alpha
 @Singleton
-public class DagProcFactory implements DagTaskVisitor<DagProc> {
+public class DagProcFactory implements DagTaskVisitor<DagProc<?>> {
 
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
 
@@ -56,16 +56,15 @@ public class DagProcFactory implements 
DagTaskVisitor<DagProc> {
   }
 
   @Override
-  public DagProc meet(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
+  public EnforceFlowFinishDeadlineDagProc 
meet(EnforceFlowFinishDeadlineDagTask enforceFlowFinishDeadlineDagTask) {
     return new 
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask);
   }
 
   @Override
-  public DagProc meet(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+  public EnforceJobStartDeadlineDagProc meet(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
     return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask);
   }
 
-
   @Override
   public LaunchDagProc meet(LaunchDagTask launchDagTask) {
     return new LaunchDagProc(launchDagTask, 
this.flowCompilationValidationHelper);
@@ -85,6 +84,5 @@ public class DagProcFactory implements 
DagTaskVisitor<DagProc> {
   public ResumeDagProc meet(ResumeDagTask resumeDagTask) {
     return new ResumeDagProc(resumeDagTask);
   }
-  //todo - overload meet method for other dag tasks
 }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 82504c729..b69f7d740 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -277,7 +277,7 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
 
   @Override
   public boolean existsJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
-      DagActionStore.DagActionType dagActionType) throws IOException, 
SQLException {
+      DagActionStore.DagActionType dagActionType) throws IOException {
     return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 12d786966..5ba32539f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -97,26 +97,15 @@ public class MysqlDagActionStore implements DagActionStore {
   }
 
   @Override
-  public boolean exists(String flowGroup, String flowName, long 
flowExecutionId, String jobName, DagActionType dagActionType) throws 
IOException, SQLException {
+  public boolean exists(String flowGroup, String flowName, long 
flowExecutionId, String jobName, DagActionType dagActionType) throws 
IOException {
     return 
dbStatementExecutor.withPreparedStatement(String.format(EXISTS_STATEMENT, 
tableName), existStatement -> {
-      int i = 0;
-      existStatement.setString(++i, flowGroup);
-      existStatement.setString(++i, flowName);
-      existStatement.setString(++i, String.valueOf(flowExecutionId));
-      existStatement.setString(++i, jobName);
-      existStatement.setString(++i, dagActionType.toString());
-      ResultSet rs = null;
-      try {
-        rs = existStatement.executeQuery();
+      fillPreparedStatement(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType, existStatement);
+      try (ResultSet rs = existStatement.executeQuery()) {
         rs.next();
         return rs.getBoolean(1);
       } catch (SQLException e) {
         throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
             new DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType), tableName), e);
-      } finally {
-        if (rs != null) {
-          rs.close();
-        }
       }
     }, true);
   }
@@ -131,12 +120,7 @@ public class MysqlDagActionStore implements DagActionStore 
{
       throws IOException {
     dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
     try {
-      int i = 0;
-      insertStatement.setString(++i, flowGroup);
-      insertStatement.setString(++i, flowName);
-      insertStatement.setString(++i, String.valueOf(flowExecutionId));
-      insertStatement.setString(++i, jobName);
-      insertStatement.setString(++i, dagActionType.toString());
+      fillPreparedStatement(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType, insertStatement);
       return insertStatement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
@@ -148,14 +132,9 @@ public class MysqlDagActionStore implements DagActionStore 
{
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
     return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
     try {
-      int i = 0;
-      deleteStatement.setString(++i, dagAction.getFlowGroup());
-      deleteStatement.setString(++i, dagAction.getFlowName());
-      deleteStatement.setString(++i, 
String.valueOf(dagAction.getFlowExecutionId()));
-      deleteStatement.setString(++i, dagAction.getJobName());
-      deleteStatement.setString(++i, dagAction.getDagActionType().toString());
-      int result = deleteStatement.executeUpdate();
-      return result != 0;
+      fillPreparedStatement(dagAction.getFlowGroup(), dagAction.getFlowName(), 
dagAction.getFlowExecutionId(),
+          dagAction.getJobName(), dagAction.getDagActionType(), 
deleteStatement);
+      return deleteStatement.executeUpdate() != 0;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
           tableName), e);
@@ -202,4 +181,15 @@ public class MysqlDagActionStore implements DagActionStore 
{
       }
     }, true);
   }
+
+  private static void fillPreparedStatement(String flowGroup, String flowName, 
long flowExecutionId, String jobName,
+      DagActionType dagActionType, PreparedStatement statement)
+      throws SQLException {
+    int i = 0;
+    statement.setString(++i, flowGroup);
+    statement.setString(++i, flowName);
+    statement.setString(++i, String.valueOf(flowExecutionId));
+    statement.setString(++i, jobName);
+    statement.setString(++i, dagActionType.toString());
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
new file mode 100644
index 000000000..032a26df6
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+/**
+ * An abstract implementation for {@link DagProc} that enforces deadline for 
jobs.
+ */
+@Slf4j
+abstract public class DeadlineEnforcementDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public DeadlineEnforcementDagProc(DagTask dagTask) {
+    super(dagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+    return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    if (validate(dag, dagManagementStateStore)) {
+      enforceDeadline(dagManagementStateStore, dag.get());
+    }
+  }
+
+  private boolean validate(Optional<Dag<JobExecutionPlan>> dag, 
DagManagementStateStore dagManagementStateStore) throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+    DagActionStore.DagAction dagAction = getDagTask().getDagAction();
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Dag not present when validating {}. It may already have 
cancelled/finished. Dag {}",
+          getDagId(), dagAction);
+      return false;
+    }
+
+    if (!dagManagementStateStore.existsJobDagAction(dagAction.getFlowGroup(), 
dagAction.getFlowName(),
+        dagAction.getFlowExecutionId(), dagAction.getJobName(), 
dagAction.getDagActionType())) {
+      log.warn("Dag action {} is cleaned up from DMSS. No further action is 
required.", dagAction);
+      return false;
+    }
+
+    return true;
+  }
+
+  abstract void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag) throws IOException;
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index 3b1ac2886..a7d388b9b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -36,51 +35,30 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
  * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
  */
 @Slf4j
-public class EnforceFlowFinishDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+public class EnforceFlowFinishDeadlineDagProc extends 
DeadlineEnforcementDagProc {
 
   public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
     super(enforceFlowFinishDeadlineDagTask);
   }
 
-  @Override
-  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+  protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag)
       throws IOException {
-   return dagManagementStateStore.getDag(getDagId());
-  }
-
-  @Override
-  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
-      throws IOException {
-    log.info("Request to enforce deadlines for dag {}", getDagId());
-
-    if (!dag.isPresent()) {
-      // todo - add a metric here
-      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
-          getDagId());
-      return;
-    }
-
-    enforceFlowFinishDeadline(dagManagementStateStore, dag);
-  }
-
-  private void enforceFlowFinishDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
-      throws IOException {
-    Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
     long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
     long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
 
     // note that this condition should be true because the triggered dag 
action has waited enough before reaching here
     if (System.currentTimeMillis() > flowStartTime + flowFinishDeadline) {
-      List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = 
dag.get().getNodes();
+      List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
       log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), getDagId());
 
       for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
         DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
       }
 
-      
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
-      dag.get().setMessage("Flow killed due to exceeding SLA of " + 
flowFinishDeadline + " ms");
-      dagManagementStateStore.checkpointDag(dag.get());
+      dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+      dag.setMessage("Flow killed due to exceeding SLA of " + 
flowFinishDeadline + " ms");
+      dagManagementStateStore.checkpointDag(dag);
     } else {
       log.error("EnforceFlowFinishDeadline dagAction received before due time. 
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 8cf01be6b..15fa11c7e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -42,34 +42,13 @@ import static 
org.apache.gobblin.service.ExecutionStatus.valueOf;
  * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
  */
 @Slf4j
-public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+public class EnforceJobStartDeadlineDagProc extends DeadlineEnforcementDagProc 
{
 
   public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
     super(enforceJobStartDeadlineDagTask);
   }
 
-  @Override
-  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
-      throws IOException {
-   return dagManagementStateStore.getDag(getDagId());
-  }
-
-  @Override
-  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
-      throws IOException {
-    log.info("Request to enforce deadlines for dag {}", getDagId());
-
-    if (!dag.isPresent()) {
-      // todo - add a metric here
-      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
-          getDagId());
-      return;
-    }
-
-    enforceJobStartDeadline(dagManagementStateStore, dag);
-  }
-
-  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+  protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag)
       throws IOException {
     Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
         dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
@@ -96,9 +75,9 @@ public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExec
           DagManagerUtils.getJobName(dagNode), jobOrchestratedTime, 
timeOutForJobStart);
       
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
       DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
-      
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
-      dag.get().setMessage("Flow killed because no update received for " + 
timeOutForJobStart + " ms after orchestration");
-      dagManagementStateStore.checkpointDag(dag.get());
+      dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+      dag.setMessage("Flow killed because no update received for " + 
timeOutForJobStart + " ms after orchestration");
+      dagManagementStateStore.checkpointDag(dag);
     }
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index aaabf6a86..7cf2efcc8 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -135,7 +135,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
     URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
     topologySpecMap.put(specExecURI, topologySpec);
     MostlyMySqlDagManagementStateStore dagManagementStateStore =
-        new MostlyMySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever, mock(DagActionStore.class));
+        new MostlyMySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever,
+            
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     return dagManagementStateStore;
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index faf6cd28b..513001953 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -46,19 +46,12 @@ public class MysqlDagActionStoreTest {
   private static final long flowExecutionId_2 = 12345678L;
   private static final long flowExecutionId_3 = 12345679L;
   private ITestMetastoreDatabase testDb;
-  private MysqlDagActionStore mysqlDagActionStore;
+  private DagActionStore mysqlDagActionStore;
 
   @BeforeClass
   public void setUp() throws Exception {
     this.testDb = TestMetastoreDatabaseFactory.get();
-    Config config = ConfigBuilder.create()
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
-        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
-        .build();
-
-    this.mysqlDagActionStore = new MysqlDagActionStore(config);
+    this.mysqlDagActionStore = getTestDagActionStore(this.testDb);
   }
 
   @AfterClass(alwaysRun = true)
@@ -67,6 +60,16 @@ public class MysqlDagActionStoreTest {
     this.testDb.close();
   }
 
+  public static DagActionStore getTestDagActionStore(ITestMetastoreDatabase 
testDb) throws Exception {
+    Config config = ConfigBuilder.create()
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+    return new MysqlDagActionStore(config);
+  }
+
   @Test
   public void testAddAction() throws Exception {
     this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index ff05be247..41bdf3334 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -48,7 +48,6 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
@@ -79,8 +78,9 @@ public class EnforceDeadlineDagProcsTest {
     String flowName = "fn";
     long flowExecutionId = System.currentTimeMillis();
     MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
-    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
-    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
+        DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
         5, "user5", ConfigFactory.empty()
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
@@ -94,6 +94,7 @@ public class EnforceDeadlineDagProcsTest {
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
     dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+    dagManagementStateStore.addDagAction(dagAction);
 
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
         new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
@@ -106,6 +107,45 @@ public class EnforceDeadlineDagProcsTest {
             .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
   }
 
+  /*
+  This test simulate deletion of a dag action (by not adding it into DMSS).
+  Absence of dag action signals that deadline enforcement is not required and 
hence the test verifies no further action
+  is taken by deadline dag proc.
+   */
+  @Test
+  public void enforceJobStartDeadlineTestWithMissingDagAction() throws 
Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
+        DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                
MostlyMySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
+        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 0; // no dag node because we 
simulated (by not adding) missing dag action
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+
   /*
    This test simulate submitting a dag with a very short flow finish deadline 
that will definitely be breached,
     resulting in the dag requiring to be killed
@@ -116,8 +156,9 @@ public class EnforceDeadlineDagProcsTest {
     String flowName = "fn";
     long flowExecutionId = System.currentTimeMillis();
     MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
-    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
-    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    LaunchDagProcTest.mockDMSSCommonBehavior(dagManagementStateStore);
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "job0",
+        DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
     int numOfDagNodes = 5;
     Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
         numOfDagNodes, "user5", ConfigFactory.empty()
@@ -132,10 +173,11 @@ public class EnforceDeadlineDagProcsTest {
     doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
     doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
     dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
is in running state
+    dagManagementStateStore.addDagAction(dagAction);
 
     EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new 
EnforceFlowFinishDeadlineDagProc(
         new EnforceFlowFinishDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
-            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, dagManagementStateStore));
+            "job0", 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null, 
dagManagementStateStore));
     enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
 
     Assert.assertEquals(numOfDagNodes,
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index e57e09589..6b5e7bbdd 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -119,7 +119,7 @@ public class LaunchDagProcTest {
   @Test
   public void launchDagWithMultipleParallelJobs() throws IOException, 
InterruptedException, URISyntaxException {
     String flowGroup = "fg";
-    String flowName = "fn";
+    String flowName = "fn2";
     long flowExecutionId = 12345L;
     Dag<JobExecutionPlan> dag = 
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
         DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5", 
ConfigFactory.empty()

Reply via email to