r-richmond opened a new issue, #53797:
URL: https://github.com/apache/airflow/issues/53797

   ### Apache Airflow version
   
   3.0.3
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Will trying to reproduce https://github.com/apache/airflow/issues/52916 
locally I made the following dag.
   
   <details>
   <summary>Dag Code</summary>
     
   ```python
   import logging
   from datetime import timedelta
   
   from airflow.models import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
   from airflow.timetables.trigger import CronTriggerTimetable
   from pendulum.tz.timezone import Timezone
   
   
   def generate_query(line_length: int) -> str:
       query = "SELECT 1 AS col /* start of querey */"
       for i in range(line_length - 1):
           query += f"\nUNION ALL SELECT {i + 2} AS col /* a comment to make 
the query longer */"
       return query
   
   
   def generic_python(ti: RuntimeTaskInstance) -> None:
       logger = logging.getLogger(__name__)
       logger.info("Executing generic_python function")
       for batch in range(3):
           query = generate_query(250)  # random quess for how long the query 
should be
           logger.info(f"Running query for batch {batch + 1}:")
           for line in query.splitlines():
               # custom dbt operator that is a subclasses of bash operator 
spits out sql logs line by line in airflow 3 (not 2); this replicates it
               logger.info(line)
       logger.info("Finished executing generic_python function")
   
   
   dag = DAG(
       dag_id="large_logs",
       schedule=CronTriggerTimetable("0 0 * * *", timezone=Timezone("Etc/UTC")),
       dagrun_timeout=timedelta(seconds=600),
       max_active_runs=1,
       catchup=False,
       default_args={},
   )
   
   # upstream dag with the issue has 246 tasks
   levels = [40, 5, 40, 5, 10, 5, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 
10, 10, 10, 10, 10, 10, 10, 10]
   
   # integer for level, 1, 2, 3, 4, 10, 6, 7, 8
   task_map: dict[int, list[PythonOperator]] = {}  # to hold the dynamically 
created tasks
   
   for level, size_of_level in enumerate(levels, start=1):
       for i in range(size_of_level):
           task_id = f"t_generic_task_l_{level}_t_{i + 1}"
           task = PythonOperator(
               task_id=task_id,
               python_callable=generic_python,
               trigger_rule="all_done",  # Ensure all tasks run even if some 
fail
               dag=dag,
           )
           if level not in task_map:
               task_map[level] = []
           task_map[level].append(task)
           if level > 1:
               # Set dependencies to the previous level
               for prev_task in task_map[level - 1]:
                   prev_task >> task
   ```
   </details>
   
   When running this locally using standalone about 5-10% of the tasks failed 
with no messages in the task logs. However I did see in the audit/event log 
that there was a state mismatch running != failed.
   
   
   
   ### What you think should happen instead?
   
   100% of the tasks should succeed.
   
   ### How to reproduce
   
   Spin up 3.0.3 locally using standalone. And run the dag provided above a 
couple times.
   
   ### Operating System
   
   docker
   
   ### Versions of Apache Airflow Providers
   
   latest
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   This was tested using the official docker image in standalone mode.
   
   ### Anything else?
   
   I was able to reproduce some failures on every dag run.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to