This is an automated email from the ASF dual-hosted git repository.

phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5946e60f439 AIP-84 Fix sqlite test lock error on logs (#47016)
5946e60f439 is described below

commit 5946e60f439cce89516b4ee55b64722b5641b02e
Author: Pierre Jeambrun <pierrejb...@gmail.com>
AuthorDate: Tue Feb 25 11:16:41 2025 +0100

    AIP-84 Fix sqlite test lock error on logs (#47016)
---
 .../core_api/routes/public/test_assets.py          | 11 ++--
 .../core_api/routes/public/test_backfills.py       |  6 +-
 .../core_api/routes/public/test_connections.py     |  7 ++-
 .../core_api/routes/public/test_dag_parsing.py     | 14 ++---
 .../core_api/routes/public/test_dag_run.py         |  2 +
 tests_common/test_utils/api_fastapi.py             | 68 ++++++++++++++++++++++
 6 files changed, 86 insertions(+), 22 deletions(-)

diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 044f94f96c3..8db3b0a1158 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -39,10 +39,10 @@ from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
+from tests_common.test_utils.api_fastapi import _check_last_log
 from tests_common.test_utils.asserts import assert_queries_count
-from tests_common.test_utils.db import clear_db_assets, clear_db_runs
+from tests_common.test_utils.db import clear_db_assets, clear_db_logs, 
clear_db_runs
 from tests_common.test_utils.format_datetime import 
from_datetime_to_zulu_without_ms
-from tests_common.test_utils.www import _check_last_log
 
 DEFAULT_DATE = datetime(2020, 6, 11, 18, 0, 0, tzinfo=timezone.utc)
 
@@ -189,10 +189,7 @@ class TestAssets:
     def setup(self) -> None:
         clear_db_assets()
         clear_db_runs()
-
-    def teardown_method(self) -> None:
-        clear_db_assets()
-        clear_db_runs()
+        clear_db_logs()
 
     @provide_session
     def create_assets(self, session, num: int = 2) -> list[AssetModel]:
@@ -1095,7 +1092,7 @@ class 
TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint):
         assert response.status_code == 204
         adrq = session.query(AssetDagRunQueue).all()
         assert len(adrq) == 0
-        _check_last_log(session, dag_id=dag_id, 
event="delete_dag_asset_queued_events", logical_date=None)
+        _check_last_log(session, dag_id=dag_id, 
event="delete_dag_asset_queued_event", logical_date=None)
 
     def test_should_respond_404(self, test_client):
         dag_id = "not_exists"
diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py 
b/tests/api_fastapi/core_api/routes/public/test_backfills.py
index ca7b3f7cf87..6860cf988c8 100644
--- a/tests/api_fastapi/core_api/routes/public/test_backfills.py
+++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py
@@ -33,13 +33,14 @@ from airflow.utils import timezone
 from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState
 
+from tests_common.test_utils.api_fastapi import _check_last_log
 from tests_common.test_utils.db import (
     clear_db_backfills,
     clear_db_dags,
+    clear_db_logs,
     clear_db_runs,
     clear_db_serialized_dags,
 )
-from tests_common.test_utils.www import _check_last_log
 
 pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
 
@@ -55,13 +56,12 @@ def _clean_db():
     clear_db_runs()
     clear_db_dags()
     clear_db_serialized_dags()
+    clear_db_logs()
 
 
 @pytest.fixture(autouse=True)
 def clean_db():
     _clean_db()
-    yield
-    _clean_db()
 
 
 def make_dags():
diff --git a/tests/api_fastapi/core_api/routes/public/test_connections.py 
b/tests/api_fastapi/core_api/routes/public/test_connections.py
index f9cf0b8fd6e..a4ecdef94da 100644
--- a/tests/api_fastapi/core_api/routes/public/test_connections.py
+++ b/tests/api_fastapi/core_api/routes/public/test_connections.py
@@ -25,8 +25,8 @@ from airflow.models import Connection
 from airflow.secrets.environment_variables import CONN_ENV_PREFIX
 from airflow.utils.session import provide_session
 
-from tests_common.test_utils.db import clear_db_connections
-from tests_common.test_utils.www import _check_last_log
+from tests_common.test_utils.api_fastapi import _check_last_log
+from tests_common.test_utils.db import clear_db_connections, clear_db_logs
 
 pytestmark = pytest.mark.db_test
 
@@ -81,6 +81,7 @@ class TestConnectionEndpoint:
     @pytest.fixture(autouse=True)
     def setup(self) -> None:
         clear_db_connections(False)
+        clear_db_logs()
 
     def teardown_method(self) -> None:
         clear_db_connections()
@@ -582,7 +583,7 @@ class TestPatchConnection(TestConnectionEndpoint):
         response = test_client.patch(f"/public/connections/{TEST_CONN_ID}", 
json=body)
         assert response.status_code == 200
         assert response.json() == expected_response
-        _check_last_log(session, dag_id=None, event="post_connection", 
logical_date=None, check_masked=True)
+        _check_last_log(session, dag_id=None, event="patch_connection", 
logical_date=None, check_masked=True)
 
 
 class TestConnection(TestConnectionEndpoint):
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py
index f5c79427f2e..c6aa9b5025d 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_parsing.py
@@ -23,10 +23,9 @@ from sqlalchemy import select
 
 from airflow.models import DagBag
 from airflow.models.dagbag import DagPriorityParsingRequest
-from airflow.utils.session import provide_session
 
-from tests_common.test_utils.db import clear_db_dag_parsing_requests, 
parse_and_sync_to_db
-from tests_common.test_utils.www import _check_last_log
+from tests_common.test_utils.api_fastapi import _check_last_log
+from tests_common.test_utils.db import clear_db_dag_parsing_requests, 
clear_db_logs, parse_and_sync_to_db
 
 pytestmark = pytest.mark.db_test
 
@@ -42,13 +41,10 @@ class TestDagParsingEndpoint:
     def clear_db():
         clear_db_dag_parsing_requests()
 
-    @provide_session
     @pytest.fixture(autouse=True)
-    def setup(self, session=None) -> None:
-        self.clear_db()
-
-    def teardown_method(self) -> None:
+    def setup(self, session) -> None:
         self.clear_db()
+        clear_db_logs()
 
     def test_201_and_400_requests(self, url_safe_serializer, session, 
test_client):
         parse_and_sync_to_db(self.EXAMPLE_DAG_FILE)
@@ -60,13 +56,13 @@ class TestDagParsingEndpoint:
         assert response.status_code == 201
         parsing_requests = 
session.scalars(select(DagPriorityParsingRequest)).all()
         assert parsing_requests[0].fileloc == test_dag.fileloc
+        _check_last_log(session, dag_id=None, event="reparse_dag_file", 
logical_date=None)
 
         # Duplicate file parsing request
         response = test_client.put(url, headers={"Accept": "application/json"})
         assert response.status_code == 409
         parsing_requests = 
session.scalars(select(DagPriorityParsingRequest)).all()
         assert parsing_requests[0].fileloc == test_dag.fileloc
-        _check_last_log(session, dag_id=None, event="reparse_dag_file", 
logical_date=None)
 
     def test_bad_file_request(self, url_safe_serializer, session, test_client):
         url = 
f"/public/parseDagFile/{url_safe_serializer.dumps('/some/random/file.py')}"
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index df4dc708d94..0176bef6248 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -37,6 +37,7 @@ from airflow.utils.types import DagRunTriggeredByType, 
DagRunType
 
 from tests_common.test_utils.db import (
     clear_db_dags,
+    clear_db_logs,
     clear_db_runs,
     clear_db_serialized_dags,
 )
@@ -83,6 +84,7 @@ def setup(request, dag_maker, session=None):
     clear_db_runs()
     clear_db_dags()
     clear_db_serialized_dags()
+    clear_db_logs()
 
     if "no_setup" in request.keywords:
         return
diff --git a/tests_common/test_utils/api_fastapi.py 
b/tests_common/test_utils/api_fastapi.py
new file mode 100644
index 00000000000..cf99a412c5a
--- /dev/null
+++ b/tests_common/test_utils/api_fastapi.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+
+from airflow.models import Log
+from airflow.sdk.execution_time.secrets_masker import DEFAULT_SENSITIVE_FIELDS 
as sensitive_fields
+
+
+def _masked_value_check(data, sensitive_fields):
+    """
+    Recursively check if sensitive fields are properly masked.
+
+    :param data: JSON object (dict, list, or value)
+    :param sensitive_fields: Set of sensitive field names
+    """
+    if isinstance(data, dict):
+        for key, value in data.items():
+            if key in sensitive_fields:
+                assert value == "***", f"Expected masked value for {key}, but 
got {value}"
+            else:
+                _masked_value_check(value, sensitive_fields)
+    elif isinstance(data, list):
+        for item in data:
+            _masked_value_check(item, sensitive_fields)
+
+
+def _check_last_log(session, dag_id, event, logical_date, expected_extra=None, 
check_masked=False):
+    logs = (
+        session.query(
+            Log.dag_id,
+            Log.task_id,
+            Log.event,
+            Log.logical_date,
+            Log.owner,
+            Log.extra,
+        )
+        .filter(
+            Log.dag_id == dag_id,
+            Log.event == event,
+            Log.logical_date == logical_date,
+        )
+        .order_by(Log.dttm.desc())
+        .limit(1)
+        .all()
+    )
+    assert len(logs) == 1
+    assert logs[0].extra
+    if expected_extra:
+        assert json.loads(logs[0].extra) == expected_extra
+    if check_masked:
+        extra_json = json.loads(logs[0].extra)
+        _masked_value_check(extra_json, sensitive_fields)

Reply via email to