[
https://issues.apache.org/jira/browse/GOBBLIN-2124?focusedWorklogId=928756&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928756
]
ASF GitHub Bot logged work on GOBBLIN-2124:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Aug/24 17:44
Start Date: 05/Aug/24 17:44
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #4016:
URL: https://github.com/apache/gobblin/pull/4016#discussion_r1704428022
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -55,14 +62,23 @@ public abstract class DagProc<T> {
@Getter protected final DagManager.DagId dagId;
@Getter protected final DagNodeId dagNodeId;
protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
+ protected final List<Class<? extends Exception>> nonRetryableExceptions;
protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
metricContext, "org.apache.gobblin.service").build();
- public DagProc(DagTask dagTask) {
+ public DagProc(DagTask dagTask, Config config) {
this.dagTask = dagTask;
this.dagId =
DagManagerUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
this.dagTask.getDagAction().getFlowName(),
this.dagTask.getDagAction().getFlowExecutionId());
this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
+ this.nonRetryableExceptions = ConfigUtils.getStringList(config,
ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)
Review Comment:
what happens if the config has an empty list?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -75,8 +91,19 @@ public final void process(DagManagementStateStore
dagManagementStateStore,
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
- act(dagManagementStateStore, state, dagProcEngineMetrics);
- log.info("{} concluded processing for dagId : {}",
getClass().getSimpleName(), this.dagId);
+ try {
+ act(dagManagementStateStore, state, dagProcEngineMetrics);
+ } catch (Exception e) {
+ if (isTransientException(e)) {
+ log.info("Ignoring transient exception. DagTask {} will conclude and
will not be retried. Exception - {} ",
+ getDagTask(), e);
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+ } else {
+ throw e;
+ }
+ }
+ log.info("{} processed dagId : {}", getClass().getSimpleName(),
this.dagId);
Review Comment:
let's only log this line if there was no error
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -75,8 +91,19 @@ public final void process(DagManagementStateStore
dagManagementStateStore,
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
- act(dagManagementStateStore, state, dagProcEngineMetrics);
- log.info("{} concluded processing for dagId : {}",
getClass().getSimpleName(), this.dagId);
+ try {
+ act(dagManagementStateStore, state, dagProcEngineMetrics);
+ } catch (Exception e) {
+ if (isTransientException(e)) {
Review Comment:
isn't this the reverse of what we want? we _should_ retry transient
exceptions not NON-transient?
Issue Time Tracking
-------------------
Worklog Id: (was: 928756)
Time Spent: 1h 20m (was: 1h 10m)
> ignore non-retryable exceptions in dag proc engine
> --------------------------------------------------
>
> Key: GOBBLIN-2124
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2124
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)