This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 579ce065c5 Override base log folder by using task handler's
base_log_folder (#32781)
579ce065c5 is described below
commit 579ce065c5b0865b4aea580ee2d05c89680b7a8b
Author: Owen Leung <[email protected]>
AuthorDate: Thu Jul 27 01:46:45 2023 +0800
Override base log folder by using task handler's base_log_folder (#32781)
* Override base log folder by using task handler's base_log_folder
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/utils/serve_logs.py | 25 ++++++++++++++++++++++++-
tests/utils/test_serve_logs.py | 40 +++++++++++++++++++++++++++++++++++-----
2 files changed, 59 insertions(+), 6 deletions(-)
diff --git a/airflow/utils/serve_logs.py b/airflow/utils/serve_logs.py
index 8c1d62722a..bae8a97da9 100644
--- a/airflow/utils/serve_logs.py
+++ b/airflow/utils/serve_logs.py
@@ -36,6 +36,7 @@ from setproctitle import setproctitle
from airflow.configuration import conf
from airflow.utils.docs import get_docs_url
from airflow.utils.jwt_signer import JWTSigner
+from airflow.utils.module_loading import import_string
logger = logging.getLogger(__name__)
@@ -44,7 +45,29 @@ def create_app():
flask_app = Flask(__name__, static_folder=None)
expiration_time_in_seconds = conf.getint("webserver",
"log_request_clock_grace", fallback=30)
log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
-
+ log_config_class = conf.get("logging", "logging_config_class")
+ if log_config_class:
+ logger.info("Detected user-defined logging config. Attempting to load
%s", log_config_class)
+ try:
+ logging_config = import_string(log_config_class)
+ try:
+ base_log_folder =
logging_config["handlers"]["task"]["base_log_folder"]
+ except KeyError:
+ base_log_folder = None
+ if base_log_folder is not None:
+ log_directory = base_log_folder
+ logger.info(
+ "Successfully imported user-defined logging config. Flask
App will serve log from %s",
+ log_directory,
+ )
+ else:
+ logger.warning(
+ "User-defined logging config does not specify
'base_log_folder'. "
+ "Flask App will use default log directory %s",
+ base_log_folder,
+ )
+ except Exception as e:
+ raise ImportError(f"Unable to load {log_config_class} due to
error: {e}")
signer = JWTSigner(
secret_key=conf.get("webserver", "secret_key"),
expiration_time_in_seconds=expiration_time_in_seconds,
diff --git a/tests/utils/test_serve_logs.py b/tests/utils/test_serve_logs.py
index 5288e646ed..5b751312d3 100644
--- a/tests/utils/test_serve_logs.py
+++ b/tests/utils/test_serve_logs.py
@@ -17,12 +17,14 @@
from __future__ import annotations
import datetime
+from pathlib import Path
from typing import TYPE_CHECKING
import jwt
import pytest
import time_machine
+from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
from airflow.utils.jwt_signer import JWTSigner
from airflow.utils.serve_logs import create_app
from tests.test_utils.config import conf_vars
@@ -34,18 +36,46 @@ LOG_DATA = "Airflow log data" * 20
@pytest.fixture
-def client(tmpdir):
- with conf_vars({("logging", "base_log_folder"): str(tmpdir)}):
+def client_without_config(tmp_path):
+ with conf_vars({("logging", "base_log_folder"): tmp_path.as_posix()}):
app = create_app()
yield app.test_client()
@pytest.fixture
-def sample_log(tmpdir):
- f = tmpdir / "sample.log"
- f.write(LOG_DATA.encode())
+def client_with_config():
+ with conf_vars(
+ {
+ (
+ "logging",
+ "logging_config_class",
+ ):
"airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
+ }
+ ):
+ app = create_app()
+
+ yield app.test_client()
+
[email protected](params=["client_without_config", "client_with_config"])
+def client(request):
+ return request.getfixturevalue(request.param)
+
+
[email protected]
+def sample_log(request, tmp_path):
+ client = request.getfixturevalue("client")
+
+ if client == request.getfixturevalue("client_without_config"):
+ base_log_dir = tmp_path
+ elif client == request.getfixturevalue("client_with_config"):
+ base_log_dir =
Path(DEFAULT_LOGGING_CONFIG["handlers"]["task"]["base_log_folder"])
+ else:
+ raise ValueError(f"Unknown client fixture: {client}")
+
+ f = base_log_dir.joinpath("sample.log")
+ f.write_bytes(LOG_DATA.encode())
return f