[
https://issues.apache.org/jira/browse/GOBBLIN-1784?focusedWorklogId=845692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845692
]
ASF GitHub Bot logged work on GOBBLIN-1784:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Feb/23 17:26
Start Date: 15/Feb/23 17:26
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3641:
URL: https://github.com/apache/gobblin/pull/3641#discussion_r1107255530
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1117,67 +1107,90 @@ private boolean hasRunningJobs(String dagId) {
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
private void cleanUp() {
- List<String> dagIdstoClean = new ArrayList<>();
- //Clean up failed dags
- for (String dagId : this.failedDagIdsFinishRunning) {
- //Skip monitoring of any other jobs of the failed dag.
- LinkedList<DagNode<JobExecutionPlan>> dagNodeList =
this.dagToJobs.get(dagId);
- while (!dagNodeList.isEmpty()) {
- DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
- deleteJobState(dagId, dagNode);
- }
- Dag<JobExecutionPlan> dag = this.dags.get(dagId);
- String status = TimingEvent.FlowTimings.FLOW_FAILED;
- addFailedDag(dagId, dag);
- log.info("Dag {} has finished with status {}; Cleaning up dag from the
state store.", dagId, status);
- // send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), status);
- dagIdstoClean.add(dagId);
- }
+ // Approximate the time when the flow events are emitted to account for
delay when the flow event is received by the job monitor
+ long cleanUpProcessingTime = System.currentTimeMillis();
// Remove dags that are finished and emit their appropriate metrics
for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair :
this.dags.entrySet()) {
String dagId = dagIdKeyPair.getKey();
+ // On service restart, we repopulate the dags that are waiting to be
cleaned up
+ if (dagIdstoClean.contains(dagId)) {
+ continue;
+ }
Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
- if (!hasRunningJobs(dagId) &&
!this.failedDagIdsFinishRunning.contains(dagId)) {
- String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
- if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
- status = TimingEvent.FlowTimings.FLOW_FAILED;
- addFailedDag(dagId, dag);
- this.failedDagIdsFinishAllPossible.remove(dagId);
+ if ((TimingEvent.FlowTimings.FLOW_FAILED.equals(dag.getFlowEvent()) ||
TimingEvent.FlowTimings.FLOW_CANCELLED.equals(dag.getFlowEvent())) &&
+ DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
+ //Skip monitoring of any other jobs of the failed dag.
+ LinkedList<DagNode<JobExecutionPlan>> dagNodeList =
this.dagToJobs.get(dagId);
+ while (!dagNodeList.isEmpty()) {
+ DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+ deleteJobState(dagId, dagNode);
+ }
+ }
+ if (!hasRunningJobs(dagId)) {
+ // Collect all the dagIds that are finished
+ this.dagIdstoClean.add(dagId);
+ if (dag.getFlowEvent() == null) {
+ // If the dag flow event is not set, then it is successful
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
} else {
-
dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
+ addFailedDag(dagId, dag);
}
- log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, status);
+ log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, dag.getFlowEvent());
// send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), status);
- dagIdstoClean.add(dagId);
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), dag.getFlowEvent());
+ dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
}
}
- for (String dagId: dagIdstoClean) {
- cleanUpDag(dagId);
+ // Only clean up dags after the job status monitor processed the flow
event
+ Set<String> cleanedDags = new HashSet<>();
+ for (String dagId: this.dagIdstoClean) {
+ Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+ JobStatus flowStatus = pollFlowStatus(dag);
+ if (flowStatus != null &&
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
+ switch(dag.getFlowEvent()) {
+ case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
+ this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.SUCCESSFUL);
+ break;
+ case TimingEvent.FlowTimings.FLOW_FAILED:
+ this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.FAILED);
+ break;
+ case TimingEvent.FlowTimings.FLOW_CANCELLED:
+ this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
Review Comment:
Ah I realize there's a bug with how we handle start SLA exceeded and
manually cancelled retries, I will put a PR later to fix that.
Issue Time Tracking
-------------------
Worklog Id: (was: 845692)
Time Spent: 1.5h (was: 1h 20m)
> Race condition where on service restart DagManager will lose track of dags
> --------------------------------------------------------------------------
>
> Key: GOBBLIN-1784
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1784
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Gobblin-as-a-Service has a bug where on restart, the DagManager will clean up
> dags but a flow event is never sent.
> This leads to a scenario where if the event is never sent by the underlying
> notification system, the dag will already be cleaned up and thus the job
> status will permanently be stuck in a running state.
> The DagManager thus should only clean up its own reference of dags after it
> reads that the jobstatus monitor has properly saved the final flow status,
> and if a status hasn't been received by some timestamp (e.g. 5 mins), then
> the DagManager will re-emit the event in case it was lost.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)