This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d8e585e [GOBBLIN-2081] do not run dag proc related code when dag 
proc engine is not enabled (#3964)
d8d8e585e is described below

commit d8d8e585e7297b0c2c20d073a686dd53e29ef3fc
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Jun 6 09:47:53 2024 -0700

    [GOBBLIN-2081] do not run dag proc related code when dag proc engine is not 
enabled (#3964)
---
 .../apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d6ff2b651..6e60392ef 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -226,12 +226,15 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
           if (updatedJobStatus.getRight() == NewState.FINISHED) {
             this.eventProducer.emitObservabilityEvent(jobStatus);
-            if (this.dagProcEngineEnabled) {
+          }
+
+          if (this.dagProcEngineEnabled) {
+            if (updatedJobStatus.getRight() == NewState.FINISHED) {
               // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
               this.dagManagementStateStore.addJobDagAction(flowGroup, 
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+            } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
+              removeStartDeadlineTriggerAndDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
             }
-          } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
-            removeStartDeadlineTriggerAndDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
           }
 
           // update the state store after adding a dag action to guaranty 
at-least-once adding of dag action

Reply via email to