kaxil commented on code in PR #48491:
URL: https://github.com/apache/airflow/pull/48491#discussion_r2021524915
##########
airflow-core/src/airflow/logging_config.py:
##########
@@ -19,40 +19,65 @@
import logging
import warnings
+from contextlib import suppress
from logging.config import dictConfig
+from typing import TYPE_CHECKING, Any
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string
+if TYPE_CHECKING:
+ from airflow.logging.remote import RemoteLogIO
+
log = logging.getLogger(__name__)
-def configure_logging():
+REMOTE_TASK_LOG: RemoteLogIO | None
+
+
+def __getattr__(name: str):
+ if name == "REMOTE_TASK_LOG":
+ global REMOTE_TASK_LOG
+ load_logging_config()
+ return REMOTE_TASK_LOG
+
+
+def load_logging_config() -> tuple[dict[str, Any], str]:
"""Configure & Validate Airflow Logging."""
- logging_class_path = ""
- try:
- logging_class_path = conf.get("logging", "logging_config_class")
- except AirflowConfigException:
- log.debug("Could not find key logging_config_class in config")
+ global REMOTE_TASK_LOG
+ logging_class_path = conf.get("logging", "logging_config_class",
fallback=None)
- if logging_class_path:
- try:
- logging_config = import_string(logging_class_path)
+ user_defined = logging_class_path is not None
- # Make sure that the variable is in scope
- if not isinstance(logging_config, dict):
- raise ValueError("Logging Config should be of dict type")
+ logging_class_path = (
+ logging_class_path or
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
Review Comment:
Why not add this to fallback on L49 instead?
##########
airflow-core/src/airflow/logging_config.py:
##########
@@ -19,40 +19,65 @@
import logging
import warnings
+from contextlib import suppress
from logging.config import dictConfig
+from typing import TYPE_CHECKING, Any
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string
+if TYPE_CHECKING:
+ from airflow.logging.remote import RemoteLogIO
+
log = logging.getLogger(__name__)
-def configure_logging():
+REMOTE_TASK_LOG: RemoteLogIO | None
+
+
+def __getattr__(name: str):
+ if name == "REMOTE_TASK_LOG":
+ global REMOTE_TASK_LOG
+ load_logging_config()
+ return REMOTE_TASK_LOG
+
+
+def load_logging_config() -> tuple[dict[str, Any], str]:
"""Configure & Validate Airflow Logging."""
- logging_class_path = ""
- try:
- logging_class_path = conf.get("logging", "logging_config_class")
- except AirflowConfigException:
- log.debug("Could not find key logging_config_class in config")
+ global REMOTE_TASK_LOG
+ logging_class_path = conf.get("logging", "logging_config_class",
fallback=None)
- if logging_class_path:
- try:
- logging_config = import_string(logging_class_path)
+ user_defined = logging_class_path is not None
- # Make sure that the variable is in scope
- if not isinstance(logging_config, dict):
- raise ValueError("Logging Config should be of dict type")
+ logging_class_path = (
+ logging_class_path or
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
+ )
+ try:
+ logging_config = import_string(logging_class_path)
+ # Make sure that the variable is in scope
+ if not isinstance(logging_config, dict):
+ raise ValueError("Logging Config should be of dict type")
+
+ if user_defined:
log.info("Successfully imported user-defined logging config from
%s", logging_class_path)
- except Exception as err:
- # Import default logging configurations.
- raise ImportError(f"Unable to load custom logging from
{logging_class_path} due to {err}")
+
+ except Exception as err:
+ # Import default logging configurations.
+ raise ImportError(
+ f"Unable to load {'custom ' if user_defined else ''} logging
config from {logging_class_path} due to {err}"
+ )
else:
- logging_class_path =
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
- logging_config = import_string(logging_class_path)
- log.debug("Unable to load custom logging, using default config
instead")
+ mod = logging_class_path.rsplit(".", 1)[0]
+ with suppress(ImportError):
Review Comment:
Remove suppress?
##########
airflow-core/src/airflow/logging/remote.py:
##########
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING, Protocol
+
+import attrs
+
+if TYPE_CHECKING:
+ import structlog.typing
+
+ from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
+
+
[email protected]
+class RemoteLogIO(Protocol):
+ """Interface for remote task loggers."""
+
+ @property
+ def processors(self) -> tuple[structlog.typing.Processor, ...]: ...
+
+ """
+ List of structlog processors to install in the task write path.
+
+ This is useful if a remote logging provider wants to either transform the
structured log messages as they
+ are being written to a file, or if you want to upload messages as they are
generated.
+ """
+
+ def upload(self, path: os.PathLike | str):
+ """Upload the given log path to the remote storage."""
+
+ def read(self, relative_path: str) -> tuple[LogSourceInfo, LogMessages |
None]:
+ """Read logs from the given remote log path."""
+ raise NotImplementedError()
Review Comment:
Since this is a protocol, do we need `raise NotImplementedError()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]