jedcunningham commented on code in PR #41220:
URL: https://github.com/apache/airflow/pull/41220#discussion_r1769030670


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -177,6 +177,37 @@ def _make_safe_label_value(self, input_value: str | 
datetime) -> str:
             return pod_generator.datetime_to_label_safe_datestring(input_value)
         return pod_generator.make_safe_label_value(input_value)
 
+    def get_pod_labels_combined_str_to_pod_map(self) -> dict[str, k8s.V1Pod]:
+        """
+        List the worker pods owned by this scheduler and create a map 
containing pod combined labels search str -> pod.
+
+        For every pod, it creates two below entries in the map
+        
dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker},<map_index={map_index}>,run_id={run_id}
+        """
+        # airflow worker label selector batch call
+        kwargs = {"label_selector": 
f"airflow-worker={self._make_safe_label_value(str(self.job_id))}"}
+        if self.kube_config.kube_client_request_args:
+            kwargs.update(self.kube_config.kube_client_request_args)
+        pod_list = self._list_pods(kwargs)
+
+        # create a set against pod query label fields
+        pod_labels_combined_str_to_pod_map = {}
+        for pod in pod_list:
+            dag_id = pod.metadata.labels.get("dag_id", None)
+            task_id = pod.metadata.labels.get("task_id", None)
+            airflow_worker = pod.metadata.labels.get("airflow-worker", None)
+            map_index = pod.metadata.labels.get("map_index", None)
+            run_id = pod.metadata.labels.get("run_id", None)
+            if dag_id is None or task_id is None or airflow_worker is None:
+                continue
+            label_search_base_str = 
f"dag_id={dag_id},task_id={task_id},airflow-worker={airflow_worker}"
+            if map_index is not None:
+                label_search_base_str += f",map_index={map_index}"
+            if run_id is not None:

Review Comment:
   Cool, if we are doing a major, now is the time to remove any other 
deprecations we have stacked up. @dirrao have at it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to