This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 06296f705270b60f595221f2539bde7a5ecb271d Author: Alexis BRENON <[email protected]> AuthorDate: Wed Apr 3 16:19:09 2024 +0200 fix(google,log): Avoid log name overriding (#38071) Avoid the use of the generic `name` attribute which is overrode by the dict configurator. (cherry picked from commit 091d5e62678de943adfce1b8a8cd94dccbbfa98b) --- airflow/config_templates/airflow_local_settings.py | 2 +- .../google/cloud/log/stackdriver_task_handler.py | 21 ++++++++++++---- .../logging/stackdriver.rst | 4 ++-- newsfragments/38071.significant.rst | 26 ++++++++++++++++++++ .../cloud/log/test_stackdriver_task_handler.py | 28 ++++++++++++++++++++++ 5 files changed, 74 insertions(+), 7 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 02cbfdcfa1..bb0812ca10 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -286,7 +286,7 @@ if REMOTE_LOGGING: "task": { "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", "formatter": "airflow", - "name": log_name, + "gcp_log_name": log_name, "gcp_key_path": key_path, } } diff --git a/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/airflow/providers/google/cloud/log/stackdriver_task_handler.py index bfeee9f12a..f5e9beb05f 100644 --- a/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -19,6 +19,7 @@ from __future__ import annotations import logging +import warnings from functools import cached_property from typing import TYPE_CHECKING, Collection from urllib.parse import urlencode @@ -29,9 +30,11 @@ from google.cloud.logging.handlers.transports import BackgroundThreadTransport, from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse +from airflow.exceptions import RemovedInAirflow3Warning from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id from airflow.providers.google.common.consts import CLIENT_INFO from airflow.utils.log.trigger_handler import ctx_indiv_trigger +from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: from google.auth.credentials import Credentials @@ -92,15 +95,25 @@ class StackdriverTaskHandler(logging.Handler): self, gcp_key_path: str | None = None, scopes: Collection[str] | None = _DEFAULT_SCOPESS, - name: str = DEFAULT_LOGGER_NAME, + name: str | ArgNotSet = NOTSET, transport: type[Transport] = BackgroundThreadTransport, resource: Resource = _GLOBAL_RESOURCE, labels: dict[str, str] | None = None, + gcp_log_name: str = DEFAULT_LOGGER_NAME, ): + if name is not NOTSET: + warnings.warn( + "Param `name` is deprecated and will be removed in a future release. " + "Please use `gcp_log_name` instead. ", + RemovedInAirflow3Warning, + stacklevel=2, + ) + gcp_log_name = str(name) + super().__init__() self.gcp_key_path: str | None = gcp_key_path self.scopes: Collection[str] | None = scopes - self.name: str = name + self.gcp_log_name: str = gcp_log_name self.transport_type: type[Transport] = transport self.resource: Resource = resource self.labels: dict[str, str] | None = labels @@ -140,7 +153,7 @@ class StackdriverTaskHandler(logging.Handler): """Object responsible for sending data to Stackdriver.""" # The Transport object is badly defined (no init) but in the docs client/name as constructor # arguments are a requirement for any class that derives from Transport class, hence ignore: - return self.transport_type(self._client, self.name) # type: ignore[call-arg] + return self.transport_type(self._client, self.gcp_log_name) # type: ignore[call-arg] def _get_labels(self, task_instance=None): if task_instance: @@ -245,7 +258,7 @@ class StackdriverTaskHandler(logging.Handler): _, project = self._credentials_and_project log_filters = [ f"resource.type={escale_label_value(self.resource.type)}", - f'logName="projects/{project}/logs/{self.name}"', + f'logName="projects/{project}/logs/{self.gcp_log_name}"', ] for key, value in self.resource.labels.items(): diff --git a/docs/apache-airflow-providers-google/logging/stackdriver.rst b/docs/apache-airflow-providers-google/logging/stackdriver.rst index 60943bda70..5a7634fc00 100644 --- a/docs/apache-airflow-providers-google/logging/stackdriver.rst +++ b/docs/apache-airflow-providers-google/logging/stackdriver.rst @@ -33,7 +33,7 @@ example: # location. If remote_logging is set to true, see UPDATING.md for additional # configuration requirements. remote_logging = True - remote_base_log_folder = stackdriver://logs-name + remote_base_log_folder = stackdriver:///logs-name All configuration options are in the ``[logging]`` section. @@ -50,7 +50,7 @@ Turning this option off will result in data not being sent to Stackdriver. The ``remote_base_log_folder`` option contains the URL that specifies the type of handler to be used. For integration with Stackdriver, this option should start with ``stackdriver://``. -The path section of the URL specifies the name of the log e.g. ``stackdriver://airflow-tasks`` writes +The path section of the URL specifies the name of the log e.g. ``stackdriver:///airflow-tasks`` writes logs under the name ``airflow-tasks``. You can set ``google_key_path`` option in the ``[logging]`` section to specify the path to `the service diff --git a/newsfragments/38071.significant.rst b/newsfragments/38071.significant.rst new file mode 100644 index 0000000000..6193de613f --- /dev/null +++ b/newsfragments/38071.significant.rst @@ -0,0 +1,26 @@ +Rename the ``name`` attribute of the StackdriverTaskHandler to ``gcp_log_name`` to avoid name overriding by the the ``DictConfigurator``. + +Airflow relies on the ``logging.config.dictConfig`` (`documentation <https://docs.python.org/3/library/logging.config.html>`_) method +to `setup the logging stack <https://github.com/apache/airflow/blob/a58441ca1b263cae61a5bb653e6839f0dd29b08e/airflow/logging_config.py#L69>`_. +However, during this setup, it iterates through the handlers and +`explicitly sets their name <https://github.com/python/cpython/blob/2a4cbf17af19a01d942f9579342f77c39fbd23c4/Lib/logging/config.py#L578>`_: + +.. code-block:: python + + for name in sorted(handlers): + try: + handler = self.configure_handler(handlers[name]) + handler.name = name + handlers[name] = handler + except Exception as e: + # [...] + pass + +So, before this fix: + +#. You setup the remote logging through the environment variables ``AIRFLOW__LOGGING__REMOTE_LOGGING="true"`` and ``AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER="stackdriver://host/path"``. +#. Airflow instantiates a ``StackdriverTaskHandler`` with the name of ``"path"`` +#. **BUT** the ``dictConfig`` call overrides the name of the handler with the key of the handlers configuration (i.e. `task <https://github.com/apache/airflow/blob/a58441ca1b263cae61a5bb653e6839f0dd29b08e/airflow/config_templates/airflow_local_settings.py#L350>`_). +#. Hence, the next calls to the ``emit`` method of the handler will generate logs to the wrong destination (``task`` instead of ``path``). + +Changing the field, from ``name`` to ``gcp_log_name`` prevents the overriding from the dictConfig. diff --git a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py index b3acbd0afa..5c01b9ff52 100644 --- a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py +++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py @@ -67,6 +67,34 @@ def test_should_pass_message_to_client(mock_client, mock_get_creds_and_project_i mock_client.assert_called_once_with(credentials="creds", client_info=mock.ANY, project="project_id") [email protected]("clean_stackdriver_handlers") [email protected]("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") [email protected]("airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client") +def test_should_use_configured_log_name(mock_client, mock_get_creds_and_project_id): + mock_get_creds_and_project_id.return_value = ("creds", "project_id") + + with mock.patch.dict( + "os.environ", + AIRFLOW__LOGGING__REMOTE_LOGGING="true", + AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER="stackdriver://host/path", + ): + import importlib + import logging + + from airflow import settings + from airflow.config_templates import airflow_local_settings + + importlib.reload(airflow_local_settings) + settings.configure_logging() + + logger = logging.getLogger("airflow.task") + handler = logger.handlers[0] + assert isinstance(handler, StackdriverTaskHandler) + with mock.patch.object(handler, "transport_type") as transport_type_mock: + logger.error("foo") + transport_type_mock.assert_called_once_with(mock_client.return_value, "path") + + @pytest.mark.db_test class TestStackdriverLoggingHandlerTask: DAG_ID = "dag_for_testing_stackdriver_file_task_handler"
