This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b6c5189dad Reduce log verbosity in KubernetesExecutor. (#26582)
b6c5189dad is described below

commit b6c5189dadb9c09967ec53c8bca1832852c5500e
Author: HTErik <[email protected]>
AuthorDate: Thu Sep 29 05:14:25 2022 +0200

    Reduce log verbosity in KubernetesExecutor. (#26582)
---
 airflow/executors/kubernetes_executor.py | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index a718aa1907..3d7876d9e5 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -25,6 +25,7 @@ from __future__ import annotations
 
 import functools
 import json
+import logging
 import multiprocessing
 import time
 from datetime import timedelta
@@ -147,7 +148,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
             )
         for event in list_worker_pods():
             task = event['object']
-            self.log.info('Event: %s had an event of type %s', 
task.metadata.name, event['type'])
+            self.log.debug('Event: %s had an event of type %s', 
task.metadata.name, event['type'])
             if event['type'] == 'ERROR':
                 return self.process_error(event)
             annotations = task.metadata.annotations
@@ -204,7 +205,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                 self.log.info('Event: Failed to start pod %s', pod_id)
                 self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))
             else:
-                self.log.info('Event: %s Pending', pod_id)
+                self.log.debug('Event: %s Pending', pod_id)
         elif status == 'Failed':
             self.log.error('Event: %s Failed', pod_id)
             self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))
@@ -302,7 +303,6 @@ class AirflowKubernetesScheduler(LoggingMixin):
         status
         """
         key, command, kube_executor_config, pod_template_file = next_job
-        self.log.info('Kubernetes job is %s', key)
 
         dag_id, task_id, run_id, try_number, map_index = key
 
@@ -333,6 +333,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
         )
         # Reconcile the pod generated by the Operator and the Pod
         # generated by the .cfg file
+        self.log.info('Creating kubernetes pod for job is %s, with pod name 
%s', key, pod.metadata.name)
         self.log.debug("Kubernetes running for command %s", command)
         self.log.debug("Kubernetes launching image %s", 
pod.spec.containers[0].image)
 
@@ -380,7 +381,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
     def process_watcher_task(self, task: KubernetesWatchType) -> None:
         """Process the task by watcher."""
         pod_id, namespace, state, annotations, resource_version = task
-        self.log.info(
+        self.log.debug(
             'Attempting to finish pod; pod_id: %s; state: %s; annotations: 
%s', pod_id, state, annotations
         )
         key = annotations_to_key(annotations=annotations)
@@ -548,11 +549,15 @@ class KubernetesExecutor(BaseExecutor):
         executor_config: Any | None = None,
     ) -> None:
         """Executes task asynchronously"""
-        self.log.info('Add task %s with command %s with executor_config %s', 
key, command, executor_config)
+        if self.log.isEnabledFor(logging.DEBUG):
+            self.log.debug('Add task %s with command %s, executor_config %s', 
key, command, executor_config)
+        else:
+            self.log.info('Add task %s with command %s', key, command)
+
         try:
             kube_executor_config = PodGenerator.from_obj(executor_config)
         except Exception:
-            self.log.error("Invalid executor_config for %s", key)
+            self.log.error("Invalid executor_config for %s. Executor_config: 
%s", key, executor_config)
             self.fail(key=key, info="Invalid executor_config passed")
             return
 

Reply via email to