Gollum999 opened a new issue, #43484:
URL: https://github.com/apache/airflow/issues/43484
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.7.1
### What happened?
I have a task with multiple upstream dependencies, one of which is a
deprecated dataset. To retire this dataset, I set its task's `end_date` to the
last day that the data was available, and I updated the code for the downstream
task to stop looking for that data. Now, the downstream task is stuck in "No
status" and must be manually started from the command line.
The only `trigger_rule`s that work around this issue are ones that have the
wrong semantics for my task (e.g. `always`). I also want to make sure we can
still backfill the deprecated dataset if needed, so I want to keep the DAG
structure and dependencies backward compatible.
### What you think should happen instead?
In descending order of personal preference:
1. The downstream task should only be prevented from running if _all_ of its
upstream dependencies are outside of their `start_date`/`end_date` window, not
just one.
2. A task outside of its date window should behave like a "skipped" task,
propagating down to children but allowing downstream behavior to be controlled
via `trigger_rule`s.
3. If the scheduling behavior cannot be changed, perhaps a workaround would
be to expose the `logical_date` at the DAG level via `AirflowParsingContext`,
so that dependencies could be conditionally changed:
```
if deprecated_task.start_date <= get_parsing_context().logical_date <=
deprecated_task.end_date:
deprecated_task >> downstream_task
```
### How to reproduce
```
#!/usr/bin/env python3
import datetime
import logging
from airflow.decorators import dag
from airflow.models.baseoperator import cross_downstream
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
logger = logging.getLogger(__name__)
@dag(
schedule='@daily',
start_date=datetime.datetime(2024, 10, 21),
)
def test_task_end_date():
with TaskGroup(group_id='single_dep'):
(
EmptyOperator(task_id='upstream')
>> EmptyOperator(task_id='expired',
end_date=datetime.datetime(2024, 10, 24))
>> EmptyOperator(task_id='downstream')
)
with TaskGroup(group_id='multiple_deps'):
[
EmptyOperator(task_id='upstream_1'),
EmptyOperator(task_id='upstream_2'),
EmptyOperator(task_id='expired',
end_date=datetime.datetime(2024, 10, 24)),
] >> EmptyOperator(task_id='downstream')
with TaskGroup(group_id='trigger_rules'):
cross_downstream([
EmptyOperator(task_id='upstream_1'),
EmptyOperator(task_id='upstream_2'),
EmptyOperator(task_id='expired',
end_date=datetime.datetime(2024, 10, 24)),
], [
EmptyOperator(task_id='all_success'),
EmptyOperator(task_id='all_done', trigger_rule='all_done'),
EmptyOperator(task_id='one_success', trigger_rule='one_success'),
EmptyOperator(task_id='none_failed', trigger_rule='none_failed'),
EmptyOperator(task_id='none_failed_min_one_success',
trigger_rule='none_failed_min_one_success'),
EmptyOperator(task_id='none_skipped',
trigger_rule='none_skipped'),
EmptyOperator(task_id='always', trigger_rule='always'),
])
dag = test_task_end_date()
if __name__ == '__main__':
dag.cli()
```

### Operating System
CentOS Stream 8
### Versions of Apache Airflow Providers
N/A
### Deployment
Other
### Deployment details
Self-hosted/standalone
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]