dirrao commented on PR #32249:
URL: https://github.com/apache/airflow/pull/32249#issuecomment-1636640787

   @hussein-awala 
   Thanks for sharing the example. The reason for this issue is schedulers are 
subscribed to their worker pod events based on the scheduler id label filter 
i.e. ( kwargs = {"label_selector": "airflow-worker=scheduler1"}). Whenever 
there is a change in the label of a worker pod from airflow-worker=scheduler1 
to  airflow-worker=scheduler2, this worker pod events goes out of scope for 
scheduler1. So, that's why we are seeing the DELETE event though the worker pod 
is still running. I have proposed to subscribe to all the worker pod events 
i.e. (kwargs = {"label_selector": "airflow-worker"}). So, each scheduler 
receives all the worker pod events i.e. both scheduler1 and scheduler2. In this 
specific issue, now the scheduler1 receives a MODIFIED instead of a DELETE 
event during the label change. To avoid processing worker pod events of other 
schedulers, drop the events based on the scheduler id label i.e. 
(labels.get("airflow-worker", None) != scheduler_job_id). I will rebase my PR 
and upda
 te it soon.
   
   Note: This issue is happening for worker pods in all phases (PENDING, 
RUNNING, etc.)
   
   existing code
   
   `    kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
   `
   
   proposed code
   
   ```
   kwargs = {"label_selector": "airflow-worker"}
   
   if labels.get("airflow-worker", None) != scheduler_job_id:
        last_resource_version = task.metadata.resource_version
        continue
   
   ```
   
   Example
   
   ```
   from kubernetes import client, config, watch
   
   if __name__ == '__main__':
       config.load_kube_config(context="<context>")
       v1 = client.CoreV1Api()
   
       kwargs = {"label_selector": "airflow-worker"}
   
       w = watch.Watch()
       for event in w.stream(v1.list_namespaced_pod, "<namespace>", **kwargs):
           print("Event: %s %s %s" % (event['type'], event['object'].kind, 
event['object'].metadata.name))
           if labels.get("airflow-worker", None) != "scheduler1":
                 print("Skipping the event of other scheduler i.e scheduler2")
                  continue
          else
                print("Processing the event of current scheduler i.e 
scheduler1")
   
   ```
   
   and I got:
   
   ```
   Event: MODIFIED Pod test-pod
   Skipping the event of other scheduler i.e scheduler2
   ```
   
   when I patched the label with:
   
   `kubectl --context <my context> --namespace <my namespace> pod/test-pod 
airflow-worker=scheduler2 --overwrite`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to