This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 07b3c68  Add guard in DagManager for improperly formed SLA (#3449)
07b3c68 is described below

commit 07b3c68c5e7bbe9ef1f73bd491994ba72b35ca1e
Author: William Lo <[email protected]>
AuthorDate: Fri Jan 7 15:50:57 2022 -0800

    Add guard in DagManager for improperly formed SLA (#3449)
---
 .../service/modules/orchestration/DagManager.java  | 13 ++++++-
 .../modules/orchestration/DagManagerTest.java      | 43 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index e4a6293..d29fc5c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -52,6 +52,7 @@ import com.google.common.eventbus.Subscribe;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigException;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -524,7 +525,7 @@ public class DagManager extends AbstractIdleService {
         cleanUp();
         log.debug("Clean up done");
       } catch (Exception e) {
-        log.error("Exception encountered in {}", getClass().getName(), e);
+        log.error(String.format("Exception encountered in %s", 
getClass().getName()), e);
       }
     }
 
@@ -829,7 +830,15 @@ public class DagManager extends AbstractIdleService {
       if (dagToSLA.containsKey(dagId)) {
         flowSla = dagToSLA.get(dagId);
       } else {
-        flowSla = DagManagerUtils.getFlowSLA(node);
+        try {
+          flowSla = DagManagerUtils.getFlowSLA(node);
+        } catch (ConfigException e) {
+          log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in 
invalid format, using default SLA of {}",
+              
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+              
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+              DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+          flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+        }
         dagToSLA.put(dagId, flowSla);
       }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 7180cf3..b072185 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -714,6 +714,49 @@ public class DagManagerTest {
     Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
+  @Test (dependsOnMethods = "testJobStartSLAKilledDag")
+  public void testDagManagerWithBadFlowSLAConfig() throws URISyntaxException, 
IOException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroup = "group0";
+    String flowName = "flow0";
+    String jobName = "job0";
+
+    // Create a config with an improperly formatted Flow SLA time e.g. "1h"
+    Config jobConfig = ConfigBuilder.create().
+        addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + flowGroup).
+        addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + flowName).
+        addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
+        addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, flowGroup).
+        addPrimitive(ConfigurationKeys.JOB_NAME_KEY, jobName).
+        addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, "FINISH_RUNNING").
+        addPrimitive(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, "1h").build();
+
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    JobSpec js = JobSpec.builder("test_job" + 
jobName).withVersion("0").withConfig(jobConfig).
+        withTemplate(new URI(jobName)).build();
+    SpecExecutor specExecutor = MockedSpecExecutor.createDummySpecExecutor(new 
URI(jobName));
+    JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
+    jobExecutionPlans.add(jobExecutionPlan);
+    Dag<JobExecutionPlan> dag = (new 
JobExecutionPlanDagFactory()).createDag(jobExecutionPlans);
+    String dagId = DagManagerUtils.generateDagId(dag);
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    // Job should have been run normally without breaking on SLA check, so we 
can just mark as completed for status
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId+1, jobName, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIterator1);
+
+    // Run the thread once. Job should run without crashing thread on SLA 
check and cleanup
+    this._dagManagerThread.run();
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+  }
+
 
   @AfterClass
   public void cleanUp() throws Exception {

Reply via email to