This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch improve-warning-system
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 47322f1722798509ca2ffcadd83ff73dc244cd18
Author: Andrey Anshin <[email protected]>
AuthorDate: Mon Apr 8 16:18:02 2024 +0400

    Change capture warnings output format to the JSON
---
 tests/conftest.py                 | 243 +++++++++++++++++++-------------------
 tests/models/test_taskinstance.py |  12 +-
 2 files changed, 126 insertions(+), 129 deletions(-)

diff --git a/tests/conftest.py b/tests/conftest.py
index 6d102e7268..b38a512116 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -21,13 +21,14 @@ import json
 import os
 import platform
 import re
+import site
 import subprocess
 import sys
 import warnings
-from contextlib import ExitStack, suppress
+from contextlib import ExitStack, contextmanager, suppress
 from datetime import datetime, timedelta, timezone
 from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, NamedTuple
 
 import pytest
 import time_machine
@@ -68,8 +69,6 @@ for env_key in os.environ.copy():
         if not (ko := _KEEP_CONFIGS.get(section)) or not ("*" in ko or option 
in ko):
             del os.environ[env_key]
 
-DEFAULT_WARNING_OUTPUT_PATH = Path("warnings.txt")
-warning_output_path = DEFAULT_WARNING_OUTPUT_PATH
 SUPPORTED_DB_BACKENDS = ("sqlite", "postgres", "mysql")
 
 # A bit of a Hack - but we need to check args before they are parsed by pytest 
in order to
@@ -295,11 +294,22 @@ def pytest_addoption(parser):
         dest="db_cleanup",
         help="Disable DB clear before each test module.",
     )
+    group.addoption(
+        "--disable-capture-warnings",
+        action="store_true",
+        dest="disable_capture_warnings",
+        help="Disable internal capture warnings.",
+    )
     group.addoption(
         "--warning-output-path",
         action="store",
         dest="warning_output_path",
-        default=DEFAULT_WARNING_OUTPUT_PATH.resolve().as_posix(),
+        metavar="PATH",
+        help=(
+            "Path for resulting captured warnings. Absolute or relative to the 
`tests` directory. "
+            "If not provided or environment variable `CAPTURE_WARNINGS_OUTPUT` 
not set "
+            "then 'warnings.txt' will be used."
+        ),
     )
 
 
