This is an automated email from the ASF dual-hosted git repository. taragolis pushed a commit to branch improve-warning-system in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 47322f1722798509ca2ffcadd83ff73dc244cd18 Author: Andrey Anshin <[email protected]> AuthorDate: Mon Apr 8 16:18:02 2024 +0400 Change capture warnings output format to the JSON --- tests/conftest.py | 243 +++++++++++++++++++------------------- tests/models/test_taskinstance.py | 12 +- 2 files changed, 126 insertions(+), 129 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 6d102e7268..b38a512116 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,13 +21,14 @@ import json import os import platform import re +import site import subprocess import sys import warnings -from contextlib import ExitStack, suppress +from contextlib import ExitStack, contextmanager, suppress from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, NamedTuple import pytest import time_machine @@ -68,8 +69,6 @@ for env_key in os.environ.copy(): if not (ko := _KEEP_CONFIGS.get(section)) or not ("*" in ko or option in ko): del os.environ[env_key] -DEFAULT_WARNING_OUTPUT_PATH = Path("warnings.txt") -warning_output_path = DEFAULT_WARNING_OUTPUT_PATH SUPPORTED_DB_BACKENDS = ("sqlite", "postgres", "mysql") # A bit of a Hack - but we need to check args before they are parsed by pytest in order to @@ -295,11 +294,22 @@ def pytest_addoption(parser): dest="db_cleanup", help="Disable DB clear before each test module.", ) + group.addoption( + "--disable-capture-warnings", + action="store_true", + dest="disable_capture_warnings", + help="Disable internal capture warnings.", + ) group.addoption( "--warning-output-path", action="store", dest="warning_output_path", - default=DEFAULT_WARNING_OUTPUT_PATH.resolve().as_posix(), + metavar="PATH", + help=( + "Path for resulting captured warnings. Absolute or relative to the `tests` directory. " + "If not provided or environment variable `CAPTURE_WARNINGS_OUTPUT` not set " + "then 'warnings.txt' will be used." + ), ) @@ -415,7 +425,25 @@ def pytest_configure(config: pytest.Config) -> None: config.addinivalue_line("markers", "enable_redact: do not mock redact secret masker") os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1" - configure_warning_output(config) + + # Setup capture warnings + if not (warning_output_path := config.getoption("warning_output_path", default=None)): + if not (warning_output_path := os.environ.get("CAPTURE_WARNINGS_OUTPUT")): + warning_output_path = "warnings.txt" + warning_output_path = Path(warning_output_path) + if not warning_output_path.is_absolute(): + warning_output_path = Path(__file__).resolve().parent.joinpath(warning_output_path) + try: + warning_output_path.parent.resolve(strict=True) + except Exception as ex: + msg = ( + f"Unable resolve {os.fspath(warning_output_path.parent)!r} path for store warnings outputs. " + f"Original Error:\n {type(ex).__name__}: {ex}." + ) + pytest.exit(msg, returncode=6) + config.option.warning_output_path = warning_output_path + if "ignore" in sys.warnoptions: + config.option.disable_capture_warnings = True def pytest_unconfigure(config): @@ -1264,139 +1292,114 @@ def _disable_redact(request: pytest.FixtureRequest, mocker): return -# The code below is a modified version of capture-warning code from -# https://github.com/athinkingape/pytest-capture-warnings [email protected]_cache(maxsize=None) +def _sites_locations() -> tuple[str, ...]: + return tuple([*site.getsitepackages(), site.getusersitepackages()]) -# MIT License -# -# Portions Copyright (c) 2022 A Thinking Ape Entertainment Ltd. -# Portions Copyright (c) 2022 Pyschojoker (Github) -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -captured_warnings: dict[tuple[str, int, type[Warning], str], warnings.WarningMessage] = {} -captured_warnings_count: dict[tuple[str, int, type[Warning], str], int] = {} -# By set ``_ispytest=True`` in WarningsRecorder we suppress annoying warnings: -# PytestDeprecationWarning: A private pytest class or function was used. -warnings_recorder = pytest.WarningsRecorder(_ispytest=True) -default_formatwarning = warnings_recorder._module.formatwarning # type: ignore[attr-defined] -default_showwarning = warnings_recorder._module.showwarning # type: ignore[attr-defined] [email protected]_cache(maxsize=None) +def _resolve_warning_filepath(path: str, rootpath: str): + if path.startswith(_sites_locations()): + for site_loc in _sites_locations(): + if path.startswith(site_loc): + return path[len(site_loc) :].lstrip(os.sep) + elif path.startswith(rootpath): + return path[len(rootpath) :].lstrip(os.sep) + return path + + +class RecordedWarning(NamedTuple): + category: str + message: str + node_id: str + filename: str + lineno: int + + @classmethod + def from_record( + cls, warning_message: warnings.WarningMessage, node_id: str, root_path: Path + ) -> RecordedWarning: + category = warning_message.category.__name__ + if (category_module := warning_message.category.__module__) != "builtins": + category = f"{category_module}.{category}" + node_id, *_ = node_id.partition("[") + return cls( + category=category, + message=str(warning_message.message), + node_id=node_id, + filename=_resolve_warning_filepath(warning_message.filename, os.fspath(root_path)), + lineno=warning_message.lineno, + ) + + @property + def uniq_key(self): + return self.category, self.message, self.lineno, self.lineno + + def as_dict(self) -> dict: + return { + "category": self.category, + "message": self.message, + "node_id": self.node_id, + "filename": self.filename, + "lineno": self.lineno, + } [email protected](hookwrapper=True) -def pytest_runtest_call(item): - """ - Needed to grab the item.location information - """ - if os.environ.get("PYTHONWARNINGS") == "ignore": - yield - return +captured_warnings: dict[RecordedWarning, int] = {} + + +@contextmanager +def _capture_warnings(node_id: str, root_path: Path, storage: dict[RecordedWarning, int]): with warnings.catch_warnings(record=True) as records: yield for record in records: - quadruplet: tuple[str, int, type[Warning], str] = ( - record.filename, - record.lineno, - record.category, - str(record.message), - ) - if quadruplet in captured_warnings: - captured_warnings_count[quadruplet] += 1 - continue + cap_warning = RecordedWarning.from_record(record, node_id, root_path=root_path) + if cap_warning not in storage: + storage[cap_warning] = 1 else: - captured_warnings[quadruplet] = record - captured_warnings_count[quadruplet] = 1 + storage[cap_warning] += 1 @pytest.hookimpl(hookwrapper=True) -def pytest_terminal_summary(terminalreporter, exitstatus, config=None): - pwd = os.path.realpath(os.curdir) +def pytest_runtest_call(item: pytest.Item): + config = item.config + if config.option.disable_capture_warnings: + yield + return - def cut_path(path): - if path.startswith(pwd): - path = path[len(pwd) + 1 :] - if "/site-packages/" in path: - path = path.split("/site-packages/")[1] - return path + with _capture_warnings(item.nodeid, config.rootpath, storage=captured_warnings): + yield - def format_test_function_location(item): - return f"{item.location[0]}::{item.location[2]}:{item.location[1]}" [email protected](hookwrapper=True) +def pytest_terminal_summary(terminalreporter, exitstatus: int, config: pytest.Config): yield + if config.option.disable_capture_warnings: + return - if captured_warnings: - terminalreporter.section( - f"Warning summary. Total: {sum(captured_warnings_count.values())}, " - f"Unique: {len(captured_warnings.values())}", - yellow=True, - bold=True, - ) - warnings_as_json = [] - - for warning in captured_warnings.values(): - serialized_warning = { - x: str(getattr(warning.message, x)) for x in dir(warning.message) if not x.startswith("__") - } - - serialized_warning.update( - { - "path": cut_path(warning.filename), - "lineno": warning.lineno, - "count": 1, - "warning_message": str(warning.message), - } - ) - - # How we format the warnings: pylint parseable format - # {path}:{line}: [{msg_id}({symbol}), {obj}] {msg} - # Always: - # {path}:{line}: [W0513(warning), ] {msg} - - if "with_traceback" in serialized_warning: - del serialized_warning["with_traceback"] - warnings_as_json.append(serialized_warning) - - with warning_output_path.open("w") as f: - for i in warnings_as_json: - f.write(f'{i["path"]}:{i["lineno"]}: [W0513(warning), ] {i["warning_message"]}') - f.write("\n") - terminalreporter.write("Warnings saved into ") - terminalreporter.write(os.fspath(warning_output_path), yellow=True) - terminalreporter.write(" file.\n") - else: - # nothing, clear file - with warning_output_path.open("w") as f: - pass - + warning_output_path = config.option.warning_output_path + if warning_output_path.exists(): + warning_output_path.open("w").close() -def configure_warning_output(config): - global warning_output_path - warning_output_path = Path(config.getoption("warning_output_path")) - if ( - "CAPTURE_WARNINGS_OUTPUT" in os.environ - and warning_output_path.resolve() != DEFAULT_WARNING_OUTPUT_PATH.resolve() - ): - warning_output_path = os.environ["CAPTURE_WARNINGS_OUTPUT"] + if not captured_warnings: + return + terminalreporter.section( + f"Warning summary. Total: {sum(captured_warnings.values())}, " + f"Unique: {len({cw.uniq_key for cw in captured_warnings})}", + yellow=True, + bold=True, + ) + with warning_output_path.open("w") as f: + for warn, warn_count in captured_warnings.items(): + record = warn.as_dict() + record["count"] = warn_count + f.write(json.dumps(record)) + f.write("\n") + terminalreporter.write("Warnings saved into ") + terminalreporter.write(os.fspath(warning_output_path), yellow=True) + terminalreporter.write(" file.\n") -# End of modified code from https://github.com/athinkingape/pytest-capture-warnings if TYPE_CHECKING: # Static checkers do not know about pytest fixtures' types and return, diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 35c4b745b7..10546afd8c 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -87,7 +87,6 @@ from airflow.ti_deps.deps.base_ti_dep import TIDepStatus from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep, _UpstreamTIStates from airflow.utils import timezone from airflow.utils.db import merge_conn -from airflow.utils.module_loading import qualname from airflow.utils.session import create_session, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.task_group import TaskGroup @@ -2855,10 +2854,7 @@ class TestTaskInstance: State.FAILED, ], ) - @patch("logging.Logger.exception") - def test_finished_callbacks_handle_and_log_exception( - self, mock_log, finished_state, create_task_instance - ): + def test_finished_callbacks_handle_and_log_exception(self, finished_state, create_task_instance, caplog): called = completed = False def on_finish_callable(context): @@ -2868,14 +2864,12 @@ class TestTaskInstance: completed = True for callback_input in [[on_finish_callable], on_finish_callable]: + caplog.clear() _run_finished_callback(callbacks=callback_input, context={}) assert called assert not completed - callback_name = callback_input[0] if isinstance(callback_input, list) else callback_input - callback_name = qualname(callback_name).split(".")[-1] - expected_message = "Error when executing %s callback" - mock_log.assert_called_with(expected_message, callback_name) + assert "Error when executing on_finish_callable callback" in caplog.text @provide_session def test_handle_failure(self, create_dummy_dag, session=None):
