This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 57d728a34d Update job status before adding REEVALUATE dag action
(#4128)
57d728a34d is described below
commit 57d728a34d96422f52c081dd65190bbccf6bb5de
Author: thisisArjit <[email protected]>
AuthorDate: Mon Aug 11 17:06:00 2025 +0530
Update job status before adding REEVALUATE dag action (#4128)
---
.../apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c2afaa0498..be46b844a4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -271,6 +271,11 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.eventProducer.emitObservabilityEvent(jobStatus);
}
+ // Update the state store before adding a dag action.
+ // Even if service dies before adding the dag action & after
updating the job status, kafka offset will not be advanced
+ // hence, the event will be reprocessed and re-attempt the addition
of dag action
+ stateStore.put(storeName, tableName, jobStatus);
+
if (DagProcUtils.isJobLevelStatus(jobName)) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
try {
@@ -292,9 +297,6 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
}
}
-
- // update the state store after adding a dag action to guaranty
at-least-once adding of dag action
- stateStore.put(storeName, tableName, jobStatus);
}
return null;
});