@@ -415,7 +425,25 @@ def pytest_configure(config: pytest.Config) -> None:
     config.addinivalue_line("markers", "enable_redact: do not mock redact 
secret masker")
 
     os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
-    configure_warning_output(config)
+
+    # Setup capture warnings
+    if not (warning_output_path := config.getoption("warning_output_path", 
default=None)):
+        if not (warning_output_path := 
os.environ.get("CAPTURE_WARNINGS_OUTPUT")):
+            warning_output_path = "warnings.txt"
+    warning_output_path = Path(warning_output_path)
+    if not warning_output_path.is_absolute():
+        warning_output_path = 
Path(__file__).resolve().parent.joinpath(warning_output_path)
+    try:
+        warning_output_path.parent.resolve(strict=True)
+    except Exception as ex:
+        msg = (
+            f"Unable resolve {os.fspath(warning_output_path.parent)!r} path 
for store warnings outputs. "
+            f"Original Error:\n {type(ex).__name__}: {ex}."
+        )
+        pytest.exit(msg, returncode=6)
+    config.option.warning_output_path = warning_output_path
+    if "ignore" in sys.warnoptions:
+        config.option.disable_capture_warnings = True
 
 
 def pytest_unconfigure(config):
@@ -1264,139 +1292,114 @@ def _disable_redact(request: pytest.FixtureRequest, 
mocker):
     return
 
 
-# The code below is a modified version of capture-warning code from
-# https://github.com/athinkingape/pytest-capture-warnings
[email protected]_cache(maxsize=None)
+def _sites_locations() -> tuple[str, ...]:
+    return tuple([*site.getsitepackages(), site.getusersitepackages()])
 
-# MIT License
-#
-# Portions Copyright (c) 2022 A Thinking Ape Entertainment Ltd.
-# Portions Copyright (c) 2022 Pyschojoker (Github)
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in 
all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-
-captured_warnings: dict[tuple[str, int, type[Warning], str], 
warnings.WarningMessage] = {}
-captured_warnings_count: dict[tuple[str, int, type[Warning], str], int] = {}
-# By set ``_ispytest=True`` in WarningsRecorder we suppress annoying warnings:
-# PytestDeprecationWarning: A private pytest class or function was used.
-warnings_recorder = pytest.WarningsRecorder(_ispytest=True)
-default_formatwarning = warnings_recorder._module.formatwarning  # type: 
ignore[attr-defined]
-default_showwarning = warnings_recorder._module.showwarning  # type: 
ignore[attr-defined]
 
[email protected]_cache(maxsize=None)
+def _resolve_warning_filepath(path: str, rootpath: str):
+    if path.startswith(_sites_locations()):
+        for site_loc in _sites_locations():
+            if path.startswith(site_loc):
+                return path[len(site_loc) :].lstrip(os.sep)
+    elif path.startswith(rootpath):
+        return path[len(rootpath) :].lstrip(os.sep)
+    return path
+
+
+class RecordedWarning(NamedTuple):
+    category: str
+    message: str
+    node_id: str
+    filename: str
+    lineno: int
+
+    @classmethod
+    def from_record(
+        cls, warning_message: warnings.WarningMessage, node_id: str, 
root_path: Path
+    ) -> RecordedWarning:
+        category = warning_message.category.__name__
+        if (category_module := warning_message.category.__module__) != 
"builtins":
+            category = f"{category_module}.{category}"
+        node_id, *_ = node_id.partition("[")
+        return cls(
+            category=category,
+            message=str(warning_message.message),
+            node_id=node_id,
+            filename=_resolve_warning_filepath(warning_message.filename, 
os.fspath(root_path)),
+            lineno=warning_message.lineno,
+        )
+
+    @property
+    def uniq_key(self):
+        return self.category, self.message, self.lineno, self.lineno
+
+    def as_dict(self) -> dict:
+        return {
+            "category": self.category,
+            "message": self.message,
+            "node_id": self.node_id,
+            "filename": self.filename,
+            "lineno": self.lineno,
+        }
 
[email protected](hookwrapper=True)
-def pytest_runtest_call(item):
-    """
-    Needed to grab the item.location information
-    """
-    if os.environ.get("PYTHONWARNINGS") == "ignore":
-        yield
-        return
 
+captured_warnings: dict[RecordedWarning, int] = {}
+
+
+@contextmanager
+def _capture_warnings(node_id: str, root_path: Path, storage: 
dict[RecordedWarning, int]):
     with warnings.catch_warnings(record=True) as records:
         yield
     for record in records:
-        quadruplet: tuple[str, int, type[Warning], str] = (
-            record.filename,
-            record.lineno,
-            record.category,
-            str(record.message),
-        )
-        if quadruplet in captured_warnings:
-            captured_warnings_count[quadruplet] += 1
-            continue
+        cap_warning = RecordedWarning.from_record(record, node_id, 
root_path=root_path)
+        if cap_warning not in storage:
+            storage[cap_warning] = 1
         else:
-            captured_warnings[quadruplet] = record
-            captured_warnings_count[quadruplet] = 1
+            storage[cap_warning] += 1
 
 
 @pytest.hookimpl(hookwrapper=True)
-def pytest_terminal_summary(terminalreporter, exitstatus, config=None):
-    pwd = os.path.realpath(os.curdir)
+def pytest_runtest_call(item: pytest.Item):
+    config = item.config
+    if config.option.disable_capture_warnings:
+        yield
+        return
 
-    def cut_path(path):
-        if path.startswith(pwd):
-            path = path[len(pwd) + 1 :]
-        if "/site-packages/" in path:
-            path = path.split("/site-packages/")[1]
-        return path
+    with _capture_warnings(item.nodeid, config.rootpath, 
storage=captured_warnings):
+        yield
 
-    def format_test_function_location(item):
-        return f"{item.location[0]}::{item.location[2]}:{item.location[1]}"
 
[email protected](hookwrapper=True)
+def pytest_terminal_summary(terminalreporter, exitstatus: int, config: 
pytest.Config):
     yield
+    if config.option.disable_capture_warnings:
+        return
 
-    if captured_warnings:
-        terminalreporter.section(
-            f"Warning summary. Total: {sum(captured_warnings_count.values())}, 
"
-            f"Unique: {len(captured_warnings.values())}",
-            yellow=True,
-            bold=True,
-        )
-        warnings_as_json = []
-
-        for warning in captured_warnings.values():
-            serialized_warning = {
-                x: str(getattr(warning.message, x)) for x in 
dir(warning.message) if not x.startswith("__")
-            }
-
-            serialized_warning.update(
-                {
-                    "path": cut_path(warning.filename),
-                    "lineno": warning.lineno,
-                    "count": 1,
-                    "warning_message": str(warning.message),
-                }
-            )
-
-            # How we format the warnings: pylint parseable format
-            # {path}:{line}: [{msg_id}({symbol}), {obj}] {msg}
-            # Always:
-            # {path}:{line}: [W0513(warning), ] {msg}
-
-            if "with_traceback" in serialized_warning:
-                del serialized_warning["with_traceback"]
-            warnings_as_json.append(serialized_warning)
-
-        with warning_output_path.open("w") as f:
-            for i in warnings_as_json:
-                f.write(f'{i["path"]}:{i["lineno"]}: [W0513(warning), ] 
{i["warning_message"]}')
-                f.write("\n")
-        terminalreporter.write("Warnings saved into ")
-        terminalreporter.write(os.fspath(warning_output_path), yellow=True)
-        terminalreporter.write(" file.\n")
-    else:
-        # nothing, clear file
-        with warning_output_path.open("w") as f:
-            pass
-
+    warning_output_path = config.option.warning_output_path
+    if warning_output_path.exists():
+        warning_output_path.open("w").close()
 
-def configure_warning_output(config):
-    global warning_output_path
-    warning_output_path = Path(config.getoption("warning_output_path"))
-    if (
-        "CAPTURE_WARNINGS_OUTPUT" in os.environ
-        and warning_output_path.resolve() != 
DEFAULT_WARNING_OUTPUT_PATH.resolve()
-    ):
-        warning_output_path = os.environ["CAPTURE_WARNINGS_OUTPUT"]
+    if not captured_warnings:
+        return
 
+    terminalreporter.section(
+        f"Warning summary. Total: {sum(captured_warnings.values())}, "
+        f"Unique: {len({cw.uniq_key for cw in captured_warnings})}",
+        yellow=True,
+        bold=True,
+    )
+    with warning_output_path.open("w") as f:
+        for warn, warn_count in captured_warnings.items():
+            record = warn.as_dict()
+            record["count"] = warn_count
+            f.write(json.dumps(record))
+            f.write("\n")
+    terminalreporter.write("Warnings saved into ")
+    terminalreporter.write(os.fspath(warning_output_path), yellow=True)
+    terminalreporter.write(" file.\n")
 
-# End of modified code from  
https://github.com/athinkingape/pytest-capture-warnings
 
 if TYPE_CHECKING:
     # Static checkers do not know about pytest fixtures' types and return,
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 35c4b745b7..10546afd8c 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -87,7 +87,6 @@ from airflow.ti_deps.deps.base_ti_dep import TIDepStatus
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep, 
_UpstreamTIStates
 from airflow.utils import timezone
 from airflow.utils.db import merge_conn
-from airflow.utils.module_loading import qualname
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.task_group import TaskGroup
@@ -2855,10 +2854,7 @@ class TestTaskInstance:
             State.FAILED,
         ],
     )
-    @patch("logging.Logger.exception")
-    def test_finished_callbacks_handle_and_log_exception(
-        self, mock_log, finished_state, create_task_instance
-    ):
+    def test_finished_callbacks_handle_and_log_exception(self, finished_state, 
create_task_instance, caplog):
         called = completed = False
 
         def on_finish_callable(context):
@@ -2868,14 +2864,12 @@ class TestTaskInstance:
             completed = True
 
         for callback_input in [[on_finish_callable], on_finish_callable]:
+            caplog.clear()
             _run_finished_callback(callbacks=callback_input, context={})
 
             assert called
             assert not completed
-            callback_name = callback_input[0] if isinstance(callback_input, 
list) else callback_input
-            callback_name = qualname(callback_name).split(".")[-1]
-            expected_message = "Error when executing %s callback"
-            mock_log.assert_called_with(expected_message, callback_name)
+            assert "Error when executing on_finish_callable callback" in 
caplog.text
 
     @provide_session
     def test_handle_failure(self, create_dummy_dag, session=None):

Reply via email to