This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 64b0872d92 Adding more information to kubernetes executor logs (#29929)
64b0872d92 is described below
commit 64b0872d92609e2df465989062e39357eeef9dab
Author: Amogh Desai <[email protected]>
AuthorDate: Thu May 25 14:10:18 2023 +0530
Adding more information to kubernetes executor logs (#29929)
---
airflow/config_templates/config.yml | 7 ++++
airflow/config_templates/default_airflow.cfg | 3 ++
airflow/executors/kubernetes_executor.py | 35 ++++++++++++++------
airflow/kubernetes/kubernetes_helper_functions.py | 15 +++++++++
tests/executors/test_kubernetes_executor.py | 39 ++++++++++++++++++++++-
5 files changed, 89 insertions(+), 10 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 52b802a5c6..aeeb18c8e9 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2686,6 +2686,13 @@ kubernetes_executor:
type: string
example: '{ "total": 3, "backoff_factor": 0.5 }'
default: ""
+ logs_task_metadata:
+ description: |
+ Flag to control the information added to kubernetes executor logs for
better traceability
+ version_added: 2.7.0
+ type: boolean
+ example: ~
+ default: "False"
pod_template_file:
description: |
Path to the YAML pod file that forms the basis for KubernetesExecutor
workers.
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index ac350cf2a1..05bf0a25ad 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1368,6 +1368,9 @@ verify_certs = True
# Example: api_client_retry_configuration = {{ "total": 3, "backoff_factor":
0.5 }}
api_client_retry_configuration =
+# Flag to control the information added to kubernetes executor logs for better
traceability
+logs_task_metadata = False
+
# Path to the YAML pod file that forms the basis for KubernetesExecutor
workers.
pod_template_file =
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 470df3101a..10aef9b532 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -44,7 +44,11 @@ from airflow.executors.base_executor import BaseExecutor
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.kube_config import KubeConfig
-from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key,
create_pod_id
+from airflow.kubernetes.kubernetes_helper_functions import (
+ annotations_for_logging_task_metadata,
+ annotations_to_key,
+ create_pod_id,
+)
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin, remove_escape_codes
@@ -216,15 +220,16 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
resource_version: str,
event: Any,
) -> None:
+ annotations_string = annotations_for_logging_task_metadata(annotations)
"""Process status response."""
if status == "Pending":
if event["type"] == "DELETED":
- self.log.info("Event: Failed to start pod %s", pod_name)
+ self.log.info("Event: Failed to start pod %s, annotations:
%s", pod_name, annotations_string)
self.watcher_queue.put((pod_name, namespace, State.FAILED,
annotations, resource_version))
else:
- self.log.debug("Event: %s Pending", pod_name)
+ self.log.debug("Event: %s Pending, annotations: %s", pod_name,
annotations_string)
elif status == "Failed":
- self.log.error("Event: %s Failed", pod_name)
+ self.log.error("Event: %s Failed, annotations: %s", pod_name,
annotations_string)
self.watcher_queue.put((pod_name, namespace, State.FAILED,
annotations, resource_version))
elif status == "Succeeded":
# We get multiple events once the pod hits a terminal state, and
we only want to
@@ -243,14 +248,18 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
pod_name,
)
return
- self.log.info("Event: %s Succeeded", pod_name)
+ self.log.info("Event: %s Succeeded, annotations: %s", pod_name,
annotations_string)
self.watcher_queue.put((pod_name, namespace, None, annotations,
resource_version))
elif status == "Running":
if event["type"] == "DELETED":
- self.log.info("Event: Pod %s deleted before it could
complete", pod_name)
+ self.log.info(
+ "Event: Pod %s deleted before it could complete,
annotations: %s",
+ pod_name,
+ annotations_string,
+ )
self.watcher_queue.put((pod_name, namespace, State.FAILED,
annotations, resource_version))
else:
- self.log.info("Event: %s is Running", pod_name)
+ self.log.info("Event: %s is Running, annotations: %s",
pod_name, annotations_string)
else:
self.log.warning(
"Event: Invalid state: %s on pod: %s in namespace %s with
annotations: %s with "
@@ -377,7 +386,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
)
# Reconcile the pod generated by the Operator and the Pod
# generated by the .cfg file
- self.log.info("Creating kubernetes pod for job is %s, with pod name
%s", key, pod.metadata.name)
+ self.log.info(
+ "Creating kubernetes pod for job is %s, with pod name %s,
annotations: %s",
+ key,
+ pod.metadata.name,
+ annotations_for_logging_task_metadata(pod.metadata.annotations),
+ )
self.log.debug("Kubernetes running for command %s", command)
self.log.debug("Kubernetes launching image %s",
pod.spec.containers[0].image)
@@ -438,7 +452,10 @@ class AirflowKubernetesScheduler(LoggingMixin):
"""Process the task by watcher."""
pod_name, namespace, state, annotations, resource_version = task
self.log.debug(
- "Attempting to finish pod; pod_name: %s; state: %s; annotations:
%s", pod_name, state, annotations
+ "Attempting to finish pod; pod_name: %s; state: %s; annotations:
%s",
+ pod_name,
+ state,
+ annotations_for_logging_task_metadata(annotations),
)
key = annotations_to_key(annotations=annotations)
if key:
diff --git a/airflow/kubernetes/kubernetes_helper_functions.py
b/airflow/kubernetes/kubernetes_helper_functions.py
index 390eb0edb7..fdb76b0aa8 100644
--- a/airflow/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/kubernetes/kubernetes_helper_functions.py
@@ -23,6 +23,8 @@ import string
import pendulum
from slugify import slugify
+from airflow.compat.functools import cache
+from airflow.configuration import conf
from airflow.models.taskinstancekey import TaskInstanceKey
log = logging.getLogger(__name__)
@@ -119,3 +121,16 @@ def annotations_to_key(annotations: dict[str, str]) ->
TaskInstanceKey:
try_number=try_number,
map_index=map_index,
)
+
+
+@cache
+def get_logs_task_metadata() -> bool:
+ return conf.getboolean("kubernetes_executor", "logs_task_metadata",
fallback=False)
+
+
+def annotations_for_logging_task_metadata(annotation_set):
+ if get_logs_task_metadata():
+ annotations_for_logging = annotation_set
+ else:
+ annotations_for_logging = "<omitted>"
+ return annotations_for_logging
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index 9e3ee938b0..fc9daaacde 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -50,7 +50,11 @@ try:
get_base_pod_from_template,
)
from airflow.kubernetes import pod_generator
- from airflow.kubernetes.kubernetes_helper_functions import
annotations_to_key
+ from airflow.kubernetes.kubernetes_helper_functions import (
+ annotations_for_logging_task_metadata,
+ annotations_to_key,
+ get_logs_task_metadata,
+ )
from airflow.kubernetes.pod_generator import PodGenerator
except ImportError:
AirflowKubernetesScheduler = None # type: ignore
@@ -1162,6 +1166,39 @@ class TestKubernetesExecutor:
def test_supports_sentry(self):
assert not KubernetesExecutor.supports_sentry
+ def test_annotations_for_logging_task_metadata(self):
+ annotations_test = {
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "task_id": "task",
+ "try_number": "1",
+ }
+ get_logs_task_metadata.cache_clear()
+ with conf_vars({("kubernetes", "logs_task_metadata"): "True"}):
+ expected_annotations = {
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "task_id": "task",
+ "try_number": "1",
+ }
+ annotations_actual =
annotations_for_logging_task_metadata(annotations_test)
+ assert annotations_actual == expected_annotations
+ get_logs_task_metadata.cache_clear()
+
+ def test_annotations_for_logging_task_metadata_fallback(self):
+ annotations_test = {
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "task_id": "task",
+ "try_number": "1",
+ }
+ get_logs_task_metadata.cache_clear()
+ with conf_vars({("kubernetes", "logs_task_metadata"): "False"}):
+ expected_annotations = "<omitted>"
+ annotations_actual =
annotations_for_logging_task_metadata(annotations_test)
+ assert annotations_actual == expected_annotations
+ get_logs_task_metadata.cache_clear()
+
class TestKubernetesJobWatcher:
test_namespace = "airflow"