This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2bcd450e84 Add task parameter to set custom logger name (#34964)
2bcd450e84 is described below

commit 2bcd450e84426fd678b3fa2e4a15757af234e98a
Author: Joffrey Bienvenu <[email protected]>
AuthorDate: Fri Nov 3 18:55:32 2023 +0100

    Add task parameter to set custom logger name (#34964)
---
 airflow/hooks/base.py                              | 10 +++
 airflow/models/baseoperator.py                     | 15 +++--
 airflow/serialization/schema.json                  |  2 +
 airflow/utils/log/logging_mixin.py                 | 38 +++++++++++-
 .../advanced-logging-configuration.rst             | 72 +++++++++++++++++++++-
 .../logging-monitoring/logging-tasks.rst           |  3 +-
 tests/hooks/test_base.py                           | 34 ++++++++++
 tests/operators/test_python.py                     | 27 ++++++++
 tests/providers/docker/hooks/test_docker.py        |  2 +-
 tests/serialization/test_dag_serialization.py      |  5 +-
 tests/utils/test_logging_mixin.py                  | 38 +++++++++++-
 11 files changed, 233 insertions(+), 13 deletions(-)

diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py
index 0974813c5a..6ec0a8938e 100644
--- a/airflow/hooks/base.py
+++ b/airflow/hooks/base.py
@@ -41,8 +41,18 @@ class BaseHook(LoggingMixin):
     object that can handle the connection and interaction to specific
     instances of these systems, and expose consistent methods to interact
     with them.
+
+    :param logger_name: Name of the logger used by the Hook to emit logs.
+        If set to `None` (default), the logger name will fall back to
+        `airflow.task.hooks.{class.__module__}.{class.__name__}` (e.g. 
DbApiHook will have
+        *airflow.task.hooks.airflow.providers.common.sql.hooks.sql.DbApiHook* 
as logger).
     """
 
+    def __init__(self, logger_name: str | None = None):
+        super().__init__()
+        self._log_config_logger_name = "airflow.task.hooks"
+        self._logger_name = logger_name
+
     @classmethod
     def get_connections(cls, conn_id: str) -> list[Connection]:
         """
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 0b05e6f6d8..61fb6e07de 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -260,6 +260,7 @@ def partial(
     doc_json: str | None | ArgNotSet = NOTSET,
     doc_yaml: str | None | ArgNotSet = NOTSET,
     doc_rst: str | None | ArgNotSet = NOTSET,
+    logger_name: str | None | ArgNotSet = NOTSET,
     **kwargs,
 ) -> OperatorPartial:
     from airflow.models.dag import DagContext
@@ -323,6 +324,7 @@ def partial(
         "doc_md": doc_md,
         "doc_rst": doc_rst,
         "doc_yaml": doc_yaml,
+        "logger_name": logger_name,
     }
 
     # Inject DAG-level default args into args provided to this function.
@@ -651,6 +653,10 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         that is visible in Task Instance details View in the Webserver
     :param doc_yaml: Add documentation (in YAML format) or notes to your Task 
objects
         that is visible in Task Instance details View in the Webserver
+    :param logger_name: Name of the logger used by the Operator to emit logs.
+        If set to `None` (default), the logger name will fall back to
+        `airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. 
SimpleHttpOperator will have
+        
*airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator*
 as logger).
     """
 
     # Implementing Operator.
@@ -670,7 +676,6 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         "user_defined_macros",
         "user_defined_filters",
         "params",
-        "_log",
     )
 
     # each operator should override this class attr for shallow copy attrs.
@@ -781,6 +786,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         doc_json: str | None = None,
         doc_yaml: str | None = None,
         doc_rst: str | None = None,
+        logger_name: str | None = None,
         **kwargs,
     ):
         from airflow.models.dag import DagContext
@@ -931,7 +937,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         if dag:
             self.dag = dag
 
-        self._log = logging.getLogger("airflow.task.operators")
+        self._log_config_logger_name = "airflow.task.operators"
+        self._logger_name = logger_name
 
         # Lineage
         self.inlets: list = []
@@ -1220,13 +1227,13 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 
     def __getstate__(self):
         state = dict(self.__dict__)
-        del state["_log"]
+        if self._log:
+            del state["_log"]
 
         return state
 
     def __setstate__(self, state):
         self.__dict__ = state
-        self._log = logging.getLogger("airflow.task.operators")
 
     def render_template_fields(
         self,
diff --git a/airflow/serialization/schema.json 
b/airflow/serialization/schema.json
index 13489fedc6..ae7121fd14 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -253,6 +253,8 @@
         "doc_json":  { "type": "string" },
         "doc_yaml":  { "type": "string" },
         "doc_rst":  { "type": "string" },
+        "_logger_name": { "type": "string" },
+        "_log_config_logger_name": { "type": "string" },
         "_is_mapped": { "const": true, "$comment": "only present when True" },
         "expand_input": { "type": "object" },
         "partial_kwargs": { "type": "object" }
diff --git a/airflow/utils/log/logging_mixin.py 
b/airflow/utils/log/logging_mixin.py
index 834237182c..15e920818b 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -69,13 +69,47 @@ class LoggingMixin:
 
     _log: logging.Logger | None = None
 
+    # Parent logger used by this class. It should match one of the loggers 
defined in the
+    # `logging_config_class`. By default, this attribute is used to create the 
final name of the logger, and
+    # will prefix the `_logger_name` with a separating dot.
+    _log_config_logger_name: str | None = None
+
+    _logger_name: str | None = None
+
     def __init__(self, context=None):
         self._set_context(context)
 
     @staticmethod
-    def _get_log(obj: Any, clazz: type[_T]) -> Logger:
+    def _create_logger_name(
+        logged_class: type[_T],
+        log_config_logger_name: str | None = None,
+        class_logger_name: str | None = None,
+    ) -> str:
+        """Generate a logger name for the given `logged_class`.
+
+        By default, this function returns the `class_logger_name` as logger 
name. If it is not provided,
+        the {class.__module__}.{class.__name__} is returned instead. When a 
`parent_logger_name` is provided,
+        it will prefix the logger name with a separating dot.
+        """
+        logger_name: str = (
+            class_logger_name
+            if class_logger_name is not None
+            else f"{logged_class.__module__}.{logged_class.__name__}"
+        )
+
+        if log_config_logger_name:
+            return f"{log_config_logger_name}.{logger_name}" if logger_name 
else log_config_logger_name
+        return logger_name
+
+    @classmethod
+    def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
         if obj._log is None:
-            obj._log = 
logging.getLogger(f"{clazz.__module__}.{clazz.__name__}")
+            logger_name: str = cls._create_logger_name(
+                logged_class=clazz,
+                log_config_logger_name=obj._log_config_logger_name,
+                class_logger_name=obj._logger_name,
+            )
+            obj._log = logging.getLogger(logger_name)
         return obj._log
 
     @classmethod
diff --git 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
index 925013af8b..5ac3ad46c0 100644
--- 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
+++ 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst
@@ -19,7 +19,7 @@
 .. _write-logs-advanced:
 
 Advanced logging configuration
-------------------------------
+==============================
 
 Not all configuration options are available from the ``airflow.cfg`` file. The 
config file describes
 how to configure logging for tasks, because the logs generated by tasks are 
not only logged in separate
@@ -27,8 +27,10 @@ files by default but has to be also accessible via the 
webserver.
 
 By default standard airflow component logs are written to the 
``$AIRFLOW_HOME/logs`` directory, but you
 can also customize it and configure it as you want by overriding Python logger 
configuration that can
-be configured by providing custom logging configuration object. Some 
configuration options require
-that the logging config class be overwritten. You can do it by copying the 
default
+be configured by providing custom logging configuration object. You can also 
create and use logging configuration
+for specific operators and tasks.
+
+Some configuration options require that the logging config class be 
overwritten. You can do it by copying the default
 configuration of Airflow and modifying it to suit your needs. The default 
configuration can be seen in the
 `airflow_local_settings.py template 
<https://github.com/apache/airflow/blob/|airflow-version|/airflow/config_templates/airflow_local_settings.py>`_
 and you can see the loggers and handlers used there. Except the custom loggers 
and handlers configurable there
@@ -38,6 +40,9 @@ that Python objects log to loggers that follow naming 
convention of ``<package>.
 You can read more about standard python logging classes (Loggers, Handlers, 
Formatters) in the
 `Python logging documentation <https://docs.python.org/library/logging.html>`_.
 
+Create a custom logging class
+-----------------------------
+
 Configuring your logging classes can be done via the ``logging_config_class`` 
option in ``airflow.cfg`` file.
 This configuration should specify the import path to a configuration 
compatible with
 :func:`logging.config.dictConfig`. If your file is a standard import location, 
then you should set a
@@ -89,3 +94,64 @@ See :doc:`../modules_management` for details on how Python 
and Airflow manage mo
 .. note::
 
    You can override the way both standard logs of the components and "task" 
logs are handled.
+
+
+Custom logger for Operators, Hooks and Tasks
+--------------------------------------------
+
+You can create custom logging handlers and apply them to specific Operators, 
Hooks and tasks. By default, the Operators
+and Hooks loggers are child of the ``airflow.task`` logger: They follow 
respectively the naming convention
+``airflow.task.operators.<package>.<module_name>`` and 
``airflow.task.hooks.<package>.<module_name>``. After
+:doc:`creating a custom logging class 
</administration-and-deployment/logging-monitoring/advanced-logging-configuration>`,
+you can assign specific loggers to them.
+
+Example of custom logging for the ``SQLExecuteQueryOperator`` and the 
``HttpHook``:
+
+    .. code-block:: python
+
+      from copy import deepcopy
+      from pydantic.utils import deep_update
+      from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+
+      LOGGING_CONFIG = deep_update(
+          deepcopy(DEFAULT_LOGGING_CONFIG),
+          {
+              "loggers": {
+                  
"airflow.task.operators.airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator":
 {
+                      "handlers": ["task"],
+                      "level": "DEBUG",
+                      "propagate": True,
+                  },
+                  
"airflow.task.hooks.airflow.providers.http.hooks.http.HttpHook": {
+                      "handlers": ["task"],
+                      "level": "WARNING",
+                      "propagate": False,
+                  },
+              }
+          },
+      )
+
+
+You can also set a custom name to a Dag's task with the ``logger_name`` 
attribute. This can be useful if multiple tasks
+are using the same Operator, but you want to disable logging for some of them.
+
+Example of custom logger name:
+
+    .. code-block:: python
+
+      # In your Dag file
+      SQLExecuteQueryOperator(..., logger_name="sql.big_query")
+
+      # In your custom `log_config.py`
+      LOGGING_CONFIG = deep_update(
+          deepcopy(DEFAULT_LOGGING_CONFIG),
+          {
+              "loggers": {
+                  "airflow.task.operators.sql.big_query": {
+                      "handlers": ["task"],
+                      "level": "WARNING",
+                      "propagate": True,
+                  },
+              }
+          },
+      )
diff --git 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
index c09501447a..f038004b71 100644
--- 
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
+++ 
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
@@ -121,7 +121,8 @@ Advanced configuration
 ----------------------
 
 You can configure :doc:`advanced features 
</administration-and-deployment/logging-monitoring/advanced-logging-configuration>`
-- including adding your own custom task log handlers (but also log handlers 
for all airflow components).
+- including adding your own custom task log handlers (but also log handlers 
for all airflow components), and creating
+custom log handlers per operators, hooks and tasks.
 
 .. _serving-worker-trigger-logs:
 
diff --git a/tests/hooks/test_base.py b/tests/hooks/test_base.py
new file mode 100644
index 0000000000..273d1906a5
--- /dev/null
+++ b/tests/hooks/test_base.py
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.hooks.base import BaseHook
+
+
+class TestBaseHook:
+    def test_hook_has_default_logger_name(self):
+        hook = BaseHook()
+        assert hook.log.name == 
"airflow.task.hooks.airflow.hooks.base.BaseHook"
+
+    def test_custom_logger_name_is_correctly_set(self):
+        hook = BaseHook(logger_name="airflow.custom.logger")
+        assert hook.log.name == "airflow.task.hooks.airflow.custom.logger"
+
+    def test_empty_string_as_logger_name(self):
+        hook = BaseHook(logger_name="")
+        assert hook.log.name == "airflow.task.hooks"
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 52787150bd..ac9178c5af 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -20,12 +20,14 @@ from __future__ import annotations
 import copy
 import logging
 import os
+import pickle
 import re
 import sys
 import tempfile
 import warnings
 from collections import namedtuple
 from datetime import date, datetime, timedelta
+from functools import partial
 from subprocess import CalledProcessError
 from tempfile import TemporaryDirectory
 from typing import TYPE_CHECKING, Generator
@@ -310,6 +312,31 @@ class TestPythonOperator(BasePythonTest):
 
         assert python_operator.template_ext == ["test_ext"]
 
+    def test_python_operator_has_default_logger_name(self):
+        python_operator = PythonOperator(task_id="task", 
python_callable=partial(int, 2))
+
+        logger_name: str = 
"airflow.task.operators.airflow.operators.python.PythonOperator"
+        assert python_operator.log.name == logger_name
+
+    def test_custom_logger_name_is_correctly_set(self):
+        """
+        Ensure the custom logger name is correctly set when the Operator is 
created,
+        and when its state is resumed via __setstate__.
+        """
+        logger_name: str = "airflow.task.operators.custom.logger"
+
+        python_operator = PythonOperator(
+            task_id="task", python_callable=partial(int, 2), 
logger_name="custom.logger"
+        )
+        assert python_operator.log.name == logger_name
+
+        setstate_operator = pickle.loads(pickle.dumps(python_operator))
+        assert setstate_operator.log.name == logger_name
+
+    def test_custom_logger_name_can_be_empty_string(self):
+        python_operator = PythonOperator(task_id="task", 
python_callable=partial(int, 2), logger_name="")
+        assert python_operator.log.name == "airflow.task.operators"
+
 
 class TestBranchOperator(BasePythonTest):
     opcls = BranchPythonOperator
diff --git a/tests/providers/docker/hooks/test_docker.py 
b/tests/providers/docker/hooks/test_docker.py
index 090ea2c521..454aee3149 100644
--- a/tests/providers/docker/hooks/test_docker.py
+++ b/tests/providers/docker/hooks/test_docker.py
@@ -36,7 +36,7 @@ TEST_VERSION = "3.14"
 TEST_CONN = {"host": "some.docker.registry.com", "login": "some_user", 
"password": "some_p4$$w0rd"}
 MOCK_CONNECTION_NOT_EXIST_MSG = "Testing connection not exists"
 MOCK_CONNECTION_NOT_EXISTS_EX = 
AirflowNotFoundException(MOCK_CONNECTION_NOT_EXIST_MSG)
-HOOK_LOGGER_NAME = "airflow.providers.docker.hooks.docker.DockerHook"
+HOOK_LOGGER_NAME = 
"airflow.task.hooks.airflow.providers.docker.hooks.docker.DockerHook"
 
 
 @pytest.fixture
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 3244f3b7be..4aecebcd1d 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -184,6 +184,7 @@ serialized_simple_dag_ground_truth = {
                     },
                 },
                 "doc_md": "### Task Tutorial Documentation",
+                "_log_config_logger_name": "airflow.task.operators",
             },
             {
                 "task_id": "custom_task",
@@ -206,6 +207,7 @@ serialized_simple_dag_ground_truth = {
                 "is_setup": False,
                 "is_teardown": False,
                 "on_failure_fail_dagrun": False,
+                "_log_config_logger_name": "airflow.task.operators",
             },
         ],
         "schedule_interval": {"__type": "timedelta", "__var": 86400.0},
