benbuckman opened a new issue, #34023:
URL: https://github.com/apache/airflow/issues/34023
### Apache Airflow version
2.7.0
### What happened
I have the following DAG:
```python
from datetime import datetime
from airflow.decorators import dag, task, task_group
from airflow.exceptions import AirflowSensorTimeout
from airflow.utils.trigger_rule import TriggerRule
@task
def get_records() -> list[str]:
return ["a", "b", "c"]
@task
def submit_job(record: str) -> None:
pass
@task
@task
def fake_sensor(record: str) -> bool:
raise RuntimeError("boo")
@task
def deliver_record(record: str) -> None:
pass
@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record: str) -> None:
pass
@task_group(group_id="deliver_records")
def deliver_record_task_group(record: str) -> None:
(
submit_job(record=record)
>> fake_sensor(record=record)
>> deliver_record(record=record)
>> handle_failed_delivery(record=record)
)
@dag(
dag_id="demo_trigger_one_failed",
schedule=None,
start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
records = get_records()
deliver_record_task_group.expand(record=records)
demo_trigger_one_failed()
```
- `fake_sensor` is simulating a task that raises an exception. (It could be
a `@task.sensor` raising a `AirflowSensorTimeout`; it doesn't matter, the
behavior is the same.)
- `handle_failed_delivery`'s `TriggerRule.ONE_FAILED` means **it is supposed
to run whenever any task upstream fails.** So when `fake_sensor` fails,
`handle_failed_delivery` should run.
But this does not work. `handle_failed_delivery` is skipped, and (based on
the UI) it's skipped very early, before it can know if the upstream tasks have
completed successfully or errored.
Here's what I see, progressively (see `How to reproduce` below for how I got
this):
| started ... | skipped too early ... | fake sensor about to fail... |
... done, didn't run |
|--------|--------|--------|--------|
| <img width="312" alt="Screenshot 2023-09-01 at 3 26 49 PM"
src="https://github.com/apache/airflow/assets/354655/2a9bb897-dd02-4c03-a381-2deb774d1072">
| <img width="310" alt="Screenshot 2023-09-01 at 3 26 50 PM"
src="https://github.com/apache/airflow/assets/354655/11d0f8c5-c7c0-400f-95dd-4ed3992701d0">
| <img width="308" alt="Screenshot 2023-09-01 at 3 26 53 PM"
src="https://github.com/apache/airflow/assets/354655/dd81e42e-ca24-45fa-a18d-df2b435c3d82">
| <img width="309" alt="Screenshot 2023-09-01 at 3 26 56 PM"
src="https://github.com/apache/airflow/assets/354655/d3a3303c-91d9-498a-88c3-f1aa1e8580b6">
|
If I remove the task group and instead do,
```python
@dag(
dag_id="demo_trigger_one_failed",
schedule=None,
start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
records = get_records()
(
submit_job(record=records)
>> fake_sensor.expand(record=records)
>> deliver_record.expand(record=records)
>> handle_failed_delivery.expand(record=records)
)
```
then it does the right thing:
| started ... | waiting ... | ... done, triggered correctly |
|--------|--------|--------|
| <img width="301" alt="Screenshot 2023-09-01 at 3 46 48 PM"
src="https://github.com/apache/airflow/assets/354655/7e52979b-0161-4469-b284-3411a0b1b1c4">
| <img width="306" alt="Screenshot 2023-09-01 at 3 46 50 PM"
src="https://github.com/apache/airflow/assets/354655/733654f3-8cb0-4181-b6b7-bad02994469d">
| <img width="304" alt="Screenshot 2023-09-01 at 3 46 53 PM"
src="https://github.com/apache/airflow/assets/354655/13ffb46f-d5ca-4e7a-8d60-caad2e4a7827">
|
### What you think should happen instead
The behavior with the task group should be the same as without the task
group: the `handle_failed_delivery` task with
`trigger_rule=TriggerRule.ONE_FAILED` should be run when the upstream
`fake_sensor` task fails.
### How to reproduce
1. Put the above DAG at a local path, `/tmp/dags/demo_trigger_one_failed.py`.
2. `docker run -it --rm --mount
type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080
apache/airflow:2.7.0-python3.10 bash`
3. In the container:
```
airflow db init
airflow users create --role Admin --username airflow --email airflow
--firstname airflow --lastname airflow --password airflow
airflow scheduler --daemon
airflow webserver
```
4. Open `http://localhost:8080` on the host. Login with `airflow` /
`airflow`. Run the DAG.
I tested this with:
- `apache/airflow:2.6.2-python3.10`
- `apache/airflow:2.6.3-python3.10`
- `apache/airflow:2.7.0-python3.10`
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
n/a
### Deployment
Other Docker-based deployment
### Deployment details
This can be reproduced using standalone Docker images, see Repro steps above.
### Anything else
I wonder if this is related to (or fixed by?)
https://github.com/apache/airflow/issues/33446 ->
https://github.com/apache/airflow/pull/33732
(The latter was "added to the `Airflow 2.7.1` milestone 3 days ago." I can
try to install that pre-release code in the container and see if it's fixed.)
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]