This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 07b3c68 Add guard in DagManager for improperly formed SLA (#3449)
07b3c68 is described below
commit 07b3c68c5e7bbe9ef1f73bd491994ba72b35ca1e
Author: William Lo <[email protected]>
AuthorDate: Fri Jan 7 15:50:57 2022 -0800
Add guard in DagManager for improperly formed SLA (#3449)
---
.../service/modules/orchestration/DagManager.java | 13 ++++++-
.../modules/orchestration/DagManagerTest.java | 43 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index e4a6293..d29fc5c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -52,6 +52,7 @@ import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigException;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -524,7 +525,7 @@ public class DagManager extends AbstractIdleService {
cleanUp();
log.debug("Clean up done");
} catch (Exception e) {
- log.error("Exception encountered in {}", getClass().getName(), e);
+ log.error(String.format("Exception encountered in %s",
getClass().getName()), e);
}
}
@@ -829,7 +830,15 @@ public class DagManager extends AbstractIdleService {
if (dagToSLA.containsKey(dagId)) {
flowSla = dagToSLA.get(dagId);
} else {
- flowSla = DagManagerUtils.getFlowSLA(node);
+ try {
+ flowSla = DagManagerUtils.getFlowSLA(node);
+ } catch (ConfigException e) {
+ log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in
invalid format, using default SLA of {}",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+ DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+ flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+ }
dagToSLA.put(dagId, flowSla);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 7180cf3..b072185 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -714,6 +714,49 @@ public class DagManagerTest {
Assert.assertEquals(this.dagToJobs.size(), 0);
}
+ @Test (dependsOnMethods = "testJobStartSLAKilledDag")
+ public void testDagManagerWithBadFlowSLAConfig() throws URISyntaxException,
IOException {
+ long flowExecutionId = System.currentTimeMillis();
+ String flowGroup = "group0";
+ String flowName = "flow0";
+ String jobName = "job0";
+
+ // Create a config with an improperly formatted Flow SLA time e.g. "1h"
+ Config jobConfig = ConfigBuilder.create().
+ addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + flowGroup).
+ addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + flowName).
+ addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
+ addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, flowGroup).
+ addPrimitive(ConfigurationKeys.JOB_NAME_KEY, jobName).
+ addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, "FINISH_RUNNING").
+ addPrimitive(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, "1h").build();
+
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ JobSpec js = JobSpec.builder("test_job" +
jobName).withVersion("0").withConfig(jobConfig).
+ withTemplate(new URI(jobName)).build();
+ SpecExecutor specExecutor = MockedSpecExecutor.createDummySpecExecutor(new
URI(jobName));
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
+ jobExecutionPlans.add(jobExecutionPlan);
+ Dag<JobExecutionPlan> dag = (new
JobExecutionPlanDagFactory()).createDag(jobExecutionPlans);
+ String dagId = DagManagerUtils.generateDagId(dag);
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dag);
+ // Job should have been run normally without breaking on SLA check, so we
can just mark as completed for status
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId+1, jobName,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator1);
+
+ // Run the thread once. Job should run without crashing thread on SLA
check and cleanup
+ this._dagManagerThread.run();
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ }
+
@AfterClass
public void cleanUp() throws Exception {