This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a6f648b9e [GOBBLIN-2077] create flow finish deadline dag action after
the launch of first job (#3960)
a6f648b9e is described below
commit a6f648b9e9521dceca1c0b6c2477e0a0f9072cd6
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jun 3 09:58:03 2024 -0700
[GOBBLIN-2077] create flow finish deadline dag action after the launch of
first job (#3960)
* send flow finish deadline dag action after the launch of first job
* address review comment
---
.../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 +-
.../service/modules/orchestration/DagManagementTaskStreamImpl.java | 5 +++--
.../gobblin/service/modules/orchestration/DagProcessingEngine.java | 2 +-
.../gobblin/service/modules/orchestration/FlowLaunchHandler.java | 2 --
.../gobblin/service/modules/orchestration/proc/LaunchDagProc.java | 3 ++-
.../service/modules/orchestration/proc/ReevaluateDagProc.java | 2 +-
.../gobblin/service/modules/orchestration/proc/ResumeDagProc.java | 2 ++
.../service/modules/orchestration/proc/LaunchDagProcTest.java | 4 ++++
8 files changed, 14 insertions(+), 8 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 309a451f1..0f4dbd10c 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1068,7 +1068,7 @@ public class ConfigurationKeys {
public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
public static final String GOBBLIN_FLOW_SLA_TIME_UNIT =
"gobblin.flow.sla.timeunit";
- public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = "MINUTES";
+ public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
public static final String GOBBLIN_JOB_START_SLA_TIME =
"gobblin.job.start.sla.time";
public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT =
"gobblin.job.start.sla.timeunit";
public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 0d4a229d0..3f93837d3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -125,8 +125,9 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
@Override
public DagTask next() {
while (true) {
+ DagActionStore.DagAction dagAction = null;
try {
- DagActionStore.DagAction dagAction = this.dagActionQueue.take();
+ dagAction = this.dagActionQueue.take();
/* Create triggers for original (non-reminder) dag actions of type
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
Reminder triggers are used to inform hosts once the job start
deadline and flow finish deadline are passed;
then only is lease arbitration done to enforce the deadline
violation and fail the job or flow if needed */
@@ -146,7 +147,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
}
} catch (Exception e) {
//TODO: need to handle exceptions gracefully
- log.error("Exception getting DagAction from the queue / creating
DagTask", e);
+ log.error("Exception getting DagAction from the queue or creating
DagTask. dagAction - {}", dagAction == null ? "<null>" : dagAction, e);
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 611628e3f..00ac9eba0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -129,7 +129,7 @@ public class DagProcessingEngine extends
AbstractIdleService {
log.warn("Received a null dag task, ignoring.");
continue;
}
- DagProc dagProc = dagTask.host(dagProcFactory);
+ DagProc<?> dagProc = dagTask.host(dagProcFactory);
try {
// todo - add retries
dagProc.process(dagManagementStateStore);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 864fae944..06551ce14 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
-import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
@@ -141,7 +140,6 @@ public class FlowLaunchHandler {
DagActionStore.DagAction launchDagAction =
leaseStatus.getConsensusDagAction();
try {
this.dagManagementStateStore.addDagAction(launchDagAction);
-
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
launchDagAction);
this.numFlowsSubmitted.mark();
// after successfully persisting, close the lease
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index b52eb3435..d3a7614cc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -81,8 +81,9 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
// todo - add metrics
} else {
submitNextNodes(dagManagementStateStore, dag.get());
- //Checkpoint the dag state, it should have an updated value of dag nodes
+ // Checkpoint the dag state, it should have an updated value of dag nodes
dagManagementStateStore.checkpointDag(dag.get());
+
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
orchestrationDelayCounter.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag.get()));
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index bc786b91f..6fe42fcc4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -171,7 +171,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
log.warn("It should not reach here. Job status {} is unexpected.",
executionStatus);
}
- //Checkpoint the dag state, it should have an updated value of dag fields
+ // Checkpoint the dag state, it should have an updated value of dag fields
dagManagementStateStore.checkpointDag(dag);
dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 00e8809eb..b3d077c46 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -93,6 +93,8 @@ public class ResumeDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
dagManagementStateStore.deleteFailedDag(failedDag.get());
resumeDag(dagManagementStateStore, failedDag.get());
+
+
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
}
private void resumeDag(DagManagementStateStore dagManagementStateStore,
Dag<JobExecutionPlan> dag) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 34a323914..6e723a716 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -55,6 +55,7 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.util.ConfigUtils;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -97,6 +98,9 @@ public class LaunchDagProcTest {
Assert.assertEquals(expectedNumOfSavingDagNodeStates,
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("addDagNodeState")).count());
+
+ Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+ .addFlowDagAction(any(), any(), any(),
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
}
// This creates a dag like this