This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 99bd5cdacc Migrate integrations tests to `pytest` (#28220)
99bd5cdacc is described below
commit 99bd5cdacc8299b73e64fe95c8528c4d23f4234d
Author: Andrey Anshin <[email protected]>
AuthorDate: Thu Dec 8 17:17:44 2022 +0400
Migrate integrations tests to `pytest` (#28220)
---
.../integration/executors/test_celery_executor.py | 49 ++++++++++------------
.../google/cloud/transfers/test_presto_to_gcs.py | 3 +-
.../google/cloud/transfers/test_trino_to_gcs.py | 3 +-
3 files changed, 23 insertions(+), 32 deletions(-)
diff --git a/tests/integration/executors/test_celery_executor.py
b/tests/integration/executors/test_celery_executor.py
index 2c0e578750..6bf7e3af46 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -22,7 +22,6 @@ import json
import logging
import os
import sys
-import unittest
from datetime import datetime, timedelta
from unittest import mock
@@ -34,7 +33,6 @@ from celery.backends.base import BaseBackend,
BaseKeyValueStoreBackend
from celery.backends.database import DatabaseBackend
from celery.contrib.testing.worker import start_worker
from kombu.asynchronous import set_event_loop
-from parameterized import parameterized
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowTaskTimeout
@@ -49,8 +47,8 @@ from tests.test_utils import db
def _prepare_test_bodies():
if "CELERY_BROKER_URLS" in os.environ:
- return [(url,) for url in os.environ["CELERY_BROKER_URLS"].split(",")]
- return [(conf.get("celery", "BROKER_URL"))]
+ return os.environ["CELERY_BROKER_URLS"].split(",")
+ return [conf.get("celery", "BROKER_URL")]
class FakeCeleryResult:
@@ -104,7 +102,7 @@ class TestCeleryExecutor:
db.clear_db_runs()
db.clear_db_jobs()
- @parameterized.expand(_prepare_test_bodies())
+ @pytest.mark.parametrize("broker_url", _prepare_test_bodies())
def test_celery_integration(self, broker_url):
success_command = ["airflow", "tasks", "run", "true", "some_parameter"]
fail_command = ["airflow", "version"]
@@ -261,17 +259,19 @@ class ClassWithCustomAttributes:
@pytest.mark.integration("celery")
@pytest.mark.backend("mysql", "postgres")
-class TestBulkStateFetcher(unittest.TestCase):
+class TestBulkStateFetcher:
+ bulk_state_fetcher_logger =
"airflow.executors.celery_executor.BulkStateFetcher"
+
@mock.patch(
"celery.backends.base.BaseKeyValueStoreBackend.mget",
return_value=[json.dumps({"status": "SUCCESS", "task_id": "123"})],
)
- def test_should_support_kv_backend(self, mock_mget):
+ def test_should_support_kv_backend(self, mock_mget, caplog):
+ caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
with _prepare_app():
mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app)
- with mock.patch(
- "airflow.executors.celery_executor.Celery.backend",
mock_backend
- ),
self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher",
level="DEBUG") as cm:
+ with
mock.patch("airflow.executors.celery_executor.Celery.backend", mock_backend):
+ caplog.clear()
fetcher = BulkStateFetcher()
result = fetcher.get_many(
[
@@ -286,18 +286,15 @@ class TestBulkStateFetcher(unittest.TestCase):
mock_mget.assert_called_once_with(mock.ANY)
assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)}
- assert [
- "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched
2 state(s) for 2 task(s)"
- ] == cm.output
+ assert caplog.messages == ["Fetched 2 state(s) for 2 task(s)"]
@mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
- def test_should_support_db_backend(self, mock_session):
+ def test_should_support_db_backend(self, mock_session, caplog):
+ caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
with _prepare_app():
mock_backend = DatabaseBackend(app=celery_executor.app,
url="sqlite3://")
-
- with mock.patch(
- "airflow.executors.celery_executor.Celery.backend",
mock_backend
- ),
self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher",
level="DEBUG") as cm:
+ with
mock.patch("airflow.executors.celery_executor.Celery.backend", mock_backend):
+ caplog.clear()
mock_session = mock_backend.ResultSession.return_value
mock_session.query.return_value.filter.return_value.all.return_value = [
mock.MagicMock(**{"to_dict.return_value": {"status":
"SUCCESS", "task_id": "123"}})
@@ -312,17 +309,15 @@ class TestBulkStateFetcher(unittest.TestCase):
)
assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)}
- assert [
- "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched
2 state(s) for 2 task(s)"
- ] == cm.output
+ assert caplog.messages == ["Fetched 2 state(s) for 2 task(s)"]
- def test_should_support_base_backend(self):
+ def test_should_support_base_backend(self, caplog):
+ caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
with _prepare_app():
mock_backend = mock.MagicMock(autospec=BaseBackend)
- with mock.patch(
- "airflow.executors.celery_executor.Celery.backend",
mock_backend
- ),
self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher",
level="DEBUG") as cm:
+ with
mock.patch("airflow.executors.celery_executor.Celery.backend", mock_backend):
+ caplog.clear()
fetcher = BulkStateFetcher(1)
result = fetcher.get_many(
[
@@ -332,6 +327,4 @@ class TestBulkStateFetcher(unittest.TestCase):
)
assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)}
- assert [
- "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched
2 state(s) for 2 task(s)"
- ] == cm.output
+ assert caplog.messages == ["Fetched 2 state(s) for 2 task(s)"]
diff --git
a/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py
b/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py
index 3a4160e7cd..9f019b14b7 100644
--- a/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py
+++ b/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py
@@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations
-import unittest
from unittest.mock import patch
import pytest
@@ -48,7 +47,7 @@ SCHEMA_JSON = b'[{"name": "some_num", "type": "INT64"},
{"name": "some_str", "ty
@pytest.mark.integration("presto")
-class TestPrestoToGCSOperator(unittest.TestCase):
+class TestPrestoToGCSOperator:
def test_init(self):
"""Test PrestoToGCSOperator instance is properly initialized."""
op = PrestoToGCSOperator(
diff --git
a/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py
b/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py
index a5771f3b67..24e659e85c 100644
--- a/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py
+++ b/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py
@@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations
-import unittest
from unittest.mock import patch
import pytest
@@ -48,7 +47,7 @@ SCHEMA_JSON = b'[{"name": "some_num", "type": "INT64"},
{"name": "some_str", "ty
@pytest.mark.integration("trino")
-class TestTrinoToGCSOperator(unittest.TestCase):
+class TestTrinoToGCSOperator:
def test_init(self):
"""Test TrinoToGCSOperator instance is properly initialized."""
op = TrinoToGCSOperator(