This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 57d728a34d Update job status before adding REEVALUATE dag action 
(#4128)
57d728a34d is described below

commit 57d728a34d96422f52c081dd65190bbccf6bb5de
Author: thisisArjit <[email protected]>
AuthorDate: Mon Aug 11 17:06:00 2025 +0530

    Update job status before adding REEVALUATE dag action (#4128)
---
 .../apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java  | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c2afaa0498..be46b844a4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -271,6 +271,11 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
             this.eventProducer.emitObservabilityEvent(jobStatus);
           }
 
+          // Update the state store before adding a dag action.
+          // Even if service dies before adding the dag action & after 
updating the job status, kafka offset will not be advanced
+          // hence, the event will be reprocessed and re-attempt the addition 
of dag action
+          stateStore.put(storeName, tableName, jobStatus);
+
           if (DagProcUtils.isJobLevelStatus(jobName)) {
             if (updatedJobStatus.getRight() == NewState.FINISHED) {
               try {
@@ -292,9 +297,6 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
               
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, 
flowGroup, flowName, flowExecutionId, jobName);
             }
           }
-
-          // update the state store after adding a dag action to guaranty 
at-least-once adding of dag action
-          stateStore.put(storeName, tableName, jobStatus);
         }
         return null;
       });

Reply via email to