This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 99bd5cdacc Migrate integrations tests to `pytest` (#28220)
99bd5cdacc is described below

commit 99bd5cdacc8299b73e64fe95c8528c4d23f4234d
Author: Andrey Anshin <[email protected]>
AuthorDate: Thu Dec 8 17:17:44 2022 +0400

    Migrate integrations tests to `pytest` (#28220)
---
 .../integration/executors/test_celery_executor.py  | 49 ++++++++++------------
 .../google/cloud/transfers/test_presto_to_gcs.py   |  3 +-
 .../google/cloud/transfers/test_trino_to_gcs.py    |  3 +-
 3 files changed, 23 insertions(+), 32 deletions(-)

diff --git a/tests/integration/executors/test_celery_executor.py 
b/tests/integration/executors/test_celery_executor.py
index 2c0e578750..6bf7e3af46 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -22,7 +22,6 @@ import json
 import logging
 import os
 import sys
-import unittest
 from datetime import datetime, timedelta
 from unittest import mock
 
@@ -34,7 +33,6 @@ from celery.backends.base import BaseBackend, 
BaseKeyValueStoreBackend
 from celery.backends.database import DatabaseBackend
 from celery.contrib.testing.worker import start_worker
 from kombu.asynchronous import set_event_loop
-from parameterized import parameterized
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowTaskTimeout
@@ -49,8 +47,8 @@ from tests.test_utils import db
 
 def _prepare_test_bodies():
     if "CELERY_BROKER_URLS" in os.environ:
-        return [(url,) for url in os.environ["CELERY_BROKER_URLS"].split(",")]
-    return [(conf.get("celery", "BROKER_URL"))]
+        return os.environ["CELERY_BROKER_URLS"].split(",")
+    return [conf.get("celery", "BROKER_URL")]
 
 
 class FakeCeleryResult:
@@ -104,7 +102,7 @@ class TestCeleryExecutor:
         db.clear_db_runs()
         db.clear_db_jobs()
 
-    @parameterized.expand(_prepare_test_bodies())
+    @pytest.mark.parametrize("broker_url", _prepare_test_bodies())
     def test_celery_integration(self, broker_url):
         success_command = ["airflow", "tasks", "run", "true", "some_parameter"]
         fail_command = ["airflow", "version"]
@@ -261,17 +259,19 @@ class ClassWithCustomAttributes:
 
 @pytest.mark.integration("celery")
 @pytest.mark.backend("mysql", "postgres")
-class TestBulkStateFetcher(unittest.TestCase):
+class TestBulkStateFetcher:
+    bulk_state_fetcher_logger = 
"airflow.executors.celery_executor.BulkStateFetcher"
+
     @mock.patch(
         "celery.backends.base.BaseKeyValueStoreBackend.mget",
         return_value=[json.dumps({"status": "SUCCESS", "task_id": "123"})],
     )
-    def test_should_support_kv_backend(self, mock_mget):
+    def test_should_support_kv_backend(self, mock_mget, caplog):
+        caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app)
-            with mock.patch(
-                "airflow.executors.celery_executor.Celery.backend", 
mock_backend
-            ), 
self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", 
level="DEBUG") as cm:
+            with 
mock.patch("airflow.executors.celery_executor.Celery.backend", mock_backend):
+                caplog.clear()
                 fetcher = BulkStateFetcher()
                 result = fetcher.get_many(
                     [
@@ -286,18 +286,15 @@ class TestBulkStateFetcher(unittest.TestCase):
         mock_mget.assert_called_once_with(mock.ANY)
 
         assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)}
-        assert [
-            "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 
2 state(s) for 2 task(s)"
-        ] == cm.output
+        assert caplog.messages == ["Fetched 2 state(s) for 2 task(s)"]
 
     @mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
-    def test_should_support_db_backend(self, mock_session):
+    def test_should_support_db_backend(self, mock_session, caplog):
+        caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = DatabaseBackend(app=celery_executor.app, 
url="sqlite3://")
-
-            with mock.patch(
-                "airflow.executors.celery_executor.Celery.backend", 
mock_backend
-            ), 
self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", 
level="DEBUG") as cm:
+            with 
mock.patch("airflow.executors.celery_executor.Celery.backend", mock_backend):
+                caplog.clear()
                 mock_session = mock_backend.ResultSession.return_value
                 
mock_session.query.return_value.filter.return_value.all.return_value = [
                     mock.MagicMock(**{"to_dict.return_value": {"status": 
"SUCCESS", "task_id": "123"}})
@@ -312,17 +309,15 @@ class TestBulkStateFetcher(unittest.TestCase):
                 )
 
         assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)}
-        assert [
-            "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 
2 state(s) for 2 task(s)"
-        ] == cm.output
+        assert caplog.messages == ["Fetched 2 state(s) for 2 task(s)"]
 
-    def test_should_support_base_backend(self):
+    def test_should_support_base_backend(self, caplog):
+        caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = mock.MagicMock(autospec=BaseBackend)
 
-            with mock.patch(
-                "airflow.executors.celery_executor.Celery.backend", 
mock_backend
-            ), 
self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", 
level="DEBUG") as cm:
+            with 
mock.patch("airflow.executors.celery_executor.Celery.backend", mock_backend):
+                caplog.clear()
                 fetcher = BulkStateFetcher(1)
                 result = fetcher.get_many(
                     [
@@ -332,6 +327,4 @@ class TestBulkStateFetcher(unittest.TestCase):
                 )
 
         assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)}
-        assert [
-            "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 
2 state(s) for 2 task(s)"
-        ] == cm.output
+        assert caplog.messages == ["Fetched 2 state(s) for 2 task(s)"]
diff --git 
a/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py 
b/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py
index 3a4160e7cd..9f019b14b7 100644
--- a/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py
+++ b/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py
@@ -17,7 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import unittest
 from unittest.mock import patch
 
 import pytest
@@ -48,7 +47,7 @@ SCHEMA_JSON = b'[{"name": "some_num", "type": "INT64"}, 
{"name": "some_str", "ty
 
 
 @pytest.mark.integration("presto")
-class TestPrestoToGCSOperator(unittest.TestCase):
+class TestPrestoToGCSOperator:
     def test_init(self):
         """Test PrestoToGCSOperator instance is properly initialized."""
         op = PrestoToGCSOperator(
diff --git 
a/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py 
b/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py
index a5771f3b67..24e659e85c 100644
--- a/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py
+++ b/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py
@@ -17,7 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import unittest
 from unittest.mock import patch
 
 import pytest
@@ -48,7 +47,7 @@ SCHEMA_JSON = b'[{"name": "some_num", "type": "INT64"}, 
{"name": "some_str", "ty
 
 
 @pytest.mark.integration("trino")
-class TestTrinoToGCSOperator(unittest.TestCase):
+class TestTrinoToGCSOperator:
     def test_init(self):
         """Test TrinoToGCSOperator instance is properly initialized."""
         op = TrinoToGCSOperator(

Reply via email to