This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dd34ba9cd7 Mark dagActionsConclude correctly (#4010)
dd34ba9cd7 is described below

commit dd34ba9cd7820c5ea40a62898963534e1485c4ec
Author: umustafi <[email protected]>
AuthorDate: Wed Jul 24 10:23:08 2024 -0700

    Mark dagActionsConclude correctly (#4010)
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../modules/orchestration/proc/DeadlineEnforcementDagProc.java      | 4 ++--
 .../apache/gobblin/service/modules/orchestration/task/DagTask.java  | 6 +++++-
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
index 0d3369a3fd..231370004d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -48,14 +48,14 @@ abstract public class DeadlineEnforcementDagProc extends 
DagProc<Optional<Dag<Jo
   @Override
   protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag,
       DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
-    if (validate(dag, dagManagementStateStore)) {
+    if (isDagStillPresent(dag, dagManagementStateStore)) {
       enforceDeadline(dagManagementStateStore, dag.get(), 
dagProcEngineMetrics);
     } else {
     dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
   }
   }
 
-  protected boolean validate(Optional<Dag<JobExecutionPlan>> dag, 
DagManagementStateStore dagManagementStateStore)
+  protected boolean isDagStillPresent(Optional<Dag<JobExecutionPlan>> dag, 
DagManagementStateStore dagManagementStateStore)
       throws IOException {
     log.info("Request to enforce deadlines for dag {}", getDagId());
     DagActionStore.DagAction dagAction = getDagTask().getDagAction();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 3fb6110e10..1d3ac86564 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -63,7 +63,11 @@ public abstract class DagTask {
     try {
       this.dagManagementStateStore.deleteDagAction(this.dagAction);
       boolean completedLease = this.leaseObtainedStatus.completeLease();
-      
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 true);
+      if (completedLease) {
+        
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 true);
+      } else {
+        
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 false);
+      }
       return completedLease;
     } catch (IOException e) {
       
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
 false);

Reply via email to