miroslav-trifonov opened a new issue, #41972:
URL: https://github.com/apache/airflow/issues/41972

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.7.2
   
   ### What happened?
   
   We are trying to use the test method for a given dag in our integration 
tests. However, when one of the task fails, the code keeps retrying it 
indefinitely. We are using pytest and in a single test we create and start a 
dag like this:
   ```
       dag = create_incremental_dag(
       ...
       )
       print("Testing integration dag execution")
       dag.test()
       ```
   
   In this example `create_incremental_dag` is a custom function which creates 
a DAG with a certain configuration. This dag contains a step that launches an 
ECS containing our business logic. If there is a problem with the business 
logic, causing the ECS task to exit with an error, the airflow step retries it 
until we manually kill the execution. We are setting the number of retries to 0 
as part of the configuration we pass to the dag (but even if we didn't it 
should still be 0 by the default airflow config)
   
   ### What you think should happen instead?
   
   The step should not retry and the dag should immediately terminate with an 
error.
   
   ### How to reproduce
   
   import datetime
   
   from airflow import DAG, AirflowException
   from airflow.decorators import task
   from airflow.operators.empty import EmptyOperator
   
   from common.tagging import  default_args
   
   
   @task(task_id="test_task")
   def test_task():
       print("Failing Task")
       raise AirflowException("Failing")
   
   
   def create_dag():
       with DAG(
           "test1",
           schedule=None,
           start_date=datetime.datetime(2021, 1, 1),
           tags=["test"],
           catchup=False,
           max_active_runs=1,
           default_args=default_args
       ) as dag:
           start = EmptyOperator(
               task_id="start"
           )
   
           test = test_task()
   
           end = EmptyOperator(
               task_id="end"
           )
   
           start >> test >> end
   
           return dag
   
   
   def test_incremental_tagging_integration():
       dag = create_dag()
       print("Testing integration dag execution with updated data")
       dag.test()
   
   ### Operating System
   
   mac-os 14.6.1 
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to