This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new faecb35600 [GOBBLIN-2181] Ensure `DagTask::conclude` after a 
non-transient exception (#4084)
faecb35600 is described below

commit faecb356003bfb1909e026428372feee47f36d48
Author: vsinghal85 <[email protected]>
AuthorDate: Sat Dec 14 01:59:04 2024 +0530

    [GOBBLIN-2181] Ensure `DagTask::conclude` after a non-transient exception 
(#4084)
---
 .../modules/orchestration/DagProcessingEngine.java |  15 ++
 .../modules/orchestration/proc/DagProc.java        |  29 ----
 .../modules/orchestration/proc/DagProcUtils.java   |  12 +-
 .../orchestration/DagProcessingEngineTest.java     |  15 +-
 .../orchestration/proc/DagProcUtilsTest.java       | 156 +++++++++++++++++++++
 5 files changed, 190 insertions(+), 37 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 9ee21395b2..fbd77d8617 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +41,7 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExceptionUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 
 
@@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService {
   public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = 
"defaultJobStartDeadlineTimeMillis";
   @Getter static long defaultJobStartDeadlineTimeMillis;
   public static final String DEFAULT_FLOW_FAILURE_OPTION = 
FailureOption.FINISH_ALL_POSSIBLE.name();
+  // TODO Update to fetch list from config once transient exception handling 
is implemented and retryable exceptions defined
+  public static final List<Class<? extends Exception>> retryableExceptions = 
Collections.EMPTY_LIST;
 
   @Inject
   public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
@@ -85,6 +90,10 @@ public class DagProcessingEngine extends AbstractIdleService 
{
     defaultJobStartDeadlineTimeMillis = deadlineTimeMs;
   }
 
+  public static boolean isTransientException(Exception e) {
+    return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions);
+  }
+
   @Override
   protected void startUp() {
     Integer numThreads = ConfigUtils.getInt
@@ -151,6 +160,12 @@ public class DagProcessingEngine extends 
AbstractIdleService {
         } catch (Exception e) {
           log.error("DagProcEngineThread: " + 
dagProc.contextualizeStatus("error"), e);
           
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+          if (!DagProcessingEngine.isTransientException(e)) {
+            log.warn(dagProc.contextualizeStatus("ignoring non-transient 
exception by concluding so no retries"));
+            
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+            dagTask.conclude();
+          }
+          // TODO add the else block for transient exceptions and add conclude 
task only if retry limit is not breached
         }
       }
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 6c9694c80f..c84d0dca8b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -18,8 +18,6 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
 
 import com.typesafe.config.Config;
 
@@ -33,15 +31,12 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.util.ExceptionUtils;
-import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagUtils;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
-import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -62,7 +57,6 @@ public abstract class DagProc<T> {
   @Getter protected final Dag.DagId dagId;
   @Getter protected final DagNodeId dagNodeId;
   protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
-  protected final List<Class<? extends Exception>> nonRetryableExceptions;
   protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
       metricContext, "org.apache.gobblin.service").build();
 
@@ -71,14 +65,6 @@ public abstract class DagProc<T> {
     this.dagId = 
DagUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
         this.dagTask.getDagAction().getFlowName(), 
this.dagTask.getDagAction().getFlowExecutionId());
     this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
-    this.nonRetryableExceptions = ConfigUtils.getStringList(config, 
ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)
-        .stream().map(className -> {
-          try {
-            return (Class<? extends Exception>) Class.forName(className);
-          } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
-          }
-        }).collect(Collectors.toList());
   }
 
   public final void process(DagManagementStateStore dagManagementStateStore,
@@ -92,20 +78,9 @@ public abstract class DagProc<T> {
       dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
       throw e;
     }
-    try {
       logContextualizedInfo("ready to process");
       act(dagManagementStateStore, state, dagProcEngineMetrics);
       logContextualizedInfo("processed");
-    } catch (Exception e) {
-      if (isNonTransientException(e)) {
-        log.error("Ignoring non transient exception. DagTask {} will conclude 
and will not be retried. Exception - {} ",
-            getDagTask(), e);
-        
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
-        
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
-      } else {
-        throw e;
-      }
-    }
   }
 
   protected abstract T initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
@@ -126,8 +101,4 @@ public abstract class DagProc<T> {
   public void logContextualizedInfo(String message) {
     log.info(contextualizeStatus(message));
   }
-
-  protected boolean isNonTransientException(Exception e) {
-    return ExceptionUtils.isExceptionInstanceOf(e, 
this.nonRetryableExceptions);
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 346037f6d2..30bd89c98b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -74,7 +74,6 @@ public class DagProcUtils {
   public static void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
       Dag.DagId dagId) throws IOException {
     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagUtils.getNext(dag);
-
     if (nextNodes.size() == 1) {
       Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
       DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
dagId);
@@ -139,12 +138,15 @@ public class DagProcUtils {
       dagManagementStateStore.updateDagNode(dagNode);
       sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
     } catch (Exception e) {
-      TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
       String message = "Cannot submit job " + 
DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
       log.error(message, e);
-      jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
-      if (jobFailedTimer != null) {
-        jobFailedTimer.stop(jobMetadata);
+      // Only mark the job as failed in case of non transient exceptions
+      if (!DagProcessingEngine.isTransientException(e)) {
+        TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
+        jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
+        if (jobFailedTimer != null) {
+          jobFailedTimer.stop(jobMetadata);
+        }
       }
       try {
         // when there is no exception, quota will be released in job status 
monitor or re-evaluate dag proc
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 8fea5303ae..6e065fc1a7 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -190,15 +190,24 @@ public class DagProcessingEngineTest {
     // (MAX_NUM_OF_TASKS + 1) th call
     int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
     int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / 
MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
-    int expectedNonRetryableExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS 
/ MockedDagTaskStream.FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY;
 
     AssertWithBackoff.assertTrue(input -> 
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() == 
expectedNumOfInvocations,
         10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " 
number of times. "
             + "Actual number of invocations " + 
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
         log, 1, 1000L);
-
+    // Currently we are treating all exceptions as non retryable and 
totalExceptionCount will be equal to count of non retryable exceptions
     
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
  expectedExceptions);
-    
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
  expectedNonRetryableExceptions);
+    
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
  expectedExceptions);
+  }
+
+  @Test
+  public void isNonTransientExceptionTest(){
+    /*
+      These exceptions examples are solely for testing purpose, ultimately it 
would come down
+      to the config defined for the transient exceptions, when we implement 
retry logic
+     */
+    Assert.assertTrue(!DagProcessingEngine.isTransientException(new 
RuntimeException("Simulating a non retryable exception!")));
+    Assert.assertTrue(!DagProcessingEngine.isTransientException(new 
AzkabanClientException("Simulating a retryable exception!")));
   }
 
   private enum ExceptionType {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
new file mode 100644
index 0000000000..7b128f120c
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class DagProcUtilsTest {
+
+  DagManagementStateStore dagManagementStateStore;
+  SpecExecutor mockSpecExecutor;
+
+  @BeforeMethod
+  public void setUp() {
+    dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+    mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
+  }
+
+  @Test
+  public void testSubmitNextNodesSuccess() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
jobExecutionPlans.stream()
+        .map(Dag.DagNode<JobExecutionPlan>::new)
+        .collect(Collectors.toList());
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      Mockito.verify(dagManagementStateStore, Mockito.times(1))
+          .addJobDagAction(jobExecutionPlan.getFlowGroup(), 
jobExecutionPlan.getFlowName(),
+              jobExecutionPlan.getFlowExecutionId(), 
jobExecutionPlan.getJobName(),
+              DagActionStore.DagActionType.REEVALUATE);
+    }
+    Mockito.verifyNoMoreInteractions(dagManagementStateStore);
+  }
+
+  @Test
+  public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode = new 
Dag.DagNode<>(jobExecutionPlan);
+    dagNodeList.add(dagNode);
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+    
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+    Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+    Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode);
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).getDagManagerMetrics();
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).tryAcquireQuota(Collections.singleton(dagNode));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).updateDagNode(dagNode);
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).addDagAction(Mockito.any(DagActionStore.DagAction.class));
+
+    Mockito.verify(metrics, 
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+    Mockito.verify(metrics, 
Mockito.times(1)).incrementJobsSentToExecutor(dagNode);
+    Mockito.verifyNoMoreInteractions(dagManagementStateStore);
+  }
+
+  @Test(expectedExceptions = RuntimeException.class)
+  public void testWhenSubmitToExecutorGivesRuntimeException() throws 
URISyntaxException, IOException, ExecutionException, InterruptedException{
+    Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
+    Dag.DagNode<JobExecutionPlan> dagNode = new 
Dag.DagNode<>(jobExecutionPlan);
+    dagNodeList.add(dagNode);
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    SpecProducer<Spec> mockedSpecProducer = 
mockSpecExecutor.getProducer().get();
+    
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
+    DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+    
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+    Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    Mockito.verify(mockedSpecProducer, 
Mockito.times(1)).addSpec(Mockito.any(JobSpec.class));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).getDagManagerMetrics();
+    Mockito.verify(metrics, 
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+    Mockito.verifyNoMoreInteractions(dagManagementStateStore);
+  }
+
+  private List<JobExecutionPlan> getJobExecutionPlans() throws 
URISyntaxException {
+    Config flowConfig1 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
"flowName1")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
+    Config flowConfig2 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
"flowName2")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
+    Config flowConfig3 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
"flowName3")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
+    List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, 
flowConfig3);
+
+    Config jobConfig1 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName1").build();
+    Config jobConfig2 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName2").build();
+    Config jobConfig3 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName3").build();
+    List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, 
jobConfig3);
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      Config jobConfig = jobConfigs.get(i);
+      FlowSpec flowSpec = 
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
+      if (i == 2) {
+        jobExecutionPlans.add(new 
JobExecutionPlan.Factory().createPlan(flowSpec,
+            jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, 
ConfigValueFactory.fromAnyRef("testUri")),
+            mockSpecExecutor, 0L, ConfigFactory.empty()));
+      } else {
+        jobExecutionPlans.add(new 
JobExecutionPlan.Factory().createPlan(flowSpec,
+            jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, 
ConfigValueFactory.fromAnyRef("testUri")),
+            new InMemorySpecExecutor(ConfigFactory.empty()), 0L, 
ConfigFactory.empty()));
+      }
+    }
+    return jobExecutionPlans;
+  }
+}
\ No newline at end of file

Reply via email to