@@ -1208,7 +1210,8 @@ class TestStringifiedDAGs:
         base_operator = BaseOperator(task_id="10")
         fields = {k: v for (k, v) in vars(base_operator).items() if k in 
BaseOperator.get_serialized_fields()}
         assert fields == {
-            "_log": base_operator.log,
+            "_logger_name": None,
+            "_log_config_logger_name": "airflow.task.operators",
             "_post_execute_hook": None,
             "_pre_execute_hook": None,
             "depends_on_past": False,
diff --git a/tests/utils/test_logging_mixin.py 
b/tests/utils/test_logging_mixin.py
index 70c0da9fc2..bcc7a40885 100644
--- a/tests/utils/test_logging_mixin.py
+++ b/tests/utils/test_logging_mixin.py
@@ -24,7 +24,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.utils.log.logging_mixin import SetContextPropagate, 
StreamLogWriter, set_context
+from airflow.utils.log.logging_mixin import LoggingMixin, SetContextPropagate, 
StreamLogWriter, set_context
 
 
 @pytest.fixture
@@ -83,6 +83,42 @@ class TestLoggingMixin:
         handler1.set_context.assert_called_once_with(value)
         handler2.set_context.assert_called_once_with(value)
 
+    def test_default_logger_name(self):
+        """
+        Ensure that by default, object logger name is equals to its module and 
class path.
+        """
+
+        class DummyClass(LoggingMixin):
+            pass
+
+        assert DummyClass().log.name == 
"tests.utils.test_logging_mixin.DummyClass"
+
+    def test_logger_name_is_root_when_logger_name_is_empty_string(self):
+        """
+        Ensure that when `_logger_name` is set as an empty string, the 
resulting logger name is an empty
+        string too, which result in a logger with 'root' as name.
+        Note: Passing an empty string to `logging.getLogger` will create a 
logger with name 'root'.
+        """
+
+        class EmptyStringLogger(LoggingMixin):
+            _logger_name: str | None = ""
+
+        assert EmptyStringLogger().log.name == "root"
+
+    def test_log_config_logger_name_correctly_prefix_logger_name(self):
+        """
+        Ensure that when a class has `_log_config_logger_name`, it is used as 
prefix in the final logger
+        name.
+        """
+
+        class ClassWithParentLogConfig(LoggingMixin):
+            _log_config_logger_name: str = "airflow.tasks"
+
+        assert (
+            ClassWithParentLogConfig().log.name
+            == 
"airflow.tasks.tests.utils.test_logging_mixin.ClassWithParentLogConfig"
+        )
+
     def teardown_method(self):
         warnings.resetwarnings()
 

Reply via email to