Gollum999 opened a new issue, #43484:
URL: https://github.com/apache/airflow/issues/43484

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.7.1
   
   ### What happened?
   
   I have a task with multiple upstream dependencies, one of which is a 
deprecated dataset.  To retire this dataset, I set its task's `end_date` to the 
last day that the data was available, and I updated the code for the downstream 
task to stop looking for that data.  Now, the downstream task is stuck in "No 
status" and must be manually started from the command line.
   
   The only `trigger_rule`s that work around this issue are ones that have the 
wrong semantics for my task (e.g. `always`).  I also want to make sure we can 
still backfill the deprecated dataset if needed, so I want to keep the DAG 
structure and dependencies backward compatible.
   
   ### What you think should happen instead?
   
   In descending order of personal preference:
   
   1. The downstream task should only be prevented from running if _all_ of its 
upstream dependencies are outside of their `start_date`/`end_date` window, not 
just one.
   2. A task outside of its date window should behave like a "skipped" task, 
propagating down to children but allowing downstream behavior to be controlled 
via `trigger_rule`s.
   3. If the scheduling behavior cannot be changed, perhaps a workaround would 
be to expose the `logical_date` at the DAG level via `AirflowParsingContext`, 
so that dependencies could be conditionally changed:
   ```
   if deprecated_task.start_date <= get_parsing_context().logical_date <= 
deprecated_task.end_date:
       deprecated_task >> downstream_task
   ```
   
   ### How to reproduce
   
   ```
   #!/usr/bin/env python3
   import datetime
   import logging
   
   from airflow.decorators import dag
   from airflow.models.baseoperator import cross_downstream
   from airflow.operators.empty import EmptyOperator
   from airflow.utils.task_group import TaskGroup
   
   
   logger = logging.getLogger(__name__)
   
   
   @dag(
       schedule='@daily',
       start_date=datetime.datetime(2024, 10, 21),
   )
   def test_task_end_date():
       with TaskGroup(group_id='single_dep'):
           (
               EmptyOperator(task_id='upstream')
               >> EmptyOperator(task_id='expired', 
end_date=datetime.datetime(2024, 10, 24))
               >> EmptyOperator(task_id='downstream')
           )
   
       with TaskGroup(group_id='multiple_deps'):
           [
               EmptyOperator(task_id='upstream_1'),
               EmptyOperator(task_id='upstream_2'),
               EmptyOperator(task_id='expired', 
end_date=datetime.datetime(2024, 10, 24)),
           ] >> EmptyOperator(task_id='downstream')
   
       with TaskGroup(group_id='trigger_rules'):
           cross_downstream([
               EmptyOperator(task_id='upstream_1'),
               EmptyOperator(task_id='upstream_2'),
               EmptyOperator(task_id='expired', 
end_date=datetime.datetime(2024, 10, 24)),
           ], [
               EmptyOperator(task_id='all_success'),
               EmptyOperator(task_id='all_done', trigger_rule='all_done'),
               EmptyOperator(task_id='one_success', trigger_rule='one_success'),
               EmptyOperator(task_id='none_failed', trigger_rule='none_failed'),
               EmptyOperator(task_id='none_failed_min_one_success', 
trigger_rule='none_failed_min_one_success'),
               EmptyOperator(task_id='none_skipped', 
trigger_rule='none_skipped'),
               EmptyOperator(task_id='always', trigger_rule='always'),
           ])
   
   
   dag = test_task_end_date()
   
   
   if __name__ == '__main__':
       dag.cli()
   ```
   
   ![Screenshot from 2024-10-29 
09-59-27](https://github.com/user-attachments/assets/510bae83-50b8-41a8-8443-b943bcd85986)
   
   
   ### Operating System
   
   CentOS Stream 8
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Self-hosted/standalone
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to