This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a6f648b9e [GOBBLIN-2077] create flow finish deadline dag action after 
the launch of first job (#3960)
a6f648b9e is described below

commit a6f648b9e9521dceca1c0b6c2477e0a0f9072cd6
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jun 3 09:58:03 2024 -0700

    [GOBBLIN-2077] create flow finish deadline dag action after the launch of 
first job (#3960)
    
    * send flow finish deadline dag action after the launch of first job
    * address review comment
---
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java     | 2 +-
 .../service/modules/orchestration/DagManagementTaskStreamImpl.java   | 5 +++--
 .../gobblin/service/modules/orchestration/DagProcessingEngine.java   | 2 +-
 .../gobblin/service/modules/orchestration/FlowLaunchHandler.java     | 2 --
 .../gobblin/service/modules/orchestration/proc/LaunchDagProc.java    | 3 ++-
 .../service/modules/orchestration/proc/ReevaluateDagProc.java        | 2 +-
 .../gobblin/service/modules/orchestration/proc/ResumeDagProc.java    | 2 ++
 .../service/modules/orchestration/proc/LaunchDagProcTest.java        | 4 ++++
 8 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 309a451f1..0f4dbd10c 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1068,7 +1068,7 @@ public class ConfigurationKeys {
   public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
   public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
   public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = 
"gobblin.flow.sla.timeunit";
-  public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
+  public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
   public static final String GOBBLIN_JOB_START_SLA_TIME = 
"gobblin.job.start.sla.time";
   public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"gobblin.job.start.sla.timeunit";
   public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 0d4a229d0..3f93837d3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -125,8 +125,9 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   @Override
   public DagTask next() {
       while (true) {
+        DagActionStore.DagAction dagAction = null;
         try {
-          DagActionStore.DagAction dagAction = this.dagActionQueue.take();
+          dagAction = this.dagActionQueue.take();
           /* Create triggers for original (non-reminder) dag actions of type 
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
              Reminder triggers are used to inform hosts once the job start 
deadline and flow finish deadline are passed;
              then only is lease arbitration done to enforce the deadline 
violation and fail the job or flow if needed */
@@ -146,7 +147,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
           }
         } catch (Exception e) {
           //TODO: need to handle exceptions gracefully
-          log.error("Exception getting DagAction from the queue / creating 
DagTask", e);
+          log.error("Exception getting DagAction from the queue or creating 
DagTask. dagAction - {}", dagAction == null ? "<null>" : dagAction, e);
         }
       }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 611628e3f..00ac9eba0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -129,7 +129,7 @@ public class DagProcessingEngine extends 
AbstractIdleService {
           log.warn("Received a null dag task, ignoring.");
           continue;
         }
-        DagProc dagProc = dagTask.host(dagProcFactory);
+        DagProc<?> dagProc = dagTask.host(dagProcFactory);
         try {
           // todo - add retries
           dagProc.process(dagManagementStateStore);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 864fae944..06551ce14 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
-import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -141,7 +140,6 @@ public class FlowLaunchHandler {
     DagActionStore.DagAction launchDagAction = 
leaseStatus.getConsensusDagAction();
     try {
       this.dagManagementStateStore.addDagAction(launchDagAction);
-      
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
launchDagAction);
       this.numFlowsSubmitted.mark();
       // after successfully persisting, close the lease
       return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index b52eb3435..d3a7614cc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -81,8 +81,9 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       // todo - add metrics
     } else {
       submitNextNodes(dagManagementStateStore, dag.get());
-      //Checkpoint the dag state, it should have an updated value of dag nodes
+      // Checkpoint the dag state, it should have an updated value of dag nodes
       dagManagementStateStore.checkpointDag(dag.get());
+      
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
       orchestrationDelayCounter.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag.get()));
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index bc786b91f..6fe42fcc4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -171,7 +171,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
         log.warn("It should not reach here. Job status {} is unexpected.", 
executionStatus);
     }
 
-    //Checkpoint the dag state, it should have an updated value of dag fields
+    // Checkpoint the dag state, it should have an updated value of dag fields
     dagManagementStateStore.checkpointDag(dag);
     dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 00e8809eb..b3d077c46 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -93,6 +93,8 @@ public class ResumeDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
     dagManagementStateStore.deleteFailedDag(failedDag.get());
 
     resumeDag(dagManagementStateStore, failedDag.get());
+
+    
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
   }
 
   private void resumeDag(DagManagementStateStore dagManagementStateStore, 
Dag<JobExecutionPlan> dag) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 34a323914..6e723a716 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -55,6 +55,7 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -97,6 +98,9 @@ public class LaunchDagProcTest {
     Assert.assertEquals(expectedNumOfSavingDagNodeStates,
         
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
             .filter(a -> 
a.getMethod().getName().equals("addDagNodeState")).count());
+
+    Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+        .addFlowDagAction(any(), any(), any(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
   }
 
   // This creates a dag like this

Reply via email to