[
https://issues.apache.org/jira/browse/GOBBLIN-2069?focusedWorklogId=919961&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-919961
]
ASF GitHub Bot logged work on GOBBLIN-2069:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/May/24 21:25
Start Date: 19/May/24 21:25
Worklog Time Spent: 10m
Work Description: arjun4084346 commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1606105120
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void
submitJobToExecutor(DagManagementStateStore dagManagementStat
throw new RuntimeException(e);
}
}
+
+ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
+ Properties props = new Properties();
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+ if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ try {
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ } else {
+ log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
+ dagNodeToCancel.getValue().getJobSpec().getUri());
+ }
+
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
+ // todo - why was it not being cleaned up in DagManager?
+ dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void cancelDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
+ log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
+
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+ }
+ }
+
+ public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ }
+
+ private static void
sendEnforceStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)
Review Comment:
sure, added it to enhance readability
Issue Time Tracking
-------------------
Worklog Id: (was: 919961)
Time Spent: 2h (was: 1h 50m)
> implement EnforceStartDeadlineDagProc
> -------------------------------------
>
> Key: GOBBLIN-2069
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2069
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)