This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a55c89c5e2 [GOBBLIN-2124] only retry transient exceptions (#4016)
a55c89c5e2 is described below
commit a55c89c5e22d7af77bae9cc361865e83494248bb
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Aug 5 17:34:51 2024 -0700
[GOBBLIN-2124] only retry transient exceptions (#4016)
* only retry non-transient exceptions
* address review comment
---
.../apache/gobblin/service/ServiceConfigKeys.java | 2 +
.../apache/gobblin/metrics/ServiceMetricNames.java | 3 +
.../main/java/org/apache/gobblin/runtime/Task.java | 4 +-
.../java/org/apache/gobblin/runtime/fork/Fork.java | 4 +-
.../modules/orchestration/DagManagerMetrics.java | 7 ++
.../modules/orchestration/DagProcFactory.java | 18 +--
.../modules/orchestration/DagProcessingEngine.java | 3 +-
.../modules/orchestration/proc/DagProc.java | 37 +++++-
.../proc/DeadlineEnforcementDagProc.java | 6 +-
.../proc/EnforceFlowFinishDeadlineDagProc.java | 7 +-
.../proc/EnforceJobStartDeadlineDagProc.java | 6 +-
.../modules/orchestration/proc/KillDagProc.java | 6 +-
.../modules/orchestration/proc/LaunchDagProc.java | 7 +-
.../orchestration/proc/ReevaluateDagProc.java | 6 +-
.../modules/orchestration/proc/ResumeDagProc.java | 5 +-
.../modules/orchestration/task/DagTask.java | 6 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 12 +-
.../monitoring/KafkaJobStatusMonitorFactory.java | 12 +-
.../DagManagementTaskStreamImplTest.java | 3 +-
.../orchestration/DagProcessingEngineTest.java | 126 ++++++++++++---------
.../modules/orchestration/OrchestratorTest.java | 4 +-
.../proc/EnforceDeadlineDagProcsTest.java | 8 +-
.../orchestration/proc/KillDagProcTest.java | 11 +-
.../orchestration/proc/LaunchDagProcTest.java | 4 +-
.../orchestration/proc/ReevaluateDagProcTest.java | 15 ++-
.../orchestration/proc/ResumeDagProcTest.java | 4 +-
.../org/apache/gobblin/util/ExceptionUtils.java | 24 +++-
.../apache/gobblin/util/ExceptionUtilsTest.java | 31 +++--
28 files changed, 248 insertions(+), 133 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index c37a7f5e95..5e3bab4009 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -207,6 +207,8 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
public static final String NUM_DAG_PROC_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+ public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
+
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED =
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index cc80975766..ea7b3124e0 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -80,6 +80,8 @@ public class ServiceMetricNames {
public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
public static final String DAG_PROCESSING_EXCEPTION_METER =
"DagProcessingException";
+ public static final String
DAG_ACTIONS_CREATE_EXCEPTIONS_IN_JOB_STATUS_MONITOR =
"DagActionsCreateExceptionsInJobStatusMonitor";
+
/* DagProcessingEngine & Multi-active Execution Related Metrics
* Note: metrics ending with the delimiter '.' will be suffixed by the
specific {@link DagActionType} type for finer
* grained monitoring of each dagAction type in addition to the aggregation
of all types.
@@ -102,4 +104,5 @@ public class ServiceMetricNames {
public static final String DAG_ACTIONS_DELETE_SUCCEEDED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded.";
public static final String DAG_ACTIONS_DELETE_FAILED =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed.";
public static final String DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS =
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsAvgProcessingDelayMillis.";
+ public static final String DAG_PROCESSING_NON_RETRYABLE_EXCEPTION_METER =
"DagProcessingNonRetryableException";
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 7c315e3079..7ddb4f11f2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -80,7 +80,7 @@ import org.apache.gobblin.runtime.fork.AsynchronousFork;
import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.runtime.fork.SynchronousFork;
import org.apache.gobblin.runtime.task.TaskIFace;
-import org.apache.gobblin.runtime.util.ExceptionCleanupUtils;
+import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -566,7 +566,7 @@ public class Task implements TaskIFace {
}
protected void failTask(Throwable t) {
- Throwable cleanedException = ExceptionCleanupUtils.removeEmptyWrappers(t);
+ Throwable cleanedException = ExceptionUtils.removeEmptyWrappers(t);
LOG.error(String.format("Task %s failed", this.taskId), cleanedException);
this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 9c58e6cf57..ed83c0d538 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -56,7 +56,7 @@ import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.util.ExceptionCleanupUtils;
+import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.runtime.util.ForkMetrics;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.stream.ControlMessage;
@@ -266,7 +266,7 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
compareAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED);
} catch (Throwable t) {
- Throwable cleanedUpException =
ExceptionCleanupUtils.removeEmptyWrappers(t);
+ Throwable cleanedUpException = ExceptionUtils.removeEmptyWrappers(t);
// Set throwable to holder first because AsynchronousFork::putRecord can
pull the throwable when it detects ForkState.FAILED status.
ForkThrowableHolder holder = Task.getForkThrowableHolder(this.broker);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index 8931bd75e2..d5bbcffb3f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -63,6 +63,9 @@ public class DagManagerMetrics {
private ContextAwareMeter allSlaExceededMeter;
private ContextAwareMeter allStartSlaExceededMeter;
public ContextAwareMeter dagProcessingExceptionMeter;
+ public ContextAwareMeter dagProcessingNonRetryableExceptionMeter;
+ public ContextAwareMeter dagActionCreationExceptionsInJobStatusMonitor;
+
// Meters representing the flows in a given state per flowgroup
private final Map<String, ContextAwareMeter> groupSuccessfulMeters =
Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> groupFailureMeters =
Maps.newConcurrentMap();
@@ -103,6 +106,10 @@ public class DagManagerMetrics {
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
dagProcessingExceptionMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.DAG_PROCESSING_EXCEPTION_METER));
+ dagProcessingNonRetryableExceptionMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.DAG_PROCESSING_NON_RETRYABLE_EXCEPTION_METER));
+ dagActionCreationExceptionsInJobStatusMonitor =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+
ServiceMetricNames.DAG_ACTIONS_CREATE_EXCEPTIONS_IN_JOB_STATUS_MONITOR));
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 0503f2d0c0..10b9bf27a8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.typesafe.config.Config;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@@ -37,7 +38,6 @@ import
org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
-
/**
* {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived
class to its companion {@link DagProc} derived class.
* Each {@link DagTask} needs it own {@link DagProcFactory#meet} method
overload to create {@link DagProc} that is
@@ -48,41 +48,43 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@Singleton
public class DagProcFactory implements DagTaskVisitor<DagProc<?>> {
+ private final Config config;
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
@Inject
- public DagProcFactory(FlowCompilationValidationHelper
flowCompilationValidationHelper) {
+ public DagProcFactory(Config config, FlowCompilationValidationHelper
flowCompilationValidationHelper) {
+ this.config = config;
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
public EnforceFlowFinishDeadlineDagProc
meet(EnforceFlowFinishDeadlineDagTask enforceFlowFinishDeadlineDagTask) {
- return new
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask);
+ return new
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask, this.config);
}
@Override
public EnforceJobStartDeadlineDagProc meet(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
- return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask);
+ return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask,
this.config);
}
@Override
public LaunchDagProc meet(LaunchDagTask launchDagTask) {
- return new LaunchDagProc(launchDagTask,
this.flowCompilationValidationHelper);
+ return new LaunchDagProc(launchDagTask,
this.flowCompilationValidationHelper, this.config);
}
@Override
public ReevaluateDagProc meet(ReevaluateDagTask reEvaluateDagTask) {
- return new ReevaluateDagProc(reEvaluateDagTask);
+ return new ReevaluateDagProc(reEvaluateDagTask, this.config);
}
@Override
public KillDagProc meet(KillDagTask killDagTask) {
- return new KillDagProc(killDagTask);
+ return new KillDagProc(killDagTask, this.config);
}
@Override
public ResumeDagProc meet(ResumeDagTask resumeDagTask) {
- return new ResumeDagProc(resumeDagTask);
+ return new ResumeDagProc(resumeDagTask, this.config);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 813c81b3d9..582281dec5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -48,7 +48,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
* Each {@link DagTask} returned from the {@link DagTaskStream} comes with a
time-limited lease conferring the exclusive
* right to perform the work of the task.
* The {@link DagProcFactory} transforms each {@link DagTask} into a specific,
concrete {@link DagProc}, which
- * encapsulates all processing inside {@link
DagProc#process(DagManagementStateStore)}
+ * encapsulates all processing inside {@link
DagProc#process(DagManagementStateStore, DagProcessingEngineMetrics)}
*/
@AllArgsConstructor
@@ -138,6 +138,7 @@ public class DagProcessingEngine extends
AbstractIdleService {
try {
dagProc.process(dagManagementStateStore, dagProcEngineMetrics);
dagTask.conclude();
+ log.info("Concluded dagTask : {}", dagTask);
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 3c8aadc6eb..da07b23bf0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -18,6 +18,10 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.typesafe.config.Config;
import lombok.Data;
import lombok.Getter;
@@ -28,6 +32,8 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ExceptionUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
@@ -35,6 +41,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -55,14 +62,23 @@ public abstract class DagProc<T> {
@Getter protected final DagManager.DagId dagId;
@Getter protected final DagNodeId dagNodeId;
protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
+ protected final List<Class<? extends Exception>> nonRetryableExceptions;
protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
metricContext, "org.apache.gobblin.service").build();
- public DagProc(DagTask dagTask) {
+ public DagProc(DagTask dagTask, Config config) {
this.dagTask = dagTask;
this.dagId =
DagManagerUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
this.dagTask.getDagAction().getFlowName(),
this.dagTask.getDagAction().getFlowExecutionId());
this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
+ this.nonRetryableExceptions = ConfigUtils.getStringList(config,
ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)
+ .stream().map(className -> {
+ try {
+ return (Class<? extends Exception>) Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
}
public final void process(DagManagementStateStore dagManagementStateStore,
@@ -75,8 +91,19 @@ public abstract class DagProc<T> {
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
- act(dagManagementStateStore, state, dagProcEngineMetrics);
- log.info("{} concluded processing for dagId : {}",
getClass().getSimpleName(), this.dagId);
+ try {
+ act(dagManagementStateStore, state, dagProcEngineMetrics);
+ log.info("{} processed dagId : {}", getClass().getSimpleName(),
this.dagId);
+ } catch (Exception e) {
+ if (isNonTransientException(e)) {
+ log.error("Ignoring non transient exception. DagTask {} will conclude
and will not be retried. Exception - {} ",
+ getDagTask(), e);
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+ } else {
+ throw e;
+ }
+ }
}
protected abstract T initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
@@ -87,4 +114,8 @@ public abstract class DagProc<T> {
public DagActionStore.DagActionType getDagActionType() {
return this.dagTask.getDagAction().getDagActionType();
}
+
+ protected boolean isNonTransientException(Exception e) {
+ return ExceptionUtils.isExceptionInstanceOf(e,
this.nonRetryableExceptions);
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
index db41ecc206..e546544716 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.util.Optional;
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -35,8 +37,8 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@Slf4j
abstract public class DeadlineEnforcementDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
- public DeadlineEnforcementDagProc(DagTask dagTask) {
- super(dagTask);
+ public DeadlineEnforcementDagProc(DagTask dagTask, Config config) {
+ super(dagTask, config);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
index b9b5d179e9..f6d48a1450 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.util.List;
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -38,8 +40,9 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@Slf4j
public class EnforceFlowFinishDeadlineDagProc extends
DeadlineEnforcementDagProc {
- public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask) {
- super(enforceFlowFinishDeadlineDagTask);
+ public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask,
+ Config config) {
+ super(enforceFlowFinishDeadlineDagTask, config);
}
protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
index 6eccc1ab1a..8a05ab4bfd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -22,6 +22,8 @@ import java.util.Optional;
import org.apache.commons.lang3.tuple.Pair;
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -45,8 +47,8 @@ import static
org.apache.gobblin.service.ExecutionStatus.valueOf;
@Slf4j
public class EnforceJobStartDeadlineDagProc extends DeadlineEnforcementDagProc
{
- public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
- super(enforceJobStartDeadlineDagTask);
+ public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask, Config config) {
+ super(enforceJobStartDeadlineDagTask, config);
}
protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index c429d754ab..c2a90dcc3b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.util.Optional;
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -39,8 +41,8 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
public class KillDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
private final boolean shouldKillSpecificJob;
- public KillDagProc(KillDagTask killDagTask) {
- super(killDagTask);
+ public KillDagProc(KillDagTask killDagTask, Config config) {
+ super(killDagTask, config);
this.shouldKillSpecificJob =
!getDagNodeId().getJobName().equals(DagActionStore.NO_JOB_NAME_DEFAULT);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index c72c1fcf96..0efef83139 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Optional;
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -41,8 +43,9 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
- super(launchDagTask);
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper,
+ Config config) {
+ super(launchDagTask, config);
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 8c7a8c8abd..04ea1258dc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -22,6 +22,8 @@ import java.util.Optional;
import org.apache.commons.lang3.tuple.Pair;
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -44,8 +46,8 @@ import org.apache.gobblin.service.monitoring.JobStatus;
@Slf4j
public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>> {
- public ReevaluateDagProc(ReevaluateDagTask reEvaluateDagTask) {
- super(reEvaluateDagTask);
+ public ReevaluateDagProc(ReevaluateDagTask reEvaluateDagTask, Config config)
{
+ super(reEvaluateDagTask, config);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 8a94c4ef12..8326d83f09 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Optional;
import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
@@ -46,8 +47,8 @@ import static
org.apache.gobblin.service.ExecutionStatus.PENDING_RESUME;
@Slf4j
public class ResumeDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
- public ResumeDagProc(ResumeDagTask resumeDagTask) {
- super(resumeDagTask);
+ public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) {
+ super(resumeDagTask, config);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 1d3ac86564..6a7f981183 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -63,11 +63,7 @@ public abstract class DagTask {
try {
this.dagManagementStateStore.deleteDagAction(this.dagAction);
boolean completedLease = this.leaseObtainedStatus.completeLease();
- if (completedLease) {
-
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
true);
- } else {
-
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
false);
- }
+
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
completedLease);
return completedLease;
} catch (IOException e) {
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
false);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d5692be371..7061cd61f5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -70,6 +70,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.util.retry.RetryerFactory;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
@@ -236,9 +237,10 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
try {
this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
} catch (IOException e) {
- if (e.getCause() != null &&
isThrowableInstanceOf(e.getCause(), nonRetryableExceptions)) {
- // todo - add metrics
- log.warn("Duplicate REEVALUATE Dag Action is being created.
Ignoring... " + e.getMessage());
+ if (ExceptionUtils.isExceptionInstanceOf(e,
nonRetryableExceptions)) {
+
this.dagManagementStateStore.getDagManagerMetrics().dagActionCreationExceptionsInJobStatusMonitor.mark();
+ log.error("Could not add REEVALUATE dag action for flow
group - {}, flow name - {}, flowExecutionId - {}, "
+ + "jobName = {} due to {}. Ignoring...", flowGroup,
flowName, flowExecutionId, jobName, e.getMessage());
} else {
throw e;
}
@@ -425,8 +427,4 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
protected abstract GobblinTrackingEvent
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message);
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
-
- public static boolean isThrowableInstanceOf(Throwable exception,
List<Class<? extends Exception>> typesList) {
- return typesList.stream().anyMatch(e -> e.isInstance(exception));
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 756e04f73e..4d0dbd45db 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -36,6 +36,7 @@ import
org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.runtime.util.InjectionNames;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -57,12 +58,14 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
@Inject
public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
- GobblinInstanceEnvironment env, DagManagementStateStore
dagManagementStateStore, @Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean
dagProcEngineEnabled) {
- this(config, jobIssueEventHandler, issueRepository,
env.isInstrumentationEnabled(), dagManagementStateStore, dagProcEngineEnabled);
+ GobblinInstanceEnvironment env, DagManagementStateStore
dagManagementStateStore,
+ @Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean
dagProcEngineEnabled, DagProcessingEngineMetrics dagProcessingEngineMetrics) {
+ this(config, jobIssueEventHandler, issueRepository,
env.isInstrumentationEnabled(), dagManagementStateStore,
+ dagProcEngineEnabled, dagProcessingEngineMetrics);
}
public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
- boolean instrumentationEnabled, DagManagementStateStore
dagManagementStateStore, boolean dagProcEngineEnabled) {
+ boolean instrumentationEnabled, DagManagementStateStore
dagManagementStateStore, boolean dagProcEngineEnabled,
DagProcessingEngineMetrics dagProcessingEngineMetrics) {
this.config = Objects.requireNonNull(config);
this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
this.issueRepository = issueRepository;
@@ -103,7 +106,8 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
observabilityEventProducerClassName,
ConfigUtils.configToState(config), this.issueRepository,
this.instrumentationEnabled);
return (KafkaJobStatusMonitor) GobblinConstructorUtils
- .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
dagManagementStateStore);
+ .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
+ dagManagementStateStore);
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index db30b928bd..2244e7dd11 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -70,7 +71,7 @@ public class DagManagementTaskStreamImplTest {
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
false, mock(DagManagementStateStore.class),
mock(DagProcessingEngineMetrics.class));
- this.dagProcFactory = new DagProcFactory(null);
+ this.dagProcFactory = new DagProcFactory(ConfigFactory.empty(), null);
this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore, mock(DagProcessingEngineMetrics.class), 0);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index a5a4f168ec..e8688680b5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -22,7 +22,6 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
-import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -30,15 +29,18 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.testing.AssertWithBackoff;
@@ -52,9 +54,7 @@ public class DagProcessingEngineTest {
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "quotas";
- private DagManagementTaskStreamImpl dagManagementTaskStream;
private DagTaskStream dagTaskStream;
- private DagProcFactory dagProcFactory;
// Field is static because it's used to instantiate every MockedDagTask
private static MySqlDagManagementStateStore dagManagementStateStore;
private static DagProcessingEngineMetrics mockedDagProcEngineMetrics;
@@ -70,32 +70,26 @@ public class DagProcessingEngineTest {
leaseObtainedStatus = mock(LeaseAttemptStatus.LeaseObtainedStatus.class);
doReturn(true).when(leaseObtainedStatus).completeLease();
- Config config;
ConfigBuilder configBuilder = ConfigBuilder.create();
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStoreTest.TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
- config = configBuilder.build();
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE)
+
.addPrimitive(ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY,
NonTransientException.class.getName());
+
+ Config config = configBuilder.build();
dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
doReturn(true).when(dagActionStore).deleteDagAction(any());
- dagManagementTaskStream =
- new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
- mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
- false, dagManagementStateStore,
Mockito.mock(DagProcessingEngineMetrics.class));
- this.dagProcFactory = new DagProcFactory(null);
-
- DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
- new DagProcessingEngine.DagProcEngineThread(dagManagementTaskStream,
this.dagProcFactory,
- dagManagementStateStore, mock(DagProcessingEngineMetrics.class),
0);
+ DagProcFactory dagProcFactory = new DagProcFactory(ConfigFactory.empty(),
null);
+
this.dagTaskStream = spy(new MockedDagTaskStream());
DagProcessingEngine dagProcessingEngine =
- new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream),
Optional.ofNullable(this.dagProcFactory),
+ new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream),
Optional.of(dagProcFactory),
Optional.ofNullable(dagManagementStateStore), 100000L,
mock(DagProcessingEngineMetrics.class));
dagProcessingEngine.startAsync();
- this.mockedDagProcEngineMetrics = mock(DagProcessingEngineMetrics.class);
+ mockedDagProcEngineMetrics = mock(DagProcessingEngineMetrics.class);
}
@AfterClass(alwaysRun = true)
@@ -107,7 +101,9 @@ public class DagProcessingEngineTest {
static class MockedDagTaskStream implements DagTaskStream {
public static final int MAX_NUM_OF_TASKS = 1000;
public static final int FAILING_DAGS_FREQUENCY = 10;
- volatile int i=0;
+ // this number should be a multiple of FAILING_DAGS_FREQUENCY, so that all
non-retryable exceptions should also be exceptions
+ public static final int
FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY = 30;
+ volatile int i = 0;
@Override
public boolean hasNext() {
@@ -115,56 +111,72 @@ public class DagProcessingEngineTest {
}
@Override
- public synchronized DagTask next() throws NoSuchElementException {
+ public synchronized DagTask next()
+ throws NoSuchElementException {
i++;
if (i > MAX_NUM_OF_TASKS) {
throw new RuntimeException("Simulating an exception to stop the
thread!");
}
- if (i % FAILING_DAGS_FREQUENCY == 0 ) {
- return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), true);
+ if (i % FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY == 0) {
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, (1234L + i), "jn-" + i,
+ DagActionStore.DagActionType.LAUNCH),
ExceptionType.NON_RETRYABLE_EXCEPTION);
} else {
- return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, (1234L + i), "jn-" + i, DagActionStore.DagActionType.LAUNCH), false);
+ if (i % FAILING_DAGS_FREQUENCY == 0) {
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i,
"fn-" + i, (1234L + i), "jn-" + i,
+ DagActionStore.DagActionType.LAUNCH),
ExceptionType.RETRYABLE_EXCEPTION);
+ } else {
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i,
"fn-" + i, (1234L + i), "jn-" + i,
+ DagActionStore.DagActionType.LAUNCH),
ExceptionType.NO_EXCEPTION);
+ }
}
}
- }
- static class MockedDagTask extends DagTask {
- private final boolean isBad;
+ static class MockedDagTask extends DagTask {
+ private final ExceptionType exceptionType;
- public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
- super(dagAction, leaseObtainedStatus,
DagProcessingEngineTest.dagManagementStateStore, mockedDagProcEngineMetrics);
- this.isBad = isBad;
- }
+ public MockedDagTask(DagActionStore.DagAction dagAction, ExceptionType
exceptionType) {
+ super(dagAction, leaseObtainedStatus,
DagProcessingEngineTest.dagManagementStateStore,
+ mockedDagProcEngineMetrics);
+ this.exceptionType = exceptionType;
+ }
- @Override
- public <T> T host(DagTaskVisitor<T> visitor) {
- return (T) new MockedDagProc(this, this.isBad);
+ @Override
+ public <T> T host(DagTaskVisitor<T> visitor) {
+ return (T) new MockedDagProc(this, this.exceptionType);
+ }
}
- }
- static class MockedDagProc extends DagProc<Void> {
- private final boolean isBad;
+ static class MockedDagProc extends DagProc<Void> {
+ private final ExceptionType exceptionType;
- public MockedDagProc(MockedDagTask mockedDagTask, boolean isBad) {
- super(mockedDagTask);
- this.isBad = isBad;
- }
+ public MockedDagProc(MockedDagTask mockedDagTask, ExceptionType
exceptionType) {
+ super(mockedDagTask, ConfigBuilder.create()
+
.addPrimitive(ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY,
NonTransientException.class.getName())
+ .build());
+ this.exceptionType = exceptionType;
+ }
- @Override
- public DagManager.DagId getDagId() {
- return new DagManager.DagId("fg", "fn", 12345L);
- }
+ @Override
+ public DagManager.DagId getDagId() {
+ return new DagManager.DagId("fg", "fn", 12345L);
+ }
- @Override
- protected Void initialize(DagManagementStateStore dagManagementStateStore)
{
- return null;
- }
+ @Override
+ protected Void initialize(DagManagementStateStore
dagManagementStateStore) {
+ return null;
+ }
- @Override
- protected void act(DagManagementStateStore dagManagementStateStore, Void
state,
- DagProcessingEngineMetrics dagProcEngineMetrics) {
- if (this.isBad) {
- throw new RuntimeException("Simulating an exception!");
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore, Void
state,
+ DagProcessingEngineMetrics dagProcEngineMetrics) {
+
+ switch (this.exceptionType) {
+ case NON_RETRYABLE_EXCEPTION:
+ throw new NonTransientException("Simulating a non retryable
exception!");
+ case RETRYABLE_EXCEPTION:
+ throw new RuntimeException("Simulating an exception!");
+ default:
+ }
}
}
}
@@ -172,13 +184,14 @@ public class DagProcessingEngineTest {
// This tests verifies that all the dag tasks entered to the dag task stream
are retrieved by dag proc engine threads
@Test
public void dagProcessingTest()
- throws InterruptedException, TimeoutException, IOException {
+ throws InterruptedException, TimeoutException {
// there are MAX_NUM_OF_TASKS dag tasks returned and then each thread
additionally call (infinitely) once to wait
// in this unit tests, it does not infinitely wait though, because the
mocked task stream throws an exception on
// (MAX_NUM_OF_TASKS + 1) th call
int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS +
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS /
MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
+ int expectedNonRetryableExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS
/ MockedDagTaskStream.FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY;
AssertWithBackoff.assertTrue(input ->
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() ==
expectedNumOfInvocations,
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + "
number of times. "
@@ -186,5 +199,12 @@ public class DagProcessingEngineTest {
log, 1, 1000L);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
+
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
expectedNonRetryableExceptions);
+ }
+
+ private enum ExceptionType {
+ NO_EXCEPTION,
+ RETRYABLE_EXCEPTION,
+ NON_RETRYABLE_EXCEPTION
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index df220b7ccc..47f3c2a4d3 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -207,7 +207,7 @@ public class OrchestratorTest {
SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config);
TopologySpec.Builder topologySpecBuilder =
TopologySpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
- TOPOLOGY_SPEC_STORE_DIR))
+ TOPOLOGY_SPEC_STORE_DIR))
.withConfig(config)
.withDescription(SPEC_DESCRIPTION)
.withVersion(SPEC_VERSION)
@@ -253,7 +253,7 @@ public class OrchestratorTest {
FlowSpec.Builder flowSpecBuilder = null;
try {
flowSpecBuilder =
FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
- FLOW_SPEC_GROUP_DIR))
+ FLOW_SPEC_GROUP_DIR))
.withConfig(config)
.withDescription(SPEC_DESCRIPTION)
.withVersion(SPEC_VERSION)
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
index 2e64b71c40..470083154b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -22,7 +22,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
-import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
@@ -45,6 +44,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -107,7 +107,7 @@ public class EnforceDeadlineDagProcsTest {
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
"job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
enforceJobStartDeadlineDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
@@ -150,7 +150,7 @@ public class EnforceDeadlineDagProcsTest {
EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
"job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
enforceJobStartDeadlineDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
// no job cancelled because we simulated (by not adding) missing dag action
@@ -192,7 +192,7 @@ public class EnforceDeadlineDagProcsTest {
EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new
EnforceFlowFinishDeadlineDagProc(
new EnforceFlowFinishDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
"job0",
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE), null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.times(1)).cancelJob(any(), any()));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index bc53139b31..519a54f3e7 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -49,6 +48,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -96,7 +96,7 @@ public class KillDagProcTest {
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow1",
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.LAUNCH),
- null, this.dagManagementStateStore, mockedDagProcEngineMetrics),
flowCompilationValidationHelper);
+ null, this.dagManagementStateStore, mockedDagProcEngineMetrics),
flowCompilationValidationHelper, ConfigFactory.empty());
launchDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -109,7 +109,7 @@ public class KillDagProcTest {
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow1",
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.KILL),
- null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+ null, this.dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
killDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
long cancelJobCount = specProducers.stream()
@@ -142,7 +142,8 @@ public class KillDagProcTest {
LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "flow2",
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.LAUNCH),
- null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics),
flowCompilationValidationHelper);
+ null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics),
flowCompilationValidationHelper,
+ ConfigFactory.empty());
launchDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
@@ -155,7 +156,7 @@ public class KillDagProcTest {
KillDagProc killDagProc = new KillDagProc(new KillDagTask(new
DagActionStore.DagAction("fg", "flow2",
flowExecutionId, "job2", DagActionStore.DagActionType.KILL),
- null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics));
+ null, this.dagManagementStateStore, this.mockedDagProcEngineMetrics),
ConfigFactory.empty());
killDagProc.process(this.dagManagementStateStore,
this.mockedDagProcEngineMetrics);
long cancelJobCount = specProducers.stream()
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 5b36851ebf..45caa295ea 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -115,7 +115,7 @@ public class LaunchDagProcTest {
new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0",
DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore,
this.mockedDagProcEngineMetrics),
- flowCompilationValidationHelper);
+ flowCompilationValidationHelper, ConfigFactory.empty());
launchDagProc.process(this.dagManagementStateStore,
mockedDagProcEngineMetrics);
@@ -146,7 +146,7 @@ public class LaunchDagProcTest {
new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId,
"jn", DagActionStore.DagActionType.LAUNCH), null,
this.dagManagementStateStore,
this.mockedDagProcEngineMetrics),
- flowCompilationValidationHelper);
+ flowCompilationValidationHelper, ConfigFactory.empty());
launchDagProc.process(this.dagManagementStateStore,
mockedDagProcEngineMetrics);
int numOfLaunchedJobs = 3; // = number of start nodes
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 2d3d02461f..3f856c2085 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -118,7 +118,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
// next job is sent to spec producer
Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
@@ -168,7 +168,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
// no new job to launch for this one job flow
@@ -203,7 +203,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
int numOfLaunchedJobs = 1; // only the current job
@@ -248,7 +248,7 @@ public class ReevaluateDagProcTest {
ReevaluateDagProc
reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "job3", DagActionStore.DagActionType.REEVALUATE),
null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
// process 4th job
reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
@@ -282,10 +282,9 @@ public class ReevaluateDagProcTest {
doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)),
Optional.of(jobStatus)))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- ReevaluateDagProc
- reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
- flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE),
null,
- dagManagementStateStore, mockedDagProcEngineMetrics));
+ ReevaluateDagProc reEvaluateDagProc = new ReevaluateDagProc(new
ReevaluateDagTask(new DagActionStore.DagAction(
+ flowGroup, flowName, flowExecutionId, "job0",
DagActionStore.DagActionType.REEVALUATE), null,
+ dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
reEvaluateDagProc.process(dagManagementStateStore,
mockedDagProcEngineMetrics);
int numOfLaunchedJobs = 1; // only the current job
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 4de37fad54..20e47145b6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.net.URISyntaxException;
-import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -41,6 +40,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -94,7 +94,7 @@ public class ResumeDagProcTest {
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.RESUME),
- null, this.dagManagementStateStore, mockedDagProcEngineMetrics));
+ null, this.dagManagementStateStore, mockedDagProcEngineMetrics),
ConfigFactory.empty());
resumeDagProc.process(this.dagManagementStateStore,
mockedDagProcEngineMetrics);
int expectedNumOfResumedJobs = 1; // = number of resumed nodes
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExceptionUtils.java
similarity index 71%
rename from
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
rename to
gobblin-utility/src/main/java/org/apache/gobblin/util/ExceptionUtils.java
index e5727e126e..ae29a5bf6b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExceptionUtils.java
@@ -15,11 +15,13 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.util;
+package org.apache.gobblin.util;
-public final class ExceptionCleanupUtils {
+import java.util.List;
- private ExceptionCleanupUtils() {
+public final class ExceptionUtils {
+
+ private ExceptionUtils() {
}
/**
@@ -42,4 +44,20 @@ public final class ExceptionCleanupUtils {
return exception;
}
+
+ /**
+ * Iterates through the exception chain and returns true if it finds
exception that is an instance of any exceptions
+ * in the provided exception list, false otherwise
+ */
+ public static boolean isExceptionInstanceOf(Throwable exception,
List<Class<? extends Exception>> exceptionsList) {
+ while (exception != null) {
+ Throwable finalException = exception;
+ if (exceptionsList.stream().anyMatch(e -> e.isInstance(finalException)))
{
+ return true;
+ } else {
+ exception = exception.getCause();
+ }
+ }
+ return false;
+ }
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/ExceptionUtilsTest.java
similarity index 57%
rename from
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
rename to
gobblin-utility/src/test/java/org/apache/gobblin/util/ExceptionUtilsTest.java
index 979bb2841a..75e771bfae 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/ExceptionCleanupUtilsTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/ExceptionUtilsTest.java
@@ -15,36 +15,53 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.util;
+package org.apache.gobblin.util;
import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import org.assertj.core.util.Lists;
+import org.testng.Assert;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-@Test(groups = {"gobblin.runtime"})
-public class ExceptionCleanupUtilsTest {
+public class ExceptionUtilsTest {
+ @Test
+ public void testIsInstanceOf() {
+ List<Class<? extends Exception>> excpetionsList =
Lists.list(ArithmeticException.class, SQLException.class);
+ Exception e1 = new ArithmeticException();
+ Exception e2 = new IOException(e1);
+ Exception e3 = new RuntimeException(e2);
+ Exception e4 = new UnsupportedOperationException();
+ Exception e5 = new IOException(e4);
+ Assert.assertTrue(ExceptionUtils.isExceptionInstanceOf(e1,
excpetionsList));
+ Assert.assertTrue(ExceptionUtils.isExceptionInstanceOf(e2,
excpetionsList));
+ Assert.assertTrue(ExceptionUtils.isExceptionInstanceOf(e3,
excpetionsList));
+ Assert.assertFalse(ExceptionUtils.isExceptionInstanceOf(e4,
excpetionsList));
+ Assert.assertFalse(ExceptionUtils.isExceptionInstanceOf(e5,
excpetionsList));
+ }
@Test
public void canRemoveEmptyWrapper() {
Exception exception = new IOException(new IllegalArgumentException("root
cause"));
- Throwable rootCause = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+ Throwable rootCause = ExceptionUtils.removeEmptyWrappers(exception);
assertEquals(rootCause.getClass(), IllegalArgumentException.class);
}
@Test
public void canRemoveMultipleEmptyWrappers() {
Exception exception = new IOException(new IOException(new
IllegalArgumentException("root cause")));
- Throwable unwrapped = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+ Throwable unwrapped = ExceptionUtils.removeEmptyWrappers(exception);
assertEquals(unwrapped.getClass(), IllegalArgumentException.class);
}
@Test
public void willNotRemoveExceptionWithMessage() {
Exception exception = new IOException("test message", new
IllegalArgumentException("root cause"));
- Throwable unwrapped = ExceptionCleanupUtils.removeEmptyWrappers(exception);
+ Throwable unwrapped = ExceptionUtils.removeEmptyWrappers(exception);
assertEquals(unwrapped.getClass(), IOException.class);
}
-}
\ No newline at end of file
+}