jedcunningham commented on a change in pull request #19904:
URL: https://github.com/apache/airflow/pull/19904#discussion_r759698127



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -473,7 +472,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> 
None:
             base_label_selector = (
                 f"dag_id={pod_generator.make_safe_label_value(task.dag_id)},"
                 f"task_id={pod_generator.make_safe_label_value(task.task_id)},"
-                
f"airflow-worker={pod_generator.make_safe_label_value(str(self.scheduler_job_id))}"
+                
f"airflow-worker={pod_generator.make_safe_label_value(str(task.queued_by_job_id))}"

Review comment:
       Note: It's using the `queued_by_job_id` from the TI, which will match 
the label. Eventually the pod will be adopted by another (or the new) 
scheduler, however, this can run before that happens AND this does run before 
adoption when a scheduler starts.
   
   Now, does it even matter? For better or worse, we use the job_id to help 
determine which Airflow the task is part of. For example, we watch for events 
based on the job id as well:
   
   
https://github.com/apache/airflow/blob/33a4502f68be6b25fbccb96ed1832c5b527bb02a/airflow/executors/kubernetes_executor.py#L129
   
   This becomes important if we consider a shared namespace with multiple 
Airflow worker pods in it. It becomes even more important if we have the same 
dags/tasks/scheduled runs. There are certainly still issues here, but this is 
at least status quo for now until we can properly fix everything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to