This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b6c5189dad Reduce log verbosity in KubernetesExecutor. (#26582)
b6c5189dad is described below
commit b6c5189dadb9c09967ec53c8bca1832852c5500e
Author: HTErik <[email protected]>
AuthorDate: Thu Sep 29 05:14:25 2022 +0200
Reduce log verbosity in KubernetesExecutor. (#26582)
---
airflow/executors/kubernetes_executor.py | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index a718aa1907..3d7876d9e5 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -25,6 +25,7 @@ from __future__ import annotations
import functools
import json
+import logging
import multiprocessing
import time
from datetime import timedelta
@@ -147,7 +148,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
)
for event in list_worker_pods():
task = event['object']
- self.log.info('Event: %s had an event of type %s',
task.metadata.name, event['type'])
+ self.log.debug('Event: %s had an event of type %s',
task.metadata.name, event['type'])
if event['type'] == 'ERROR':
return self.process_error(event)
annotations = task.metadata.annotations
@@ -204,7 +205,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
self.log.info('Event: Failed to start pod %s', pod_id)
self.watcher_queue.put((pod_id, namespace, State.FAILED,
annotations, resource_version))
else:
- self.log.info('Event: %s Pending', pod_id)
+ self.log.debug('Event: %s Pending', pod_id)
elif status == 'Failed':
self.log.error('Event: %s Failed', pod_id)
self.watcher_queue.put((pod_id, namespace, State.FAILED,
annotations, resource_version))
@@ -302,7 +303,6 @@ class AirflowKubernetesScheduler(LoggingMixin):
status
"""
key, command, kube_executor_config, pod_template_file = next_job
- self.log.info('Kubernetes job is %s', key)
dag_id, task_id, run_id, try_number, map_index = key
@@ -333,6 +333,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
)
# Reconcile the pod generated by the Operator and the Pod
# generated by the .cfg file
+ self.log.info('Creating kubernetes pod for job is %s, with pod name
%s', key, pod.metadata.name)
self.log.debug("Kubernetes running for command %s", command)
self.log.debug("Kubernetes launching image %s",
pod.spec.containers[0].image)
@@ -380,7 +381,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
def process_watcher_task(self, task: KubernetesWatchType) -> None:
"""Process the task by watcher."""
pod_id, namespace, state, annotations, resource_version = task
- self.log.info(
+ self.log.debug(
'Attempting to finish pod; pod_id: %s; state: %s; annotations:
%s', pod_id, state, annotations
)
key = annotations_to_key(annotations=annotations)
@@ -548,11 +549,15 @@ class KubernetesExecutor(BaseExecutor):
executor_config: Any | None = None,
) -> None:
"""Executes task asynchronously"""
- self.log.info('Add task %s with command %s with executor_config %s',
key, command, executor_config)
+ if self.log.isEnabledFor(logging.DEBUG):
+ self.log.debug('Add task %s with command %s, executor_config %s',
key, command, executor_config)
+ else:
+ self.log.info('Add task %s with command %s', key, command)
+
try:
kube_executor_config = PodGenerator.from_obj(executor_config)
except Exception:
- self.log.error("Invalid executor_config for %s", key)
+ self.log.error("Invalid executor_config for %s. Executor_config:
%s", key, executor_config)
self.fail(key=key, info="Invalid executor_config passed")
return