[ 
https://issues.apache.org/jira/browse/GOBBLIN-2069?focusedWorklogId=920478&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-920478
 ]

ASF GitHub Bot logged work on GOBBLIN-2069:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/May/24 18:46
            Start Date: 22/May/24 18:46
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1610441614


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +151,37 @@ public DagTask next() {
       }
   }
 
+  private void createJobStartDeadlineTrigger(DagActionStore.DagAction 
dagAction) throws SchedulerException, IOException {
+    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
+        dagAction.getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    // todo - this timestamp is just an approximation, the real job submission 
has happened in past, and that is when a

Review Comment:
   is this approximation okay for us or you plan on updating this in the future 
to use the actual SLA deadline from when job submission occurred? 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag 
does not finish in
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
+    super(enforceFlowFinishDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceFlowFinishDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceFlowFinishDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+    long flowSla = DagManagerUtils.getFlowSLA(dagNode);
+    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+    // note that this condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (System.currentTimeMillis() > flowStartTime + flowSla) {
+      log.info("Dag {} exceeded the SLA of {} ms. Killing it now...", 
getDagId(), flowSla);
+      List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = 
dag.get().getNodes();
+      log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), getDagId());

Review Comment:
   can we consolidate these logs into one so easier to track. Let's also make 
the message something easily searchable
   
   `Dag exceeded Flow Finish SLA. Killing all jobs associated with it. Dag: {} 
SLA {} Num nodes to kill {} DagID: {}` then the info is all in one log that's 
easier to search.



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  /*
+    This test simulate submitting a dag with a very little job start deadline 
and then verifies that the starting job is
+     killed because not being able to start in the deadline time.
+   */
+  @Test
+  public void enforceJobStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
+        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+
+  /*
+   This test simulate submitting a dag with a very little flow finish deadline 
and then verifies that all of its job are

Review Comment:
   see above



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -124,9 +131,18 @@ public DagTask next() {
       while (true) {
         try {
           DagActionStore.DagAction dagAction = this.dagActionQueue.take();
-          LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagAction);
-          if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
-            return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+          // create triggers for original (non-reminder) dag actions of type 
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE
+          // reminder triggers are used to inform hosts once the deadline for 
ENFORCE_JOB_START and ENFORCE_FLOW_FINISH passed

Review Comment:
   missing "_DEADLINE" also nit: let's use a multi-line comment /*....*/



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -86,18 +88,18 @@ public static class ReminderJob implements Job {
     @Override
     public void execute(JobExecutionContext context) {
       // Get properties from the trigger to create a dagAction
-      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      JobDataMap jobDataMap = context.getMergedJobDataMap();
       String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
       String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      String flowExecutionId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
 
       log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
-          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+          + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName 
+ ", dagActionType: " + dagActionType + ")");
 
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
-          dagActionType);
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType);
+      //dagAction.setReminder(true);

Review Comment:
   actually we have a better method:
   pass a flag to `createReminderJobTrigger` when the reminder is created to 
denote whether it's actually a reminder or not so for deadline triggers the 
first time we are created we don't set this value then for any subsequent 
attempt we set reminder=True. when we create reminder for all other actions 
types we set reminder=True



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -86,18 +88,18 @@ public static class ReminderJob implements Job {
     @Override
     public void execute(JobExecutionContext context) {
       // Get properties from the trigger to create a dagAction
-      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      JobDataMap jobDataMap = context.getMergedJobDataMap();
       String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
       String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      String flowExecutionId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
 
       log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
-          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+          + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName 
+ ", dagActionType: " + dagActionType + ")");
 
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
-          dagActionType);
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType);
+      //dagAction.setReminder(true);

Review Comment:
   do we not always want to set the reminder flag for actions coming from the 
trigger (potentially other than the deadline actions depending on how we decide 
to implement)
   
   option 1: set reminder flag for all events
   - handle deadline events knowing when it's a reminder = True is when we 
enforce
   
   option 2: set reminder flag for all events except deadline if there's some 
other job property such as "firstTimeFiring=True" 
   - set firstTimeFiring=False so all subsequent deadline actions are known to 
be true "reminders"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag 
does not finish in
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
+    super(enforceFlowFinishDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceFlowFinishDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceFlowFinishDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+    long flowSla = DagManagerUtils.getFlowSLA(dagNode);
+    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+    // note that this condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (System.currentTimeMillis() > flowStartTime + flowSla) {

Review Comment:
   let's add a warn/error log and/or metric (can be TODO) for when this is not 
true 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -191,4 +196,16 @@ private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Da
   private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
     throw new UnsupportedOperationException("More than one start job is not 
allowed");
   }
+
+  private void removeFlowFinishDeadlineTriggerAndDagAction() {
+    DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),

Review Comment:
   let's log when this function is used in case it has unintended consequences



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    super(enforceJobStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceJobStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store
+      log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc", 
getDagNodeId());
+      return;
+    }
+
+    Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus = 
dagNodeToCheckDeadline.getRight();
+    if (!jobStatus.isPresent()) {
+      return;
+    }
+
+    ExecutionStatus executionStatus = valueOf(jobStatus.get().getEventName());
+    long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
+    // note that second condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() > 
jobOrchestratedTime + timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing 
the job now...",

Review Comment:
   I suggest a similar change in log message to put a cohesive message 
beforehand that's easily searchable then all the parameters at the end



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    super(enforceJobStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceJobStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store

Review Comment:
   add TODO to include a metric here that we will alert on 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     }
   }
 
+  private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String 
flowName, String flowExecutionId, String jobName) {
+

Review Comment:
   same as above let's log when this is used and javadoc for when it should be 
called



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
or does not finish in

Review Comment:
   is this not only for the latter?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
or does not finish in

Review Comment:
   only the former right



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  /*
+    This test simulate submitting a dag with a very little job start deadline 
and then verifies that the starting job is
+     killed because not being able to start in the deadline time.
+   */
+  @Test
+  public void enforceJobStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
+        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());

Review Comment:
   do both deadlines result in same method being called?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    super(enforceJobStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceJobStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store
+      log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc", 
getDagNodeId());
+      return;
+    }
+
+    Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus = 
dagNodeToCheckDeadline.getRight();
+    if (!jobStatus.isPresent()) {

Review Comment:
   does this mean the job finished? if so let's comment



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  /*
+    This test simulate submitting a dag with a very little job start deadline 
and then verifies that the starting job is

Review Comment:
   "with very short job start deadline that will definitely be breached, 
resulting in the job requiring to be killed..." (or something to that effect



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -134,6 +146,12 @@ public void testOneNextJobToRun() throws Exception {
     // current job's state is deleted
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
+
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);

Review Comment:
   why are these called if the reminder fired and action was taken? I would 
expect deleteDagAction because it means the action has been completed 
(committed)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     }
   }
 
+  private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String 
flowName, String flowExecutionId, String jobName) {
+

Review Comment:
   also put todo for a metric





Issue Time Tracking
-------------------

    Worklog Id:     (was: 920478)
    Time Spent: 3h 20m  (was: 3h 10m)

> implement EnforceStartDeadlineDagProc
> -------------------------------------
>
>                 Key: GOBBLIN-2069
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2069
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to