keomgak opened a new issue, #31491:
URL: https://github.com/apache/airflow/issues/31491

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   I'm using subdag in airflow2. My subdags is like this,,
   
   ```
   parent_subdag_sample
   \child_subdag_start (success)
   \child_subdag_task_1 (success)
   \child_subdag_task_2 (success)
   \child_subdag_task_fail (fail)
   \child_subdag_task_3 (upstream_failed)
   \child_subdag_end (upstream_failed)
   ```
   The case when child_subdag_task_fail is failed,, I want to rerun from the 
first task(child_subdag_start). So, I clear parent_subdag_sample with the 
recursive + downstream option selected. (I did not select the failed option)
   
   For airflow1, from child_subdag_start task is cleared. For airflow2.4.3, 
child_subdag_start, child_subdag_task_1, and child_subdag_task_2 with success 
status are not cleared, but from child_subdag_task_fail task is cleared.
   
   The recursive option is described as "Recursive - All the tasks in the child 
DAGs and parent DAGs
   
   So I think all child subdag will be cleared like airflow1.
   In airflow2.4.3, I wonder why child subdag that succeeded is not cleared. 
   Is there any way to make all child subdag clear?
   
   
   
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Here is my testcode.
   
   ```
   from time import sleep
   from pendulum.tz.timezone import Timezone
   from datetime import datetime, timedelta
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.subdag import SubDagOperator
   import subdag.child_subdag as cs
   
   kst=Timezone('Asia/Seoul')
   
   default_args={
       'owner': 'test',
       'retries':0,
       'retry_delay':timedelta(minutes=1),
   }
   
   def dump() -> None:
       sleep(3)
   
   
   dag_id = 'subdag_test'
   
   with DAG(
       dag_id=dag_id,
       default_args=default_args,
       start_date=datetime(2023,3,11, tzinfo=kst),
       end_date=datetime(2023,3,11, tzinfo=kst),
       schedule_interval="@daily",
       tags=['test', 'subDAG']
   ) as dag:
      parent_task_start = PythonOperator(
           task_id='parent_task_start',
           python_callable=dump,
       )
   
       parent_subdag_sample = SubDagOperator(
           task_id='parent_subdag_sample',
           subdag=cs.child_subdag(parent_dag_name=dag_id, 
child_dag_name="parent_subdag_sample")
       )
   
       parent_task_end= PythonOperator(
           task_id='parent_task_end',
           python_callable=dump,
       )
   
       parent_task_start >> parent_subdag_sample >> parent_task_end
   ```
   
   
   
   ```
   def dump_error() -> None:
       raise AirflowFailException("fail")
   
   
   default_args={
       'owner': 'test',
       'retries':0,
       'retry_delay':timedelta(minutes=1),
   }
   
   def child_subdag(parent_dag_name, child_dag_name) -> DAG:
       with DAG(
           dag_id=f"{parent_dag_name}.{child_dag_name}",
           default_args=default_args,
           schedule_interval="@daily",
           start_date=datetime(2023,3,11, tzinfo=kst),
           end_date=datetime(2023,3,11, tzinfo=kst),
           tags=['test', 'subdag', 'sample1'],
       ) as dag:
           child_subdag_start = PythonOperator(
               task_id='child_subdag_start',
               python_callable=dump
           )
   
           child_subdag_task_1 = PythonOperator(
               task_id='child_subdag_task_1',
               python_callable=dump
           )
   
           child_subdag_task_2 = PythonOperator(
               task_id='child_subdag_task_2',
               python_callable=dump
           )
   
           child_subdag_task_fail = PythonOperator(
               task_id='child_subdag_task_fail',
               python_callable=dump_error
           )
   
           child_subdag_task_3 = PythonOperator(
               task_id='child_subdag_task_3',
               python_callable=dump
           )
   
           child_subdag_end = DummyOperator(
               task_id='child_subdag_end'
           )
   
           child_subdag_start >> child_subdag_task_1 >> child_subdag_task_2 >> 
child_subdag_task_fail \
           >> child_subdag_task_3 >> child_subdag_end
   
           return dag
   ```
   
   
   
   ### Operating System
   
   ubuntu 22.04
   
   ### Versions of Apache Airflow Providers
   
   composer-2.1.8
   airflow-2.4.3
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to