arkadiuszbach opened a new issue, #58441:
URL: https://github.com/apache/airflow/issues/58441

   ### Apache Airflow version
   
   3.1.3
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   3.1.1
   
   ### What happened?
   
   ## Issue
   
   When task is in the running state and there is a broker redelivery of the 
same task then it is marked as failed.
    
   
   ### Redelivery
   Redis redelivery:
   - after visibility_timeout (default 6 hours) if task hasn't been acked. On 
connectivity restart to redis, celery will not ack the task even though it 
succeeded in Airflow
   
   - if broker exited abruptly and didn't got a chance to dump the snapshot on 
disk (by default it creates a snapshot every 300 operations or 5minutes as i 
recall), see, scenario below:
     1. task was pushed by the scheduler to celery queue in redis
     2. redis wrote snapshot of in memory db into disk
     3. worker pulled and started the task 
     4. redis crashed
     5. redis started and loaded the state from 2nd point, so the task is in 
the celery queue instead of in unacked_index
     6. task is redelivered / picked by another worker
   
      PS: this may happen currently in helm chart on pod restart as sigterm is 
not propagated correctly to redis, see 
https://github.com/apache/airflow/pull/58432
   
   RabbitMQ:
   - As i recall it redeliver the task  after connectivity restart without any 
delay, in k8s it will happen on pod restart
   
   ### Scenario
   1. Task is running on some worker
   2. Redelivery happens, supervisor calls: 
https://github.com/apache/airflow/blob/b4b941891a35a95c3e33688877ca5fc620d6e788/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L973
   3. task_instances.start throws ServerResponseError due to 407 http code as 
the task is already running: 
https://github.com/apache/airflow/blob/b4b941891a35a95c3e33688877ca5fc620d6e788/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L195
   4. CeleryExecutor on Scheduler process gets the failure event from Celery
   5. Scheduler is processing the events from executor within 
`process_executor_events`, task instance fullfills below condition (ti.state = 
TaskInstanceState.RUNNING) and is marked as failed:  
https://github.com/apache/airflow/blob/b4b941891a35a95c3e33688877ca5fc620d6e788/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L923
     
   In Airflow < 3 such scenario was not causing task failures as only 
TaskIstanceState.Queued was considered :
   
