dirrao commented on PR #32249:
URL: https://github.com/apache/airflow/pull/32249#issuecomment-1636640787
@hussein-awala
Thanks for sharing the example. The reason for this issue is schedulers are
subscribed to their worker pod events based on the scheduler id label filter
i.e. ( kwargs = {"label_selector": "airflow-worker=scheduler1"}). Whenever
there is a change in the label of a worker pod from airflow-worker=scheduler1
to airflow-worker=scheduler2, this worker pod events goes out of scope for
scheduler1. So, that's why we are seeing the DELETE event though the worker pod
is still running. I have proposed to subscribe to all the worker pod events
i.e. (kwargs = {"label_selector": "airflow-worker"}). So, each scheduler
receives all the worker pod events i.e. both scheduler1 and scheduler2. In this
specific issue, now the scheduler1 receives a MODIFIED instead of a DELETE
event during the label change. To avoid processing worker pod events of other
schedulers, drop the events based on the scheduler id label i.e.
(labels.get("airflow-worker", None) != scheduler_job_id). I will rebase my PR
and upda
te it soon.
Note: This issue is happening for worker pods in all phases (PENDING,
RUNNING, etc.)
existing code
` kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
`
proposed code
```
kwargs = {"label_selector": "airflow-worker"}
if labels.get("airflow-worker", None) != scheduler_job_id:
last_resource_version = task.metadata.resource_version
continue
```
Example
```
from kubernetes import client, config, watch
if __name__ == '__main__':
config.load_kube_config(context="<context>")
v1 = client.CoreV1Api()
kwargs = {"label_selector": "airflow-worker"}
w = watch.Watch()
for event in w.stream(v1.list_namespaced_pod, "<namespace>", **kwargs):
print("Event: %s %s %s" % (event['type'], event['object'].kind,
event['object'].metadata.name))
if labels.get("airflow-worker", None) != "scheduler1":
print("Skipping the event of other scheduler i.e scheduler2")
continue
else
print("Processing the event of current scheduler i.e
scheduler1")
```
and I got:
```
Event: MODIFIED Pod test-pod
Skipping the event of other scheduler i.e scheduler2
```
when I patched the label with:
`kubectl --context <my context> --namespace <my namespace> pod/test-pod
airflow-worker=scheduler2 --overwrite`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]