juditnovak commented on issue #59074:
URL: https://github.com/apache/airflow/issues/59074#issuecomment-3632595440

   I'm experiencing the same issue, as I also heavily rely on 
`dag.test(use_executor=True)` (via Airflow System Tests).
   
   I'm using the Airflow Pytest Plugin (which invokes 
`dag.test(executor=True)`) on pipelines testing the custom executor implemented 
in the [Nomad Airflow 
Provider](https://github.com/juditnovak/airflow-provider-nomad) project. No 
integration with `breeze`, but simple `pytest` calls. I run the tests against 
Airflow API server started up "manually" before the test run. (*)
   
   Investigating the issue lately my impression was a race condition across the 
following events:
    1. `LocalExecutor` executes workloads via the 
[supervisor](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/supervisor.py)
    2. at the end of the task execution, the `supervisor` is sending an call to 
the Airflow API `/execution/task-instances/` endpoint. This call is responsible 
for updating the database with the final state
    3. then the `supervisor` returns results to the Executor, who will register 
the results in its Event Buffer
    4. NOTE: 1.)-3.) were part of the Executor's `heartbeat()/sync()` calls.
    5. in the particularly intense `dag.test()` polling workflow the 
`SchedulerJobRunner.process_executor_events()` call is invoked right after, 
rapidly processing contents of the Executor's Event Buffer.
    6. however, at this point the API call from 2.) is not yet finished. The 
`supervisor`'s attempt to record the state in the database is not yet done.
   
   This results in an inconsistent state between the Executor's Event Buffer 
and the database, reflected by the errors such as:
   
   Executior (`dag.test(use_executor=True)` side:
   
   ```
    2025-12-09 15:05:04 [info     ] Task finished                  [supervisor] 
duration=10.164723910973407 exit_code=0 final_state=success
   [2025-12-09T15:05:05.738+0100] {scheduler_job_runner.py:835} INFO - Received 
executor event with state success for task instance 
TaskInstanceKey(dag_id='test-nomad-job-operator-localexecutor', 
task_id='nomad_job_localexecutor', 
run_id='manual__2025-12-09T14:04:54.434758+00:00', try_number=1, map_index=-1)
   [2025-12-09T15:05:05.746+0100] {scheduler_job_runner.py:879} INFO - 
TaskInstance Finished: dag_id=test-nomad-job-operator-localexecutor, 
task_id=nomad_job_localexecutor, 
run_id=manual__2025-12-09T14:04:54.434758+00:00, map_index=-1, 
run_start_date=2025-12-09 14:04:54.605744+00:00, run_end_date=None, 
run_duration=None, state=running, executor=LocalExecutor(parallelism=32), 
executor_state=success, try_number=1, max_tries=0, pool=default_pool, 
queue=default, priority_weight=1, operator=NomadJobOperator, queued_dttm=None, 
scheduled_dttm=2025-12-09 14:04:54.449045+00:00,queued_by_job_id=None, 
pid=252308
   [2025-12-09T15:05:05.753+0100] {scheduler_job_runner.py:952} ERROR - DAG 
'test-nomad-job-operator-localexecutor' for task instance <TaskInstance: 
test-nomad-job-operator-localexecutor.nomad_job_localexecutor 
manual__2025-12-09T14:04:54.434758+00:00 [running]> not found in serialized_dag 
table
   [2025-12-09T15:05:05.753+0100] {taskinstance.py:1888} ERROR - Executor 
LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: 
test-nomad-job-operator-localexecutor.nomad_job_localexecutor 
manual__2025-12-09T14:04:54.434758+00:00 [running]> finished with state 
success, but the task instance's state attribute is running. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
   [2025-12-09T15:05:05.758+0100] {taskinstance.py:2011} INFO - Marking task as 
FAILED. dag_id=test-nomad-job-operator-localexecutor, 
task_id=nomad_job_localexecutor, 
run_id=manual__2025-12-09T14:04:54.434758+00:00, logical_date=20251209T140454, 
start_date=20251209T140454, end_date=20251209T140505      
   ```
   API server side (all good):
   ```
   INFO:     127.0.0.1:54378 - "PUT 
/execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/heartbeat 
HTTP/1.1" 204 No Content
   INFO:     127.0.0.1:54378 - "POST 
/execution/xcoms/test-nomad-job-operator-localexecutor/manual__2025-12-09T14%3A04%3A54.434758%2B00%3A00/nomad_job_localexecutor/return_value
 HTTP/1.1" 201 Created
   2025-12-09 15:05:04 [debug    ] Updating task instance state   
new_state=success ti_id=019b036d-ed2d-7948-a320-7ff515217e41
   2025-12-09 15:05:04 [debug    ] Retrieved current task instance state 
max_tries=0 previous_state=running ti_id=019b036d-ed2d-7948-a320-7ff515217e41 
try_number=1
   2025-12-09 15:05:04 [info     ] Task instance state updated    
new_state=success rows_affected=1 ti_id=019b036d-ed2d-7948-a320-7ff515217e41
   INFO:     127.0.0.1:54378 - "PATCH 
/execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/state HTTP/1.1" 
204 No Content
   ```
   The errros above were from Airflow 3.0.6, but I've been observing similar 
behavior on 3.1.3.
   
   Of course I may be mistaken. I'm glad to get guidelines on any part of the 
workflow that may have been misunderstood.
   
   I'm happy to offer a PR or to contribute to ongoing efforts.
   
   (*) NOTE: Programming errors on the `LocalExecutor` pipeline have been 
recently identified, fix coming very soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to