This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new faecb35600 [GOBBLIN-2181] Ensure `DagTask::conclude` after a
non-transient exception (#4084)
faecb35600 is described below
commit faecb356003bfb1909e026428372feee47f36d48
Author: vsinghal85 <[email protected]>
AuthorDate: Sat Dec 14 01:59:04 2024 +0530
[GOBBLIN-2181] Ensure `DagTask::conclude` after a non-transient exception
(#4084)
---
.../modules/orchestration/DagProcessingEngine.java | 15 ++
.../modules/orchestration/proc/DagProc.java | 29 ----
.../modules/orchestration/proc/DagProcUtils.java | 12 +-
.../orchestration/DagProcessingEngineTest.java | 15 +-
.../orchestration/proc/DagProcUtilsTest.java | 156 +++++++++++++++++++++
5 files changed, 190 insertions(+), 37 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 9ee21395b2..fbd77d8617 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService {
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS =
"defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartDeadlineTimeMillis;
public static final String DEFAULT_FLOW_FAILURE_OPTION =
FailureOption.FINISH_ALL_POSSIBLE.name();
+ // TODO Update to fetch list from config once transient exception handling
is implemented and retryable exceptions defined
+ public static final List<Class<? extends Exception>> retryableExceptions =
Collections.EMPTY_LIST;
@Inject
public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory,
@@ -85,6 +90,10 @@ public class DagProcessingEngine extends AbstractIdleService
{
defaultJobStartDeadlineTimeMillis = deadlineTimeMs;
}
+ public static boolean isTransientException(Exception e) {
+ return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions);
+ }
+
@Override
protected void startUp() {
Integer numThreads = ConfigUtils.getInt
@@ -151,6 +160,12 @@ public class DagProcessingEngine extends
AbstractIdleService {
} catch (Exception e) {
log.error("DagProcEngineThread: " +
dagProc.contextualizeStatus("error"), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+ if (!DagProcessingEngine.isTransientException(e)) {
+ log.warn(dagProc.contextualizeStatus("ignoring non-transient
exception by concluding so no retries"));
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+ dagTask.conclude();
+ }
+ // TODO add the else block for transient exceptions and add conclude
task only if retry limit is not breached
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 6c9694c80f..c84d0dca8b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -18,8 +18,6 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
import com.typesafe.config.Config;
@@ -33,15 +31,12 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.util.ExceptionUtils;
-import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagUtils;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
-import org.apache.gobblin.util.ConfigUtils;
/**
@@ -62,7 +57,6 @@ public abstract class DagProc<T> {
@Getter protected final Dag.DagId dagId;
@Getter protected final DagNodeId dagNodeId;
protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
- protected final List<Class<? extends Exception>> nonRetryableExceptions;
protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
metricContext, "org.apache.gobblin.service").build();
@@ -71,14 +65,6 @@ public abstract class DagProc<T> {
this.dagId =
DagUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
this.dagTask.getDagAction().getFlowName(),
this.dagTask.getDagAction().getFlowExecutionId());
this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
- this.nonRetryableExceptions = ConfigUtils.getStringList(config,
ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)
- .stream().map(className -> {
- try {
- return (Class<? extends Exception>) Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
}
public final void process(DagManagementStateStore dagManagementStateStore,
@@ -92,20 +78,9 @@ public abstract class DagProc<T> {
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
- try {
logContextualizedInfo("ready to process");
act(dagManagementStateStore, state, dagProcEngineMetrics);
logContextualizedInfo("processed");
- } catch (Exception e) {
- if (isNonTransientException(e)) {
- log.error("Ignoring non transient exception. DagTask {} will conclude
and will not be retried. Exception - {} ",
- getDagTask(), e);
-
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
-
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
- } else {
- throw e;
- }
- }
}
protected abstract T initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
@@ -126,8 +101,4 @@ public abstract class DagProc<T> {
public void logContextualizedInfo(String message) {
log.info(contextualizeStatus(message));
}
-
- protected boolean isNonTransientException(Exception e) {
- return ExceptionUtils.isExceptionInstanceOf(e,
this.nonRetryableExceptions);
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 346037f6d2..30bd89c98b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -74,7 +74,6 @@ public class DagProcUtils {
public static void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId) throws IOException {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagUtils.getNext(dag);
-
if (nextNodes.size() == 1) {
Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
dagId);
@@ -139,12 +138,15 @@ public class DagProcUtils {
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
- TimingEvent jobFailedTimer =
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " +
DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
- jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
- if (jobFailedTimer != null) {
- jobFailedTimer.stop(jobMetadata);
+ // Only mark the job as failed in case of non transient exceptions
+ if (!DagProcessingEngine.isTransientException(e)) {
+ TimingEvent jobFailedTimer =
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
+ jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
+ if (jobFailedTimer != null) {
+ jobFailedTimer.stop(jobMetadata);
+ }
}
try {
// when there is no exception, quota will be released in job status
monitor or re-evaluate dag proc
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 8fea5303ae..6e065fc1a7 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -190,15 +190,24 @@ public class DagProcessingEngineTest {
// (MAX_NUM_OF_TASKS + 1) th call
int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS +
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS /
MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
- int expectedNonRetryableExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS
/ MockedDagTaskStream.FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY;
AssertWithBackoff.assertTrue(input ->
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() ==
expectedNumOfInvocations,
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + "
number of times. "
+ "Actual number of invocations " +
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);
-
+ // Currently we are treating all exceptions as non retryable and
totalExceptionCount will be equal to count of non retryable exceptions
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
-
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
expectedNonRetryableExceptions);
+
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
expectedExceptions);
+ }
+
+ @Test
+ public void isNonTransientExceptionTest(){
+ /*
+ These exceptions examples are solely for testing purpose, ultimately it
would come down
+ to the config defined for the transient exceptions, when we implement
retry logic
+ */
+ Assert.assertTrue(!DagProcessingEngine.isTransientException(new
RuntimeException("Simulating a non retryable exception!")));
+ Assert.assertTrue(!DagProcessingEngine.isTransientException(new
AzkabanClientException("Simulating a retryable exception!")));
}
private enum ExceptionType {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
new file mode 100644
index 0000000000..7b128f120c
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class DagProcUtilsTest {
+
+ DagManagementStateStore dagManagementStateStore;
+ SpecExecutor mockSpecExecutor;
+
+ @BeforeMethod
+ public void setUp() {
+ dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+ mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
+ }
+
+ @Test
+ public void testSubmitNextNodesSuccess() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+ List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList =
jobExecutionPlans.stream()
+ .map(Dag.DagNode<JobExecutionPlan>::new)
+ .collect(Collectors.toList());
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+ Mockito.verify(dagManagementStateStore, Mockito.times(1))
+ .addJobDagAction(jobExecutionPlan.getFlowGroup(),
jobExecutionPlan.getFlowName(),
+ jobExecutionPlan.getFlowExecutionId(),
jobExecutionPlan.getJobName(),
+ DagActionStore.DagActionType.REEVALUATE);
+ }
+ Mockito.verifyNoMoreInteractions(dagManagementStateStore);
+ }
+
+ @Test
+ public void testWhenSubmitToExecutorSuccess() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
+ Dag.DagNode<JobExecutionPlan> dagNode = new
Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+ DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+ Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+ Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode);
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).getDagManagerMetrics();
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).tryAcquireQuota(Collections.singleton(dagNode));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).updateDagNode(dagNode);
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).addDagAction(Mockito.any(DagActionStore.DagAction.class));
+
+ Mockito.verify(metrics,
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+ Mockito.verify(metrics,
Mockito.times(1)).incrementJobsSentToExecutor(dagNode);
+ Mockito.verifyNoMoreInteractions(dagManagementStateStore);
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testWhenSubmitToExecutorGivesRuntimeException() throws
URISyntaxException, IOException, ExecutionException, InterruptedException{
+ Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
+ Dag.DagNode<JobExecutionPlan> dagNode = new
Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+ SpecProducer<Spec> mockedSpecProducer =
mockSpecExecutor.getProducer().get();
+
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
+ DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+ Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ Mockito.verify(mockedSpecProducer,
Mockito.times(1)).addSpec(Mockito.any(JobSpec.class));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).getDagManagerMetrics();
+ Mockito.verify(metrics,
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+ Mockito.verifyNoMoreInteractions(dagManagementStateStore);
+ }
+
+ private List<JobExecutionPlan> getJobExecutionPlans() throws
URISyntaxException {
+ Config flowConfig1 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"flowName1")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
+ Config flowConfig2 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"flowName2")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
+ Config flowConfig3 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"flowName3")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
+ List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2,
flowConfig3);
+
+ Config jobConfig1 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName1").build();
+ Config jobConfig2 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName2").build();
+ Config jobConfig3 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName3").build();
+ List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2,
jobConfig3);
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ Config jobConfig = jobConfigs.get(i);
+ FlowSpec flowSpec =
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
+ if (i == 2) {
+ jobExecutionPlans.add(new
JobExecutionPlan.Factory().createPlan(flowSpec,
+ jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
ConfigValueFactory.fromAnyRef("testUri")),
+ mockSpecExecutor, 0L, ConfigFactory.empty()));
+ } else {
+ jobExecutionPlans.add(new
JobExecutionPlan.Factory().createPlan(flowSpec,
+ jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
ConfigValueFactory.fromAnyRef("testUri")),
+ new InMemorySpecExecutor(ConfigFactory.empty()), 0L,
ConfigFactory.empty()));
+ }
+ }
+ return jobExecutionPlans;
+ }
+}
\ No newline at end of file