This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 178138f [GOBBLIN-917] kill orphan gaas jobs
178138f is described below
commit 178138f112242cd26a55b6b792aa8aff5d6ac5e4
Author: Arjun <[email protected]>
AuthorDate: Fri Oct 18 14:53:45 2019 -0700
[GOBBLIN-917] kill orphan gaas jobs
Closes #2769 from
arjun4084346/killJobWithUnknownStatus
---
.../gobblin/configuration/ConfigurationKeys.java | 4 +++
.../service/modules/orchestration/DagManager.java | 41 ++++++++++++++++------
.../modules/orchestration/DagManagerUtils.java | 16 +++++++++
.../modules/orchestration/DagManagerFlowTest.java | 41 +++++++++++++++++++++-
4 files changed, 91 insertions(+), 11 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index bdaa910..20a0d72 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -898,6 +898,10 @@ public class ConfigurationKeys {
public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
public static final String GOBBLIN_FLOW_SLA_TIME_UNIT =
"gobblin.flow.sla.timeunit";
public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
+ public static final String GOBBLIN_FLOW_START_SLA_TIME =
"gobblin.flow.start.sla.time";
+ public static final String GOBBLIN_FLOW_START_SLA_TIME_UNIT =
"gobblin.flow.start.sla.timeunit";
+ public static final long DEFAULT_GOBBLIN_FLOW_START_SLA = 10L;
+ public static final String DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT =
"MINUTES";
public static final String DATASET_SUBPATHS_KEY =
"gobblin.flow.dataset.subPaths";
public static final String DATASET_BASE_INPUT_PATH_KEY =
"gobblin.flow.dataset.baseInputPath";
public static final String DATASET_BASE_OUTPUT_PATH_KEY =
"gobblin.flow.dataset.baseOutputPath";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2bf3ca2..dcadc88 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -74,12 +74,7 @@ import
org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
-import static org.apache.gobblin.service.ExecutionStatus.COMPLETE;
-import static org.apache.gobblin.service.ExecutionStatus.FAILED;
-import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
-import static org.apache.gobblin.service.ExecutionStatus.PENDING;
-import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+import static org.apache.gobblin.service.ExecutionStatus.*;
/**
@@ -491,12 +486,14 @@ public class DagManager extends AbstractIdleService {
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted =
Maps.newHashMap();
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
- for (DagNode<JobExecutionPlan> node: this.jobToDag.keySet()) {
+ for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
boolean slaKilled = slaKillIfNeeded(node);
JobStatus jobStatus = pollJobStatus(node);
- ExecutionStatus status = getJobExecutionStatus(slaKilled, jobStatus);
+ boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
+
+ ExecutionStatus status = getJobExecutionStatus(slaKilled,
killOrphanFlow, jobStatus);
JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(node);
@@ -545,8 +542,32 @@ public class DagManager extends AbstractIdleService {
}
}
- private ExecutionStatus getJobExecutionStatus(boolean slaKilled, JobStatus
jobStatus) {
- if (slaKilled) {
+ /**
+ * Cancel the job if the job has been "orphaned". A job is orphaned if has
been in ORCHESTRATED
+ * {@link ExecutionStatus} for some specific amount of time.
+ * @param node {@link DagNode} representing the job
+ * @param jobStatus current {@link JobStatus} of the job
+ * @return true if the job status remains ORCHESTRATED for some specific
time
+ */
+ private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node,
JobStatus jobStatus)
+ throws ExecutionException, InterruptedException {
+ if (jobStatus == null) {
+ return false;
+ }
+ ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+ long timeoutForFlowStart = DagManagerUtils.getFlowStartSLA(node);
+ long flowStartTime = jobStatus.getFlowExecutionId();
+
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() -
flowStartTime > timeoutForFlowStart) {
+ cancelDagNode(node);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean
killOrphanFlow, JobStatus jobStatus) {
+ if (slaKilled || killOrphanFlow) {
return CANCELLED;
} else {
if (jobStatus == null) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 47caf6f..d489bed 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -238,6 +238,22 @@ public class DagManagerUtils {
: NO_SLA;
}
+ /**
+ * get the sla from the dag node config.
+ * if time unit is not provided, it assumes time unit is minute.
+ * @param dagNode dag node for which sla is to be retrieved
+ * @return sla if it is provided, {@value NO_SLA} otherwise
+ */
+ static long getFlowStartSLA(DagNode<JobExecutionPlan> dagNode) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
+ jobConfig, ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT,
ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA_TIME_UNIT));
+
+ return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME)
+ ?
slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME))
+ : ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_START_SLA;
+ }
+
static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
return getDagQueueId(DagManagerUtils.getFlowExecId(dag), numThreads);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 60ac979..fde9b85 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -201,7 +201,6 @@ public class DagManagerFlowTest {
// check the SLA value
Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(),
TimeUnit.SECONDS.toMillis(7L));
-
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input ->
dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId),
ERROR_MESSAGE);
@@ -213,7 +212,47 @@ public class DagManagerFlowTest {
// check removal of dag from dagToSLA map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input ->
!dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId),
ERROR_MESSAGE);
+ }
+
+ @Test()
+ void testOrphanFlowKill() throws Exception {
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", 234567891L,
"FINISH_RUNNING", 1);
+ String dagId = DagManagerUtils.generateDagId(dag);
+ int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
+
+
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"),
eq("group4"), anyInt()))
+ .thenReturn(Collections.singletonList(234567891L));
+
+ // change config to set a small sla
+ Config jobConfig =
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ jobConfig = jobConfig
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME,
ConfigValueFactory.fromAnyRef("7"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.name()));
+ dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
+
+ // mock add spec
+ dagManager.addDag(dag);
+ // check existence of dag in dagToSLA map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input ->
dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
+
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6",
234567891L,
+ "group0", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
+
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow6",
"group6",
+ 234567891L, "job0", "group6");
+
+ // check existence of dag in dagToJobs map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input ->
dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId),
ERROR_MESSAGE);
+
+ // verify deleteSpec() of specProducer is called once
+ // which means job cancellation was triggered
+
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new
DeletePredicate(dag), ERROR_MESSAGE);
+
+ // check removal of dag from dagToSLA map
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
+ assertTrue(input ->
!dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId),
ERROR_MESSAGE);
}
@Test