This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a55c89c5e2 [GOBBLIN-2124] only retry transient exceptions (#4016)
a55c89c5e2 is described below

commit a55c89c5e22d7af77bae9cc361865e83494248bb
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Aug 5 17:34:51 2024 -0700

    [GOBBLIN-2124] only retry transient exceptions (#4016)
    
    * only retry non-transient exceptions
    * address review comment
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   2 +
 .../apache/gobblin/metrics/ServiceMetricNames.java |   3 +
 .../main/java/org/apache/gobblin/runtime/Task.java |   4 +-
 .../java/org/apache/gobblin/runtime/fork/Fork.java |   4 +-
 .../modules/orchestration/DagManagerMetrics.java   |   7 ++
 .../modules/orchestration/DagProcFactory.java      |  18 +--
 .../modules/orchestration/DagProcessingEngine.java |   3 +-
 .../modules/orchestration/proc/DagProc.java        |  37 +++++-
 .../proc/DeadlineEnforcementDagProc.java           |   6 +-
 .../proc/EnforceFlowFinishDeadlineDagProc.java     |   7 +-
 .../proc/EnforceJobStartDeadlineDagProc.java       |   6 +-
 .../modules/orchestration/proc/KillDagProc.java    |   6 +-
 .../modules/orchestration/proc/LaunchDagProc.java  |   7 +-
 .../orchestration/proc/ReevaluateDagProc.java      |   6 +-
 .../modules/orchestration/proc/ResumeDagProc.java  |   5 +-
 .../modules/orchestration/task/DagTask.java        |   6 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  12 +-
 .../monitoring/KafkaJobStatusMonitorFactory.java   |  12 +-
 .../DagManagementTaskStreamImplTest.java           |   3 +-
 .../orchestration/DagProcessingEngineTest.java     | 126 ++++++++++++---------
 .../modules/orchestration/OrchestratorTest.java    |   4 +-
 .../proc/EnforceDeadlineDagProcsTest.java          |   8 +-
 .../orchestration/proc/KillDagProcTest.java        |  11 +-
 .../orchestration/proc/LaunchDagProcTest.java      |   4 +-
 .../orchestration/proc/ReevaluateDagProcTest.java  |  15 ++-
 .../orchestration/proc/ResumeDagProcTest.java      |   4 +-
 .../org/apache/gobblin/util/ExceptionUtils.java    |  24 +++-
 .../apache/gobblin/util/ExceptionUtilsTest.java    |  31 +++--
 28 files changed, 248 insertions(+), 133 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index c37a7f5e95..5e3bab4009 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -207,6 +207,8 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
   public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
   public static final String NUM_DAG_PROC_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
+
   public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
   public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = 
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index cc80975766..ea7b3124e0 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -80,6 +80,8 @@ public class ServiceMetricNames {
   public static final String DAG_COUNT_FS_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
 
   public static final String DAG_PROCESSING_EXCEPTION_METER = 
"DagProcessingException";
+  public static final String 
DAG_ACTIONS_CREATE_EXCEPTIONS_IN_JOB_STATUS_MONITOR = 
"DagActionsCreateExceptionsInJobStatusMonitor";
+
   /* DagProcessingEngine & Multi-active Execution Related Metrics
   * Note: metrics ending with the delimiter '.' will be suffixed by the 
specific {@link DagActionType} type for finer
   * grained monitoring of each dagAction type in addition to the aggregation 
of all types.
@@ -102,4 +104,5 @@ public class ServiceMetricNames {
   public static final String DAG_ACTIONS_DELETE_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded.";
   public static final String DAG_ACTIONS_DELETE_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed.";
   public static final String DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsAvgProcessingDelayMillis.";
+  public static final String DAG_PROCESSING_NON_RETRYABLE_EXCEPTION_METER = 
"DagProcessingNonRetryableException";
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 7c315e3079..7ddb4f11f2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -80,7 +80,7 @@ import org.apache.gobblin.runtime.fork.AsynchronousFork;
 import org.apache.gobblin.runtime.fork.Fork;
 import org.apache.gobblin.runtime.fork.SynchronousFork;
 import org.apache.gobblin.runtime.task.TaskIFace;
-import org.apache.gobblin.runtime.util.ExceptionCleanupUtils;
+import org.apache.gobblin.util.ExceptionUtils;
 import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -566,7 +566,7 @@ public class Task implements TaskIFace {
   }
 
   protected void failTask(Throwable t) {
-    Throwable cleanedException = ExceptionCleanupUtils.removeEmptyWrappers(t);
+    Throwable cleanedException = ExceptionUtils.removeEmptyWrappers(t);
 
     LOG.error(String.format("Task %s failed", this.taskId), cleanedException);
     this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 9c58e6cf57..ed83c0d538 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -56,7 +56,7 @@ import org.apache.gobblin.runtime.Task;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.util.ExceptionCleanupUtils;
+import org.apache.gobblin.util.ExceptionUtils;
 import org.apache.gobblin.runtime.util.ForkMetrics;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.stream.ControlMessage;
@@ -266,7 +266,7 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
 
       compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
     } catch (Throwable t) {
-      Throwable cleanedUpException = 
ExceptionCleanupUtils.removeEmptyWrappers(t);
+      Throwable cleanedUpException = ExceptionUtils.removeEmptyWrappers(t);
 
       // Set throwable to holder first because AsynchronousFork::putRecord can 
pull the throwable when it detects ForkState.FAILED status.
       ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index 8931bd75e2..d5bbcffb3f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -63,6 +63,9 @@ public class DagManagerMetrics {
   private ContextAwareMeter allSlaExceededMeter;
   private ContextAwareMeter allStartSlaExceededMeter;
   public ContextAwareMeter dagProcessingExceptionMeter;
+  public ContextAwareMeter dagProcessingNonRetryableExceptionMeter;
+  public ContextAwareMeter dagActionCreationExceptionsInJobStatusMonitor;
+
   // Meters representing the flows in a given state per flowgroup
   private final Map<String, ContextAwareMeter> groupSuccessfulMeters = 
Maps.newConcurrentMap();
   private final Map<String, ContextAwareMeter> groupFailureMeters = 
Maps.newConcurrentMap();
@@ -103,6 +106,10 @@ public class DagManagerMetrics {
           ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
       dagProcessingExceptionMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
           ServiceMetricNames.DAG_PROCESSING_EXCEPTION_METER));
+      dagProcessingNonRetryableExceptionMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.DAG_PROCESSING_NON_RETRYABLE_EXCEPTION_METER));
+      dagActionCreationExceptionsInJobStatusMonitor = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          
ServiceMetricNames.DAG_ACTIONS_CREATE_EXCEPTIONS_IN_JOB_STATUS_MONITOR));
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 0503f2d0c0..10b9bf27a8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@@ -37,7 +38,6 @@ import 
org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
-
 /**
  * {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived 
class to its companion {@link DagProc} derived class.
  * Each {@link DagTask} needs it own {@link DagProcFactory#meet} method 
overload to create {@link DagProc} that is
@@ -48,41 +48,43 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 @Singleton
 public class DagProcFactory implements DagTaskVisitor<DagProc<?>> {
 
+  private final Config config;
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
 
   @Inject
-  public DagProcFactory(FlowCompilationValidationHelper 
flowCompilationValidationHelper) {
+  public DagProcFactory(Config config, FlowCompilationValidationHelper 
flowCompilationValidationHelper) {
+    this.config = config;
     this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   public EnforceFlowFinishDeadlineDagProc 
meet(EnforceFlowFinishDeadlineDagTask enforceFlowFinishDeadlineDagTask) {
-    return new 
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask);
+    return new 
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask, this.config);
   }
 
   @Override
   public EnforceJobStartDeadlineDagProc meet(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
-    return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask);
+    return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask, 
this.config);
   }
 
   @Override
   public LaunchDagProc meet(LaunchDagTask launchDagTask) {
-    return new LaunchDagProc(launchDagTask, 
this.flowCompilationValidationHelper);
+    return new LaunchDagProc(launchDagTask, 
this.flowCompilationValidationHelper, this.config);
   }
 
   @Override
   public ReevaluateDagProc meet(ReevaluateDagTask reEvaluateDagTask) {
-    return new ReevaluateDagProc(reEvaluateDagTask);
+    return new ReevaluateDagProc(reEvaluateDagTask, this.config);
   }
 
   @Override
   public KillDagProc meet(KillDagTask killDagTask) {
-    return new KillDagProc(killDagTask);
+    return new KillDagProc(killDagTask, this.config);
   }
 
   @Override
   public ResumeDagProc meet(ResumeDagTask resumeDagTask) {
-    return new ResumeDagProc(resumeDagTask);
+    return new ResumeDagProc(resumeDagTask, this.config);
   }
 }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 813c81b3d9..582281dec5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -48,7 +48,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
  * Each {@link DagTask} returned from the {@link DagTaskStream} comes with a 
time-limited lease conferring the exclusive
  * right to perform the work of the task.
  * The {@link DagProcFactory} transforms each {@link DagTask} into a specific, 
concrete {@link DagProc}, which
- * encapsulates all processing inside {@link 
DagProc#process(DagManagementStateStore)}
+ * encapsulates all processing inside {@link 
DagProc#process(DagManagementStateStore, DagProcessingEngineMetrics)}
  */
 
 @AllArgsConstructor
@@ -138,6 +138,7 @@ public class DagProcessingEngine extends 
AbstractIdleService {
         try {
           dagProc.process(dagManagementStateStore, dagProcEngineMetrics);
           dagTask.conclude();
+          log.info("Concluded dagTask : {}", dagTask);
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
           
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 3c8aadc6eb..da07b23bf0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -18,6 +18,10 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.typesafe.config.Config;
 
 import lombok.Data;
 import lombok.Getter;
@@ -28,6 +32,8 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ExceptionUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -35,6 +41,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -55,14 +62,23 @@ public abstract class DagProc<T> {
   @Getter protected final DagManager.DagId dagId;
   @Getter protected final DagNodeId dagNodeId;
   protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
+  protected final List<Class<? extends Exception>> nonRetryableExceptions;
   protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
       metricContext, "org.apache.gobblin.service").build();
 
-  public DagProc(DagTask dagTask) {
+  public DagProc(DagTask dagTask, Config config) {
     this.dagTask = dagTask;
     this.dagId = 
DagManagerUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
         this.dagTask.getDagAction().getFlowName(), 
this.dagTask.getDagAction().getFlowExecutionId());
     this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
+    this.nonRetryableExceptions = ConfigUtils.getStringList(config, 
ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)
+        .stream().map(className -> {
+          try {
+            return (Class<? extends Exception>) Class.forName(className);
+          } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+          }
+        }).collect(Collectors.toList());
   }
 
   public final void process(DagManagementStateStore dagManagementStateStore,
@@ -75,8 +91,19 @@ public abstract class DagProc<T> {
       dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
       throw e;
     }
-    act(dagManagementStateStore, state, dagProcEngineMetrics);
-    log.info("{} concluded processing for dagId : {}", 
getClass().getSimpleName(), this.dagId);
+    try {
+      act(dagManagementStateStore, state, dagProcEngineMetrics);
+      log.info("{} processed dagId : {}", getClass().getSimpleName(), 
this.dagId);
+    } catch (Exception e) {
+      if (isNonTransientException(e)) {
+        log.error("Ignoring non transient exception. DagTask {} will conclude 
and will not be retried. Exception - {} ",
+            getDagTask(), e);
+        
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+        
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+      } else {
+        throw e;
+      }
+    }
   }
 
   protected abstract T initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
@@ -87,4 +114,8 @@ public abstract class DagProc<T> {
   public DagActionStore.DagActionType getDagActionType() {
     return this.dagTask.getDagAction().getDagActionType();
   }
+
+  protected boolean isNonTransientException(Exception e) {
+    return ExceptionUtils.isExceptionInstanceOf(e, 
this.nonRetryableExceptions);
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
index db41ecc206..e546544716 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.util.Optional;
 
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -35,8 +37,8 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 @Slf4j
 abstract public class DeadlineEnforcementDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
 
-  public DeadlineEnforcementDagProc(DagTask dagTask) {
-    super(dagTask);
+  public DeadlineEnforcementDagProc(DagTask dagTask, Config config) {
+    super(dagTask, config);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index b9b5d179e9..f6d48a1450 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.util.List;
 
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -38,8 +40,9 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 @Slf4j
 public class EnforceFlowFinishDeadlineDagProc extends 
DeadlineEnforcementDagProc {
 
-  public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
-    super(enforceFlowFinishDeadlineDagTask);
+  public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask,
+      Config config) {
+    super(enforceFlowFinishDeadlineDagTask, config);
   }
 
   protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 6eccc1ab1a..8a05ab4bfd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -22,6 +22,8 @@ import java.util.Optional;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -45,8 +47,8 @@ import static 
org.apache.gobblin.service.ExecutionStatus.valueOf;
 @Slf4j
 public class EnforceJobStartDeadlineDagProc extends DeadlineEnforcementDagProc 
{
 
-  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
-    super(enforceJobStartDeadlineDagTask);
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask, Config config) {
+    super(enforceJobStartDeadlineDagTask, config);
   }
 
   protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index c429d754ab..c2a90dcc3b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.util.Optional;
 
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -39,8 +41,8 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 public class KillDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
   private final boolean shouldKillSpecificJob;
 
-  public KillDagProc(KillDagTask killDagTask) {
-    super(killDagTask);
+  public KillDagProc(KillDagTask killDagTask, Config config) {
+    super(killDagTask, config);
     this.shouldKillSpecificJob = 
!getDagNodeId().getJobName().equals(DagActionStore.NO_JOB_NAME_DEFAULT);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index c72c1fcf96..0efef83139 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Optional;
 
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -41,8 +43,9 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
-    super(launchDagTask);
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper,
+      Config config) {
+    super(launchDagTask, config);
     this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 8c7a8c8abd..04ea1258dc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -22,6 +22,8 @@ import java.util.Optional;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -44,8 +46,8 @@ import org.apache.gobblin.service.monitoring.JobStatus;
 @Slf4j
 public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>> {
 
-  public ReevaluateDagProc(ReevaluateDagTask reEvaluateDagTask) {
-    super(reEvaluateDagTask);
+  public ReevaluateDagProc(ReevaluateDagTask reEvaluateDagTask, Config config) 
{
+    super(reEvaluateDagTask, config);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 8a94c4ef12..8326d83f09 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -46,8 +47,8 @@ import static 
org.apache.gobblin.service.ExecutionStatus.PENDING_RESUME;
 @Slf4j
 public class ResumeDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
 
-  public ResumeDagProc(ResumeDagTask resumeDagTask) {
-    super(resumeDagTask);
+  public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) {
+    super(resumeDagTask, config);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 1d3ac86564..6a7f981183 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -63,11 +63,7 @@ public abstract class DagTask {
     try {
       this.dagManagementStateStore.deleteDagAction(this.dagAction);
       boolean completedLease = this.leaseObtainedStatus.completeLease();
-      if (completedLease) {
-        
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 true);
-      } else {
-        
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 false);
-      }
+      
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 completedLease);
       return completedLease;
     } catch (IOException e) {
       
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 false);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d5692be371..7061cd61f5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -70,6 +70,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExceptionUtils;
 import org.apache.gobblin.util.retry.RetryerFactory;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
@@ -236,9 +237,10 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
               try {
                 this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
               } catch (IOException e) {
-                if (e.getCause() != null && 
isThrowableInstanceOf(e.getCause(), nonRetryableExceptions)) {
-                  // todo - add metrics
-                  log.warn("Duplicate REEVALUATE Dag Action is being created. 
Ignoring... " + e.getMessage());
+                if (ExceptionUtils.isExceptionInstanceOf(e, 
nonRetryableExceptions)) {
+                  
this.dagManagementStateStore.getDagManagerMetrics().dagActionCreationExceptionsInJobStatusMonitor.mark();
+                  log.error("Could not add REEVALUATE dag action for flow 
group - {}, flow name - {}, flowExecutionId - {}, "
+                      + "jobName = {} due to {}. Ignoring...", flowGroup, 
flowName, flowExecutionId, jobName, e.getMessage());
                 } else {
                   throw e;
                 }
@@ -425,8 +427,4 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   protected abstract GobblinTrackingEvent 
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message);
 
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
-
-  public static boolean isThrowableInstanceOf(Throwable exception, 
List<Class<? extends Exception>> typesList) {
-    return typesList.stream().anyMatch(e -> e.isInstance(exception));
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 756e04f73e..4d0dbd45db 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -36,6 +36,7 @@ import 
org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -57,12 +58,14 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
 
   @Inject
   public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
-      GobblinInstanceEnvironment env, DagManagementStateStore 
dagManagementStateStore, @Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean 
dagProcEngineEnabled) {
-    this(config, jobIssueEventHandler, issueRepository, 
env.isInstrumentationEnabled(), dagManagementStateStore, dagProcEngineEnabled);
+      GobblinInstanceEnvironment env, DagManagementStateStore 
dagManagementStateStore,
+      @Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean 
dagProcEngineEnabled, DagProcessingEngineMetrics dagProcessingEngineMetrics) {
+    this(config, jobIssueEventHandler, issueRepository, 
env.isInstrumentationEnabled(), dagManagementStateStore,
+        dagProcEngineEnabled, dagProcessingEngineMetrics);
   }
 
   public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
-      boolean instrumentationEnabled, DagManagementStateStore 
dagManagementStateStore, boolean dagProcEngineEnabled) {
+      boolean instrumentationEnabled, DagManagementStateStore 
dagManagementStateStore, boolean dagProcEngineEnabled, 
DagProcessingEngineMetrics dagProcessingEngineMetrics) {
     this.config = Objects.requireNonNull(config);
     this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
     this.issueRepository = issueRepository;
@@ -103,7 +106,8 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
         observabilityEventProducerClassName, 
ConfigUtils.configToState(config), this.issueRepository, 
this.instrumentationEnabled);
 
     return (KafkaJobStatusMonitor) GobblinConstructorUtils
-        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer, 
dagManagementStateStore);
+        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
+            dagManagementStateStore);
   }
 
   @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index db30b928bd..2244e7dd11 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -70,7 +71,7 @@ public class DagManagementTaskStreamImplTest {
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
             mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
             false, mock(DagManagementStateStore.class), 
mock(DagProcessingEngineMetrics.class));
-    this.dagProcFactory = new DagProcFactory(null);
+    this.dagProcFactory = new DagProcFactory(ConfigFactory.empty(), null);
     this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
         this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore, mock(DagProcessingEngineMetrics.class), 0);
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index a5a4f168ec..e8688680b5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -22,7 +22,6 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.TimeoutException;
 
-import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -30,15 +29,18 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.NonTransientException;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
@@ -52,9 +54,7 @@ public class DagProcessingEngineTest {
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_TABLE = "quotas";
-  private DagManagementTaskStreamImpl dagManagementTaskStream;
   private DagTaskStream dagTaskStream;
-  private DagProcFactory dagProcFactory;
   // Field is static because it's used to instantiate every MockedDagTask
   private static MySqlDagManagementStateStore dagManagementStateStore;
   private static DagProcessingEngineMetrics mockedDagProcEngineMetrics;
@@ -70,32 +70,26 @@ public class DagProcessingEngineTest {
     leaseObtainedStatus = mock(LeaseAttemptStatus.LeaseObtainedStatus.class);
     doReturn(true).when(leaseObtainedStatus).completeLease();
 
-    Config config;
     ConfigBuilder configBuilder = ConfigBuilder.create();
     
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
-    config = configBuilder.build();
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE)
+        
.addPrimitive(ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY, 
NonTransientException.class.getName());
+
+    Config config = configBuilder.build();
 
     dagManagementStateStore = 
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
     doReturn(true).when(dagActionStore).deleteDagAction(any());
-    dagManagementTaskStream =
-        new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
-            mock(MultiActiveLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),
-            false, dagManagementStateStore, 
Mockito.mock(DagProcessingEngineMetrics.class));
-    this.dagProcFactory = new DagProcFactory(null);
-
-    DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
-        new DagProcessingEngine.DagProcEngineThread(dagManagementTaskStream, 
this.dagProcFactory,
-            dagManagementStateStore, mock(DagProcessingEngineMetrics.class), 
0);
+    DagProcFactory dagProcFactory = new DagProcFactory(ConfigFactory.empty(), 
null);
+
     this.dagTaskStream = spy(new MockedDagTaskStream());
     DagProcessingEngine dagProcessingEngine =
-        new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream), 
Optional.ofNullable(this.dagProcFactory),
+        new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream), 
Optional.of(dagProcFactory),
             Optional.ofNullable(dagManagementStateStore), 100000L, 
mock(DagProcessingEngineMetrics.class));
     dagProcessingEngine.startAsync();
-    this.mockedDagProcEngineMetrics = mock(DagProcessingEngineMetrics.class);
+    mockedDagProcEngineMetrics = mock(DagProcessingEngineMetrics.class);
   }
 
   @AfterClass(alwaysRun = true)
@@ -107,7 +101,9 @@ public class DagProcessingEngineTest {
   static class MockedDagTaskStream implements DagTaskStream {
     public static final int MAX_NUM_OF_TASKS = 1000;
     public static final int FAILING_DAGS_FREQUENCY = 10;
-    volatile int i=0;
+    // this number should be a multiple of FAILING_DAGS_FREQUENCY, so that all 
non-retryable exceptions should also be exceptions
+    public static final int 
FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY = 30;
+    volatile int i = 0;
 
     @Override
     public boolean hasNext() {
@@ -115,56 +111,72 @@ public class DagProcessingEngineTest {
     }
 
     @Override
-    public synchronized DagTask next() throws NoSuchElementException {
+    public synchronized DagTask next()
+        throws NoSuchElementException {
       i++;
       if (i > MAX_NUM_OF_TASKS) {
         throw new RuntimeException("Simulating an exception to stop the 
thread!");
       }
-      if (i % FAILING_DAGS_FREQUENCY == 0 ) {
-        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), true);
+      if (i % FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY == 0) {
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, (1234L + i), "jn-" + i,
+            DagActionStore.DagActionType.LAUNCH), 
ExceptionType.NON_RETRYABLE_EXCEPTION);
       } else {
-        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), false);
+        if (i % FAILING_DAGS_FREQUENCY == 0) {
+          return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, 
"fn-" + i, (1234L + i), "jn-" + i,
+              DagActionStore.DagActionType.LAUNCH), 
ExceptionType.RETRYABLE_EXCEPTION);
+        } else {
+          return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, 
"fn-" + i, (1234L + i), "jn-" + i,
+              DagActionStore.DagActionType.LAUNCH), 
ExceptionType.NO_EXCEPTION);
+        }
       }
     }
-  }
 
