This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 55d662f  [GOBBLIN-1583] Add System level job start SLA (#3437)
55d662f is described below

commit 55d662f46940c3604940f359be6739e7c9bab833
Author: William Lo <[email protected]>
AuthorDate: Tue Dec 21 11:28:24 2021 -0800

    [GOBBLIN-1583] Add System level job start SLA (#3437)
    
    * Add System level job start SLA
    
    * Address review and make time unit configurable, improve naming, add 
comments to tests
    
    * Address review
---
 .../gobblin/configuration/ConfigurationKeys.java   |  4 +-
 .../service/modules/orchestration/DagManager.java  | 17 +++--
 .../modules/orchestration/DagManagerUtils.java     | 11 +--
 .../modules/orchestration/DagManagerTest.java      | 78 ++++++++++++++++++++--
 4 files changed, 93 insertions(+), 17 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 36bc680..9368521 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1007,8 +1007,8 @@ public class ConfigurationKeys {
   public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
   public static final String GOBBLIN_JOB_START_SLA_TIME = 
"gobblin.job.start.sla.time";
   public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"gobblin.job.start.sla.timeunit";
-  public static final long DEFAULT_GOBBLIN_JOB_START_SLA_TIME = 10L;
-  public static final String DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"MINUTES";
+  public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
+  public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"MINUTES";
   public static final String DATASET_SUBPATHS_KEY = 
"gobblin.flow.dataset.subPaths";
   public static final String DATASET_BASE_INPUT_PATH_KEY = 
"gobblin.flow.dataset.baseInputPath";
   public static final String DATASET_BASE_OUTPUT_PATH_KEY = 
"gobblin.flow.dataset.baseOutputPath";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 7986dce..e4a6293 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -138,6 +138,9 @@ public class DagManager extends AbstractIdleService {
   private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + 
"defaultJobQuota";
   private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + 
"perUserQuota";
+  // Default job start SLA time if configured, measured in minutes. Default is 
10 minutes
+  private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+  private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
 
   private static final String QUOTA_SEPERATOR = ":";
 
@@ -180,6 +183,7 @@ public class DagManager extends AbstractIdleService {
   private final Integer numThreads;
   private final Integer pollingInterval;
   private final Integer retentionPollingInterval;
+  protected final Long defaultJobStartSlaTimeMillis;
   @Getter
   private final JobStatusRetriever jobStatusRetriever;
   private final Config config;
@@ -208,7 +212,8 @@ public class DagManager extends AbstractIdleService {
     }
 
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, 
DEFAULT_USER_JOB_QUOTA);
-
+    TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+    this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
     for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) 
{
       mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], 
Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
@@ -379,7 +384,7 @@ public class DagManager extends AbstractIdleService {
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
               runQueue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
-              allSuccessfulMeter, allFailedMeter);
+              allSuccessfulMeter, allFailedMeter, 
this.defaultJobStartSlaTimeMillis);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -443,14 +448,14 @@ public class DagManager extends AbstractIdleService {
     private final BlockingQueue<Dag<JobExecutionPlan>> queue;
     private final BlockingQueue<String> cancelQueue;
     private final BlockingQueue<String> resumeQueue;
-
+    private final Long defaultJobStartSlaTimeMillis;
     /**
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, BlockingQueue<String> resumeQueue,
         boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota, Set<String> failedDagIds,
-        ContextAwareMeter allSuccessfulMeter, ContextAwareMeter 
allFailedMeter) {
+        ContextAwareMeter allSuccessfulMeter, ContextAwareMeter 
allFailedMeter, Long defaultJobStartSla) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
       this.failedDagStateStore = failedDagStateStore;
@@ -462,6 +467,7 @@ public class DagManager extends AbstractIdleService {
       this.perUserQuota = perUserQuota;
       this.allSuccessfulMeter = allSuccessfulMeter;
       this.allFailedMeter = allFailedMeter;
+      this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
       if (instrumentationEnabled) {
         this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
         this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
@@ -774,9 +780,8 @@ public class DagManager extends AbstractIdleService {
         return false;
       }
       ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
-      long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
+      long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
this.defaultJobStartSlaTimeMillis);
       long jobOrchestratedTime = jobStatus.getOrchestratedTime();
-
       if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
         log.info("Job {} of flow {} exceeded the job start SLA of {} ms. 
Killing the job now...",
             DagManagerUtils.getJobName(node),
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 5afa128..8e52784 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -285,14 +285,15 @@ public class DagManagerUtils {
    * @param dagNode dag node for which flow start sla is to be retrieved
    * @return job start sla in ms
    */
-  static long getJobStartSla(DagNode<JobExecutionPlan> dagNode) {
+  static long getJobStartSla(DagNode<JobExecutionPlan> dagNode, Long 
defaultJobStartSla) {
     Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
     TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
-        jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+        jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
 
-    return 
slaTimeUnit.toMillis(jobConfig.hasPath(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
-        ? jobConfig.getLong(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
-        : ConfigurationKeys.DEFAULT_GOBBLIN_JOB_START_SLA_TIME);
+
+    return jobConfig.hasPath(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME)
+        ? 
slaTimeUnit.toMillis(jobConfig.getLong(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME))
+        : defaultJobStartSla;
   }
 
   static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index ea0af64..7180cf3 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -74,6 +74,7 @@ public class DagManagerTest {
   private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
   private Map<String, Dag<JobExecutionPlan>> dags;
   private Set<String> failedDagIds;
+  private static long START_SLA_DEFAULT = 15 * 60 * 1000;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -90,7 +91,7 @@ public class DagManagerTest {
     MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
failedDagStateStore, queue, cancelQueue,
         resumeQueue, true, 5, new HashMap<>(), new HashSet<>(), 
metricContext.contextAwareMeter("successMeter"),
-        metricContext.contextAwareMeter("failedMeter"));
+        metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT);
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
@@ -149,12 +150,15 @@ public class DagManagerTest {
   }
 
   static Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String 
eventName) {
-    return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, 
jobName, eventName, false);
+    return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, 
jobName, eventName, false, flowExecutionId + 10);
   }
 
-  private static Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String 
eventName, boolean shouldRetry) {
+  private static Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup, Long flowExecutionId, String jobGroup, String jobName, String 
eventName, boolean shouldRetry) {
+    return getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, 
jobName, eventName, shouldRetry, flowExecutionId + 10);
+  }
+  private static Iterator<JobStatus> getMockJobStatus(String flowName, String 
flowGroup,  Long flowExecutionId, String jobGroup, String jobName, String 
eventName, boolean shouldRetry, Long orchestratedTime) {
     return 
Iterators.singletonIterator(JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(jobGroup).jobName(jobName).flowExecutionId(flowExecutionId).
-        message("Test message").eventName(eventName).startTime(flowExecutionId 
+ 10).shouldRetry(shouldRetry).build());
+        message("Test message").eventName(eventName).startTime(flowExecutionId 
+ 10).shouldRetry(shouldRetry).orchestratedTime(orchestratedTime).build());
   }
 
   @Test
