GitHub user yengkhoo created a discussion: Airflow 3 - tasks triggered twice at 
a time when scheduler replica is 2

Hi all, I recently upgraded from 2.11.0 to Airflow 3.0.4, and using helm chart 
v1.18.0. I am on EKS, have Postgresql 14 as the database and using Kubernetes 
executor.

After the upgrade, I observed that when I have 2 scheduler replicas running, 
sometimes a task gets triggered twice at the same time and one of the pod will 
then fail with error below. This issue also leads to missing logs (external 
logging to S3 is enabled) sometimes for the tasks that were triggered twice, 
where there's no logs found at all even in S3 bucket. My scheduler pod(s) also 
have intermittent restarts with Liveness probe failure, but they wont have 
restarts until the worker pods starts failing.

Once I reduced the scheduler replica to 1, I no longer see the issues of worker 
pods failing and missing logs. But scheduler pod still gets intermittent 
restarts.

I also 
- tested with Airflow 3.0.0/3.0.6 and faced the same problem.
- Double checked that `use_row_level_locking config` is set to true

Can I get some advice on what I may have done wrong or what else should I 
check? I have included the worker log and parts of the scheduler and apiserver 
logs below 🙏 

1. Worker pod fail log:
```
{"timestamp":"2025-09-24T03:11:24.401447Z","level":"info","event":"Executing 
workload","workload":"ExecuteTask(token='xx.xx.x-xx', 
ti=TaskInstance(id=UUID('0199798e-6c60-7548-a4bf-e687e4b95744'), 
task_id='dag.tasl', dag_id='dag', 
run_id='scheduled__2025-09-24T02:30:00+00:00', try_number=3, map_index=-1, 
pool_slots=1, queue='default', priority_weight=5, executor_config=None, 
parent_context_carrier={}, context_carrier={}, 
queued_dttm=datetime.datetime(2025, 9, 24, 3, 2, 51, 887212, 
tzinfo=TzInfo(UTC))), dag_rel_path=PurePosixPath('dag/file.py'), 
bundle_info=BundleInfo(name='airflow-dags', version='xx'), 
log_path='dag_id=dag/run_id=scheduled__2025-09-24T02:30:00+00:00/task_id=dag.task/attempt=3.log',
 type='ExecuteTask')","logger":"__main__"}
{"timestamp":"2025-09-24T03:11:24.761111Z","level":"info","event":"Connecting 
to 
server:","server":"http://airflow-api-server:8080/execution/","logger":"__main__"}
{"timestamp":"2025-09-24T03:11:24.816791Z","level":"info","event":"Secrets 
backends loaded for 
worker","count":1,"backend_classes":["EnvironmentVariablesBackend"],"logger":"supervisor"}
{"timestamp":"2025-09-24T03:11:24.840334Z","level":"warning","event":"Server 
error","detail":{"detail":{"reason":"invalid_state","message":"TI was not in a 
state where it could be marked as 
running","previous_state":"success"}},"logger":"airflow.sdk.api.client"}
{"timestamp":"2025-09-24T03:11:24.849299Z","level":"info","event":"Process 
exited","pid":14,"exit_code":-9,"signal_sent":"SIGKILL","logger":"supervisor"}
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py",
 line 125, in <module>
    main()
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py",
 line 121, in main
    execute_workload(workload)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py",
 line 66, in execute_workload
    supervise(
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1829, in supervise
    process = ActivitySubprocess.start(
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 933, in start
    proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, 
bundle_info=bundle_info)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 944, in _on_child_started
    ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 152, in start
    resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 1218, in patch
    return self.request(
  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
338, in wrapped_f
    return copy(f, *args, **kw)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
477, in __call__
    do = self.iter(retry_state=retry_state)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
378, in iter
    result = action(retry_state)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
400, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in 
result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in 
__get_result
    raise self._exception
  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
480, in __call__
    result = fn(*args, **kwargs)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 735, in request
    return super().request(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 825, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 914, in send
    response = self._send_handling_auth(
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 942, in _send_handling_auth
    response = self._send_handling_redirects(
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 999, in _send_handling_redirects
    raise exc
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 982, in _send_handling_redirects
    hook(response)
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 117, in raise_on_4xx_5xx
    return get_json_error(response) or response.raise_for_status()
  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 113, in get_json_error
    raise err
airflow.sdk.api.client.ServerResponseError: Server returned error
```