https://github.com/apache/airflow/blob/b93c3db6b1641b0840bd15ac7d05bc58ff2cccbf/airflow/jobs/scheduler_job_runner.py#L904
   
   
   ### Logs
   Worker:
   ```
   
   2025-11-18T18:05:33.195572Z [info     [] starting stale bundle cleanup 
process [airflow.providers.celery.cli.celery_command] loc=celery_command.py:141
   2025-11-18T18:05:33.198872Z [info     [] Starting log server on 
http://[::[]:8793 [airflow.utils.serve_logs.core] loc=core.py:50
   WARNING:  ASGI app factory detected. Using it, but please consider setting 
the --factory flag explicitly.
   INFO:     Started server process [15]
   INFO:     Waiting for application startup.
   INFO:     Application startup complete.
   INFO:     Uvicorn running on http://:8793 (Press CTRL+C to quit)
    
    -------------- celery@airflow-worker-78d74bfcff-77qj2 v5.5.3 (immunity)
   --- ***** ----- 
   -- ******* ---- Linux-5.15.0-1097-azure-x86_64-with-glibc2.36 2025-11-18 
18:05:34
   - *** --- * --- 
   - ** ---------- [config]
   - ** ---------- .> app:         
airflow.providers.celery.executors.celery_executor:0x7f86fdf94fe0
   - ** ---------- .> transport:   redis://:**@airflow-redis:6379/0
   - ** ---------- .> results:     
postgresql://pgadmin:**@airflow-pgbouncer.airflow-ap-d1:6543/airflow-metadata
   - *** --- * --- .> concurrency: 16 (prefork)
   -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this 
worker)
   --- ***** ----- 
    -------------- [queues]
                   .> default          exchange=default(direct) key=default
                   
   
   [tasks]
     . execute_workload
   
   2025-11-18T18:05:37.512920Z [info     [] Connected to 
redis://:**@airflow-redis:6379/0 [celery.worker.consumer.connection] 
loc=connection.py:22
   2025-11-18T18:05:37.541429Z [info     [] 
celery@airflow-worker-78d74bfcff-77qj2 ready. [celery.apps.worker] 
loc=worker.py:176
   2025-11-18T18:05:38.617744Z [info     [] Events of group {task} enabled by 
remote. [celery.worker.control] loc=control.py:343
   2025-11-18T18:07:05.324375Z [info     [] Task 
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received 
[celery.worker.strategy] loc=strategy.py:161
   2025-11-18T18:07:05.423828Z [info     [] 
[5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery: 
token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'), 
dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4', 
dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, 
map_index=-1, pool_slots=1, queue='default', priority_weight=1, 
executor_config=None, parent_context_carrier={}, context_carrier={}) 
dag_rel_path=PurePosixPath('test_dag.py') 
bundle_info=BundleInfo(name='dags-folder', version=None) 
log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log'
 type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils] 
loc=celery_executor_utils.py:166
   2025-11-18T18:07:05.467438Z [info     [] Secrets backends loaded for worker 
[supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1 
loc=supervisor.py:1917
   2025-11-18T18:08:57.547218Z [info     [] Task 
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received 
[celery.worker.strategy] loc=strategy.py:161
   2025-11-18T18:08:57.610058Z [info     [] 
[5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery: 
token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'), 
dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4', 
dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, 
map_index=-1, pool_slots=1, queue='default', priority_weight=1, 
executor_config=None, parent_context_carrier={}, context_carrier={}) 
dag_rel_path=PurePosixPath('test_dag.py') 
bundle_info=BundleInfo(name='dags-folder', version=None) 
log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log'
 type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils] 
loc=celery_executor_utils.py:166
   2025-11-18T18:08:57.652183Z [info     [] Secrets backends loaded for worker 
[supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1 
loc=supervisor.py:1917
   2025-11-18T18:08:57.678714Z [warning  [] Server error                   
[airflow.sdk.api.client] detail={'detail': {'reason': 'invalid_state', 
'message': 'TI was not in a state where it could be marked as running', 
'previous_state': 'running'}} loc=client.py:171
   2025-11-18T18:08:57.686821Z [info     [] Process exited                 
[supervisor] exit_code=-9 loc=supervisor.py:708 pid=86 signal_sent=SIGKILL
   2025-11-18T18:08:57.705082Z [error    [] Task 
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] raised unexpected: 
ServerResponseError('Server returned error') [celery.app.trace] loc=trace.py:267
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 
453, in trace_task
       R = retval = fun(*args, **kwargs)
                    ^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 
736, in __protected_call__
       return self.run(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 174, in execute_workload
       supervise(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1926, in supervise
       process = ActivitySubprocess.start(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 950, in start
       proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, 
bundle_info=bundle_info)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 961, in _on_child_started
       ti_context = self.client.task_instances.start(ti.id, self.pid, 
start_date)
                    
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", 
line 211, in start
       resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", 
line 1218, in patch
       return self.request(
              ^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
338, in wrapped_f
       return copy(f, *args, **kw)
              ^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
477, in __call__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
378, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
400, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449, 
in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401, 
in __get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
480, in __call__
       result = fn(*args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", 
line 867, in request
       return super().request(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", 
line 825, in request
       return self.send(request, auth=auth, follow_redirects=follow_redirects)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", 
line 914, in send
       response = self._send_handling_auth(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", 
line 942, in _send_handling_auth
       response = self._send_handling_redirects(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", 
line 999, in _send_handling_redirects
       raise exc
     File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", 
line 982, in _send_handling_redirects
       hook(response)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", 
line 182, in raise_on_4xx_5xx_with_note
       return get_json_error(response) or response.raise_for_status()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", 
line 172, in get_json_error
       raise err
   airflow.sdk.api.client.ServerResponseError: Server returned error
   Correlation-id=019a9827-d13d-7175-b0fb-b69a017a9ffd
   INFO:     10.0.49.54:40460 - "GET 
/log/dag_id%3Dtest/run_id%3Dscheduled__2025-11-18T18%3A00%3A00%2B00%3A00/task_id%3Dtest_4/attempt%3D2.log
 HTTP/1.1" 200 OK
   2025-11-18T18:09:01.229047Z [warning  [] Server error                   
[airflow.sdk.api.client] detail={'detail': {'reason': 'not_running', 'message': 
'TI is no longer in the running state and task should terminate', 
'current_state': 'failed'}} loc=client.py:171
   2025-11-18T18:09:01.229556Z [error    [] Server indicated the task shouldn't 
be running anymore [supervisor] detail={'detail': {'reason': 'not_running', 
'message': 'TI is no longer in the running state and task should terminate', 
'current_state': 'failed'}} loc=supervisor.py:1109 status_code=409 
ti_id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86')
   2025-11-18T18:09:06.235061Z [warning  [] Process did not terminate in time; 
escalating [supervisor] loc=supervisor.py:716 pid=64 signal=SIGTERM
   2025-11-18T18:09:06.247180Z [info     [] Process exited                 
[supervisor] exit_code=-9 loc=supervisor.py:708 pid=64 signal_sent=SIGKILL
   2025-11-18T18:09:06.247710Z [info     [] Task finished                  
[supervisor] duration=120.79638931999943 exit_code=-9 
final_state=SERVER_TERMINATED loc=supervisor.py:1937 
task_instance_id=019a9826-13fa-7352-abcc-138aee8aaa86
   2025-11-18T18:09:06.263830Z [info     [] Task 
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] succeeded in 
120.9372150579984s: None [celery.app.trace] loc=trace.py:128
   ```
   
   Scheduler:
   ```
   
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   2025-11-18T18:06:18.629850Z [info     [] Starting the scheduler         
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1054
   2025-11-18T18:06:19.812843Z [info     [] Loaded executor: :CeleryExecutor: 
[airflow.executors.executor_loader] loc=executor_loader.py:281
   2025-11-18T18:06:19.921818Z [info     [] Loaded executor: 
:KubernetesExecutor: [airflow.executors.executor_loader] 
loc=executor_loader.py:281
   2025-11-18T18:06:19.922646Z [info     [] Start Kubernetes executor      
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 loc=kubernetes_executor.py:238
   2025-11-18T18:06:20.013982Z [info     [] Adopting or resetting orphaned 
tasks for active dag runs 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:2276
   2025-11-18T18:06:20.020902Z [info     [] Event: and now my watch begins 
starting at resource_version: 0 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher]
 loc=kubernetes_executor_utils.py:132
   2025-11-18T18:06:20.025287Z [info     [] Marked 1 SchedulerJob instances as 
failed [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:2299
   2025-11-18T18:07:04.809254Z [info     ] 1 tasks up for execution:
       <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 
[scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:444
   2025-11-18T18:07:04.809453Z [info     [] DAG test has 0/16 running and 
queued tasks [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:516
   2025-11-18T18:07:04.810400Z [info     ] Setting the following tasks to 
queued state:
       <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 
[scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:655
   2025-11-18T18:07:04.813201Z [info     [] Trying to enqueue tasks: 
[<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]>] 
for executor: CeleryExecutor(parallelism=32) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:740
   2025-11-18T18:07:05.348969Z [info     [] Received executor event with state 
queued for task instance TaskInstanceKey(dag_id='test', task_id='test_4', 
run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:818
   2025-11-18T18:07:05.369501Z [info     [] Setting external_executor_id for 
<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [queued[]> to 
5fc1cd6a-2920-4a70-bea1-696704f17a75 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:854
   2025-11-18T18:08:58.283462Z [info     [] Received executor event with state 
failed for task instance TaskInstanceKey(dag_id='test', task_id='test_4', 
run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:818
   2025-11-18T18:08:58.294573Z [info     [] TaskInstance Finished: dag_id=test, 
task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00, map_index=-1, 
run_start_date=2025-11-18 18:07:05.473530+00:00, run_end_date=None, 
run_duration=262.232856, state=running, 
executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=2, 
max_tries=1, pool=default_pool, queue=default, priority_weight=1, 
operator=PythonOperator, queued_dttm=2025-11-18 18:07:04.810721+00:00, 
scheduled_dttm=2025-11-18 18:07:04.781059+00:00,queued_by_job_id=215, pid=64 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:864
   2025-11-18T18:08:58.297131Z [error    [] Executor 
CeleryExecutor(parallelism=32) reported that the task instance <TaskInstance: 
test.test_4 scheduled__2025-11-18T18:00:00+00:00 [running[]> finished with 
state failed, but the task instance's state attribute is running. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
 [airflow.task] loc=taskinstance.py:1505
   2025-11-18T18:08:58.301159Z [info     [] Marking task as FAILED. 
dag_id=test, task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00, 
logical_date=20251118T180000, start_date=20251118T180705, 
end_date=20251118T180858 [airflow.models.taskinstance] loc=taskinstance.py:1595
   2025-11-18T18:08:59.374603Z [info     [] Marking run <DagRun test @ 
2025-11-18 18:00:00+00:00: scheduled__2025-11-18T18:00:00+00:00, state:running, 
queued_at: 2025-11-18 18:07:03.683680+00:00. run_type: scheduled> failed 
[airflow.models.dagrun.DagRun] loc=dagrun.py:1171
   2025-11-18T18:08:59.374813Z [info     [] DagRun Finished: dag_id=test, 
logical_date=2025-11-18 18:00:00+00:00, 
run_id=scheduled__2025-11-18T18:00:00+00:00, run_start_date=2025-11-18 
18:07:04.747229+00:00, run_end_date=2025-11-18 18:08:59.374738+00:00, 
run_duration=114.627509, state=failed, run_type=scheduled, 
data_interval_start=2025-11-18 18:00:00+00:00, data_interval_end=2025-11-18 
18:00:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1274
   2025-11-18T18:08:59.381120Z [info     [] Setting next_dagrun for test to 
2025-11-18 19:00:00+00:00, run_after=2025-11-18 19:00:00+00:00 
[airflow.models.dag] loc=dag.py:688
   ```
   
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   With redis:
   1. Start some dummy sleep task for 10 minutes
   2. log into redis-cli
   3. Get id from `zrange unacked_index 0 -1 withscores`
   4. Run `ZINCRBY unacked_index -{{visibility_timeout}} {{id from previous 
command}}` to force redelivery, i.e. `ZINCRBY unacked_index -86400 
38ff5954-4edd-46fb-aca4-7749b8702977`
   5. Wait, worker should get the same message soon and the running task will 
be marked as failed
   
   With RabbitMQ:
   1. Restart of RabbitMQ while the task is running should be sufficient in 
theory, but didn't test this
   
   ### Operating System
   
   Kubernetes
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to