@@ -645,6 +649,72 @@ public class DagManagerTest {
     Assert.assertFalse(this.dags.containsKey(dagId));
   }
 
+  @Test (dependsOnMethods = "testResumeCancelledDag")
+  public void testJobStartSLAKilledDag() throws URISyntaxException, 
IOException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    String flowGroup = "group" + flowGroupId;
+    String flowName = "flow" + flowGroupId;
+    String jobName0 = "job0";
+    String flowGroupId1 = "1";
+    String flowGroup1 = "group" + flowGroupId1;
+    String flowName1 = "flow" + flowGroupId1;
+
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 
"FINISH_RUNNING", false);
+    Dag<JobExecutionPlan> dag1 = buildDag(flowGroupId1, flowExecutionId+1, 
"FINISH_RUNNING", false);
+
+    String dagId = DagManagerUtils.generateDagId(dag);
+    String dagId1 = DagManagerUtils.generateDagId(dag1);
+
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    // The start time should be 16 minutes ago, which is past the start SLA so 
the job should be cancelled
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId - 16 * 60 * 1000);
+    // This is for the second Dag that does not match the SLA so should 
schedule normally
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, 
flowGroup1, String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId - 10 * 60 * 1000);
+    // Let the first job get reported as cancel due to SLA kill on start and 
clean up
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.CANCELLED),
+            false, flowExecutionId - 16 * 60 * 1000);
+    // Cleanup the running job that is scheduled normally
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, 
flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4);
+
+    // Run the thread once. Ensure the first job is running
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    // Job should be marked as failed
+    Assert.assertTrue(this.failedDagIds.contains(dagId));
+
+    // Next job should succeed as it doesn't exceed SLA
+    this.queue.offer(dag1);
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 1);
+    Assert.assertEquals(this.jobToDag.size(), 1);
+    Assert.assertEquals(this.dagToJobs.size(), 1);
+    Assert.assertTrue(this.dags.containsKey(dagId1));
+
+    // Cleanup
+    this._dagManagerThread.run();
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+  }
+
+
   @AfterClass
   public void cleanUp() throws Exception {
     FileUtils.deleteDirectory(new File(this.dagStateStoreDir));

Reply via email to