rch9 opened a new issue #21341:
URL: https://github.com/apache/airflow/issues/21341


   ### Apache Airflow version
   
   2.2.3 (latest released)
   
   ### What happened
   
   SubDagOperator is inherited from BaseSensorOperator so that we can provide 
arg `mode` (poke/reschedule) while creating it.
   
   If I create subdag with mode (`mode=reschedule`) it tries to run failed task 
infinitely (until timeout). If subdag is created with `mode=poke` then subdag 
task will be failed after first try to rerun subdag's failed tasks. Retries are 
set to 0 (`retries=0`).
   
   Also, it seems there is some inconsistency in database because of following 
log:
   ```
   
--------------------------------------------------------------------------------
   [2022-02-05, 15:04:52 UTC] {taskinstance.py:1239} INFO - Starting attempt 3 
of 1
   [2022-02-05, 15:04:52 UTC] {taskinstance.py:1240} INFO - 
   
--------------------------------------------------------------------------------
 
   ```
   
   Please see screenshots below:
   
   
![image](https://user-images.githubusercontent.com/9168155/152647283-aa9c2b7a-85e7-468f-a9e1-2ac2df07ddcc.png)
   
![image](https://user-images.githubusercontent.com/9168155/152647298-147ab167-fa85-4440-9e26-439fe911f798.png)
   
![image](https://user-images.githubusercontent.com/9168155/152647335-4bd05f0d-7f3c-462d-9796-7994f2f029aa.png)
   
![image](https://user-images.githubusercontent.com/9168155/152647373-8ddb2187-aa2b-4113-b5b2-9f1036630dc7.png)
   
   
   
   ### What you expected to happen
   
   Subdag with mode `mode=reschedule` should be in failed state after first try 
to rereun failed task.
   
   ### How to reproduce
   
   ```
   from airflow.models import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.subdag import SubDagOperator
   
   from airflow.utils.dates import days_ago
   
   
   def get_subdag(
       parent_dag_id: str,
   ) -> DAG:
       with DAG(
           dag_id=f"{parent_dag_id}.subdag_op",
           start_date=days_ago(2),
       ) as subdag:
   
           def raise_exception():
               raise Exception("Something bad happened")
   
           PythonOperator(
               task_id="python_op",
               python_callable=raise_exception,
               retries=0
           )
   
           return subdag
   
   
   def main():
       with DAG(
           "1_main_dag",
           schedule_interval="@once",
           start_date=days_ago(2),
       ) as dag:
   
           SubDagOperator(
               task_id="subdag_op",
               subdag=get_subdag("1_main_dag"),
               mode="reschedule",
               # mode="poke",
               timeout=600,
               poke_interval=30,
               retries=0,
           )
   
           globals()[dag.dag_id] = dag
   
   
   main()
   ```
   
   ### Operating System
   
   Ubuntu 20.04.3 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   Please let me know that it is an issue :)
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to