This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 64b0872d92 Adding more information to kubernetes executor logs (#29929)
64b0872d92 is described below

commit 64b0872d92609e2df465989062e39357eeef9dab
Author: Amogh Desai <[email protected]>
AuthorDate: Thu May 25 14:10:18 2023 +0530

    Adding more information to kubernetes executor logs (#29929)
---
 airflow/config_templates/config.yml               |  7 ++++
 airflow/config_templates/default_airflow.cfg      |  3 ++
 airflow/executors/kubernetes_executor.py          | 35 ++++++++++++++------
 airflow/kubernetes/kubernetes_helper_functions.py | 15 +++++++++
 tests/executors/test_kubernetes_executor.py       | 39 ++++++++++++++++++++++-
 5 files changed, 89 insertions(+), 10 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 52b802a5c6..aeeb18c8e9 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2686,6 +2686,13 @@ kubernetes_executor:
       type: string
       example: '{ "total": 3, "backoff_factor": 0.5 }'
       default: ""
+    logs_task_metadata:
+      description: |
+        Flag to control the information added to kubernetes executor logs for 
better traceability
+      version_added: 2.7.0
+      type: boolean
+      example: ~
+      default: "False"
     pod_template_file:
       description: |
         Path to the YAML pod file that forms the basis for KubernetesExecutor 
workers.
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index ac350cf2a1..05bf0a25ad 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1368,6 +1368,9 @@ verify_certs = True
 # Example: api_client_retry_configuration = {{ "total": 3, "backoff_factor": 
0.5 }}
 api_client_retry_configuration =
 
+# Flag to control the information added to kubernetes executor logs for better 
traceability
+logs_task_metadata = False
+
 # Path to the YAML pod file that forms the basis for KubernetesExecutor 
workers.
 pod_template_file =
 
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 470df3101a..10aef9b532 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -44,7 +44,11 @@ from airflow.executors.base_executor import BaseExecutor
 from airflow.kubernetes import pod_generator
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.kube_config import KubeConfig
-from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key, 
create_pod_id
+from airflow.kubernetes.kubernetes_helper_functions import (
+    annotations_for_logging_task_metadata,
+    annotations_to_key,
+    create_pod_id,
+)
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.log.logging_mixin import LoggingMixin, remove_escape_codes
@@ -216,15 +220,16 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
         resource_version: str,
         event: Any,
     ) -> None:
+        annotations_string = annotations_for_logging_task_metadata(annotations)
         """Process status response."""
         if status == "Pending":
             if event["type"] == "DELETED":
-                self.log.info("Event: Failed to start pod %s", pod_name)
+                self.log.info("Event: Failed to start pod %s, annotations: 
%s", pod_name, annotations_string)
                 self.watcher_queue.put((pod_name, namespace, State.FAILED, 
annotations, resource_version))
             else:
-                self.log.debug("Event: %s Pending", pod_name)
+                self.log.debug("Event: %s Pending, annotations: %s", pod_name, 
annotations_string)
         elif status == "Failed":
-            self.log.error("Event: %s Failed", pod_name)
+            self.log.error("Event: %s Failed, annotations: %s", pod_name, 
annotations_string)
             self.watcher_queue.put((pod_name, namespace, State.FAILED, 
annotations, resource_version))
         elif status == "Succeeded":
             # We get multiple events once the pod hits a terminal state, and 
we only want to
@@ -243,14 +248,18 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                     pod_name,
                 )
                 return
-            self.log.info("Event: %s Succeeded", pod_name)
+            self.log.info("Event: %s Succeeded, annotations: %s", pod_name, 
annotations_string)
             self.watcher_queue.put((pod_name, namespace, None, annotations, 
resource_version))
         elif status == "Running":
             if event["type"] == "DELETED":
