This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 05f5640 [GOBBLIN-1252] Provide a default flow SLA for Gobblin Service
flows[]
05f5640 is described below
commit 05f5640d557c4d6c4a9ddf4d986dcad6563e7a2f
Author: sv2000 <[email protected]>
AuthorDate: Sat Aug 29 18:03:54 2020 -0700
[GOBBLIN-1252] Provide a default flow SLA for Gobblin Service flows[]
Closes #3093 from sv2000/flowSla
---
.../apache/gobblin/service/ServiceConfigKeys.java | 4 ++
.../service/modules/orchestration/DagManager.java | 2 +-
.../modules/orchestration/DagManagerUtils.java | 6 +--
.../modules/orchestration/Orchestrator.java | 6 ++-
.../modules/orchestration/DagManagerFlowTest.java | 44 +++++++++++-----------
5 files changed, 36 insertions(+), 26 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 9b458ee..e161b78 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -118,4 +118,8 @@ public class ServiceConfigKeys {
// Prefix for config to ServiceBasedAppLauncher that will only be used by
GaaS and not orchestrated jobs
public static final String GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX =
"gobblinServiceAppLauncher";
+
+ //Flow concurrency config key to control default service behavior.
+ public static final String FLOW_CONCURRENCY_ALLOWED = GOBBLIN_SERVICE_PREFIX
+ "flowConcurrencyAllowed";
+ public static final Boolean DEFAULT_FLOW_CONCURRENCY_ALLOWED = true;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index db1145a..906251d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -686,7 +686,7 @@ public class DagManager extends AbstractIdleService {
dagToSLA.put(dagId, flowSla);
}
- if (flowSla != DagManagerUtils.NO_SLA && currentTime > flowStartTime +
flowSla) {
+ if (currentTime > flowStartTime + flowSla) {
log.info("Flow {} exceeded the SLA of {} ms. Killing the job {}
now...",
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 8ca8966..248fe4d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -45,7 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagerUtils {
- static long NO_SLA = -1L;
+ static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
static String QUOTA_KEY_SEPERATOR = ",";
static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
@@ -255,7 +255,7 @@ public class DagManagerUtils {
* get the sla from the dag node config.
* if time unit is not provided, it assumes time unit is minute.
* @param dagNode dag node for which sla is to be retrieved
- * @return sla if it is provided, {@value NO_SLA} otherwise
+ * @return sla if it is provided, DEFAULT_FLOW_SLA_MILLIS otherwise
*/
static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
@@ -264,7 +264,7 @@ public class DagManagerUtils {
return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME)
?
slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME))
- : NO_SLA;
+ : DEFAULT_FLOW_SLA_MILLIS;
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ecd16de..ae0daea 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -90,6 +90,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
protected final MetricContext metricContext;
protected final Optional<EventSubmitter> eventSubmitter;
+ private final boolean flowConcurrencyFlag;
@Getter
private Optional<Meter> flowOrchestrationSuccessFulMeter;
@Getter
@@ -141,6 +142,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.flowOrchestrationTimer = Optional.absent();
this.eventSubmitter = Optional.absent();
}
+ this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
}
public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
@@ -237,7 +240,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
//If the FlowSpec disallows concurrent executions, then check if another
instance of the flow is already
//running. If so, return immediately.
- boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, true);
+ boolean allowConcurrentExecution = ConfigUtils
+ .getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
_log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 85b8bca..ecd90fa 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -70,9 +70,13 @@ public class DagManagerFlowTest {
@Test
void testAddDeleteSpec() throws Exception {
- Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0", 123456780L,
"FINISH_RUNNING", 1);
- Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1", 123456781L,
"FINISH_RUNNING", 1);
- Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2", 123456782L,
"FINISH_RUNNING", 1);
+ Long flowExecutionId1 = System.currentTimeMillis();
+ Long flowExecutionId2 = flowExecutionId1 + 1;
+ Long flowExecutionId3 = flowExecutionId1 + 2;
+
+ Dag<JobExecutionPlan> dag1 = DagManagerTest.buildDag("0",
flowExecutionId1, "FINISH_RUNNING", 1);
+ Dag<JobExecutionPlan> dag2 = DagManagerTest.buildDag("1",
flowExecutionId2, "FINISH_RUNNING", 1);
+ Dag<JobExecutionPlan> dag3 = DagManagerTest.buildDag("2",
flowExecutionId3, "FINISH_RUNNING", 1);
String dagId1 = DagManagerUtils.generateDagId(dag1);
String dagId2 = DagManagerUtils.generateDagId(dag2);
@@ -83,11 +87,11 @@ public class DagManagerFlowTest {
int queue3 = DagManagerUtils.getDagQueueId(dag3, dagNumThreads);
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow0"),
eq("group0"), anyInt()))
- .thenReturn(Collections.singletonList(123456780L));
+ .thenReturn(Collections.singletonList(flowExecutionId1));
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow1"),
eq("group1"), anyInt()))
- .thenReturn(Collections.singletonList(123456781L));
+ .thenReturn(Collections.singletonList(flowExecutionId2));
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow2"),
eq("group2"), anyInt()))
- .thenReturn(Collections.singletonList(123456782L));
+ .thenReturn(Collections.singletonList(flowExecutionId3));
// mock add spec
dagManager.addDag(dag1, true);
@@ -113,20 +117,20 @@ public class DagManagerFlowTest {
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new
DeletePredicate(dag3), ERROR_MESSAGE);
// mock flow cancellation tracking event
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0",
123456780L,
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0",
flowExecutionId1,
"group0", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow0",
"group0",
- 123456780L, "job0", "group0");
+ flowExecutionId1, "job0", "group0");
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1",
123456781L,
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow1", "group1",
flowExecutionId2,
"group1", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow1",
"group1",
- 123456781L, "job0", "group1");
+ flowExecutionId2, "job0", "group1");
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2",
123456782L,
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow2", "group2",
flowExecutionId3,
"group2", "job0", String.valueOf(ExecutionStatus.CANCELLED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow2",
"group2",
- 123456782L, "job0", "group2");
+ flowExecutionId3, "job0", "group2");
// check removal of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -159,7 +163,7 @@ public class DagManagerFlowTest {
assertTrue(input ->
dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
// check the SLA value
-
Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(),
DagManagerUtils.NO_SLA);
+
Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(),
DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
// verify deleteSpec() of the specProducer is not called once
// which means job cancellation was triggered
@@ -216,13 +220,11 @@ public class DagManagerFlowTest {
@Test()
void testOrphanFlowKill() throws Exception {
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", 234567891L,
"FINISH_RUNNING", 1);
+ Long flowExecutionId = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(10);
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("6", flowExecutionId,
"FINISH_RUNNING", 1);
String dagId = DagManagerUtils.generateDagId(dag);
int queue = DagManagerUtils.getDagQueueId(dag, dagNumThreads);
-
when(this.dagManager.getJobStatusRetriever().getLatestExecutionIdsForFlow(eq("flow4"),
eq("group4"), anyInt()))
- .thenReturn(Collections.singletonList(234567891L));
-
// change config to set a small sla
Config jobConfig =
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
jobConfig = jobConfig
@@ -237,10 +239,10 @@ public class DagManagerFlowTest {
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input ->
dagManager.dagManagerThreads[queue].dagToSLA.containsKey(dagId), ERROR_MESSAGE);
- Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6",
234567891L,
- "group0", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
+ Mockito.doReturn(DagManagerTest.getMockJobStatus("flow6", "group6",
flowExecutionId,
+ "group6", "job0", String.valueOf(ExecutionStatus.ORCHESTRATED)))
.when(dagManager.getJobStatusRetriever()).getJobStatusesForFlowExecution("flow6",
"group6",
- 234567891L, "job0", "group6");
+ flowExecutionId, "job0", "group6");
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -258,7 +260,7 @@ public class DagManagerFlowTest {
@Test
void slaConfigCheck() throws Exception {
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("5", 123456783L,
"FINISH_RUNNING", 1);
-
Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)),
-1L);
+
Assert.assertEquals(DagManagerUtils.getFlowSLA(dag.getStartNodes().get(0)),
DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
Config jobConfig =
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
jobConfig = jobConfig