phet commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1702273840
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
+ if (KafkaJobStatusMonitor.isThrowableInstanceOf(e,
this.nonRetryableExceptions)) {
+ // conclude the lease so that it is not retried, if the dag proc
fails with non-transient exception
+ dagTask.conclude();
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
Review Comment:
good point. really I'm just seeking better encapsulation within `DagProc`,
so the DPE remains blissfully unaware of what they're doing and hence what
exceptions they might throw.
most ideal might be each `DagProc` class to know which of its exceptions are
transient. e.g. a base class method `boolean isTransientException(Throwable
t)`, with a common base class version and any derived class extending beyond
that (while delegating to the base class impl as a fallback).
if for expedience, you want a small step in the right direction that would
be for the factory to be initialized w/ a predicate as I suggested - given it's
the one who decides which specific `DagProc` derived class impls to use.
but after considering further, I really think
`DagProc::isTransientException` is preferable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]