-                self.log.info("Event: Pod %s deleted before it could 
complete", pod_name)
+                self.log.info(
+                    "Event: Pod %s deleted before it could complete, 
annotations: %s",
+                    pod_name,
+                    annotations_string,
+                )
                 self.watcher_queue.put((pod_name, namespace, State.FAILED, 
annotations, resource_version))
             else:
-                self.log.info("Event: %s is Running", pod_name)
+                self.log.info("Event: %s is Running, annotations: %s", 
pod_name, annotations_string)
         else:
             self.log.warning(
                 "Event: Invalid state: %s on pod: %s in namespace %s with 
annotations: %s with "
@@ -377,7 +386,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
         )
         # Reconcile the pod generated by the Operator and the Pod
         # generated by the .cfg file
-        self.log.info("Creating kubernetes pod for job is %s, with pod name 
%s", key, pod.metadata.name)
+        self.log.info(
+            "Creating kubernetes pod for job is %s, with pod name %s, 
annotations: %s",
+            key,
+            pod.metadata.name,
+            annotations_for_logging_task_metadata(pod.metadata.annotations),
+        )
         self.log.debug("Kubernetes running for command %s", command)
         self.log.debug("Kubernetes launching image %s", 
pod.spec.containers[0].image)
 
@@ -438,7 +452,10 @@ class AirflowKubernetesScheduler(LoggingMixin):
         """Process the task by watcher."""
         pod_name, namespace, state, annotations, resource_version = task
         self.log.debug(
-            "Attempting to finish pod; pod_name: %s; state: %s; annotations: 
%s", pod_name, state, annotations
+            "Attempting to finish pod; pod_name: %s; state: %s; annotations: 
%s",
+            pod_name,
+            state,
+            annotations_for_logging_task_metadata(annotations),
         )
         key = annotations_to_key(annotations=annotations)
         if key:
diff --git a/airflow/kubernetes/kubernetes_helper_functions.py 
b/airflow/kubernetes/kubernetes_helper_functions.py
index 390eb0edb7..fdb76b0aa8 100644
--- a/airflow/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/kubernetes/kubernetes_helper_functions.py
@@ -23,6 +23,8 @@ import string
 import pendulum
 from slugify import slugify
 
+from airflow.compat.functools import cache
+from airflow.configuration import conf
 from airflow.models.taskinstancekey import TaskInstanceKey
 
 log = logging.getLogger(__name__)
@@ -119,3 +121,16 @@ def annotations_to_key(annotations: dict[str, str]) -> 
TaskInstanceKey:
         try_number=try_number,
         map_index=map_index,
     )
+
+
+@cache
+def get_logs_task_metadata() -> bool:
+    return conf.getboolean("kubernetes_executor", "logs_task_metadata", 
fallback=False)
+
+
+def annotations_for_logging_task_metadata(annotation_set):
+    if get_logs_task_metadata():
+        annotations_for_logging = annotation_set
+    else:
+        annotations_for_logging = "<omitted>"
+    return annotations_for_logging
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 9e3ee938b0..fc9daaacde 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -50,7 +50,11 @@ try:
         get_base_pod_from_template,
     )
     from airflow.kubernetes import pod_generator
-    from airflow.kubernetes.kubernetes_helper_functions import 
annotations_to_key
+    from airflow.kubernetes.kubernetes_helper_functions import (
+        annotations_for_logging_task_metadata,
+        annotations_to_key,
+        get_logs_task_metadata,
+    )
     from airflow.kubernetes.pod_generator import PodGenerator
 except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
@@ -1162,6 +1166,39 @@ class TestKubernetesExecutor:
     def test_supports_sentry(self):
         assert not KubernetesExecutor.supports_sentry
 
+    def test_annotations_for_logging_task_metadata(self):
+        annotations_test = {
+            "dag_id": "dag",
+            "run_id": "run_id",
+            "task_id": "task",
+            "try_number": "1",
+        }
+        get_logs_task_metadata.cache_clear()
+        with conf_vars({("kubernetes", "logs_task_metadata"): "True"}):
+            expected_annotations = {
+                "dag_id": "dag",
+                "run_id": "run_id",
+                "task_id": "task",
+                "try_number": "1",
+            }
+            annotations_actual = 
annotations_for_logging_task_metadata(annotations_test)
+            assert annotations_actual == expected_annotations
+        get_logs_task_metadata.cache_clear()
+
+    def test_annotations_for_logging_task_metadata_fallback(self):
+        annotations_test = {
+            "dag_id": "dag",
+            "run_id": "run_id",
+            "task_id": "task",
+            "try_number": "1",
+        }
+        get_logs_task_metadata.cache_clear()
+        with conf_vars({("kubernetes", "logs_task_metadata"): "False"}):
+            expected_annotations = "<omitted>"
+            annotations_actual = 
annotations_for_logging_task_metadata(annotations_test)
+            assert annotations_actual == expected_annotations
+        get_logs_task_metadata.cache_clear()
+
 
 class TestKubernetesJobWatcher:
     test_namespace = "airflow"

Reply via email to