juditnovak commented on issue #59074: URL: https://github.com/apache/airflow/issues/59074#issuecomment-3632595440
I'm experiencing the same issue, as I also heavily rely on `dag.test(use_executor=True)` (via Airflow System Tests). I'm using the Airflow Pytest Plugin (which invokes `dag.test(executor=True)`) on pipelines testing the custom executor implemented in the [Nomad Airflow Provider](https://github.com/juditnovak/airflow-provider-nomad) project. No integration with `breeze`, but simple `pytest` calls. I run the tests against Airflow API server started up "manually" before the test run. (*) Investigating the issue lately my impression was a race condition across the following events: 1. `LocalExecutor` executes workloads via the [supervisor](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/supervisor.py) 2. at the end of the task execution, the `supervisor` is sending an call to the Airflow API `/execution/task-instances/` endpoint. This call is responsible for updating the database with the final state 3. then the `supervisor` returns results to the Executor, who will register the results in its Event Buffer 4. NOTE: 1.)-3.) were part of the Executor's `heartbeat()/sync()` calls. 5. in the particularly intense `dag.test()` polling workflow the `SchedulerJobRunner.process_executor_events()` call is invoked right after, rapidly processing contents of the Executor's Event Buffer. 6. however, at this point the API call from 2.) is not yet finished. The `supervisor`'s attempt to record the state in the database is not yet done. This results in an inconsistent state between the Executor's Event Buffer and the database, reflected by the errors such as: Executior (`dag.test(use_executor=True)` side: ``` 2025-12-09 15:05:04 [info ] Task finished [supervisor] duration=10.164723910973407 exit_code=0 final_state=success [2025-12-09T15:05:05.738+0100] {scheduler_job_runner.py:835} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test-nomad-job-operator-localexecutor', task_id='nomad_job_localexecutor', run_id='manual__2025-12-09T14:04:54.434758+00:00', try_number=1, map_index=-1) [2025-12-09T15:05:05.746+0100] {scheduler_job_runner.py:879} INFO - TaskInstance Finished: dag_id=test-nomad-job-operator-localexecutor, task_id=nomad_job_localexecutor, run_id=manual__2025-12-09T14:04:54.434758+00:00, map_index=-1, run_start_date=2025-12-09 14:04:54.605744+00:00, run_end_date=None, run_duration=None, state=running, executor=LocalExecutor(parallelism=32), executor_state=success, try_number=1, max_tries=0, pool=default_pool, queue=default, priority_weight=1, operator=NomadJobOperator, queued_dttm=None, scheduled_dttm=2025-12-09 14:04:54.449045+00:00,queued_by_job_id=None, pid=252308 [2025-12-09T15:05:05.753+0100] {scheduler_job_runner.py:952} ERROR - DAG 'test-nomad-job-operator-localexecutor' for task instance <TaskInstance: test-nomad-job-operator-localexecutor.nomad_job_localexecutor manual__2025-12-09T14:04:54.434758+00:00 [running]> not found in serialized_dag table [2025-12-09T15:05:05.753+0100] {taskinstance.py:1888} ERROR - Executor LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: test-nomad-job-operator-localexecutor.nomad_job_localexecutor manual__2025-12-09T14:04:54.434758+00:00 [running]> finished with state success, but the task instance's state attribute is running. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally [2025-12-09T15:05:05.758+0100] {taskinstance.py:2011} INFO - Marking task as FAILED. dag_id=test-nomad-job-operator-localexecutor, task_id=nomad_job_localexecutor, run_id=manual__2025-12-09T14:04:54.434758+00:00, logical_date=20251209T140454, start_date=20251209T140454, end_date=20251209T140505 ``` API server side (all good): ``` INFO: 127.0.0.1:54378 - "PUT /execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/heartbeat HTTP/1.1" 204 No Content INFO: 127.0.0.1:54378 - "POST /execution/xcoms/test-nomad-job-operator-localexecutor/manual__2025-12-09T14%3A04%3A54.434758%2B00%3A00/nomad_job_localexecutor/return_value HTTP/1.1" 201 Created 2025-12-09 15:05:04 [debug ] Updating task instance state new_state=success ti_id=019b036d-ed2d-7948-a320-7ff515217e41 2025-12-09 15:05:04 [debug ] Retrieved current task instance state max_tries=0 previous_state=running ti_id=019b036d-ed2d-7948-a320-7ff515217e41 try_number=1 2025-12-09 15:05:04 [info ] Task instance state updated new_state=success rows_affected=1 ti_id=019b036d-ed2d-7948-a320-7ff515217e41 INFO: 127.0.0.1:54378 - "PATCH /execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/state HTTP/1.1" 204 No Content ``` The errros above were from Airflow 3.0.6, but I've been observing similar behavior on 3.1.3. Of course I may be mistaken. I'm glad to get guidelines on any part of the workflow that may have been misunderstood. I'm happy to offer a PR or to contribute to ongoing efforts. (*) NOTE: Programming errors on the `LocalExecutor` pipeline have been recently identified, fix coming very soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
