jj-ian opened a new pull request #4083: Airflow 3211 URL: https://github.com/apache/incubator-airflow/pull/4083 This change allows Airflow to reattach to existing Dataproc jobs upon scheduler restart. Previously, if the Airflow scheduler restarts while it's running a job on GCP Dataproc, it'll lose track of that job, mark the task as failed, and eventually retry. However, the jobs may still be running on Dataproc and maybe even finish successfully. So when Airflow retries and reruns the job, the same job will run twice. This can result in issues like delayed workflows, increased costs, and duplicate data. Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. - https://issues.apache.org/jira/browse/AIRFLOW-3211 ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: My change has Airflow query the Dataproc API before submitting a job to see if the job is already running on the cluster. If a job with a matching task ID is already running on the cluster AND is in a recoverable state (like RUNNING or DONE), then Airflow will reattach itself to the existing job on Dataproc instead of resubmitting a new job to the cluster. If the job on the cluster is in an irrecoverable state like ERROR, Airflow will resubmit the job. To see this change in action: Setup: 1. Set up a GCP Project with the Dataproc API enabled 2. Install Airflow. 3. In the box that's running Airflow, `pip install google-api-python-client oauth2client` 4. Start the Airflow webserver. In the Airflow UI, Go to Admin->Connections, edit the `google_cloud_default` connection, and fill in the Project Id field with your project ID. To reproduce: 1. Install this DAG in the Airflow instance: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/b80895ed88ba86fce223df27a48bf481007ca708/composer/workflows/quickstart.py Set up the Airflow variables as instructed at the top of the file. 2. Start the Airflow scheduler and webserver if they're not running already. Kick off a run of the above DAG through the Airflow UI. Wait for the cluster to spin up and the job to start running on Dataproc. 3. While the job's running, kill the scheduler. Wait 5 seconds or so, and then start it back up. 4. Airflow will retry the task and reattach to the existing task already on Dataproc. Look at the Airflow logs to observe "Reattaching to previously-started DataProc job [JOB NAME HERE] (in state RUNNING)." Click on the cluster in Dataproc to observe that only the single job is running; a duplicate job has not been submitted. 5. Observe that, when the job finishes, Airflow detects the completion successfully and runs the downstream cluster delete operation. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: Added the following tests to `tests/contrib/hooks/test_gcp_dataproc_hook.py`: When submitting a new job to Dataproc: - If a job with the same task ID is already running on the cluster, don't resubmit the job. - If the first matching job found on the cluster is in an irrecoverable state, keep looking for a job in a recoverable state to reattach to on the cluster. This ensures that Airflow will prioritize recoverable jobs when looking for jobs to reattach to on the cluster. - If there are jobs running on the cluster, but none of them have the same task ID as the job we're about to submit, then submit the new job. - If there are no other jobs already running on the cluster, then submit the job. - If a job with the same task ID finished with error on the cluster, then resubmit the job for retry. ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `flake8`
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
