umustafi commented on code in PR #4016:
URL: https://github.com/apache/gobblin/pull/4016#discussion_r1704428022


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -55,14 +62,23 @@ public abstract class DagProc<T> {
   @Getter protected final DagManager.DagId dagId;
   @Getter protected final DagNodeId dagNodeId;
   protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
+  protected final List<Class<? extends Exception>> nonRetryableExceptions;
   protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
       metricContext, "org.apache.gobblin.service").build();
 
-  public DagProc(DagTask dagTask) {
+  public DagProc(DagTask dagTask, Config config) {
     this.dagTask = dagTask;
     this.dagId = 
DagManagerUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
         this.dagTask.getDagAction().getFlowName(), 
this.dagTask.getDagAction().getFlowExecutionId());
     this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
+    this.nonRetryableExceptions = ConfigUtils.getStringList(config, 
ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)

Review Comment:
   what happens if the config has an empty list?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -75,8 +91,19 @@ public final void process(DagManagementStateStore 
dagManagementStateStore,
       dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
       throw e;
     }
-    act(dagManagementStateStore, state, dagProcEngineMetrics);
-    log.info("{} concluded processing for dagId : {}", 
getClass().getSimpleName(), this.dagId);
+    try {
+      act(dagManagementStateStore, state, dagProcEngineMetrics);
+    } catch (Exception e) {
+      if (isTransientException(e)) {
+        log.info("Ignoring transient exception. DagTask {} will conclude and 
will not be retried. Exception - {} ",
+            getDagTask(), e);
+        
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+        
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+      } else {
+        throw e;
+      }
+    }
+    log.info("{} processed dagId : {}", getClass().getSimpleName(), 
this.dagId);

Review Comment:
   let's only log this line if there was no error 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -75,8 +91,19 @@ public final void process(DagManagementStateStore 
dagManagementStateStore,
       dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
       throw e;
     }
-    act(dagManagementStateStore, state, dagProcEngineMetrics);
-    log.info("{} concluded processing for dagId : {}", 
getClass().getSimpleName(), this.dagId);
+    try {
+      act(dagManagementStateStore, state, dagProcEngineMetrics);
+    } catch (Exception e) {
+      if (isTransientException(e)) {

Review Comment:
   isn't this the reverse of what we want? we _should_ retry transient 
exceptions not NON-transient?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to