jj-ian opened a new pull request #4083: Airflow 3211
URL: https://github.com/apache/incubator-airflow/pull/4083
 
 
   This change allows Airflow to reattach to existing Dataproc jobs upon 
scheduler restart. Previously, if the Airflow scheduler restarts while it's 
running a job on GCP Dataproc, it'll lose track of that job, mark the task as 
failed, and eventually retry. However, the jobs may still be running on 
Dataproc and maybe even finish successfully. So when Airflow retries and reruns 
the job, the same job will run twice. This can result in issues like delayed 
workflows, increased costs, and duplicate data. 
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. 
     - https://issues.apache.org/jira/browse/AIRFLOW-3211
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   My change has Airflow query the Dataproc API before submitting a job to see 
if the job is already running on the cluster. If a job with a matching task ID 
is already running on the cluster AND is in a recoverable state (like RUNNING 
or DONE), then Airflow will reattach itself to the existing job on Dataproc 
instead of resubmitting a new job to the cluster. If the job on the cluster is 
in an irrecoverable state like ERROR, Airflow will resubmit the job.
   
   To see this change in action:
   
   Setup:
   1. Set up a GCP Project with the Dataproc API enabled
   2. Install Airflow.
   3. In the box that's running Airflow, `pip install google-api-python-client 
oauth2client`
   4. Start the Airflow webserver. In the Airflow UI, Go to Admin->Connections, 
edit the `google_cloud_default` connection, and fill in the Project Id field 
with your project ID.
   
   To reproduce:
   
   1. Install this DAG in the Airflow instance: 
https://github.com/GoogleCloudPlatform/python-docs-samples/blob/b80895ed88ba86fce223df27a48bf481007ca708/composer/workflows/quickstart.py
 Set up the Airflow variables as instructed at the top of the file.
   2. Start the Airflow scheduler and webserver if they're not running already. 
Kick off a run of the above DAG through the Airflow UI. Wait for the cluster to 
spin up and the job to start running on Dataproc.
   3. While the job's running, kill the scheduler. Wait 5 seconds or so, and 
then start it back up.
   4. Airflow will retry the task and reattach to the existing task already on 
Dataproc. Look at the Airflow logs to observe "Reattaching to 
previously-started DataProc job [JOB NAME HERE] (in state RUNNING)." Click on 
the cluster in Dataproc to observe that only the single job is running; a 
duplicate job has not been submitted.
   5. Observe that, when the job finishes, Airflow detects the completion 
successfully and runs the downstream cluster delete operation.
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   Added the following tests to `tests/contrib/hooks/test_gcp_dataproc_hook.py`:
   
   When submitting a new job to Dataproc:
   - If a job with the same task ID is already running on the cluster, don't 
resubmit the job.
   - If the first matching job found on the cluster is in an irrecoverable 
state, keep looking for a job in a recoverable state to reattach to on the 
cluster. This ensures that Airflow will prioritize recoverable jobs when 
looking for jobs to reattach to on the cluster.
   - If there are jobs running on the cluster, but none of them have the same 
task ID as the job we're about to submit, then submit the new job.
   - If there are no other jobs already running on the cluster, then submit the 
job.
   - If a job with the same task ID finished with error on the cluster, then 
resubmit the job for retry.
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes 
how to use it.
     - When adding new operators/hooks/sensors, the autoclass documentation 
generation needs to be added.
   
   ### Code Quality
   
   - [x] Passes `flake8`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to