ralichkov commented on issue #39791:
URL: https://github.com/apache/airflow/issues/39791#issuecomment-3361008628
Hi @shahar1! Thank you for taking the time to look into this, and I'm glad
that another good has come out of this as well.
Before I begin, please don't mind the wall of text. I promise it's a rather
light read.
I spun up the official Airflow Helm chart on my local machine to test this
out, but unfortunately my pod still died unannounced.
Here are my `values.yml`. The repo referenced for the Dag is public.
<details>
```yml
defaultAirflowTag: "3.1.0"
airflowVersion: "3.1.0"
executor: "LocalExecutor"
# Exact hostname depends on your setup.
airflowLocalSettings: |-
def k8s_hostname_callable():
return "airflowhelm-scheduler.airflow.svc.cluster.local"
env:
- name: "AIRFLOW__CORE__HOSTNAME_CALLABLE"
value: "airflow_local_settings.k8s_hostname_callable"
- name: "AIRFLOW_CONN_KUBERNETES_DEFAULT"
value: '{"conn_type": "kubernetes", "extra": {"in_cluster": true,
"namespace": "airflow"}}'
workers:
replicas: 0 # Number of Airflow Celery workers
postgresql:
image:
# Broadcom deprecated the old repo
repository: bitnamilegacy/postgresql
tag: 16.1.0-debian-11-r15
dags:
gitSync:
enabled: true
repo: https://github.com/ralichkov/dag-tests.git
branch: main
rev: HEAD
ref: main
depth: 1
maxFailures: 0
subPath: ""
```
</details>
After triggering the Dag and letting it run for a bit, I killed the
scheduler pod (not forced). Below are logs from the scheduler pod - nothing too
interesting IMO:
<details>
```
[info ] 1 tasks up for execution:
<TaskInstance: kpo.idle manual__2025-10-01T12:27:44.488750+00:00
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:419
[info ] DAG kpo has 0/16 running and queued tasks
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:491
[info ] Setting the following tasks to queued state:
<TaskInstance: kpo.idle manual__2025-10-01T12:27:44.488750+00:00
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:630
[info ] Trying to enqueue tasks: [<TaskInstance: kpo.idle
manual__2025-10-01T12:27:44.488750+00:00 [scheduled]>] for executor:
LocalExecutor(parallelism=32)
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:715
[info ] Worker starting up pid=40
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:65
[info ] Worker starting up pid=41
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:65
[info ] Secrets backends loaded for worker [supervisor]
backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1870
/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py:476
DeprecationWarning: This process (pid=40) is multi-threaded, use of fork() may
lead to deadlocks in the child.
[info ] Task finished [supervisor]
duration=52.103339167999366 exit_code=0 final_state=failed
loc=supervisor.py:1889
[info ] Marking run <DagRun kpo @ 2025-10-01 12:27:42+00:00:
manual__2025-10-01T12:27:44.488750+00:00, state:running, queued_at: 2025-10-01
12:27:44.492154+00:00. run_type: manual> failed [airflow.models.dagrun.DagRun]
loc=dagrun.py:1171
[info ] DagRun Finished: dag_id=kpo, logical_date=2025-10-01
12:27:42+00:00, run_id=manual__2025-10-01T12:27:44.488750+00:00,
run_start_date=2025-10-01 12:27:45.021312+00:00, run_end_date=2025-10-01
12:38:47.057662+00:00, run_duration=662.03635, state=failed, run_type=manual,
data_interval_start=2025-10-01 12:27:42+00:00, data_interval_end=2025-10-01
12:27:42+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1274
[info ] Received executor event with state success for task instance
TaskInstanceKey(dag_id='kpo', task_id='idle',
run_id='manual__2025-10-01T12:27:44.488750+00:00', try_number=3, map_index=-1)
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:793
[info ] TaskInstance Finished: dag_id=kpo, task_id=idle,
run_id=manual__2025-10-01T12:27:44.488750+00:00, map_index=-1,
run_start_date=2025-10-01 12:37:53.953693+00:00, run_end_date=2025-10-01
12:38:46.034291+00:00, run_duration=52.080598, state=failed,
executor=LocalExecutor(parallelism=32), executor_state=success, try_number=3,
max_tries=0, pool=default_pool, queue=default, priority_weight=1,
operator=KubernetesPodOperator, queued_dttm=2025-10-01 12:37:53.890130+00:00,
scheduled_dttm=2025-10-01 12:37:53.880571+00:00,queued_by_job_id=24, pid=42
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:838
[info ] 1 tasks up for execution:
<TaskInstance: kpo.idle manual__2025-10-01T12:40:42.496552+00:00
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:419
[info ] DAG kpo has 0/16 running and queued tasks
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:491
[info ] Setting the following tasks to queued state:
<TaskInstance: kpo.idle manual__2025-10-01T12:40:42.496552+00:00
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:630
[info ] Trying to enqueue tasks: [<TaskInstance: kpo.idle
manual__2025-10-01T12:40:42.496552+00:00 [scheduled]>] for executor:
LocalExecutor(parallelism=32)
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:715
[info ] Worker starting up pid=84
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:65
[info ] Secrets backends loaded for worker [supervisor]
backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1870
/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py:476
DeprecationWarning: This process (pid=41) is multi-threaded, use of fork() may
lead to deadlocks in the child.
[info ] Exiting gracefully upon receiving signal 15
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:256
[info ] Shutting down LocalExecutor; waiting for running tasks to
finish. Signal again if you don't want to wait.
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:226
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [34]
```
</details>
Immediately, the operator's pod went into the Terminating state as well, but
it kept ticking away for the amount configured in `termination_grace_period`.
No logs related to termination appeared, the log stream simply ended.
Now, here's the kicker: if you compare the operator pod's metadata
immediately before and after killing the scheduler(worker), this is what
changes:
```diff
metadata:
creationTimestamp: "2025-10-01T12:56:44Z"
- generation: 1
+ deletionGracePeriodSeconds: 60
+ deletionTimestamp: "2025-10-01T12:58:24Z"
+ generation: 2
```
Indeed, [this is part of the on_kill call inside the
operator](https://github.com/apache/airflow/blob/3.1.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py#L1223).
It actually sets the `deletionGracePeriodSeconds` to the value of
`termination_grace_period` in the operator kwargs, then kills the pod. (I
believe like this kwarg shouldn't be used for this, but I'll avoid delving into
that now.)
Anyway. What I did to solve this is subclass the KPO and override the
`on_kill` method to only print a message and return.
That appears to have fixed my issue. Detailed logs from the ["fixed" version
of the
Dag](https://github.com/ralichkov/dag-tests/blob/e6abbc8c8f96d918e94450a28d94a7fb48ed1b57/kpo.py#L5-L11):
<details>
```
INFO - [base] tick 42
WARNING - Operator killed.
INFO - func=<module>, file=/home/airflow/.local/bin/airflow, line=7
INFO - func=main,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py,
line=55
INFO - func=command,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py,
line=49
INFO - func=wrapper,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py,
line=114
INFO - func=wrapped_function,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py,
line=54
INFO - func=scheduler,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py,
line=52
INFO - func=run_command_with_daemon_option,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py,
line=86
INFO - func=<lambda>,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py,
line=55
INFO - func=_run_scheduler_job,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py,
line=43
INFO - func=wrapper,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py,
line=100
INFO - func=run_job,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py,
line=368
INFO - func=execute_job,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py,
line=397
INFO - func=_execute,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py,
line=1042
INFO - func=_run_scheduler_loop,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py,
line=1340
INFO - func=wrapper,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py,
line=58
INFO - func=heartbeat,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/base_executor.py,
line=253
INFO - func=wrapper,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py,
line=58
INFO - func=trigger_tasks,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/base_executor.py,
line=389
INFO - func=_process_workloads,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
line=259
INFO - func=_check_workers,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
line=196
INFO - func=_spawn_worker,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
line=208
INFO - func=start,
file=/usr/python/lib/python3.12/multiprocessing/process.py, line=121
INFO - func=_Popen,
file=/usr/python/lib/python3.12/multiprocessing/context.py, line=224
INFO - func=_Popen,
file=/usr/python/lib/python3.12/multiprocessing/context.py, line=282
INFO - func=__init__,
file=/usr/python/lib/python3.12/multiprocessing/popen_fork.py, line=19
INFO - func=_launch,
file=/usr/python/lib/python3.12/multiprocessing/popen_fork.py, line=71
INFO - func=_bootstrap,
file=/usr/python/lib/python3.12/multiprocessing/process.py, line=314
INFO - func=run, file=/usr/python/lib/python3.12/multiprocessing/process.py,
line=108
INFO - func=_run_worker,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
line=96
INFO - func=_execute_work,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
line=124
INFO - func=supervise,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
line=1878
INFO - func=start,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
line=938
INFO - func=start,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
line=488
INFO - func=_fork_main,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
line=387
INFO - func=_subprocess_main,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
line=205
INFO - func=main,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
line=1450
INFO - func=run,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
line=920
INFO - func=_execute_task,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
line=1307
INFO - func=wrapper,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py,
line=416
INFO - func=execute,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
line=657
INFO - func=execute_sync,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
line=709
INFO - func=wrapped_f,
file=/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py,
line=338
INFO - func=__call__,
file=/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py,
line=480
INFO - func=await_pod_completion,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
line=782
INFO - func=fetch_requested_container_logs,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
line=728
INFO - func=fetch_container_logs,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
line=611
INFO - func=consume_logs,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
line=546
INFO - func=__iter__,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
line=269
INFO - func=stream,
file=/home/airflow/.local/lib/python3.12/site-packages/urllib3/response.py,
line=1088
INFO - func=read_chunked,
file=/home/airflow/.local/lib/python3.12/site-packages/urllib3/response.py,
line=1248
INFO - func=_update_chunk_length,
file=/home/airflow/.local/lib/python3.12/site-packages/urllib3/response.py,
line=1167
INFO - func=readinto, file=/usr/python/lib/python3.12/socket.py, line=720
INFO - func=recv_into, file=/usr/python/lib/python3.12/ssl.py, line=1251
INFO - func=read, file=/usr/python/lib/python3.12/ssl.py, line=1103
INFO - func=_on_term,
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
line=890
INFO - func=on_kill, file=/opt/airflow/dags/repo/kpo.py, line=9
INFO - [base] tick 43
INFO - [base] tick 44
INFO - [base] tick 45
```
</details>
Of course this "hotfix" breaks other expected functionality like the pod
interrupting after "mark failed", etc.
On my first view attempts it would actually not resume the task properly,
leading to weird stuff like this:
<img width="165" height="121" alt="Image"
src="https://github.com/user-attachments/assets/c0ced136-cd0c-4c79-b8c7-916b863dcad0"
/>
Whether this is something to do on my end or another bug I don't know, just
wanted to put it out there.
Overall, I don't see how this reattachment behavior is supposed to work in
the real world if `on_kill` is always called when the worker dies - that always
kills the pod. What is it supposed to reattach to? I don't understand how your
breeze setup "works". Maybe that's how it was rolled out as well.
What I would propose is the following: modify `_on_term()` to propagate the
trapped signal or frame (or both?), then pass that down to `ti.task.on_kill()`.
Then, KPO can make the decision to **not** kill the operating pod if
`reattach_on_restart` is enabled.
I will look into setting up a breeze locally and see what I can do about
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]