This is an automated email from the ASF dual-hosted git repository. phanikumv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 5946e60f439 AIP-84 Fix sqlite test lock error on logs (#47016) 5946e60f439 is described below commit 5946e60f439cce89516b4ee55b64722b5641b02e Author: Pierre Jeambrun <pierrejb...@gmail.com> AuthorDate: Tue Feb 25 11:16:41 2025 +0100 AIP-84 Fix sqlite test lock error on logs (#47016) --- .../core_api/routes/public/test_assets.py | 11 ++-- .../core_api/routes/public/test_backfills.py | 6 +- .../core_api/routes/public/test_connections.py | 7 ++- .../core_api/routes/public/test_dag_parsing.py | 14 ++--- .../core_api/routes/public/test_dag_run.py | 2 + tests_common/test_utils/api_fastapi.py | 68 ++++++++++++++++++++++ 6 files changed, 86 insertions(+), 22 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 044f94f96c3..8db3b0a1158 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -39,10 +39,10 @@ from airflow.utils.session import provide_session from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType +from tests_common.test_utils.api_fastapi import _check_last_log from tests_common.test_utils.asserts import assert_queries_count -from tests_common.test_utils.db import clear_db_assets, clear_db_runs +from tests_common.test_utils.db import clear_db_assets, clear_db_logs, clear_db_runs from tests_common.test_utils.format_datetime import from_datetime_to_zulu_without_ms -from tests_common.test_utils.www import _check_last_log DEFAULT_DATE = datetime(2020, 6, 11, 18, 0, 0, tzinfo=timezone.utc) @@ -189,10 +189,7 @@ class TestAssets: def setup(self) -> None: clear_db_assets() clear_db_runs() - - def teardown_method(self) -> None: - clear_db_assets() - clear_db_runs() + clear_db_logs() @provide_session def create_assets(self, session, num: int = 2) -> list[AssetModel]: @@ -1095,7 +1092,7 @@ class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint): assert response.status_code == 204 adrq = session.query(AssetDagRunQueue).all() assert len(adrq) == 0 - _check_last_log(session, dag_id=dag_id, event="delete_dag_asset_queued_events", logical_date=None) + _check_last_log(session, dag_id=dag_id, event="delete_dag_asset_queued_event", logical_date=None) def test_should_respond_404(self, test_client): dag_id = "not_exists" diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index ca7b3f7cf87..6860cf988c8 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -33,13 +33,14 @@ from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.state import DagRunState +from tests_common.test_utils.api_fastapi import _check_last_log from tests_common.test_utils.db import ( clear_db_backfills, clear_db_dags, + clear_db_logs, clear_db_runs, clear_db_serialized_dags, ) -from tests_common.test_utils.www import _check_last_log pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] @@ -55,13 +56,12 @@ def _clean_db(): clear_db_runs() clear_db_dags() clear_db_serialized_dags() + clear_db_logs() @pytest.fixture(autouse=True) def clean_db(): _clean_db() - yield - _clean_db() def make_dags(): diff --git a/tests/api_fastapi/core_api/routes/public/test_connections.py b/tests/api_fastapi/core_api/routes/public/test_connections.py index f9cf0b8fd6e..a4ecdef94da 100644 --- a/tests/api_fastapi/core_api/routes/public/test_connections.py +++ b/tests/api_fastapi/core_api/routes/public/test_connections.py @@ -25,8 +25,8 @@ from airflow.models import Connection from airflow.secrets.environment_variables import CONN_ENV_PREFIX from airflow.utils.session import provide_session -from tests_common.test_utils.db import clear_db_connections -from tests_common.test_utils.www import _check_last_log +from tests_common.test_utils.api_fastapi import _check_last_log +from tests_common.test_utils.db import clear_db_connections, clear_db_logs pytestmark = pytest.mark.db_test @@ -81,6 +81,7 @@ class TestConnectionEndpoint: @pytest.fixture(autouse=True) def setup(self) -> None: clear_db_connections(False) + clear_db_logs() def teardown_method(self) -> None: clear_db_connections() @@ -582,7 +583,7 @@ class TestPatchConnection(TestConnectionEndpoint): response = test_client.patch(f"/public/connections/{TEST_CONN_ID}", json=body) assert response.status_code == 200 assert response.json() == expected_response - _check_last_log(session, dag_id=None, event="post_connection", logical_date=None, check_masked=True) + _check_last_log(session, dag_id=None, event="patch_connection", logical_date=None, check_masked=True) class TestConnection(TestConnectionEndpoint): diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py b/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py index f5c79427f2e..c6aa9b5025d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py @@ -23,10 +23,9 @@ from sqlalchemy import select from airflow.models import DagBag from airflow.models.dagbag import DagPriorityParsingRequest -from airflow.utils.session import provide_session -from tests_common.test_utils.db import clear_db_dag_parsing_requests, parse_and_sync_to_db -from tests_common.test_utils.www import _check_last_log +from tests_common.test_utils.api_fastapi import _check_last_log +from tests_common.test_utils.db import clear_db_dag_parsing_requests, clear_db_logs, parse_and_sync_to_db pytestmark = pytest.mark.db_test @@ -42,13 +41,10 @@ class TestDagParsingEndpoint: def clear_db(): clear_db_dag_parsing_requests() - @provide_session @pytest.fixture(autouse=True) - def setup(self, session=None) -> None: - self.clear_db() - - def teardown_method(self) -> None: + def setup(self, session) -> None: self.clear_db() + clear_db_logs() def test_201_and_400_requests(self, url_safe_serializer, session, test_client): parse_and_sync_to_db(self.EXAMPLE_DAG_FILE) @@ -60,13 +56,13 @@ class TestDagParsingEndpoint: assert response.status_code == 201 parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() assert parsing_requests[0].fileloc == test_dag.fileloc + _check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None) # Duplicate file parsing request response = test_client.put(url, headers={"Accept": "application/json"}) assert response.status_code == 409 parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all() assert parsing_requests[0].fileloc == test_dag.fileloc - _check_last_log(session, dag_id=None, event="reparse_dag_file", logical_date=None) def test_bad_file_request(self, url_safe_serializer, session, test_client): url = f"/public/parseDagFile/{url_safe_serializer.dumps('/some/random/file.py')}" diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index df4dc708d94..0176bef6248 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -37,6 +37,7 @@ from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.db import ( clear_db_dags, + clear_db_logs, clear_db_runs, clear_db_serialized_dags, ) @@ -83,6 +84,7 @@ def setup(request, dag_maker, session=None): clear_db_runs() clear_db_dags() clear_db_serialized_dags() + clear_db_logs() if "no_setup" in request.keywords: return diff --git a/tests_common/test_utils/api_fastapi.py b/tests_common/test_utils/api_fastapi.py new file mode 100644 index 00000000000..cf99a412c5a --- /dev/null +++ b/tests_common/test_utils/api_fastapi.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json + +from airflow.models import Log +from airflow.sdk.execution_time.secrets_masker import DEFAULT_SENSITIVE_FIELDS as sensitive_fields + + +def _masked_value_check(data, sensitive_fields): + """ + Recursively check if sensitive fields are properly masked. + + :param data: JSON object (dict, list, or value) + :param sensitive_fields: Set of sensitive field names + """ + if isinstance(data, dict): + for key, value in data.items(): + if key in sensitive_fields: + assert value == "***", f"Expected masked value for {key}, but got {value}" + else: + _masked_value_check(value, sensitive_fields) + elif isinstance(data, list): + for item in data: + _masked_value_check(item, sensitive_fields) + + +def _check_last_log(session, dag_id, event, logical_date, expected_extra=None, check_masked=False): + logs = ( + session.query( + Log.dag_id, + Log.task_id, + Log.event, + Log.logical_date, + Log.owner, + Log.extra, + ) + .filter( + Log.dag_id == dag_id, + Log.event == event, + Log.logical_date == logical_date, + ) + .order_by(Log.dttm.desc()) + .limit(1) + .all() + ) + assert len(logs) == 1 + assert logs[0].extra + if expected_extra: + assert json.loads(logs[0].extra) == expected_extra + if check_masked: + extra_json = json.loads(logs[0].extra) + _masked_value_check(extra_json, sensitive_fields)