jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055921589
##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
except Exception as e:
raise PodReconciliationError from e
+ @classmethod
+ def build_selector_for_k8s_executor_pod(
+ cls,
+ *,
+ dag_id,
+ task_id,
+ try_number,
+ map_index=None,
+ date=None,
+ run_id=None,
+ ):
+ """
+ Generate selector for kubernetes executor pod
+
+ :meta private:
+ """
+ labels = cls.build_labels_for_k8s_executor_pod(
+ dag_id=dag_id,
+ task_id=task_id,
+ try_number=try_number,
+ map_index=map_index,
+ date=date,
+ run_id=run_id,
+ )
+ label_strings = [f"{label_id}={label}" for label_id, label in
sorted(labels.items())]
+ selector = ",".join(label_strings)
+ selector += ",airflow-worker"
+ return selector
+
+ @classmethod
+ def build_labels_for_k8s_executor_pod(
+ cls,
+ *,
+ dag_id,
+ task_id,
+ try_number,
+ airflow_worker=None,
+ map_index=None,
+ date=None,
+ run_id=None,
+ ):
+ """
+ Generate labels for kubernetes executor pod
+
+ :meta private:
+ """
+ labels = {
+ "dag_id": make_safe_label_value(dag_id),
+ "task_id": make_safe_label_value(task_id),
+ "try_number": str(try_number),
+ "kubernetes_executor": "True",
+ }
+ if airflow_worker is not None:
Review Comment:
I wonder if we should have `build_selector_for_k8s_executor_pod` built it's
own dict so we don't have to do all these conditional checks - it took me a
second to figure out why it was this way. Plus it'll ensure we set everything
we actually want to on the pod (e.g. airflow_version).
##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int,
metadata: dict[str, Any] | No
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
elif self._should_check_k8s(ti.queue):
- pod_override = ti.executor_config.get("pod_override")
- if pod_override and pod_override.metadata and
pod_override.metadata.namespace:
- namespace = pod_override.metadata.namespace
- else:
- namespace = conf.get("kubernetes_executor", "namespace")
try:
from airflow.kubernetes.kube_client import get_kube_client
+ from airflow.kubernetes.pod_generator import PodGenerator
- kube_client = get_kube_client()
+ client = get_kube_client()
log += f"*** Trying to get logs (last 100 lines) from worker
pod {ti.hostname} ***\n\n"
- res = kube_client.read_namespaced_pod_log(
- name=ti.hostname,
+ selector = PodGenerator.build_selector_for_k8s_executor_pod(
Review Comment:
Is it easy to get scheduler_job_id here? That'll be a little more resilient
to more than one instance in a single namespace.
##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
except Exception as e:
raise PodReconciliationError from e
+ @classmethod
+ def build_selector_for_k8s_executor_pod(
+ cls,
+ *,
+ dag_id,
+ task_id,
+ try_number,
+ map_index=None,
+ date=None,
+ run_id=None,
+ ):
+ """
+ Generate selector for kubernetes executor pod
+
+ :meta private:
+ """
+ labels = cls.build_labels_for_k8s_executor_pod(
+ dag_id=dag_id,
+ task_id=task_id,
+ try_number=try_number,
+ map_index=map_index,
+ date=date,
Review Comment:
Do we need both execution_date and run_id on the selector?
##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int,
metadata: dict[str, Any] | No
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
elif self._should_check_k8s(ti.queue):
- pod_override = ti.executor_config.get("pod_override")
- if pod_override and pod_override.metadata and
pod_override.metadata.namespace:
- namespace = pod_override.metadata.namespace
- else:
- namespace = conf.get("kubernetes_executor", "namespace")
try:
from airflow.kubernetes.kube_client import get_kube_client
+ from airflow.kubernetes.pod_generator import PodGenerator
- kube_client = get_kube_client()
+ client = get_kube_client()
log += f"*** Trying to get logs (last 100 lines) from worker
pod {ti.hostname} ***\n\n"
- res = kube_client.read_namespaced_pod_log(
- name=ti.hostname,
+ selector = PodGenerator.build_selector_for_k8s_executor_pod(
+ dag_id=ti.dag_id,
+ task_id=ti.task_id,
+ try_number=ti.try_number,
+ map_index=ti.map_index,
+ run_id=ti.run_id,
+ )
+ namespace = self._get_pod_namespace(ti)
+ pod_list = client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=selector,
+ ).items
+ if not pod_list:
+ raise RuntimeError("Cannot find pod for ti %s", ti)
+ res = client.read_namespaced_pod_log(
+ name=pod_list[0].metadata.name,
Review Comment:
If we have more than 1, we should probably log/bail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]