-  static class MockedDagTask extends DagTask {
-    private final boolean isBad;
+    static class MockedDagTask extends DagTask {
+      private final ExceptionType exceptionType;
 
-    public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
-      super(dagAction, leaseObtainedStatus, 
DagProcessingEngineTest.dagManagementStateStore, mockedDagProcEngineMetrics);
-      this.isBad = isBad;
-    }
+      public MockedDagTask(DagActionStore.DagAction dagAction, ExceptionType 
exceptionType) {
+        super(dagAction, leaseObtainedStatus, 
DagProcessingEngineTest.dagManagementStateStore,
+            mockedDagProcEngineMetrics);
+        this.exceptionType = exceptionType;
+      }
 
-    @Override
-    public <T> T host(DagTaskVisitor<T> visitor) {
-      return (T) new MockedDagProc(this, this.isBad);
+      @Override
+      public <T> T host(DagTaskVisitor<T> visitor) {
+        return (T) new MockedDagProc(this, this.exceptionType);
+      }
     }
-  }
 
-  static class MockedDagProc extends DagProc<Void> {
-    private final boolean isBad;
+    static class MockedDagProc extends DagProc<Void> {
+      private final ExceptionType exceptionType;
 
-    public MockedDagProc(MockedDagTask mockedDagTask, boolean isBad) {
-      super(mockedDagTask);
-      this.isBad = isBad;
-    }
+      public MockedDagProc(MockedDagTask mockedDagTask, ExceptionType 
exceptionType) {
+        super(mockedDagTask, ConfigBuilder.create()
+            
.addPrimitive(ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY, 
NonTransientException.class.getName())
+            .build());
+        this.exceptionType = exceptionType;
+      }
 
-    @Override
-    public DagManager.DagId getDagId() {
-      return new DagManager.DagId("fg", "fn", 12345L);
-    }
+      @Override
+      public DagManager.DagId getDagId() {
+        return new DagManager.DagId("fg", "fn", 12345L);
+      }
 
-    @Override
-    protected Void initialize(DagManagementStateStore dagManagementStateStore) 
{
-      return null;
-    }
+      @Override
+      protected Void initialize(DagManagementStateStore 
dagManagementStateStore) {
+        return null;
+      }
 
-    @Override
-    protected void act(DagManagementStateStore dagManagementStateStore, Void 
state,
-        DagProcessingEngineMetrics dagProcEngineMetrics) {
-      if (this.isBad) {
-        throw new RuntimeException("Simulating an exception!");
+      @Override
+      protected void act(DagManagementStateStore dagManagementStateStore, Void 
state,
+          DagProcessingEngineMetrics dagProcEngineMetrics) {
+
+        switch (this.exceptionType) {
+          case NON_RETRYABLE_EXCEPTION:
+            throw new NonTransientException("Simulating a non retryable 
exception!");
+          case RETRYABLE_EXCEPTION:
+            throw new RuntimeException("Simulating an exception!");
+          default:
+        }
       }
     }
   }
@@ -172,13 +184,14 @@ public class DagProcessingEngineTest {
   // This tests verifies that all the dag tasks entered to the dag task stream 
are retrieved by dag proc engine threads
   @Test
   public void dagProcessingTest()
-      throws InterruptedException, TimeoutException, IOException {
+      throws InterruptedException, TimeoutException {
 
     // there are MAX_NUM_OF_TASKS dag tasks returned and then each thread 
additionally call (infinitely) once to wait
     // in this unit tests, it does not infinitely wait though, because the 
mocked task stream throws an exception on
     // (MAX_NUM_OF_TASKS + 1) th call
     int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
     int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / 
MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
+    int expectedNonRetryableExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS 
/ MockedDagTaskStream.FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY;
 
     AssertWithBackoff.assertTrue(input -> 
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() == 
expectedNumOfInvocations,
         10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " 
number of times. "
@@ -186,5 +199,12 @@ public class DagProcessingEngineTest {
         log, 1, 1000L);
 
     
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
  expectedExceptions);
+    
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
  expectedNonRetryableExceptions);
+  }
+
+  private enum ExceptionType {
+    NO_EXCEPTION,
+    RETRYABLE_EXCEPTION,
+    NON_RETRYABLE_EXCEPTION
   }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index df220b7ccc..47f3c2a4d3 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -207,7 +207,7 @@ public class OrchestratorTest {
     SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config);
 
     TopologySpec.Builder topologySpecBuilder = 
TopologySpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
-        TOPOLOGY_SPEC_STORE_DIR))
+            TOPOLOGY_SPEC_STORE_DIR))
         .withConfig(config)
         .withDescription(SPEC_DESCRIPTION)
         .withVersion(SPEC_VERSION)
