arkadiuszbach opened a new issue, #58441:
URL: https://github.com/apache/airflow/issues/58441
### Apache Airflow version
3.1.3
### If "Other Airflow 2/3 version" selected, which one?
3.1.1
### What happened?
## Issue
When task is in the running state and there is a broker redelivery of the
same task then it is marked as failed.
### Redelivery
Redis redelivery:
- after visibility_timeout (default 6 hours) if task hasn't been acked. On
connectivity restart to redis, celery will not ack the task even though it
succeeded in Airflow
- if broker exited abruptly and didn't got a chance to dump the snapshot on
disk (by default it creates a snapshot every 300 operations or 5minutes as i
recall), see, scenario below:
1. task was pushed by the scheduler to celery queue in redis
2. redis wrote snapshot of in memory db into disk
3. worker pulled and started the task
4. redis crashed
5. redis started and loaded the state from 2nd point, so the task is in
the celery queue instead of in unacked_index
6. task is redelivered / picked by another worker
PS: this may happen currently in helm chart on pod restart as sigterm is
not propagated correctly to redis, see
https://github.com/apache/airflow/pull/58432
RabbitMQ:
- As i recall it redeliver the task after connectivity restart without any
delay, in k8s it will happen on pod restart
### Scenario
1. Task is running on some worker
2. Redelivery happens, supervisor calls:
https://github.com/apache/airflow/blob/b4b941891a35a95c3e33688877ca5fc620d6e788/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L973
3. task_instances.start throws ServerResponseError due to 407 http code as
the task is already running:
https://github.com/apache/airflow/blob/b4b941891a35a95c3e33688877ca5fc620d6e788/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L195
4. CeleryExecutor on Scheduler process gets the failure event from Celery
5. Scheduler is processing the events from executor within
`process_executor_events`, task instance fullfills below condition (ti.state =
TaskInstanceState.RUNNING) and is marked as failed:
https://github.com/apache/airflow/blob/b4b941891a35a95c3e33688877ca5fc620d6e788/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L923
In Airflow < 3 such scenario was not causing task failures as only
TaskIstanceState.Queued was considered :
https://github.com/apache/airflow/blob/b93c3db6b1641b0840bd15ac7d05bc58ff2cccbf/airflow/jobs/scheduler_job_runner.py#L904
### Logs
Worker:
```
2025-11-18T18:05:33.195572Z [info [] starting stale bundle cleanup
process [airflow.providers.celery.cli.celery_command] loc=celery_command.py:141
2025-11-18T18:05:33.198872Z [info [] Starting log server on
http://[::[]:8793 [airflow.utils.serve_logs.core] loc=core.py:50
WARNING: ASGI app factory detected. Using it, but please consider setting
the --factory flag explicitly.
INFO: Started server process [15]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://:8793 (Press CTRL+C to quit)
-------------- celery@airflow-worker-78d74bfcff-77qj2 v5.5.3 (immunity)
--- ***** -----
-- ******* ---- Linux-5.15.0-1097-azure-x86_64-with-glibc2.36 2025-11-18
18:05:34
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:
airflow.providers.celery.executors.celery_executor:0x7f86fdf94fe0
- ** ---------- .> transport: redis://:**@airflow-redis:6379/0
- ** ---------- .> results:
postgresql://pgadmin:**@airflow-pgbouncer.airflow-ap-d1:6543/airflow-metadata
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
[tasks]
. execute_workload
2025-11-18T18:05:37.512920Z [info [] Connected to
redis://:**@airflow-redis:6379/0 [celery.worker.consumer.connection]
loc=connection.py:22
2025-11-18T18:05:37.541429Z [info []
celery@airflow-worker-78d74bfcff-77qj2 ready. [celery.apps.worker]
loc=worker.py:176
2025-11-18T18:05:38.617744Z [info [] Events of group {task} enabled by
remote. [celery.worker.control] loc=control.py:343
2025-11-18T18:07:05.324375Z [info [] Task
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received
[celery.worker.strategy] loc=strategy.py:161
2025-11-18T18:07:05.423828Z [info []
[5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery:
token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'),
dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4',
dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2,
map_index=-1, pool_slots=1, queue='default', priority_weight=1,
executor_config=None, parent_context_carrier={}, context_carrier={})
dag_rel_path=PurePosixPath('test_dag.py')
bundle_info=BundleInfo(name='dags-folder', version=None)
log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log'
type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils]
loc=celery_executor_utils.py:166
2025-11-18T18:07:05.467438Z [info [] Secrets backends loaded for worker
[supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1
loc=supervisor.py:1917
2025-11-18T18:08:57.547218Z [info [] Task
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received
[celery.worker.strategy] loc=strategy.py:161
2025-11-18T18:08:57.610058Z [info []
[5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery:
token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'),
dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4',
dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2,
map_index=-1, pool_slots=1, queue='default', priority_weight=1,
executor_config=None, parent_context_carrier={}, context_carrier={})
dag_rel_path=PurePosixPath('test_dag.py')
bundle_info=BundleInfo(name='dags-folder', version=None)
log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log'
type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils]
loc=celery_executor_utils.py:166
2025-11-18T18:08:57.652183Z [info [] Secrets backends loaded for worker
[supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1
loc=supervisor.py:1917
2025-11-18T18:08:57.678714Z [warning [] Server error
[airflow.sdk.api.client] detail={'detail': {'reason': 'invalid_state',
'message': 'TI was not in a state where it could be marked as running',
'previous_state': 'running'}} loc=client.py:171
2025-11-18T18:08:57.686821Z [info [] Process exited
[supervisor] exit_code=-9 loc=supervisor.py:708 pid=86 signal_sent=SIGKILL
2025-11-18T18:08:57.705082Z [error [] Task
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] raised unexpected:
ServerResponseError('Server returned error') [celery.app.trace] loc=trace.py:267
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line
453, in trace_task
R = retval = fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line
736, in __protected_call__
return self.run(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 174, in execute_workload
supervise(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1926, in supervise
process = ActivitySubprocess.start(
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py",
line 950, in start
proc._on_child_started(ti=what, dag_rel_path=dag_rel_path,
bundle_info=bundle_info)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py",
line 961, in _on_child_started
ti_context = self.client.task_instances.start(ti.id, self.pid,
start_date)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py",
line 211, in start
resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py",
line 1218, in patch
return self.request(
^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
338, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
477, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
378, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
400, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449,
in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401,
in __get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
480, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py",
line 867, in request
return super().request(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py",
line 825, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py",
line 914, in send
response = self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py",
line 942, in _send_handling_auth
response = self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py",
line 999, in _send_handling_redirects
raise exc
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py",
line 982, in _send_handling_redirects
hook(response)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py",
line 182, in raise_on_4xx_5xx_with_note
return get_json_error(response) or response.raise_for_status()
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py",
line 172, in get_json_error
raise err
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=019a9827-d13d-7175-b0fb-b69a017a9ffd
INFO: 10.0.49.54:40460 - "GET
/log/dag_id%3Dtest/run_id%3Dscheduled__2025-11-18T18%3A00%3A00%2B00%3A00/task_id%3Dtest_4/attempt%3D2.log
HTTP/1.1" 200 OK
2025-11-18T18:09:01.229047Z [warning [] Server error
[airflow.sdk.api.client] detail={'detail': {'reason': 'not_running', 'message':
'TI is no longer in the running state and task should terminate',
'current_state': 'failed'}} loc=client.py:171
2025-11-18T18:09:01.229556Z [error [] Server indicated the task shouldn't
be running anymore [supervisor] detail={'detail': {'reason': 'not_running',
'message': 'TI is no longer in the running state and task should terminate',
'current_state': 'failed'}} loc=supervisor.py:1109 status_code=409
ti_id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86')
2025-11-18T18:09:06.235061Z [warning [] Process did not terminate in time;
escalating [supervisor] loc=supervisor.py:716 pid=64 signal=SIGTERM
2025-11-18T18:09:06.247180Z [info [] Process exited
[supervisor] exit_code=-9 loc=supervisor.py:708 pid=64 signal_sent=SIGKILL
2025-11-18T18:09:06.247710Z [info [] Task finished
[supervisor] duration=120.79638931999943 exit_code=-9
final_state=SERVER_TERMINATED loc=supervisor.py:1937
task_instance_id=019a9826-13fa-7352-abcc-138aee8aaa86
2025-11-18T18:09:06.263830Z [info [] Task
execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] succeeded in
120.9372150579984s: None [celery.app.trace] loc=trace.py:128
```
Scheduler:
```
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
2025-11-18T18:06:18.629850Z [info [] Starting the scheduler
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1054
2025-11-18T18:06:19.812843Z [info [] Loaded executor: :CeleryExecutor:
[airflow.executors.executor_loader] loc=executor_loader.py:281
2025-11-18T18:06:19.921818Z [info [] Loaded executor:
:KubernetesExecutor: [airflow.executors.executor_loader]
loc=executor_loader.py:281
2025-11-18T18:06:19.922646Z [info [] Start Kubernetes executor
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
loc=kubernetes_executor.py:238
2025-11-18T18:06:20.013982Z [info [] Adopting or resetting orphaned
tasks for active dag runs
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:2276
2025-11-18T18:06:20.020902Z [info [] Event: and now my watch begins
starting at resource_version: 0
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher]
loc=kubernetes_executor_utils.py:132
2025-11-18T18:06:20.025287Z [info [] Marked 1 SchedulerJob instances as
failed [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:2299
2025-11-18T18:07:04.809254Z [info ] 1 tasks up for execution:
<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00
[scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:444
2025-11-18T18:07:04.809453Z [info [] DAG test has 0/16 running and
queued tasks [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:516
2025-11-18T18:07:04.810400Z [info ] Setting the following tasks to
queued state:
<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00
[scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:655
2025-11-18T18:07:04.813201Z [info [] Trying to enqueue tasks:
[<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]>]
for executor: CeleryExecutor(parallelism=32)
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:740
2025-11-18T18:07:05.348969Z [info [] Received executor event with state
queued for task instance TaskInstanceKey(dag_id='test', task_id='test_4',
run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1)
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:818
2025-11-18T18:07:05.369501Z [info [] Setting external_executor_id for
<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [queued[]> to
5fc1cd6a-2920-4a70-bea1-696704f17a75
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:854
2025-11-18T18:08:58.283462Z [info [] Received executor event with state
failed for task instance TaskInstanceKey(dag_id='test', task_id='test_4',
run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1)
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:818
2025-11-18T18:08:58.294573Z [info [] TaskInstance Finished: dag_id=test,
task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00, map_index=-1,
run_start_date=2025-11-18 18:07:05.473530+00:00, run_end_date=None,
run_duration=262.232856, state=running,
executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=2,
max_tries=1, pool=default_pool, queue=default, priority_weight=1,
operator=PythonOperator, queued_dttm=2025-11-18 18:07:04.810721+00:00,
scheduled_dttm=2025-11-18 18:07:04.781059+00:00,queued_by_job_id=215, pid=64
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:864
2025-11-18T18:08:58.297131Z [error [] Executor
CeleryExecutor(parallelism=32) reported that the task instance <TaskInstance:
test.test_4 scheduled__2025-11-18T18:00:00+00:00 [running[]> finished with
state failed, but the task instance's state attribute is running. Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
[airflow.task] loc=taskinstance.py:1505
2025-11-18T18:08:58.301159Z [info [] Marking task as FAILED.
dag_id=test, task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00,
logical_date=20251118T180000, start_date=20251118T180705,
end_date=20251118T180858 [airflow.models.taskinstance] loc=taskinstance.py:1595
2025-11-18T18:08:59.374603Z [info [] Marking run <DagRun test @
2025-11-18 18:00:00+00:00: scheduled__2025-11-18T18:00:00+00:00, state:running,
queued_at: 2025-11-18 18:07:03.683680+00:00. run_type: scheduled> failed
[airflow.models.dagrun.DagRun] loc=dagrun.py:1171
2025-11-18T18:08:59.374813Z [info [] DagRun Finished: dag_id=test,
logical_date=2025-11-18 18:00:00+00:00,
run_id=scheduled__2025-11-18T18:00:00+00:00, run_start_date=2025-11-18
18:07:04.747229+00:00, run_end_date=2025-11-18 18:08:59.374738+00:00,
run_duration=114.627509, state=failed, run_type=scheduled,
data_interval_start=2025-11-18 18:00:00+00:00, data_interval_end=2025-11-18
18:00:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1274
2025-11-18T18:08:59.381120Z [info [] Setting next_dagrun for test to
2025-11-18 19:00:00+00:00, run_after=2025-11-18 19:00:00+00:00
[airflow.models.dag] loc=dag.py:688
```
### What you think should happen instead?
_No response_
### How to reproduce
With redis:
1. Start some dummy sleep task for 10 minutes
2. log into redis-cli
3. Get id from `zrange unacked_index 0 -1 withscores`
4. Run `ZINCRBY unacked_index -{{visibility_timeout}} {{id from previous
command}}` to force redelivery, i.e. `ZINCRBY unacked_index -86400
38ff5954-4edd-46fb-aca4-7749b8702977`
5. Wait, worker should get the same message soon and the running task will
be marked as failed
With RabbitMQ:
1. Restart of RabbitMQ while the task is running should be sufficient in
theory, but didn't test this
### Operating System
Kubernetes
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]