r-richmond opened a new issue, #53797: URL: https://github.com/apache/airflow/issues/53797
### Apache Airflow version 3.0.3 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? Will trying to reproduce https://github.com/apache/airflow/issues/52916 locally I made the following dag. <details> <summary>Dag Code</summary> ```python import logging from datetime import timedelta from airflow.models import DAG from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance from airflow.timetables.trigger import CronTriggerTimetable from pendulum.tz.timezone import Timezone def generate_query(line_length: int) -> str: query = "SELECT 1 AS col /* start of querey */" for i in range(line_length - 1): query += f"\nUNION ALL SELECT {i + 2} AS col /* a comment to make the query longer */" return query def generic_python(ti: RuntimeTaskInstance) -> None: logger = logging.getLogger(__name__) logger.info("Executing generic_python function") for batch in range(3): query = generate_query(250) # random quess for how long the query should be logger.info(f"Running query for batch {batch + 1}:") for line in query.splitlines(): # custom dbt operator that is a subclasses of bash operator spits out sql logs line by line in airflow 3 (not 2); this replicates it logger.info(line) logger.info("Finished executing generic_python function") dag = DAG( dag_id="large_logs", schedule=CronTriggerTimetable("0 0 * * *", timezone=Timezone("Etc/UTC")), dagrun_timeout=timedelta(seconds=600), max_active_runs=1, catchup=False, default_args={}, ) # upstream dag with the issue has 246 tasks levels = [40, 5, 40, 5, 10, 5, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10] # integer for level, 1, 2, 3, 4, 10, 6, 7, 8 task_map: dict[int, list[PythonOperator]] = {} # to hold the dynamically created tasks for level, size_of_level in enumerate(levels, start=1): for i in range(size_of_level): task_id = f"t_generic_task_l_{level}_t_{i + 1}" task = PythonOperator( task_id=task_id, python_callable=generic_python, trigger_rule="all_done", # Ensure all tasks run even if some fail dag=dag, ) if level not in task_map: task_map[level] = [] task_map[level].append(task) if level > 1: # Set dependencies to the previous level for prev_task in task_map[level - 1]: prev_task >> task ``` </details> When running this locally using standalone about 5-10% of the tasks failed with no messages in the task logs. However I did see in the audit/event log that there was a state mismatch running != failed. ### What you think should happen instead? 100% of the tasks should succeed. ### How to reproduce Spin up 3.0.3 locally using standalone. And run the dag provided above a couple times. ### Operating System docker ### Versions of Apache Airflow Providers latest ### Deployment Official Apache Airflow Helm Chart ### Deployment details This was tested using the official docker image in standalone mode. ### Anything else? I was able to reproduce some failures on every dag run. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
