arjun4084346 commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1606105120
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void
submitJobToExecutor(DagManagementStateStore dagManagementStat
throw new RuntimeException(e);
}
}
+
+ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
+ Properties props = new Properties();
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+ if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ try {
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ } else {
+ log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
+ dagNodeToCancel.getValue().getJobSpec().getUri());
+ }
+
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
+ // todo - why was it not being cleaned up in DagManager?
+ dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void cancelDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
+ log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
+
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+ }
+ }
+
+ public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ }
+
+ private static void
sendEnforceStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)
Review Comment:
sure, added it to enhance readability
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]