amoghrajesh opened a new pull request, #67473:
URL: https://github.com/apache/airflow/pull/67473
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [x] Yes: Claude Sonnet 4.6
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
Built on top of a few PRs: https://github.com/apache/airflow/pull/67118 and
even https://github.com/apache/airflow/pull/65991
### What problem are we solving?
`SparkSubmitOperator` in YARN cluster mode submits the Spark driver as a
YARN AM — it runs independently on the cluster. If the Airflow worker dies
mid-poll, the YARN app keeps running but Airflow loses track of it. The only
recovery today is to submit a brand new application, wasting already-done
compute or causing conflicts if the job is not idempotent.
### Proposed change
Extends `ResumableJobMixin` crash recovery (introduced in #67118 for Spark
standalone cluster mode) to YARN cluster mode.
**On first run:**
- Injects `spark.yarn.submit.waitAppCompletion=false` so spark-submit exits
after YARN accepts the app
- Parses the YARN application ID from spark-submit output and persists it to
`task_state`
- Polls the YARN ResourceManager REST API (GET
`/ws/v1/cluster/apps/{appId}`) until terminal, built on top of
https://github.com/apache/airflow/pull/65991
**On retry (worker crash):**
- Reads the application ID from `task_state`
- Queries YARN resourcemanager for current status, reconnects if still
running, skips polling if already succeeded, resubmits only if failed/killed
**Status mapping:**
The hook queries `GET /ws/v1/cluster/apps/{appId}` and synthesizes YARN
API's two-field response into a single string for the mixin interface:
| YARN `state` | `finalStatus` | Synthesized status | Mixin behaviour |
|---|---|---|---|
| `NEW` / `NEW_SAVING` / `SUBMITTED` / `ACCEPTED` / `RUNNING` | `UNDEFINED`
| state as-is | Poll again |
| `FINISHED` | `SUCCEEDED` | `SUCCEEDED` | Return result |
| `FINISHED` | `FAILED` / `KILLED` / `UNDEFINED` | `FAILED` | Resubmit |
| `FAILED` | `FAILED` / `KILLED` | `FAILED` | Resubmit |
| `KILLED` | `KILLED` | `FAILED` | Resubmit |
`state` determines whether the app is still running; `finalStatus` resolves
the outcome when `state=FINISHED`. Using `finalStatus` alone would be
unreliable because YARN can report `UNDEFINED` for a dead app if the RM
recovered from a crash and lost the final status.
**Observability:** Logs status transitions such as ACCEPTED → RUNNING and a
heartbeat every 10 polls so users can see the job is alive without log spam.
**on_kill**: Since spark-submit has already exited
(`waitAppCompletion=false`), the hook's CLI kill has nothing to terminate. YARN
cluster mode `on_kill` uses the Resource Manager REST API instead (`PUT
/ws/v1/cluster/apps/{appId}/state`).
### What changes from the standalone PR (#67118)
The standalone PR covered `spark://` masters only. This PR adds the parallel
YARN path. The two paths are kept as separate if branches in `execute()` for
readability.
### User impact and backcompat
#### New behaviour (opt-in by existing default):
- Any `SparkSubmitOperator` with `--master yarn --deploy-mode cluster` now
gets crash recovery automatically. `reconnect_on_retry=True` is the default
- Requires one new connection `extra: yarn_resourcemanager_webapp_address`
(e.g. `http://rm.example.com:8088`). Without it, the resumable path raises
`ValueError` at submit time with a clear message.
- Trade-off to be aware of: Full spark-submit log streaming
(infrastructure-level YARN orchestrator logs) is replaced by RM REST API
polling. Actual driver/executor logs are unaffected — they remain in YARN log
aggregation (`yarn logs -applicationId ...`). Most users will not notice; users
relying on orchestrator log streaming should set `reconnect_on_retry=False`
#### Unaffected paths:
- YARN client mode — unaffected, still uses the blocking hook.submit() path
- Kubernetes — unaffected, still raises `NotImplementedError` (sister PR)
- `reconnect_on_retry=False` — skips crash recovery, submits and polls
without task_state persistence
### Testing Details
#### Setup
Using this docker compose setup to spin up a 4 node / container Hadoop 3.2.1
cluster: NN, DN, RM, NM.
```shell
services:
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
container_name: yarn-namenode
ports:
- "9870:9870"
- "9000:9000"
volumes:
- namenode_data:/hadoop/dfs/name
environment:
- CLUSTER_NAME=yarn-test
env_file:
- hadoop.env
networks:
- yarn-net
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
container_name: yarn-datanode
volumes:
- datanode_data:/hadoop/dfs/data
env_file:
- hadoop.env
depends_on:
- namenode
networks:
- yarn-net
resourcemanager:
image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8
container_name: yarn-resourcemanager
ports:
- "8088:8088"
- "8030:8030"
- "8031:8031"
- "8032:8032"
env_file:
- hadoop.env
depends_on:
- namenode
- datanode
networks:
- yarn-net
nodemanager:
image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8
container_name: yarn-nodemanager
env_file:
- hadoop.env
depends_on:
- resourcemanager
networks:
- yarn-net
volumes:
namenode_data:
datanode_data:
networks:
yarn-net:
driver: bridge
```
Prepared a script to setup few things / bootstrap them for running spark on
hadoop -- setup Java, stages Spark JARs to HDFS, write `core-site.xml` and
`yarn-site.xml` config, register `spark_yarn` airflow connection with the right
`yarn_resourcemanager_webapp_address`:
Connection
```
airflow connections add spark_yarn \
--conn-type spark \
--conn-host yarn \
--conn-extra '{"deploy-mode": "cluster",
"yarn_resourcemanager_webapp_address": "http://host.docker.internal:8088"}'
```
Using this DAG:
```
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.apache.spark.operators.spark_submit import
SparkSubmitOperator
with DAG(
dag_id="spark_yarn_mode_repro",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
tags=["aip-103", "yarn", "repro"],
) as dag:
SparkSubmitOperator(
task_id="submit_long_running_job",
conn_id="spark_yarn",
application="/opt/airflow/dev/spark-cluster-yarn/spark-examples.jar",
java_class="org.apache.spark.examples.SparkPi",
application_args=["100"],
conf={
"spark.yarn.archive": "hdfs:///spark-jars.zip",
"spark.executor.memory": "512m",
"spark.driver.memory": "512m",
"spark.executor.cores": "1",
},
env_vars={
"HADOOP_CONF_DIR": "/tmp/hadoop-conf",
"HADOOP_USER_NAME": "root",
},
num_executors=1,
retries=2,
retry_delay=5,
)
```
The DAG submits SparkPi with 10,000 iterations (~2 min runtime) — long
enough to kill the worker mid-poll and verify reconnection on retry. Check the
YARN UI to confirm only one application was submitted across multiple Airflow
task attempt.
#### Steps to Verify reconnect behaviour
1. Trigger the dag and wait for yarn application to spin up
<img width="2551" height="1344" alt="image"
src="https://github.com/user-attachments/assets/ebe3565a-21e5-4de1-8204-ebffb98e6103"
/>
<img width="1723" height="1026" alt="image"
src="https://github.com/user-attachments/assets/4567463f-fd2f-44f4-bcfc-8f557138cb9d"
/>
2. Kill the worker mid way and watch the logs
<img width="1723" height="1026" alt="image"
src="https://github.com/user-attachments/assets/ca60e709-fa0b-45df-b19b-3bb1754a3c3a"
/>
3. Observe on yarn UI, application continues running and completes
<img width="1723" height="1026" alt="image"
src="https://github.com/user-attachments/assets/af83ce81-6f0f-41c1-983c-219f4e76a067"
/>
4. Resume worker once yarn app is complete to observe no resubmission
<img width="1723" height="1026" alt="image"
src="https://github.com/user-attachments/assets/f0ce61f7-b9f4-43be-a7a5-63dee1c31d92"
/>
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]