@@ -253,7 +253,7 @@ public class OrchestratorTest {
     FlowSpec.Builder flowSpecBuilder = null;
     try {
       flowSpecBuilder = 
FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
-          FLOW_SPEC_GROUP_DIR))
+              FLOW_SPEC_GROUP_DIR))
           .withConfig(config)
           .withDescription(SPEC_DESCRIPTION)
           .withVersion(SPEC_VERSION)
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 2e64b71c40..470083154b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -22,7 +22,6 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.tuple.Pair;
-import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
@@ -45,6 +44,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
 import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -107,7 +107,7 @@ public class EnforceDeadlineDagProcsTest {
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
         new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
             "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null,
-            dagManagementStateStore, mockedDagProcEngineMetrics));
+            dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     enforceJobStartDeadlineDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
@@ -150,7 +150,7 @@ public class EnforceDeadlineDagProcsTest {
     EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
         new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
             "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null,
-            dagManagementStateStore, mockedDagProcEngineMetrics));
+            dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     enforceJobStartDeadlineDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     // no job cancelled because we simulated (by not adding) missing dag action
@@ -192,7 +192,7 @@ public class EnforceDeadlineDagProcsTest {
     EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new 
EnforceFlowFinishDeadlineDagProc(
         new EnforceFlowFinishDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
             "job0", 
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null,
-            dagManagementStateStore, mockedDagProcEngineMetrics));
+            dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.times(1)).cancelJob(any(), any()));
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index bc53139b31..519a54f3e7 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -49,6 +48,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -96,7 +96,7 @@ public class KillDagProcTest {
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow1",
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.LAUNCH),
-        null, this.dagManagementStateStore, mockedDagProcEngineMetrics), 
flowCompilationValidationHelper);
+        null, this.dagManagementStateStore, mockedDagProcEngineMetrics), 
flowCompilationValidationHelper, ConfigFactory.empty());
     launchDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -109,7 +109,7 @@ public class KillDagProcTest {
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow1",
        flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.KILL),
