This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c22fc000b6 Use labels instead of pod name for pod log read in k8s exec 
(#28546)
c22fc000b6 is described below

commit c22fc000b6c0075429b9d1e51c9ee3d384141ff3
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Dec 30 15:11:16 2022 -0800

    Use labels instead of pod name for pod log read in k8s exec (#28546)
    
    This means we don't have to use ti.hostname as a proxy for pod name, and 
allows us to lift the 63 charcter limit, which was a consequence of getting pod 
name through hostname.
---
 airflow/kubernetes/kubernetes_helper_functions.py  |  9 +-
 airflow/kubernetes/pod_generator.py                | 99 +++++++++++++++++-----
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  2 +-
 airflow/utils/log/file_task_handler.py             | 38 +++++++--
 .../kubernetes/test_kubernetes_helper_functions.py |  8 +-
 tests/kubernetes/test_pod_generator.py             | 51 ++++++++++-
 tests/utils/test_log_handlers.py                   | 50 +++++++----
 7 files changed, 199 insertions(+), 58 deletions(-)

diff --git a/airflow/kubernetes/kubernetes_helper_functions.py 
b/airflow/kubernetes/kubernetes_helper_functions.py
index ebe469342a..7965e405ac 100644
--- a/airflow/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/kubernetes/kubernetes_helper_functions.py
@@ -48,16 +48,15 @@ def create_pod_id(
     dag_id: str | None = None,
     task_id: str | None = None,
     *,
-    max_length: int = 63,  # must be 63 for now, see below
+    max_length: int = 80,
     unique: bool = True,
 ) -> str:
     """
     Generates unique pod ID given a dag_id and / or task_id.
 
-    Because of the way that the task log handler reads from running k8s 
executor pods,
-    we must keep pod name <= 63 characters.  The handler gets pod name from 
ti.hostname.
-    TI hostname is derived from the container hostname, which is truncated to 
63 characters.
-    We could lift this limit by using label selectors instead of pod name to 
find the pod.
+    The default of 80 for max length is somewhat arbitrary, mainly a balance 
between
+    content and not overwhelming terminal windows of reasonable width. The true
+    upper limit is 253, and this is enforced in construct_pod.
 
     :param dag_id: DAG ID
     :param task_id: Task ID
diff --git a/airflow/kubernetes/pod_generator.py 
b/airflow/kubernetes/pod_generator.py
index 64b7965a4c..f27dba7e95 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -339,8 +339,9 @@ class PodGenerator:
             base_containers[1:], client_containers[1:]
         )
 
-    @staticmethod
+    @classmethod
     def construct_pod(
+        cls,
         dag_id: str,
         task_id: str,
         pod_id: str,
@@ -370,15 +371,6 @@ class PodGenerator:
                 "pod_id supplied is longer than 253 characters; truncating and 
adding unique suffix."
             )
             pod_id = add_pod_suffix(pod_name=pod_id, max_len=253)
-        if len(pod_id) > 63:
-            # because in task handler we get pod name from ti hostname (which 
truncates
-            # pod_id to 63 characters) we won't be able to find the pod unless 
it is <= 63 characters.
-            # our code creates pod names shorter than this so this warning 
should not normally be triggered.
-            warnings.warn(
-                "Supplied pod_id is longer than 63 characters. Due to 
implementation details, the webserver "
-                "may not be able to stream logs while task is running. Please 
choose a shorter pod name."
-            )
-
         try:
             image = pod_override_object.spec.containers[0].image  # type: 
ignore
             if not image:
@@ -391,30 +383,27 @@ class PodGenerator:
             "task_id": task_id,
             "try_number": str(try_number),
         }
-        labels = {
-            "airflow-worker": make_safe_label_value(scheduler_job_id),
-            "dag_id": make_safe_label_value(dag_id),
-            "task_id": make_safe_label_value(task_id),
-            "try_number": str(try_number),
-            "airflow_version": airflow_version.replace("+", "-"),
-            "kubernetes_executor": "True",
-        }
         if map_index >= 0:
             annotations["map_index"] = str(map_index)
-            labels["map_index"] = str(map_index)
         if date:
             annotations["execution_date"] = date.isoformat()
-            labels["execution_date"] = datetime_to_label_safe_datestring(date)
         if run_id:
             annotations["run_id"] = run_id
-            labels["run_id"] = make_safe_label_value(run_id)
 
         dynamic_pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
                 namespace=namespace,
                 annotations=annotations,
                 name=pod_id,
-                labels=labels,
+                labels=cls.build_labels_for_k8s_executor_pod(
+                    dag_id=dag_id,
+                    task_id=task_id,
+                    try_number=try_number,
+                    airflow_worker=scheduler_job_id,
+                    map_index=map_index,
+                    execution_date=date,
+                    run_id=run_id,
+                ),
             ),
             spec=k8s.V1PodSpec(
                 containers=[
@@ -447,6 +436,72 @@ class PodGenerator:
 
         return pod
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        execution_date=None,
+        run_id=None,
+        airflow_worker=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            execution_date=execution_date,
+            run_id=run_id,
+            airflow_worker=airflow_worker,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in 
sorted(labels.items())]
+        selector = ",".join(label_strings)
+        if not airflow_worker:  # this filters out KPO pods even when we don't 
know the scheduler job id
+            selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        execution_date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+            "airflow_version": airflow_version.replace("+", "-"),
+        }
+        if airflow_worker is not None:
+            labels["airflow-worker"] = 
make_safe_label_value(str(airflow_worker))
+        if map_index is not None and map_index >= 0:
+            labels["map_index"] = str(map_index)
+        if execution_date:
+            labels["execution_date"] = 
datetime_to_label_safe_datestring(execution_date)
+        if run_id:
+            labels["run_id"] = make_safe_label_value(run_id)
+        return labels
+
     @staticmethod
     def serialize_pod(pod: k8s.V1Pod) -> dict:
         """
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 62d4262a84..c34fc5c02b 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -92,7 +92,7 @@ def _create_pod_id(
     dag_id: str | None = None,
     task_id: str | None = None,
     *,
-    max_length: int = 63,
+    max_length: int = 80,
     unique: bool = True,
 ) -> str:
     """
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index b8feb2997b..cce6b39984 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -21,6 +21,7 @@ from __future__ import annotations
 import logging
 import os
 import warnings
+from contextlib import suppress
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
 from urllib.parse import urljoin
@@ -191,19 +192,32 @@ class FileTaskHandler(logging.Handler):
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and 
pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker 
pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                    airflow_worker=ti.queued_by_job_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)
+                elif len(pod_list) > 1:
+                    raise RuntimeError("Found multiple pods for ti %s: %s", 
ti, pod_list)
+                res = client.read_namespaced_pod_log(
+                    name=pod_list[0].metadata.name,
                     namespace=namespace,
                     container="base",
                     follow=False,
@@ -272,6 +286,14 @@ class FileTaskHandler(logging.Handler):
 
         return log, {"end_of_log": end_of_log, "log_pos": log_pos}
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):
+            namespace = pod_override.metadata.namespace
+        return namespace or conf.get("kubernetes_executor", "namespace", 
fallback="default")
+
     @staticmethod
     def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> 
str:
         url = urljoin(
diff --git a/tests/kubernetes/test_kubernetes_helper_functions.py 
b/tests/kubernetes/test_kubernetes_helper_functions.py
index 52a453e2e8..76512c657a 100644
--- a/tests/kubernetes/test_kubernetes_helper_functions.py
+++ b/tests/kubernetes/test_kubernetes_helper_functions.py
@@ -88,14 +88,14 @@ class TestCreatePodId:
 
     def test_create_pod_id_dag_too_long_with_suffix(self, create_pod_id):
         actual = create_pod_id("0" * 254)
-        assert len(actual) == 63
-        assert re.match(r"0{54}-[a-z0-9]{8}", actual)
+        assert len(actual) == 80
+        assert re.match(r"0{71}-[a-z0-9]{8}", actual)
         assert re.match(pod_name_regex, actual)
 
     def test_create_pod_id_dag_too_long_non_unique(self, create_pod_id):
         actual = create_pod_id("0" * 254, unique=False)
-        assert len(actual) == 63
-        assert re.match(r"0{63}", actual)
+        assert len(actual) == 80
+        assert re.match(r"0{80}", actual)
         assert re.match(pod_name_regex, actual)
 
     @pytest.mark.parametrize("unique", [True, False])
diff --git a/tests/kubernetes/test_pod_generator.py 
b/tests/kubernetes/test_pod_generator.py
index 1314f763e8..578e8e5edd 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -22,6 +22,7 @@ import sys
 from unittest import mock
 from unittest.mock import MagicMock
 
+import pendulum
 import pytest
 from dateutil import parser
 from kubernetes.client import ApiClient, models as k8s
@@ -38,6 +39,8 @@ from airflow.kubernetes.pod_generator import (
 )
 from airflow.kubernetes.secret import Secret
 
+now = pendulum.now("UTC")
+
 
 class TestPodGenerator:
     def setup_method(self):
@@ -476,7 +479,7 @@ class TestPodGenerator:
         result_dict = self.k8s_client.sanitize_for_serialization(result)
         expected_dict = self.k8s_client.sanitize_for_serialization(expected)
 
-        assert expected_dict == result_dict
+        assert result_dict == expected_dict
 
     def test_construct_pod_empty_executor_config(self):
         path = sys.path[0] + 
"/tests/kubernetes/pod_generator_base_with_secrets.yaml"
@@ -772,3 +775,49 @@ class TestPodGenerator:
             PodGenerator()
         PodGenerator(pod_template_file="tests/kubernetes/pod.yaml")
         PodGenerator(pod=k8s.V1Pod())
+
+    @pytest.mark.parametrize(
+        "extra, extra_expected",
+        [
+            param(dict(), {}, id="base"),
+            param(dict(airflow_worker=2), {"airflow-worker": "2"}, 
id="worker"),
+            param(dict(map_index=2), {"map_index": "2"}, id="map_index"),
+            param(dict(run_id="2"), {"run_id": "2"}, id="run_id"),
+            param(
+                dict(execution_date=now),
+                {"execution_date": datetime_to_label_safe_datestring(now)},
+                id="date",
+            ),
+            param(
+                dict(airflow_worker=2, map_index=2, run_id="2", 
execution_date=now),
+                {
+                    "airflow-worker": "2",
+                    "map_index": "2",
+                    "run_id": "2",
+                    "execution_date": datetime_to_label_safe_datestring(now),
+                },
+                id="all",
+            ),
+        ],
+    )
+    def test_build_labels_for_k8s_executor_pod(self, extra, extra_expected):
+        from airflow.version import version as airflow_version
+
+        kwargs = dict(
+            dag_id="dag*",
+            task_id="task*",
+            try_number=1,
+        )
+        expected = dict(
+            dag_id="dag-6b24921d4",
+            task_id="task-b6aca8991",
+            try_number="1",
+            airflow_version=airflow_version,
+            kubernetes_executor="True",
+        )
+        labels = PodGenerator.build_labels_for_k8s_executor_pod(**kwargs, 
**extra)
+        assert labels == {**expected, **extra_expected}
+        exp_selector = ",".join([f"{k}={v}" for k, v in 
sorted(labels.items())])
+        if "airflow_worker" not in extra:
+            exp_selector += ",airflow-worker"
+        assert PodGenerator.build_selector_for_k8s_executor_pod(**kwargs, 
**extra) == exp_selector
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 8b7f0145de..3a49a47f6b 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,7 +21,7 @@ import logging
 import logging.config
 import os
 import re
-from unittest.mock import MagicMock, patch
+from unittest.mock import patch
 
 import pytest
 from kubernetes.client import models as k8s
@@ -235,42 +235,58 @@ class TestFileTaskLogHandler:
     def test_read_from_k8s_under_multi_namespace_mode(
         self, mock_kube_client, pod_override, namespace_to_call
     ):
-        mock_read_namespaced_pod_log = MagicMock()
-        mock_kube_client.return_value.read_namespaced_pod_log = 
mock_read_namespaced_pod_log
+        mock_read_log = mock_kube_client.return_value.read_namespaced_pod_log
+        mock_list_pod = mock_kube_client.return_value.list_namespaced_pod
 
         def task_callable(ti):
             ti.log.info("test")
 
-        dag = DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE)
+        with DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE) 
as dag:
+            task = PythonOperator(
+                task_id="task_for_testing_file_log_handler",
+                python_callable=task_callable,
+                executor_config={"pod_override": pod_override},
+            )
         dagrun = dag.create_dagrun(
             run_type=DagRunType.MANUAL,
             state=State.RUNNING,
             execution_date=DEFAULT_DATE,
         )
-        executor_config_pod = pod_override
-        task = PythonOperator(
-            task_id="task_for_testing_file_log_handler",
-            dag=dag,
-            python_callable=task_callable,
-            executor_config={"pod_override": executor_config_pod},
-        )
         ti = TaskInstance(task=task, run_id=dagrun.run_id)
         ti.try_number = 3
 
         logger = ti.log
         ti.log.disabled = False
 
-        file_handler = next(
-            (handler for handler in logger.handlers if handler.name == 
FILE_TASK_HANDLER), None
-        )
+        file_handler = next((h for h in logger.handlers if h.name == 
FILE_TASK_HANDLER), None)
         set_context(logger, ti)
         ti.run(ignore_ti_state=True)
 
         file_handler.read(ti, 3)
 
-        # Check if kube_client.read_namespaced_pod_log() is called with the 
namespace we expect
-        mock_read_namespaced_pod_log.assert_called_once_with(
-            name=ti.hostname,
+        # first we find pod name
+        mock_list_pod.assert_called_once()
+        actual_kwargs = mock_list_pod.call_args[1]
+        assert actual_kwargs["namespace"] == namespace_to_call
+        actual_selector = actual_kwargs["label_selector"]
+        assert re.match(
+            ",".join(
+                [
+                    "airflow_version=.+?",
+                    "dag_id=dag_for_testing_file_task_handler",
+                    "kubernetes_executor=True",
+                    "run_id=manual__2016-01-01T0000000000-2b88d1d57",
+                    "task_id=task_for_testing_file_log_handler",
+                    "try_number=.+?",
+                    "airflow-worker",
+                ]
+            ),
+            actual_selector,
+        )
+
+        # then we read log
+        mock_read_log.assert_called_once_with(
+            name=mock_list_pod.return_value.items[0].metadata.name,
             namespace=namespace_to_call,
             container="base",
             follow=False,

Reply via email to