ralichkov commented on issue #39791:
URL: https://github.com/apache/airflow/issues/39791#issuecomment-3361008628

   Hi @shahar1! Thank you for taking the time to look into this, and I'm glad 
that another good has come out of this as well.
   
   Before I begin, please don't mind the wall of text. I promise it's a rather 
light read.
   
   I spun up the official Airflow Helm chart on my local machine to test this 
out, but unfortunately my pod still died unannounced.
   Here are my `values.yml`. The repo referenced for the Dag is public.
   <details>
   
   ```yml
   defaultAirflowTag: "3.1.0"
   airflowVersion: "3.1.0"
   
   executor: "LocalExecutor"
   
   # Exact hostname depends on your setup.
   airflowLocalSettings: |-
     def k8s_hostname_callable():
       return "airflowhelm-scheduler.airflow.svc.cluster.local"
   
   env:
   - name: "AIRFLOW__CORE__HOSTNAME_CALLABLE"
     value: "airflow_local_settings.k8s_hostname_callable"
   - name: "AIRFLOW_CONN_KUBERNETES_DEFAULT"
     value: '{"conn_type": "kubernetes", "extra": {"in_cluster": true, 
"namespace": "airflow"}}'
   
   workers:
     replicas: 0 # Number of Airflow Celery workers
   
   postgresql:
     image:
       # Broadcom deprecated the old repo
       repository: bitnamilegacy/postgresql
       tag: 16.1.0-debian-11-r15
   
   dags:
     gitSync:
       enabled: true
       repo: https://github.com/ralichkov/dag-tests.git
       branch: main
       rev: HEAD
       ref: main
       depth: 1
       maxFailures: 0
       subPath: ""
   ```
   
   </details>
   
   After triggering the Dag and letting it run for a bit, I killed the 
scheduler pod (not forced). Below are logs from the scheduler pod - nothing too 
interesting IMO:
   <details>
   
   ```
   [info     ] 1 tasks up for execution:
           <TaskInstance: kpo.idle manual__2025-10-01T12:27:44.488750+00:00 
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:419
   [info     ] DAG kpo has 0/16 running and queued tasks 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:491
   [info     ] Setting the following tasks to queued state:
           <TaskInstance: kpo.idle manual__2025-10-01T12:27:44.488750+00:00 
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:630
   [info     ] Trying to enqueue tasks: [<TaskInstance: kpo.idle 
manual__2025-10-01T12:27:44.488750+00:00 [scheduled]>] for executor: 
LocalExecutor(parallelism=32) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:715
   [info     ] Worker starting up pid=40      
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:65
   [info     ] Worker starting up pid=41      
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:65
   [info     ] Secrets backends loaded for worker [supervisor] 
backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1870
   
/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py:476
 DeprecationWarning: This process (pid=40) is multi-threaded, use of fork() may 
lead to deadlocks in the child.
   [info     ] Task finished                  [supervisor] 
duration=52.103339167999366 exit_code=0 final_state=failed 
loc=supervisor.py:1889
   [info     ] Marking run <DagRun kpo @ 2025-10-01 12:27:42+00:00: 
manual__2025-10-01T12:27:44.488750+00:00, state:running, queued_at: 2025-10-01 
12:27:44.492154+00:00. run_type: manual> failed [airflow.models.dagrun.DagRun] 
loc=dagrun.py:1171
   [info     ] DagRun Finished: dag_id=kpo, logical_date=2025-10-01 
12:27:42+00:00, run_id=manual__2025-10-01T12:27:44.488750+00:00, 
run_start_date=2025-10-01 12:27:45.021312+00:00, run_end_date=2025-10-01 
12:38:47.057662+00:00, run_duration=662.03635, state=failed, run_type=manual, 
data_interval_start=2025-10-01 12:27:42+00:00, data_interval_end=2025-10-01 
12:27:42+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1274
   [info     ] Received executor event with state success for task instance 
TaskInstanceKey(dag_id='kpo', task_id='idle', 
run_id='manual__2025-10-01T12:27:44.488750+00:00', try_number=3, map_index=-1) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:793
   [info     ] TaskInstance Finished: dag_id=kpo, task_id=idle, 
run_id=manual__2025-10-01T12:27:44.488750+00:00, map_index=-1, 
run_start_date=2025-10-01 12:37:53.953693+00:00, run_end_date=2025-10-01 
12:38:46.034291+00:00, run_duration=52.080598, state=failed, 
executor=LocalExecutor(parallelism=32), executor_state=success, try_number=3, 
max_tries=0, pool=default_pool, queue=default, priority_weight=1, 
operator=KubernetesPodOperator, queued_dttm=2025-10-01 12:37:53.890130+00:00, 
scheduled_dttm=2025-10-01 12:37:53.880571+00:00,queued_by_job_id=24, pid=42 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:838
   [info     ] 1 tasks up for execution:
           <TaskInstance: kpo.idle manual__2025-10-01T12:40:42.496552+00:00 
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:419
   [info     ] DAG kpo has 0/16 running and queued tasks 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:491
   [info     ] Setting the following tasks to queued state:
           <TaskInstance: kpo.idle manual__2025-10-01T12:40:42.496552+00:00 
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:630
   [info     ] Trying to enqueue tasks: [<TaskInstance: kpo.idle 
manual__2025-10-01T12:40:42.496552+00:00 [scheduled]>] for executor: 
LocalExecutor(parallelism=32) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:715
   [info     ] Worker starting up pid=84      
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:65
   [info     ] Secrets backends loaded for worker [supervisor] 
backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1870
   
/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py:476
 DeprecationWarning: This process (pid=41) is multi-threaded, use of fork() may 
lead to deadlocks in the child.
   [info     ] Exiting gracefully upon receiving signal 15 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:256
   [info     ] Shutting down LocalExecutor; waiting for running tasks to 
finish.  Signal again if you don't want to wait. 
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:226
   INFO:     Shutting down
   INFO:     Waiting for application shutdown.
   INFO:     Application shutdown complete.
   INFO:     Finished server process [34]
   ```
   
   </details>
   
   Immediately, the operator's pod went into the Terminating state as well, but 
