This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dd34ba9cd7 Mark dagActionsConclude correctly (#4010)
dd34ba9cd7 is described below
commit dd34ba9cd7820c5ea40a62898963534e1485c4ec
Author: umustafi <[email protected]>
AuthorDate: Wed Jul 24 10:23:08 2024 -0700
Mark dagActionsConclude correctly (#4010)
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/orchestration/proc/DeadlineEnforcementDagProc.java | 4 ++--
.../apache/gobblin/service/modules/orchestration/task/DagTask.java | 6 +++++-
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
index 0d3369a3fd..231370004d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java
@@ -48,14 +48,14 @@ abstract public class DeadlineEnforcementDagProc extends
DagProc<Optional<Dag<Jo
@Override
protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag,
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
- if (validate(dag, dagManagementStateStore)) {
+ if (isDagStillPresent(dag, dagManagementStateStore)) {
enforceDeadline(dagManagementStateStore, dag.get(),
dagProcEngineMetrics);
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
}
}
- protected boolean validate(Optional<Dag<JobExecutionPlan>> dag,
DagManagementStateStore dagManagementStateStore)
+ protected boolean isDagStillPresent(Optional<Dag<JobExecutionPlan>> dag,
DagManagementStateStore dagManagementStateStore)
throws IOException {
log.info("Request to enforce deadlines for dag {}", getDagId());
DagActionStore.DagAction dagAction = getDagTask().getDagAction();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 3fb6110e10..1d3ac86564 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -63,7 +63,11 @@ public abstract class DagTask {
try {
this.dagManagementStateStore.deleteDagAction(this.dagAction);
boolean completedLease = this.leaseObtainedStatus.completeLease();
-
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
true);
+ if (completedLease) {
+
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
true);
+ } else {
+
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
false);
+ }
return completedLease;
} catch (IOException e) {
this.dagProcEngineMetrics.markDagActionsConclude(this.dagAction.getDagActionType(),
false);