This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d8d8e585e [GOBBLIN-2081] do not run dag proc related code when dag
proc engine is not enabled (#3964)
d8d8e585e is described below
commit d8d8e585e7297b0c2c20d073a686dd53e29ef3fc
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Jun 6 09:47:53 2024 -0700
[GOBBLIN-2081] do not run dag proc related code when dag proc engine is not
enabled (#3964)
---
.../apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index d6ff2b651..6e60392ef 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -226,12 +226,15 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
if (updatedJobStatus.getRight() == NewState.FINISHED) {
this.eventProducer.emitObservabilityEvent(jobStatus);
- if (this.dagProcEngineEnabled) {
+ }
+
+ if (this.dagProcEngineEnabled) {
+ if (updatedJobStatus.getRight() == NewState.FINISHED) {
// todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+ } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
+ removeStartDeadlineTriggerAndDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
}
- } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
- removeStartDeadlineTriggerAndDagAction(dagManagementStateStore,
flowGroup, flowName, flowExecutionId, jobName);
}
// update the state store after adding a dag action to guaranty
at-least-once adding of dag action