-        null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+        null, this.dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     killDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     long cancelJobCount = specProducers.stream()
@@ -142,7 +142,8 @@ public class KillDagProcTest {
 
     LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "flow2",
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.LAUNCH),
-        null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics), 
flowCompilationValidationHelper);
+        null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics), 
flowCompilationValidationHelper,
+        ConfigFactory.empty());
     launchDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -155,7 +156,7 @@ public class KillDagProcTest {
 
     KillDagProc killDagProc = new KillDagProc(new KillDagTask(new 
DagActionStore.DagAction("fg", "flow2",
         flowExecutionId, "job2", DagActionStore.DagActionType.KILL),
-        null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics));
+        null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     killDagProc.process(this.dagManagementStateStore, 
this.mockedDagProcEngineMetrics);
 
     long cancelJobCount = specProducers.stream()
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 5b36851ebf..45caa295ea 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -115,7 +115,7 @@ public class LaunchDagProcTest {
         new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, "job0",
             DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore,
             this.mockedDagProcEngineMetrics),
-        flowCompilationValidationHelper);
+        flowCompilationValidationHelper, ConfigFactory.empty());
 
     launchDagProc.process(this.dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
@@ -146,7 +146,7 @@ public class LaunchDagProcTest {
         new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId,
             "jn", DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore,
             this.mockedDagProcEngineMetrics),
-        flowCompilationValidationHelper);
+        flowCompilationValidationHelper, ConfigFactory.empty());
 
     launchDagProc.process(this.dagManagementStateStore, 
mockedDagProcEngineMetrics);
     int numOfLaunchedJobs = 3; // = number of start nodes
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 2d3d02461f..3f856c2085 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -118,7 +118,7 @@ public class ReevaluateDagProcTest {
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
-        dagManagementStateStore, mockedDagProcEngineMetrics));
+        dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
     // next job is sent to spec producer
     Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
