jedcunningham commented on a change in pull request #19904:
URL: https://github.com/apache/airflow/pull/19904#discussion_r759698127
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -473,7 +472,7 @@ def clear_not_launched_queued_tasks(self, session=None) ->
None:
base_label_selector = (
f"dag_id={pod_generator.make_safe_label_value(task.dag_id)},"
f"task_id={pod_generator.make_safe_label_value(task.task_id)},"
-
f"airflow-worker={pod_generator.make_safe_label_value(str(self.scheduler_job_id))}"
+
f"airflow-worker={pod_generator.make_safe_label_value(str(task.queued_by_job_id))}"
Review comment:
Note: It's using the `queued_by_job_id` from the TI, which will match
the label. Eventually the pod will be adopted by another (or the new)
scheduler, however, this can run before that happens AND this does run before
adoption when a scheduler starts.
Now, does it even matter? For better or worse, we use the job_id to help
determine which Airflow the task is part of. For example, we watch for events
based on the job id as well:
https://github.com/apache/airflow/blob/33a4502f68be6b25fbccb96ed1832c5b527bb02a/airflow/executors/kubernetes_executor.py#L129
This becomes important if we consider a shared namespace with multiple
Airflow worker pods in it. It becomes even more important if we have the same
dags/tasks/scheduled runs. There are certainly still issues here, but this is
at least status quo for now until we can properly fix everything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]