This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 55d662f [GOBBLIN-1583] Add System level job start SLA (#3437)
55d662f is described below
commit 55d662f46940c3604940f359be6739e7c9bab833
Author: William Lo <[email protected]>
AuthorDate: Tue Dec 21 11:28:24 2021 -0800
[GOBBLIN-1583] Add System level job start SLA (#3437)
* Add System level job start SLA
* Address review and make time unit configurable, improve naming, add
comments to tests
* Address review
---
.../gobblin/configuration/ConfigurationKeys.java | 4 +-
.../service/modules/orchestration/DagManager.java | 17 +++--
.../modules/orchestration/DagManagerUtils.java | 11 +--
.../modules/orchestration/DagManagerTest.java | 78 ++++++++++++++++++++--
4 files changed, 93 insertions(+), 17 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 36bc680..9368521 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1007,8 +1007,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
public static final String GOBBLIN_JOB_START_SLA_TIME =
"gobblin.job.start.sla.time";
public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT =
"gobblin.job.start.sla.timeunit";
- public static final long DEFAULT_GOBBLIN_JOB_START_SLA_TIME = 10L;
- public static final String DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT =
"MINUTES";
+ public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
+ public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT =
"MINUTES";
public static final String DATASET_SUBPATHS_KEY =
"gobblin.flow.dataset.subPaths";
public static final String DATASET_BASE_INPUT_PATH_KEY =
"gobblin.flow.dataset.baseInputPath";
public static final String DATASET_BASE_OUTPUT_PATH_KEY =
"gobblin.flow.dataset.baseOutputPath";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 7986dce..e4a6293 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -138,6 +138,9 @@ public class DagManager extends AbstractIdleService {
private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX +
"defaultJobQuota";
private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX +
"perUserQuota";
+ // Default job start SLA time if configured, measured in minutes. Default is
10 minutes
+ private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+ private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
private static final String QUOTA_SEPERATOR = ":";
@@ -180,6 +183,7 @@ public class DagManager extends AbstractIdleService {
private final Integer numThreads;
private final Integer pollingInterval;
private final Integer retentionPollingInterval;
+ protected final Long defaultJobStartSlaTimeMillis;
@Getter
private final JobStatusRetriever jobStatusRetriever;
private final Config config;
@@ -208,7 +212,8 @@ public class DagManager extends AbstractIdleService {
}
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY,
DEFAULT_USER_JOB_QUOTA);
-
+ TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+ this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA))
{
mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0],
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
@@ -379,7 +384,7 @@ public class DagManager extends AbstractIdleService {
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
runQueue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
- allSuccessfulMeter, allFailedMeter);
+ allSuccessfulMeter, allFailedMeter,
this.defaultJobStartSlaTimeMillis);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
@@ -443,14 +448,14 @@ public class DagManager extends AbstractIdleService {
private final BlockingQueue<Dag<JobExecutionPlan>> queue;
private final BlockingQueue<String> cancelQueue;
private final BlockingQueue<String> resumeQueue;
-
+ private final Long defaultJobStartSlaTimeMillis;
/**
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, int defaultQuota, Map<String, Integer>
perUserQuota, Set<String> failedDagIds,
- ContextAwareMeter allSuccessfulMeter, ContextAwareMeter
allFailedMeter) {
+ ContextAwareMeter allSuccessfulMeter, ContextAwareMeter
allFailedMeter, Long defaultJobStartSla) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
@@ -462,6 +467,7 @@ public class DagManager extends AbstractIdleService {
this.perUserQuota = perUserQuota;
this.allSuccessfulMeter = allSuccessfulMeter;
this.allFailedMeter = allFailedMeter;
+ this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
if (instrumentationEnabled) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
@@ -774,9 +780,8 @@ public class DagManager extends AbstractIdleService {
return false;
}
ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
- long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(node,
this.defaultJobStartSlaTimeMillis);
long jobOrchestratedTime = jobStatus.getOrchestratedTime();
-
if (executionStatus == ORCHESTRATED && System.currentTimeMillis() -
jobOrchestratedTime > timeOutForJobStart) {
log.info("Job {} of flow {} exceeded the job start SLA of {} ms.
Killing the job now...",
DagManagerUtils.getJobName(node),
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5afa128..8e52784 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -285,14 +285,15 @@ public class DagManagerUtils {
* @param dagNode dag node for which flow start sla is to be retrieved
* @return job start sla in ms
*/
- static long getJobStartSla(DagNode<JobExecutionPlan> dagNode) {
+ static long getJobStartSla(DagNode<JobExecutionPlan> dagNode, Long
defaultJobStartSla) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
- jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+ jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
- return
slaTimeUnit.toMillis(jobConfig.hasPath(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
- ? jobConfig.getLong(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
- : ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME);
+
+ return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
+ ?
slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME))
+ : defaultJobStartSla;
}
static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index ea0af64..7180cf3 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -74,6 +74,7 @@ public class DagManagerTest {
private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
private Map<String, Dag<JobExecutionPlan>> dags;
private Set<String> failedDagIds;
+ private static long START_SLA_DEFAULT = 15 * 60 * 1000;
@BeforeClass
public void setUp() throws Exception {
@@ -90,7 +91,7 @@ public class DagManagerTest {
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
failedDagStateStore, queue, cancelQueue,
resumeQueue, true, 5, new HashMap<>(), new HashSet<>(),
metricContext.contextAwareMeter("successMeter"),
- metricContext.contextAwareMeter("failedMeter"));
+ metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT);
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
@@ -149,12 +150,15 @@ public class DagManagerTest {
}
static Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName) {
- return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup,
jobName, eventName, false);
+ return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup,
jobName, eventName, false, flowExecutionId + 10);
}
- private static Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName, boolean shouldRetry) {
+ private static Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName, boolean shouldRetry) {
+ return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup,
jobName, eventName, shouldRetry, flowExecutionId + 10);
+ }
+ private static Iterator<JobStatus> getMockJobStatus(String flowName, String
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String
eventName, boolean shouldRetry, Long orchestratedTime) {
return
Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
- message("Test message").eventName(eventName).startTime(flowExecutionId
+ 10).shouldRetry(shouldRetry).build());
+ message("Test message").eventName(eventName).startTime(flowExecutionId
+ 10).shouldRetry(shouldRetry).orchestratedTime(orchestratedTime).build());
}
@Test
@@ -645,6 +649,72 @@ public class DagManagerTest {
Assert.assertFalse(this.dags.containsKey(dagId));
}
+ @Test (dependsOnMethods = "testResumeCancelledDag")
+ public void testJobStartSLAKilledDag() throws URISyntaxException,
IOException {
+ long flowExecutionId = System.currentTimeMillis();
+ String flowGroupId = "0";
+ String flowGroup = "group" + flowGroupId;
+ String flowName = "flow" + flowGroupId;
+ String jobName0 = "job0";
+ String flowGroupId1 = "1";
+ String flowGroup1 = "group" + flowGroupId1;
+ String flowName1 = "flow" + flowGroupId1;
+
+ Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId,
"FINISH_RUNNING", false);
+ Dag<JobExecutionPlan> dag1 = buildDag(flowGroupId1, flowExecutionId+1,
"FINISH_RUNNING", false);
+
+ String dagId = DagManagerUtils.generateDagId(dag);
+ String dagId1 = DagManagerUtils.generateDagId(dag1);
+
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dag);
+ // The start time should be 16 minutes ago, which is past the start SLA so
the job should be cancelled
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId - 16 * 60 * 1000);
+ // This is for the second Dag that does not match the SLA so should
schedule normally
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0,
flowGroup1, String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId - 10 * 60 * 1000);
+ // Let the first job get reported as cancel due to SLA kill on start and
clean up
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.CANCELLED),
+ false, flowExecutionId - 16 * 60 * 1000);
+ // Cleanup the running job that is scheduled normally
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0,
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4);
+
+ // Run the thread once. Ensure the first job is running
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 0);
+ // Job should be marked as failed
+ Assert.assertTrue(this.failedDagIds.contains(dagId));
+
+ // Next job should succeed as it doesn't exceed SLA
+ this.queue.offer(dag1);
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 1);
+ Assert.assertEquals(this.jobToDag.size(), 1);
+ Assert.assertEquals(this.dagToJobs.size(), 1);
+ Assert.assertTrue(this.dags.containsKey(dagId1));
+
+ // Cleanup
+ this._dagManagerThread.run();
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ }
+
+
@AfterClass
public void cleanUp() throws Exception {
FileUtils.deleteDirectory(new File(this.dagStateStoreDir));