2. Scheduler log if I try to grep one of the failed worker name:
```
[2025-10-22T07:55:51.037+0000] {kubernetes_executor_utils.py:428} INFO - 
Creating kubernetes pod for job is 
TaskInstanceKey(dag_id='nametest_report_imports_v3', 
task_id='marketing_attribution.load_to_table_marketing_attribution', 
run_id='manual__2025-10-21T14:56:38.949888+00:00', try_number=1, map_index=-1), 
with pod nametest-report-imports-v3-marketing-axuuqhz0, annotations: <omitted>
[2025-10-22T07:56:01.124+0000] {kubernetes_executor_utils.py:268} INFO - 
Event:test-report-imports-v3-marketing-axuuqhz0 Pending, annotations: <omitted>
[2025-10-22T07:56:01.142+0000] {kubernetes_executor_utils.py:268} INFO - 
Event:test-report-imports-v3-marketing-axuuqhz0 Pending, annotations: <omitted>
[2025-10-22T07:56:01.181+0000] {kubernetes_executor_utils.py:268} INFO - 
Event:test-report-imports-v3-marketing-axuuqhz0 Pending, annotations: <omitted>
[2025-10-22T07:56:01.880+0000] {kubernetes_executor_utils.py:268} INFO - 
Event:test-report-imports-v3-marketing-axuuqhz0 Pending, annotations: <omitted>
[2025-10-22T07:56:02.684+0000] {kubernetes_executor_utils.py:292} INFO - 
Event:test-report-imports-v3-marketing-axuuqhz0 is Running, annotations: 
<omitted>
[2025-10-22T07:56:12.712+0000] {kubernetes_executor_utils.py:292} INFO - 
Event:test-report-imports-v3-marketing-axuuqhz0 is Running, annotations: 
<omitted>
[2025-10-22T07:56:13.900+0000] {kubernetes_executor_utils.py:292} INFO - 
Event:test-report-imports-v3-marketing-axuuqhz0 is Running, annotations: 
<omitted>
[2025-10-22T07:56:13.942+0000] {kubernetes_executor_utils.py:272} ERROR - 
Event:test-report-imports-v3-marketing-axuuqhz0 Failed, annotations: <omitted>
[2025-10-22T07:56:31.408+0000] {kubernetes_executor.py:335} INFO - Changing 
state of (TaskInstanceKey(dag_id='nametest_report_imports_v3', 
task_id='marketing_attribution.load_to_table_marketing_attribution', 
run_id='manual__2025-10-21T14:56:38.949888+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, 
'nametest-report-imports-v3-marketing-axuuqhz0', 'airflow-sbx', '1538631248') 
to failed
```

4. Before scheduler container is restarted, it'll throw this:
`[2025-10-22T07:09:59.561+0000] {kubernetes_executor.py:686} WARNING - Executor 
shutting down, will NOT run 
task=(TaskInstanceKey(dag_id='test_report_imports_v3', 
task_id='test_task.create_table', 
run_id='manual__2025-10-21T14:56:38.949888+00:00', try_number=2, map_index=-1), 
[ExecuteTask(token='eyJxxx.xxx.xxxx', 
ti=TaskInstance(id=UUID('019a0745-b259-72d1-b08a-18b0c3259c2f'), 
task_id='test_task.create_table', dag_id='test_dag', 
run_id='manual__2025-10-21T14:56:38.949888+00:00', try_number=2, map_index=-1, 
pool_slots=1, queue='default', priority_weight=3, 
executor_config={'pod_override': {'api_version': None, ......`

5. api-server sometimes show this for some of the failed pods :
```
INFO:     10.4.133.79:57534 - "GET /execution/connections/test_conn HTTP/1.1" 
200 OK
INFO:     10.4.133.79:57534 - "GET /execution/connections/test_conn HTTP/1.1" 
200 OK
INFO:     10.4.132.6:34854 - "GET /execution/connections/aws_s3_logging 
HTTP/1.1" 200 OK
2025-10-22 08:02:53 [debug    ] Starting task instance run     
hostname=nametest-report-imports-v3-google-search-lfk4zj7c pid=14 
ti_id=019a0745-b283-7033-ab53-3dc0cc605362 unixname=airflow
2025-10-22 08:02:53 [debug    ] Retrieved task instance details 
dag_id=nametest_report_imports_v3 state=success 
task_id=testtask.load_to_prep_table_op_testtask 
ti_id=019a0745-b283-7033-ab53-3dc0cc605362
2025-10-22 08:02:53 [warning  ] Cannot start Task Instance in invalid state 
previous_state=success ti_id=019a0745-b283-7033-ab53-3dc0cc605362
INFO:     10.4.133.79:57534 - "GET /execution/connections/test_conn HTTP/1.1" 
200 OK
INFO:     10.4.132.6:34854 - "PATCH 
/execution/task-instances/019a0745-b283-7033-ab53-3dc0cc605362/run HTTP/1.1" 
409 Conflict
INFO:     10.4.133.79:57534 - "GET /execution/connections/test_conn HTTP/1.1" 
200 OK
INFO:     10.4.133.79:57534 - "GET /execution/connections/test_conn HTTP/1.1" 
200 OK
```

GitHub link: https://github.com/apache/airflow/discussions/57041

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to