it kept ticking away for the amount configured in `termination_grace_period`. 
No logs related to termination appeared, the log stream simply ended.
   
   Now, here's the kicker: if you compare the operator pod's metadata 
immediately before and after killing the scheduler(worker), this is what 
changes:
   ```diff
    metadata:
      creationTimestamp: "2025-10-01T12:56:44Z"
   -  generation: 1
   +  deletionGracePeriodSeconds: 60
   +  deletionTimestamp: "2025-10-01T12:58:24Z"
   +  generation: 2
   ```
   Indeed, [this is part of the on_kill call inside the 
operator](https://github.com/apache/airflow/blob/3.1.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py#L1223).
 It actually sets the `deletionGracePeriodSeconds` to the value of 
`termination_grace_period` in the operator kwargs, then kills the pod. (I 
believe like this kwarg shouldn't be used for this, but I'll avoid delving into 
that now.)
   
   Anyway. What I did to solve this is subclass the KPO and override the 
`on_kill` method to only print a message and return.
   That appears to have fixed my issue. Detailed logs from the ["fixed" version 
of the 
Dag](https://github.com/ralichkov/dag-tests/blob/e6abbc8c8f96d918e94450a28d94a7fb48ed1b57/kpo.py#L5-L11):
   
   <details>
   
   ```
   INFO - [base] tick 42
   WARNING - Operator killed.
   INFO - func=<module>, file=/home/airflow/.local/bin/airflow, line=7
   INFO - func=main, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py, 
line=55
   INFO - func=command, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py,
 line=49
   INFO - func=wrapper, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py, 
line=114
   INFO - func=wrapped_function, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py,
 line=54
   INFO - func=scheduler, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py,
 line=52
   INFO - func=run_command_with_daemon_option, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py,
 line=86
   INFO - func=<lambda>, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py,
 line=55
   INFO - func=_run_scheduler_job, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py,
 line=43
   INFO - func=wrapper, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py,
 line=100
   INFO - func=run_job, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py, 
line=368
   INFO - func=execute_job, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py, 
line=397
   INFO - func=_execute, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py,
 line=1042
   INFO - func=_run_scheduler_loop, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py,
 line=1340
   INFO - func=wrapper, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py,
 line=58
   INFO - func=heartbeat, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/base_executor.py,
 line=253
   INFO - func=wrapper, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py,
 line=58
   INFO - func=trigger_tasks, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/base_executor.py,
 line=389
   INFO - func=_process_workloads, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
 line=259
   INFO - func=_check_workers, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
 line=196
   INFO - func=_spawn_worker, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
 line=208
   INFO - func=start, 
file=/usr/python/lib/python3.12/multiprocessing/process.py, line=121
   INFO - func=_Popen, 
file=/usr/python/lib/python3.12/multiprocessing/context.py, line=224
   INFO - func=_Popen, 
file=/usr/python/lib/python3.12/multiprocessing/context.py, line=282
   INFO - func=__init__, 
file=/usr/python/lib/python3.12/multiprocessing/popen_fork.py, line=19
   INFO - func=_launch, 
file=/usr/python/lib/python3.12/multiprocessing/popen_fork.py, line=71
   INFO - func=_bootstrap, 
file=/usr/python/lib/python3.12/multiprocessing/process.py, line=314
   INFO - func=run, file=/usr/python/lib/python3.12/multiprocessing/process.py, 
line=108
   INFO - func=_run_worker, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
 line=96
   INFO - func=_execute_work, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/executors/local_executor.py,
 line=124
   INFO - func=supervise, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
 line=1878
   INFO - func=start, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
 line=938
   INFO - func=start, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
 line=488
   INFO - func=_fork_main, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
 line=387
   INFO - func=_subprocess_main, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py,
 line=205
   INFO - func=main, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
 line=1450
   INFO - func=run, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
 line=920
   INFO - func=_execute_task, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
 line=1307
   INFO - func=wrapper, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py,
 line=416
   INFO - func=execute, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
 line=657
   INFO - func=execute_sync, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
 line=709
   INFO - func=wrapped_f, 
file=/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py, 
line=338
   INFO - func=__call__, 
file=/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py, 
line=480
   INFO - func=await_pod_completion, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
 line=782
   INFO - func=fetch_requested_container_logs, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
 line=728
   INFO - func=fetch_container_logs, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
 line=611
   INFO - func=consume_logs, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
 line=546
   INFO - func=__iter__, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py,
 line=269
   INFO - func=stream, 
file=/home/airflow/.local/lib/python3.12/site-packages/urllib3/response.py, 
line=1088
   INFO - func=read_chunked, 
file=/home/airflow/.local/lib/python3.12/site-packages/urllib3/response.py, 
line=1248
   INFO - func=_update_chunk_length, 
file=/home/airflow/.local/lib/python3.12/site-packages/urllib3/response.py, 
line=1167
   INFO - func=readinto, file=/usr/python/lib/python3.12/socket.py, line=720
   INFO - func=recv_into, file=/usr/python/lib/python3.12/ssl.py, line=1251
   INFO - func=read, file=/usr/python/lib/python3.12/ssl.py, line=1103
   INFO - func=_on_term, 
file=/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
 line=890
   INFO - func=on_kill, file=/opt/airflow/dags/repo/kpo.py, line=9
   INFO - [base] tick 43
   INFO - [base] tick 44
   INFO - [base] tick 45
   ```
   
   </details>
   
   Of course this "hotfix" breaks other expected functionality like the pod 
interrupting after "mark failed", etc.
   
   On my first view attempts it would actually not resume the task properly, 
leading to weird stuff like this:
   <img width="165" height="121" alt="Image" 
src="https://github.com/user-attachments/assets/c0ced136-cd0c-4c79-b8c7-916b863dcad0";
 />
   Whether this is something to do on my end or another bug I don't know, just 
wanted to put it out there.
   
   Overall, I don't see how this reattachment behavior is supposed to work in 
the real world if `on_kill` is always called when the worker dies - that always 
kills the pod. What is it supposed to reattach to? I don't understand how your 
breeze setup "works". Maybe that's how it was rolled out as well.
   
   What I would propose is the following: modify `_on_term()` to propagate the 
trapped signal or frame (or both?), then pass that down to `ti.task.on_kill()`. 
Then, KPO can make the decision to **not** kill the operating pod if 
`reattach_on_restart` is enabled.
   
   I will look into setting up a breeze locally and see what I can do about 
this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to