[
https://issues.apache.org/jira/browse/GOBBLIN-847?focusedWorklogId=292816&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-292816
]
ASF GitHub Bot logged work on GOBBLIN-847:
------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Aug/19 02:46
Start Date: 12/Aug/19 02:46
Worklog Time Spent: 10m
Work Description: sv2000 commented on pull request #2702: [GOBBLIN-847]
Flow level sla
URL: https://github.com/apache/incubator-gobblin/pull/2702#discussion_r312770018
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -414,30 +415,49 @@ public void run() {
}
}
+ /**
+ * Cancels the dag and sends a cancellation tracking event.
+ * @param dagToCancel dag node to cancel
+ * @throws ExecutionException executionException
+ * @throws InterruptedException interruptedException
+ */
private void cancelDag(String dagToCancel) throws ExecutionException,
InterruptedException {
log.info("Cancel flow with DagId {}", dagToCancel);
if (this.dagToJobs.containsKey(dagToCancel)) {
List<DagNode<JobExecutionPlan>> dagNodesToCancel =
this.dagToJobs.get(dagToCancel);
log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
- Properties props = new Properties();
- if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
- Future future = dagNodeToCancel.getValue().getJobFuture().get();
- if (future instanceof CompletableFuture &&
- future.get() instanceof AzkabanExecuteFlowStatus.ExecuteId) {
- CompletableFuture<AzkabanExecuteFlowStatus.ExecuteId>
completableFuture = (CompletableFuture) future;
- String azkabanExecId = completableFuture.get().getExecId();
- props.put(ConfigurationKeys.AZKABAN_EXEC_ID, azkabanExecId);
- log.info("Cancel job with azkaban exec id {}.", azkabanExecId);
- }
- }
- DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(null,
props);
+ cancelDag(dagNodeToCancel);
}
} else {
log.warn("Did not find Dag with id {}, it might be already
cancelled.", dagToCancel);
}
}
+ private void cancelDag(DagNode<JobExecutionPlan> dagNodeToCancel) throws
ExecutionException, InterruptedException {
+ Properties props = new Properties();
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ if (future instanceof CompletableFuture &&
+ future.get() instanceof AzkabanExecuteFlowStatus.ExecuteId) {
Review comment:
Aren't we leaking Azkaban related details into the DagManager? Why can't
this be abstracted away?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 292816)
Time Spent: 2h 20m (was: 2h 10m)
> add a flow level sla in gaas flows
> ----------------------------------
>
> Key: GOBBLIN-847
> URL: https://issues.apache.org/jira/browse/GOBBLIN-847
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> add a flow level sla in gaas flows, because sometimes azkaban jobs may not
> start and hence send any tracking event, or azkaban maybe down. in all those
> cases, we might have to kill the job so we can start a new job
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)