@@ -168,7 +168,7 @@ public class ReevaluateDagProcTest {
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
-        dagManagementStateStore, mockedDagProcEngineMetrics));
+        dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     // no new job to launch for this one job flow
@@ -203,7 +203,7 @@ public class ReevaluateDagProcTest {
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
-        dagManagementStateStore, mockedDagProcEngineMetrics));
+        dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int numOfLaunchedJobs = 1; // only the current job
@@ -248,7 +248,7 @@ public class ReevaluateDagProcTest {
     ReevaluateDagProc
         reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE), 
null,
-        dagManagementStateStore, mockedDagProcEngineMetrics));
+        dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     // process 4th job
     reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
@@ -282,10 +282,9 @@ public class ReevaluateDagProcTest {
     doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), 
Optional.of(jobStatus)))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
 
-    ReevaluateDagProc
-        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
-        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
-        dagManagementStateStore, mockedDagProcEngineMetrics));
+    ReevaluateDagProc reEvaluateDagProc = new ReevaluateDagProc(new 
ReevaluateDagTask(new DagActionStore.DagAction(
+        flowGroup, flowName, flowExecutionId, "job0", 
DagActionStore.DagActionType.REEVALUATE), null,
+        dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int numOfLaunchedJobs = 1; // only the current job
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 4de37fad54..20e47145b6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.net.URISyntaxException;
 
-import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -41,6 +40,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
 import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 
@@ -94,7 +94,7 @@ public class ResumeDagProcTest {
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.RESUME),
-        null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+        null, this.dagManagementStateStore, mockedDagProcEngineMetrics), 
ConfigFactory.empty());
     resumeDagProc.process(this.dagManagementStateStore, 
mockedDagProcEngineMetrics);
 
     int expectedNumOfResumedJobs = 1; // = number of resumed nodes
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
 b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExceptionUtils.java
