This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 178138f  [GOBBLIN-917] kill orphan gaas jobs
178138f is described below

commit 178138f112242cd26a55b6b792aa8aff5d6ac5e4
Author: Arjun <[email protected]>
AuthorDate: Fri Oct 18 14:53:45 2019 -0700

    [GOBBLIN-917] kill orphan gaas jobs
    
    Closes #2769 from
    arjun4084346/killJobWithUnknownStatus
---
 .../gobblin/configuration/ConfigurationKeys.java   |  4 +++
 .../service/modules/orchestration/DagManager.java  | 41 ++++++++++++++++------
 .../modules/orchestration/DagManagerUtils.java     | 16 +++++++++
 .../modules/orchestration/DagManagerFlowTest.java  | 41 +++++++++++++++++++++-
 4 files changed, 91 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index bdaa910..20a0d72 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -898,6 +898,10 @@ public class ConfigurationKeys {
   public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
   public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = 
"gobblin.flow.sla.timeunit";
   public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
+  public static final String GOBBLIN_FLOW_START_SLA_TIME = 
"gobblin.flow.start.sla.time";
+  public static final String GOBBLIN_FLOW_START_SLA_TIME_UNIT = 
"gobblin.flow.start.sla.timeunit";
+  public static final long DEFAULT_GOBBLIN_FLOW_START_SLA = 10L;
+  public static final String DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT = 
"MINUTES";
   public static final String DATASET_SUBPATHS_KEY = 
"gobblin.flow.dataset.subPaths";
   public static final String DATASET_BASE_INPUT_PATH_KEY = 
"gobblin.flow.dataset.baseInputPath";
   public static final String DATASET_BASE_OUTPUT_PATH_KEY = 
"gobblin.flow.dataset.baseOutputPath";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2bf3ca2..dcadc88 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -74,12 +74,7 @@ import 
org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
-import static org.apache.gobblin.service.ExecutionStatus.COMPLETE;
-import static org.apache.gobblin.service.ExecutionStatus.FAILED;
-import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
-import static org.apache.gobblin.service.ExecutionStatus.PENDING;
-import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+import static org.apache.gobblin.service.ExecutionStatus.*;
 
 
 /**
@@ -491,12 +486,14 @@ public class DagManager extends AbstractIdleService {
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = 
Maps.newHashMap();
       List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
 
-      for (DagNode<JobExecutionPlan> node: this.jobToDag.keySet()) {
+      for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
         boolean slaKilled = slaKillIfNeeded(node);
 
         JobStatus jobStatus = pollJobStatus(node);
 
-        ExecutionStatus status = getJobExecutionStatus(slaKilled, jobStatus);
+        boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
+
+        ExecutionStatus status = getJobExecutionStatus(slaKilled, 
killOrphanFlow, jobStatus);
 
         JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
 
@@ -545,8 +542,32 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    private ExecutionStatus getJobExecutionStatus(boolean slaKilled, JobStatus 
jobStatus) {
-      if (slaKilled) {
+    /**
+     * Cancel the job if the job has been "orphaned". A job is orphaned if has 
been in ORCHESTRATED
+     * {@link ExecutionStatus} for some specific amount of time.
+     * @param node {@link DagNode} representing the job
+     * @param jobStatus current {@link JobStatus} of the job
+     * @return true if the job status remains ORCHESTRATED for some specific 
time
+     */
+    private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, 
JobStatus jobStatus)
+        throws ExecutionException, InterruptedException {
+      if (jobStatus == null) {
+        return false;
+      }
+      ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+      long timeoutForFlowStart = DagManagerUtils.getFlowStartSLA(node);
+      long flowStartTime = jobStatus.getFlowExecutionId();
+
+      if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
flowStartTime > timeoutForFlowStart) {
+        cancelDagNode(node);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean 
killOrphanFlow, JobStatus jobStatus) {
+      if (slaKilled || killOrphanFlow) {
         return CANCELLED;
       } else {
         if (jobStatus == null) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 47caf6f..d489bed 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -238,6 +238,22 @@ public class DagManagerUtils {
         : NO_SLA;
   }
 
+  /**
+   * get the sla from the dag node config.
+   * if time unit is not provided, it assumes time unit is minute.
+   * @param dagNode dag node for which sla is to be retrieved
+   * @return sla if it is provided, {@value NO_SLA} otherwise
+   */
+  static long getFlowStartSLA(DagNode<JobExecutionPlan> dagNode) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
+        jobConfig, ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT, 
ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT));
+
+    return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME)
+        ? 
slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME))
+        : ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA;
+  }
+
   static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
     return getDagQueueId(DagManagerUtils.getFlowExecId(dag), numThreads);
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 60ac979..fde9b85 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -201,7 +201,6 @@ public class DagManagerFlowTest {
     // check the SLA value
     
Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(),
 TimeUnit.SECONDS.toMillis(7L));
 
-
     // check existence of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
         assertTrue(input -> 
dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), 
ERROR_MESSAGE);
@@ -213,7 +212,47 @@ public class DagManagerFlowTest {
     // check removal of dag from dagToSLA map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
         assertTrue(input -> 
!dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), 
ERROR_MESSAGE);
+  }
+
+  @Test()
+  void testOrphanFlowKill() throws Exception {
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", 234567891L, 
"FINISH_RUNNING", 1);
+    String dagId = DagManagerUtils.generateDagId(dag);
+    int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
+
+    
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"),
 eq("group4"), anyInt()))
+        .thenReturn(Collections.singletonList(234567891L));
+
+    // change config to set a small sla
+    Config jobConfig = 
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    jobConfig = jobConfig
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef("7"))
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+    dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+
+    // mock add spec
+    dagManager.addDag(dag);
 
+    // check existence of dag in dagToSLA map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> 
dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+    Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6", 
234567891L,
+        "group0", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
+        
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow6",
 "group6",
+        234567891L, "job0", "group6");
+
+    // check existence of dag in dagToJobs map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> 
dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), 
ERROR_MESSAGE);
+
+    // verify deleteSpec() of specProducer is called once
+    // which means job cancellation was triggered
+    
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new 
DeletePredicate(dag), ERROR_MESSAGE);
+
+    // check removal of dag from dagToSLA map
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+        assertTrue(input -> 
!dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), 
ERROR_MESSAGE);
   }
 
   @Test

Reply via email to