similarity index 71%
rename from 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
rename to 
gobblin-utility/src/main/java/org/apache/gobblin/util/ExceptionUtils.java
index e5727e126e..ae29a5bf6b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExceptionUtils.java
@@ -15,11 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.util;
+package org.apache.gobblin.util;
 
-public final class ExceptionCleanupUtils {
+import java.util.List;
 
-  private ExceptionCleanupUtils() {
+public final class ExceptionUtils {
+
+  private ExceptionUtils() {
   }
 
   /**
@@ -42,4 +44,20 @@ public final class ExceptionCleanupUtils {
 
     return exception;
   }
+
+  /**
+   * Iterates through the exception chain and returns true if it finds 
exception that is an instance of any exceptions
+   * in the provided exception list, false otherwise
+   */
+  public static boolean isExceptionInstanceOf(Throwable exception, 
List<Class<? extends Exception>> exceptionsList) {
+    while (exception != null) {
+      Throwable finalException = exception;
+      if (exceptionsList.stream().anyMatch(e -> e.isInstance(finalException))) 
{
+        return true;
+      } else {
+        exception = exception.getCause();
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
 b/gobblin-utility/src/test/java/org/apache/gobblin/util/ExceptionUtilsTest.java
similarity index 57%
rename from 
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
rename to 
gobblin-utility/src/test/java/org/apache/gobblin/util/ExceptionUtilsTest.java
index 979bb2841a..75e771bfae 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/ExceptionUtilsTest.java
@@ -15,36 +15,53 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.runtime.util;
+package org.apache.gobblin.util;
 
 import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
 
+import org.assertj.core.util.Lists;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 
 
-@Test(groups = {"gobblin.runtime"})
-public class ExceptionCleanupUtilsTest {
+public class ExceptionUtilsTest {
+  @Test
+  public void testIsInstanceOf() {
+    List<Class<? extends Exception>> excpetionsList = 
Lists.list(ArithmeticException.class, SQLException.class);
+    Exception e1 = new ArithmeticException();
+    Exception e2 = new IOException(e1);
+    Exception e3 = new RuntimeException(e2);
+    Exception e4 = new UnsupportedOperationException();
+    Exception e5 = new IOException(e4);
+    Assert.assertTrue(ExceptionUtils.isExceptionInstanceOf(e1, 
excpetionsList));
+    Assert.assertTrue(ExceptionUtils.isExceptionInstanceOf(e2, 
excpetionsList));
+    Assert.assertTrue(ExceptionUtils.isExceptionInstanceOf(e3, 
excpetionsList));
+    Assert.assertFalse(ExceptionUtils.isExceptionInstanceOf(e4, 
excpetionsList));
+    Assert.assertFalse(ExceptionUtils.isExceptionInstanceOf(e5, 
excpetionsList));
+  }
 
   @Test
   public void canRemoveEmptyWrapper() {
     Exception exception = new IOException(new IllegalArgumentException("root 
cause"));
-    Throwable rootCause = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+    Throwable rootCause = ExceptionUtils.removeEmptyWrappers(exception);
     assertEquals(rootCause.getClass(), IllegalArgumentException.class);
   }
 
   @Test
   public void canRemoveMultipleEmptyWrappers() {
     Exception exception = new IOException(new IOException(new 
IllegalArgumentException("root cause")));
-    Throwable unwrapped = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+    Throwable unwrapped = ExceptionUtils.removeEmptyWrappers(exception);
     assertEquals(unwrapped.getClass(), IllegalArgumentException.class);
   }
 
   @Test
   public void willNotRemoveExceptionWithMessage() {
     Exception exception = new IOException("test message", new 
IllegalArgumentException("root cause"));
-    Throwable unwrapped = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+    Throwable unwrapped = ExceptionUtils.removeEmptyWrappers(exception);
     assertEquals(unwrapped.getClass(), IOException.class);
   }
-}
\ No newline at end of file
+}

Reply via email to