This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 81a82d8481 Run unit tests for Providers with airflow installed as
package. (#39513)
81a82d8481 is described below
commit 81a82d848100acf95fc4764030f02bbdde9832fd
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed May 15 16:16:09 2024 -0400
Run unit tests for Providers with airflow installed as package. (#39513)
This PR adds the option of running unit tests for providers against
a specific airflow version (for example released version in PyPI)
and enables it for back-compatibility testing for 2.9.1. In the
future it could be used to run forward-compatibility testing with
Airflow 3 as well.
---
.github/workflows/check-providers.yml | 23 +-
.github/workflows/ci.yml | 3 +
Dockerfile.ci | 2 +
airflow/providers/openlineage/conf.py | 1 -
.../providers/openlineage/plugins/openlineage.py | 3 +
contributing-docs/testing/unit_tests.rst | 607 +++++++++++++--------
dev/breeze/src/airflow_breeze/global_constants.py | 8 +
.../src/airflow_breeze/utils/selective_checks.py | 13 +-
dev/breeze/tests/test_selective_checks.py | 28 +-
pyproject.toml | 3 +
scripts/docker/entrypoint_ci.sh | 2 +
scripts/in_container/run_ci_tests.sh | 9 +
.../endpoints/test_import_error_endpoint.py | 2 +-
tests/api_connexion/schemas/test_error_schema.py | 2 +-
tests/api_experimental/common/test_delete_dag.py | 2 +-
tests/conftest.py | 20 +-
tests/dag_processing/test_job_runner.py | 2 +-
tests/dag_processing/test_processor.py | 2 +-
tests/listeners/class_listener.py | 92 +++-
.../amazon/aws/auth_manager/avp/test_facade.py | 23 +-
.../aws/executors/batch/test_batch_executor.py | 6 +
.../amazon/aws/executors/ecs/test_ecs_executor.py | 15 +-
tests/providers/amazon/aws/hooks/test_dynamodb.py | 2 +-
.../amazon/aws/hooks/test_hooks_signature.py | 4 +-
tests/providers/amazon/aws/links/test_base_aws.py | 5 +-
.../amazon/aws/operators/test_emr_serverless.py | 5 +-
.../amazon/aws/utils/test_eks_get_token.py | 6 +-
.../providers/apache/iceberg/hooks/test_iceberg.py | 34 +-
.../cncf/kubernetes/operators/test_pod.py | 15 +-
.../cncf/kubernetes/test_template_rendering.py | 4 +-
.../google/cloud/operators/test_bigquery.py | 23 +-
.../google/cloud/operators/test_dataproc.py | 75 ++-
.../providers/openlineage/plugins/test_listener.py | 48 +-
.../openlineage/plugins/test_openlineage.py | 4 +
tests/providers/smtp/notifications/test_smtp.py | 6 +-
tests/test_utils/compat.py | 60 ++
tests/test_utils/db.py | 5 +-
37 files changed, 766 insertions(+), 398 deletions(-)
diff --git a/.github/workflows/check-providers.yml
b/.github/workflows/check-providers.yml
index d71b8d678d..8bac79f6fd 100644
--- a/.github/workflows/check-providers.yml
+++ b/.github/workflows/check-providers.yml
@@ -43,7 +43,11 @@ on: # yamllint disable-line rule:truthy
providers-compatibility-checks:
description: >
JSON-formatted array of providers compatibility checks in the form
of array of dicts
- (airflow-version, python-versions, remove-providers)
+ (airflow-version, python-versions, remove-providers, run-tests)
+ required: true
+ type: string
+ providers-test-types-list-as-string:
+ description: "List of parallel provider test types as string"
required: true
type: string
skip-provider-tests:
@@ -237,6 +241,9 @@ jobs:
- name: >
Install and verify all provider packages and airflow on
Airflow ${{ matrix.airflow-version }}:Python ${{
matrix.python-version }}
+ # We do not need to run import check if we run tests, the tests should
cover all the import checks
+ # automatically
+ if: matrix.run-tests != 'true'
run: >
breeze release-management verify-provider-packages
--use-packages-from-dist
@@ -245,3 +252,17 @@ jobs:
--airflow-constraints-reference
constraints-${{matrix.airflow-version}}
--providers-skip-constraints
--install-airflow-with-constraints
+ - name: >
+ Run provider unit tests on
+ Airflow ${{ matrix.airflow-version }}:Python ${{
matrix.python-version }}
+ if: matrix.run-tests == 'true'
+ run: >
+ breeze testing tests --run-in-parallel
+ --parallel-test-types "${{
inputs.providers-test-types-list-as-string }}"
+ --use-packages-from-dist
+ --package-format wheel
+ --use-airflow-version "${{ matrix.airflow-version }}"
+ --airflow-constraints-reference
constraints-${{matrix.airflow-version}}
+ --install-airflow-with-constraints
+ --providers-skip-constraints
+ --skip-providers "${{ matrix.remove-providers }}"
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 00155070d1..0502df3368 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -77,6 +77,8 @@ jobs:
full-tests-needed: ${{ steps.selective-checks.outputs.full-tests-needed
}}
parallel-test-types-list-as-string: >-
${{ steps.selective-checks.outputs.parallel-test-types-list-as-string
}}
+ providers-test-types-list-as-string: >-
+ ${{ steps.selective-checks.outputs.providers-test-types-list-as-string
}}
include-success-outputs: ${{
steps.selective-checks.outputs.include-success-outputs }}
postgres-exclude: ${{ steps.selective-checks.outputs.postgres-exclude }}
mysql-exclude: ${{ steps.selective-checks.outputs.mysql-exclude }}
@@ -315,6 +317,7 @@ jobs:
providers-compatibility-checks: ${{
needs.build-info.outputs.providers-compatibility-checks }}
skip-provider-tests: ${{ needs.build-info.outputs.skip-provider-tests }}
python-versions: ${{ needs.build-info.outputs.python-versions }}
+ providers-test-types-list-as-string: ${{
needs.build-info.outputs.providers-test-types-list-as-string }}
tests-helm:
name: "Helm tests"
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 7b9392f291..ff95970371 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -984,6 +984,8 @@ function determine_airflow_to_use() {
mkdir -p "${AIRFLOW_SOURCES}"/tmp/
else
python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
+ # Some packages might leave legacy typing module which causes test
issues
+ pip uninstall -y typing || true
fi
if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* &&
"${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
diff --git a/airflow/providers/openlineage/conf.py
b/airflow/providers/openlineage/conf.py
index 23e663f67e..f79511a22d 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -100,7 +100,6 @@ def is_disabled() -> bool:
option = os.getenv("OPENLINEAGE_DISABLED", "")
if _is_true(option):
return True
-
# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and
os.getenv("OPENLINEAGE_URL", "") == ""
diff --git a/airflow/providers/openlineage/plugins/openlineage.py
b/airflow/providers/openlineage/plugins/openlineage.py
index 5927929588..5b4f2c3dbf 100644
--- a/airflow/providers/openlineage/plugins/openlineage.py
+++ b/airflow/providers/openlineage/plugins/openlineage.py
@@ -39,3 +39,6 @@ class OpenLineageProviderPlugin(AirflowPlugin):
if not conf.is_disabled():
macros = [lineage_job_namespace, lineage_job_name, lineage_run_id,
lineage_parent_id]
listeners = [get_openlineage_listener()]
+ else:
+ macros = []
+ listeners = []
diff --git a/contributing-docs/testing/unit_tests.rst
b/contributing-docs/testing/unit_tests.rst
index e9a7263a4e..c7eaed99e2 100644
--- a/contributing-docs/testing/unit_tests.rst
+++ b/contributing-docs/testing/unit_tests.rst
@@ -427,75 +427,75 @@ You can see details about the limitation `here
<https://pytest-xdist.readthedocs
The error in this case will look similar to:
- .. code-block::
+.. code-block::
- Different tests were collected between gw0 and gw7. The difference is:
+ Different tests were collected between gw0 and gw7. The difference is:
The fix for that is to sort the parameters in ``parametrize``. For example
instead of this:
- .. code-block:: python
+.. code-block:: python
- @pytest.mark.parametrize("status", ALL_STATES)
- def test_method():
- ...
+ @pytest.mark.parametrize("status", ALL_STATES)
+ def test_method():
+ ...
do that:
- .. code-block:: python
+.. code-block:: python
- @pytest.mark.parametrize("status", sorted(ALL_STATES))
- def test_method():
- ...
+ @pytest.mark.parametrize("status", sorted(ALL_STATES))
+ def test_method():
+ ...
Similarly if your parameters are defined as result of utcnow() or other
dynamic method - you should
avoid that, or assign unique IDs for those parametrized tests. Instead of this:
- .. code-block:: python
-
- @pytest.mark.parametrize(
- "url, expected_dag_run_ids",
- [
- (
- f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
- f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
- [],
- ),
- (
- f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
- f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
- ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
- ),
- ],
- )
- def test_end_date_gte_lte(url, expected_dag_run_ids):
- ...
+.. code-block:: python
+
+ @pytest.mark.parametrize(
+ "url, expected_dag_run_ids",
+ [
+ (
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
+ f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
+ [],
+ ),
+ (
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
+ f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
+ ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
+ ),
+ ],
+ )
+ def test_end_date_gte_lte(url, expected_dag_run_ids):
+ ...
Do this:
- .. code-block:: python
-
- @pytest.mark.parametrize(
- "url, expected_dag_run_ids",
- [
- pytest.param(
- f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
- f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
- [],
- id="end_date_gte",
- ),
- pytest.param(
- f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
- f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
- ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
- id="end_date_lte",
- ),
- ],
- )
- def test_end_date_gte_lte(url, expected_dag_run_ids):
- ...
+.. code-block:: python
+
+ @pytest.mark.parametrize(
+ "url, expected_dag_run_ids",
+ [
+ pytest.param(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
+ f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
+ [],
+ id="end_date_gte",
+ ),
+ pytest.param(
+ f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
+ f"{urllib.parse.quote((timezone.utcnow() +
timedelta(days=1)).isoformat())}",
+ ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
+ id="end_date_lte",
+ ),
+ ],
+ )
+ def test_end_date_gte_lte(url, expected_dag_run_ids):
+ ...
@@ -515,107 +515,107 @@ Moving object creation from top-level to inside tests.
This code will break coll
the test is marked as DB test:
- .. code-block:: python
-
- TI = TaskInstance(
- task=BashOperator(task_id="test", bash_command="true",
dag=DAG(dag_id="id"), start_date=datetime.now()),
- run_id="fake_run",
- state=State.RUNNING,
- )
-
-
- class TestCallbackRequest:
- @pytest.mark.parametrize(
- "input,request_class",
- [
- (CallbackRequest(full_filepath="filepath",
msg="task_failure"), CallbackRequest),
- (
- TaskCallbackRequest(
- full_filepath="filepath",
-
simple_task_instance=SimpleTaskInstance.from_ti(ti=TI),
- processor_subdir="/test_dir",
- is_failure_callback=True,
- ),
- TaskCallbackRequest,
- ),
- (
- DagCallbackRequest(
- full_filepath="filepath",
- dag_id="fake_dag",
- run_id="fake_run",
- processor_subdir="/test_dir",
- is_failure_callback=False,
- ),
- DagCallbackRequest,
- ),
- (
- SlaCallbackRequest(
- full_filepath="filepath",
- dag_id="fake_dag",
- processor_subdir="/test_dir",
- ),
- SlaCallbackRequest,
- ),
- ],
- )
- def test_from_json(self, input, request_class):
- ...
+.. code-block:: python
+
+ TI = TaskInstance(
+ task=BashOperator(task_id="test", bash_command="true",
dag=DAG(dag_id="id"), start_date=datetime.now()),
+ run_id="fake_run",
+ state=State.RUNNING,
+ )
+
+
+ class TestCallbackRequest:
+ @pytest.mark.parametrize(
+ "input,request_class",
+ [
+ (CallbackRequest(full_filepath="filepath", msg="task_failure"),
CallbackRequest),
+ (
+ TaskCallbackRequest(
+ full_filepath="filepath",
+ simple_task_instance=SimpleTaskInstance.from_ti(ti=TI),
+ processor_subdir="/test_dir",
+ is_failure_callback=True,
+ ),
+ TaskCallbackRequest,
+ ),
+ (
+ DagCallbackRequest(
+ full_filepath="filepath",
+ dag_id="fake_dag",
+ run_id="fake_run",
+ processor_subdir="/test_dir",
+ is_failure_callback=False,
+ ),
+ DagCallbackRequest,
+ ),
+ (
+ SlaCallbackRequest(
+ full_filepath="filepath",
+ dag_id="fake_dag",
+ processor_subdir="/test_dir",
+ ),
+ SlaCallbackRequest,
+ ),
+ ],
+ )
+ def test_from_json(self, input, request_class):
+ ...
Instead - this will not break collection. The TaskInstance is not initialized
when the module is parsed,
it will only be initialized when the test gets executed because we moved
initialization of it from
top level / parametrize to inside the test:
- .. code-block:: python
+.. code-block:: python
- pytestmark = pytest.mark.db_test
+ pytestmark = pytest.mark.db_test
- class TestCallbackRequest:
- @pytest.mark.parametrize(
- "input,request_class",
- [
- (CallbackRequest(full_filepath="filepath",
msg="task_failure"), CallbackRequest),
- (
- None, # to be generated when test is run
- TaskCallbackRequest,
- ),
- (
- DagCallbackRequest(
- full_filepath="filepath",
- dag_id="fake_dag",
- run_id="fake_run",
- processor_subdir="/test_dir",
- is_failure_callback=False,
- ),
- DagCallbackRequest,
- ),
- (
- SlaCallbackRequest(
- full_filepath="filepath",
- dag_id="fake_dag",
- processor_subdir="/test_dir",
- ),
- SlaCallbackRequest,
- ),
- ],
- )
- def test_from_json(self, input, request_class):
- if input is None:
- ti = TaskInstance(
- task=BashOperator(
- task_id="test", bash_command="true",
dag=DAG(dag_id="id"), start_date=datetime.now()
- ),
- run_id="fake_run",
- state=State.RUNNING,
- )
-
- input = TaskCallbackRequest(
- full_filepath="filepath",
- simple_task_instance=SimpleTaskInstance.from_ti(ti=ti),
- processor_subdir="/test_dir",
- is_failure_callback=True,
- )
+ class TestCallbackRequest:
+ @pytest.mark.parametrize(
+ "input,request_class",
+ [
+ (CallbackRequest(full_filepath="filepath", msg="task_failure"),
CallbackRequest),
+ (
+ None, # to be generated when test is run
+ TaskCallbackRequest,
+ ),
+ (
+ DagCallbackRequest(
+ full_filepath="filepath",
+ dag_id="fake_dag",
+ run_id="fake_run",
+ processor_subdir="/test_dir",
+ is_failure_callback=False,
+ ),
+ DagCallbackRequest,
+ ),
+ (
+ SlaCallbackRequest(
+ full_filepath="filepath",
+ dag_id="fake_dag",
+ processor_subdir="/test_dir",
+ ),
+ SlaCallbackRequest,
+ ),
+ ],
+ )
+ def test_from_json(self, input, request_class):
+ if input is None:
+ ti = TaskInstance(
+ task=BashOperator(
+ task_id="test", bash_command="true",
dag=DAG(dag_id="id"), start_date=datetime.now()
+ ),
+ run_id="fake_run",
+ state=State.RUNNING,
+ )
+
+ input = TaskCallbackRequest(
+ full_filepath="filepath",
+ simple_task_instance=SimpleTaskInstance.from_ti(ti=ti),
+ processor_subdir="/test_dir",
+ is_failure_callback=True,
+ )
Sometimes it is difficult to rewrite the tests, so you might add conditional
handling and mock out some
@@ -624,119 +624,119 @@ will hit the Database while parsing the tests, because
this is what Variable.set
parametrize specification is being parsed - even if test is marked as DB test.
- .. code-block:: python
-
- from airflow.models.variable import Variable
-
- pytestmark = pytest.mark.db_test
+.. code-block:: python
- initial_db_init()
+ from airflow.models.variable import Variable
+ pytestmark = pytest.mark.db_test
- @pytest.mark.parametrize(
- "env, expected",
- [
- pytest.param(
- {"plain_key": "plain_value"},
- "{'plain_key': 'plain_value'}",
- id="env-plain-key-val",
- ),
- pytest.param(
- {"plain_key": Variable.setdefault("plain_var", "banana")},
- "{'plain_key': 'banana'}",
- id="env-plain-key-plain-var",
- ),
- pytest.param(
- {"plain_key": Variable.setdefault("secret_var", "monkey")},
- "{'plain_key': '***'}",
- id="env-plain-key-sensitive-var",
- ),
- pytest.param(
- {"plain_key": "{{ var.value.plain_var }}"},
- "{'plain_key': '{{ var.value.plain_var }}'}",
- id="env-plain-key-plain-tpld-var",
- ),
- ],
- )
- def test_rendered_task_detail_env_secret(patch_app, admin_client,
request, env, expected):
- ...
+ initial_db_init()
+
+
+ @pytest.mark.parametrize(
+ "env, expected",
+ [
+ pytest.param(
+ {"plain_key": "plain_value"},
+ "{'plain_key': 'plain_value'}",
+ id="env-plain-key-val",
+ ),
+ pytest.param(
+ {"plain_key": Variable.setdefault("plain_var", "banana")},
+ "{'plain_key': 'banana'}",
+ id="env-plain-key-plain-var",
+ ),
+ pytest.param(
+ {"plain_key": Variable.setdefault("secret_var", "monkey")},
+ "{'plain_key': '***'}",
+ id="env-plain-key-sensitive-var",
+ ),
+ pytest.param(
+ {"plain_key": "{{ var.value.plain_var }}"},
+ "{'plain_key': '{{ var.value.plain_var }}'}",
+ id="env-plain-key-plain-tpld-var",
+ ),
+ ],
+ )
+ def test_rendered_task_detail_env_secret(patch_app, admin_client, request,
env, expected):
+ ...
You can make the code conditional and mock out the Variable to avoid hitting
the database.
- .. code-block:: python
-
- from airflow.models.variable import Variable
-
- pytestmark = pytest.mark.db_test
+.. code-block:: python
+ from airflow.models.variable import Variable
- if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
- # Handle collection of the test by non-db case
- Variable = mock.MagicMock() # type: ignore[misc] # noqa: F811
- else:
- initial_db_init()
+ pytestmark = pytest.mark.db_test
- @pytest.mark.parametrize(
- "env, expected",
- [
- pytest.param(
- {"plain_key": "plain_value"},
- "{'plain_key': 'plain_value'}",
- id="env-plain-key-val",
- ),
- pytest.param(
- {"plain_key": Variable.setdefault("plain_var", "banana")},
- "{'plain_key': 'banana'}",
- id="env-plain-key-plain-var",
- ),
- pytest.param(
- {"plain_key": Variable.setdefault("secret_var", "monkey")},
- "{'plain_key': '***'}",
- id="env-plain-key-sensitive-var",
- ),
- pytest.param(
- {"plain_key": "{{ var.value.plain_var }}"},
- "{'plain_key': '{{ var.value.plain_var }}'}",
- id="env-plain-key-plain-tpld-var",
- ),
- ],
- )
- def test_rendered_task_detail_env_secret(patch_app, admin_client,
request, env, expected):
- ...
+ if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
+ # Handle collection of the test by non-db case
+ Variable = mock.MagicMock() # type: ignore[misc] # noqa: F811
+ else:
+ initial_db_init()
+
+
+ @pytest.mark.parametrize(
+ "env, expected",
+ [
+ pytest.param(
+ {"plain_key": "plain_value"},
+ "{'plain_key': 'plain_value'}",
+ id="env-plain-key-val",
+ ),
+ pytest.param(
+ {"plain_key": Variable.setdefault("plain_var", "banana")},
+ "{'plain_key': 'banana'}",
+ id="env-plain-key-plain-var",
+ ),
+ pytest.param(
+ {"plain_key": Variable.setdefault("secret_var", "monkey")},
+ "{'plain_key': '***'}",
+ id="env-plain-key-sensitive-var",
+ ),
+ pytest.param(
+ {"plain_key": "{{ var.value.plain_var }}"},
+ "{'plain_key': '{{ var.value.plain_var }}'}",
+ id="env-plain-key-plain-tpld-var",
+ ),
+ ],
+ )
+ def test_rendered_task_detail_env_secret(patch_app, admin_client, request,
env, expected):
+ ...
You can also use fixture to create object that needs database just like this.
- .. code-block:: python
+.. code-block:: python
- from airflow.models import Connection
+ from airflow.models import Connection
- pytestmark = pytest.mark.db_test
+ pytestmark = pytest.mark.db_test
- @pytest.fixture()
- def get_connection1():
- return Connection()
+ @pytest.fixture()
+ def get_connection1():
+ return Connection()
- @pytest.fixture()
- def get_connection2():
- return Connection(host="apache.org", extra={})
+ @pytest.fixture()
+ def get_connection2():
+ return Connection(host="apache.org", extra={})
- @pytest.mark.parametrize(
- "conn",
- [
- "get_connection1",
- "get_connection2",
- ],
- )
- def test_as_json_from_connection(self, conn: Connection):
- conn = request.getfixturevalue(conn)
- ...
+ @pytest.mark.parametrize(
+ "conn",
+ [
+ "get_connection1",
+ "get_connection2",
+ ],
+ )
+ def test_as_json_from_connection(self, conn: Connection):
+ conn = request.getfixturevalue(conn)
+ ...
Running Unit tests
@@ -1061,7 +1061,7 @@ Those tests are marked with ``@pytest.mark.quarantined``
annotation.
Those tests are skipped by default. You can enable them with
``--include-quarantined`` flag. You
can also decide to only run tests with ``-m quarantined`` flag to run only
those tests.
-Running Tests with provider packages
+Running provider compatibility tests
....................................
Airflow 2.0 introduced the concept of splitting the monolithic Airflow package
into separate
@@ -1108,6 +1108,129 @@ This prepares airflow .whl package in the dist folder.
breeze --use-airflow-version wheel --use-packages-from-dist
--mount-sources skip
+Compatibility Provider unit tests against older airflow releases
+................................................................
+
+Our CI runs provider tests for providers with previous compatible airflow
releases. This allows to check
+if the providers still work when installed for older airflow versions.
+
+.. note::
+
+ For now it's done for 2.9.1 version only.
+
+Those tests can be used to test compatibility of the providers with past and
future releases of airflow.
+For example it could be used to run latest provider versions with released or
main
+Airflow 3 if they are developed independently.
+
+The tests use the current source version of ``tests`` folder - so care should
be taken that the tests
+implemented for providers in the sources allow to run it against previous
versions of Airflow and
+against Airflow installed from package rather than from the sources.
+
+This can be reproduced locally building providers from tag/commit of the
airflow repository.
+
+1. Make sure to build latest Breeze ci image
+
+.. code-block:: bash
+
+ breeze ci-image build --python 3.8
+
+2. Build providers from latest sources:
+
+.. code-block:: bash
+
+ rm dist/*
+ breeze release-management prepare-provider-packages
--include-not-ready-providers \
+ --version-suffix-for-pypi dev0 --package-format wheel
+
+3. Prepare provider constraints
+
+.. code-block:: bash
+
+ breeze release-management generate-constraints --airflow-constraints-mode
constraints-source-providers --answer yes
+
+3. Enter breeze environment, installing selected airflow version and the
provider packages prepared from main
+
+.. code-block::bash
+
+ breeze shell --use-packages-from-dist --package-format wheel\
+ --use-airflow-version 2.9.1 --airflow-constraints-reference
constraints-2.9.1 \
+ --install-airflow-with-constraints \
+ --providers-skip-constraints \
+ --mount-sources tests
+
+4. You can then run tests as usual:
+
+.. code-block::bash
+
+ pytest tests/providers/<provider>/test.py
+
+5. Iterate with the tests
+
+The tests are run using:
+
+* airflow installed from PyPI
+* tests coming from the current airflow sources (they are mounted inside the
breeze image)
+* provider packages built from the current airflow sources and placed in dist
+
+This means that you can modify and run tests and re-run them, but if you want
to modify provider code
+you need to exit breeze, rebuild the provider package and restart breeze using
the command above.
+
+Rebuilding single provider package can be done using this command:
+
+.. code-block::bash
+
+ breeze release-management prepare-provider-packages \
+ --version-suffix-for-pypi dev0 --package-format wheel <provider>
+
+
+Note that some of the tests if written without taking care about the
compatibility, might not work with older
+versions of Airflow - this is because of refactorings, renames, and tests
relying on internals of Airflow that
+are not part of the public API. We deal with it in one of the following ways:
+
+1) If the whole provider is supposed to only work for later airflow version,
we remove the whole provider
+ by excluding it from compatibility test configuration (see below)
+
+2) Some compatibility shims are defined in ``tests/test_utils/compat.py`` -
and they can be used to make the
+ tests compatible - for example importing ``ParseImportError`` after the
exception has been renamed from
+ ``ImportError`` and it would fail in Airflow 2.9, but we have a fallback
import in ``compat.py`` that
+ falls back to old import automatically, so all tests testing / expecting
``ParseImportError`` should import
+ it from the ``tests.tests_utils.compat`` module. There are few other
compatibility shims defined there and
+ you can add more if needed in a similar way.
+
+3) If only some tests are not compatible and use features that are available
only in newer airflow version,
+ we can mark those tests with appropriate ``AIRFLOW_V_2_X_PLUS`` boolean
constant defined in ``compat.py``
+ For example:
+
+.. code-block::python
+
+ from tests.test_utils.compat import AIRFLOW_V_2_7_PLUS
+
+ @pytest.mark.skip(not AIRFLOW_V_2_7_PLUS, reason="The tests should be
skipped for Airflow < 2.7")
+ def some_test_that_only_works_for_airflow_2_7_plus():
+ pass
+
+4) Sometimes, the tests should only be run when airflow is installed from the
sources. In this case you can
+ add conditional ``skipif`` markerfor
``RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES`` to the test. For example:
+
+.. code-block::python
+
+ @pytest.mark.skipif(RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES,
+ reason="Plugin initialization is done early in case of
packages")
+ def test_plugin():
+ pass
+
+
+How providers compatibility tests are run in CI?
+-------------------------------------------------
+
+We run a set of back-compatibility tests based on the configuration specified
in the
+``BASE_PROVIDERS_COMPATIBILITY_CHECKS`` constant in the
``./dev/breeze/src/airflow_breeze/global_constants.py``
+file - where we specify:
+* python version
+* airflow version
+* which providers should be removed (exclusions)
+* whether to run tests
+
Other Settings
--------------
@@ -1169,7 +1292,7 @@ to **ignore**, e.g. set ``PYTHONWARNINGS`` environment
variable to ``ignore``.
.. code-block:: bash
- pytest tests/core/ --disable-capture-warnings
+ pytest tests/core/ --disable-capture-warnings
Code Coverage
-------------
@@ -1186,19 +1309,19 @@ a. Initiate a breeze shell.
b. Execute one of the commands below based on the desired coverage area:
- - **Core:** ``python scripts/cov/core_coverage.py``
- - **REST API:** ``python scripts/cov/restapi_coverage.py``
- - **CLI:** ``python scripts/cov/cli_coverage.py``
- - **Webserver:** ``python scripts/cov/www_coverage.py``
+- **Core:** ``python scripts/cov/core_coverage.py``
+- **REST API:** ``python scripts/cov/restapi_coverage.py``
+- **CLI:** ``python scripts/cov/cli_coverage.py``
+- **Webserver:** ``python scripts/cov/www_coverage.py``
c. After execution, the coverage report will be available at:
http://localhost:28000/dev/coverage/index.html.
- .. note::
+.. note::
- In order to see the coverage report, you must start webserver first in
breeze environment via the
- `airflow webserver`. Once you enter `breeze`, you can start `tmux`
(terminal multiplexer) and
- split the terminal (by pressing `ctrl-B "` for example) to continue
testing and run the webserver
- in one terminal and run tests in the second one (you can switch between
the terminals with `ctrl-B <arrow>`).
+ In order to see the coverage report, you must start webserver first in
breeze environment via the
+ ``airflow webserver``. Once you enter ``breeze``, you can start ``tmux``
(terminal multiplexer) and
+ split the terminal (by pressing ``ctrl-B "`` for example) to continue
testing and run the webserver
+ in one terminal and run tests in the second one (you can switch between the
terminals with ``ctrl-B <arrow>``).
Modules Not Fully Covered:
..........................
diff --git a/dev/breeze/src/airflow_breeze/global_constants.py
b/dev/breeze/src/airflow_breeze/global_constants.py
index 66cde22e70..8442450565 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -476,11 +476,19 @@ BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str]]
= [
"python-version": "3.8",
"airflow-version": "2.7.1",
"remove-providers": _exclusion(["common.io", "fab"]),
+ "run-tests": "false",
},
{
"python-version": "3.8",
"airflow-version": "2.8.0",
"remove-providers": _exclusion(["fab"]),
+ "run-tests": "false",
+ },
+ {
+ "python-version": "3.8",
+ "airflow-version": "2.9.1",
+ "remove-providers": _exclusion([]),
+ "run-tests": "true",
},
]
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 6b42d91346..193271f34c 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -83,6 +83,8 @@ ALL_CI_SELECTIVE_TEST_TYPES = (
"PythonVenv Serialization WWW"
)
+ALL_PROVIDERS_SELECTIVE_TEST_TYPES = "Providers[-amazon,google]
Providers[amazon] Providers[google]"
+
class FileGroupForCi(Enum):
ENVIRONMENT_FILES = "environment_files"
@@ -816,6 +818,15 @@ class SelectiveChecks:
self._extract_long_provider_tests(current_test_types)
return " ".join(sorted(current_test_types))
+ @cached_property
+ def providers_test_types_list_as_string(self) -> str | None:
+ all_test_types = self.parallel_test_types_list_as_string
+ if all_test_types is None:
+ return None
+ return " ".join(
+ test_type for test_type in all_test_types.split(" ") if
test_type.startswith("Providers")
+ )
+
@cached_property
def include_success_outputs(
self,
@@ -828,7 +839,7 @@ class SelectiveChecks:
@staticmethod
def _print_diff(old_lines: list[str], new_lines: list[str]):
- diff = "\n".join([line for line in difflib.ndiff(old_lines, new_lines)
if line and line[0] in "+-?"])
+ diff = "\n".join(line for line in difflib.ndiff(old_lines, new_lines)
if line and line[0] in "+-?")
get_console().print(diff)
@cached_property
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 964c8c3487..81789de714 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -30,7 +30,11 @@ from airflow_breeze.global_constants import (
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
GithubEvents,
)
-from airflow_breeze.utils.selective_checks import ALL_CI_SELECTIVE_TEST_TYPES,
SelectiveChecks
+from airflow_breeze.utils.selective_checks import (
+ ALL_CI_SELECTIVE_TEST_TYPES,
+ ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
+ SelectiveChecks,
+)
ANSI_COLORS_MATCHER = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]")
@@ -114,6 +118,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"mypy-docs,mypy-providers,ts-compile-format-lint-www",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": None,
+ "providers-test-types-list-as-string": None,
"needs-mypy": "false",
"mypy-folders": "[]",
},
@@ -139,6 +144,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"mypy-docs,mypy-providers,ts-compile-format-lint-www",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "API Always
Providers[fab]",
+ "providers-test-types-list-as-string": "Providers[fab]",
"needs-mypy": "true",
"mypy-folders": "['airflow']",
},
@@ -164,6 +170,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"mypy-docs,mypy-providers,ts-compile-format-lint-www",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always Operators",
+ "providers-test-types-list-as-string": "",
"needs-mypy": "true",
"mypy-folders": "['airflow']",
},
@@ -190,6 +197,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always
BranchExternalPython BranchPythonVenv "
"ExternalPython Operators PythonVenv",
+ "providers-test-types-list-as-string": "",
"needs-mypy": "true",
"mypy-folders": "['airflow']",
},
@@ -215,6 +223,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"mypy-docs,mypy-providers,ts-compile-format-lint-www",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always
Serialization",
+ "providers-test-types-list-as-string": "",
"needs-mypy": "true",
"mypy-folders": "['airflow']",
},
@@ -245,6 +254,8 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "API Always
Providers[amazon] "
"Providers[common.sql,fab,openlineage,pgvector,postgres]
Providers[google]",
+ "providers-test-types-list-as-string": "Providers[amazon] "
+ "Providers[common.sql,fab,openlineage,pgvector,postgres]
Providers[google]",
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers']",
},
@@ -271,6 +282,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"run-kubernetes-tests": "false",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always
Providers[apache.beam] Providers[google]",
+ "providers-test-types-list-as-string":
"Providers[apache.beam] Providers[google]",
"needs-mypy": "true",
"mypy-folders": "['providers']",
},
@@ -297,6 +309,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"run-kubernetes-tests": "false",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": None,
+ "providers-test-types-list-as-string": None,
"needs-mypy": "false",
"mypy-folders": "[]",
},
@@ -327,6 +340,8 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always
Providers[amazon] "
"Providers[common.sql,openlineage,pgvector,postgres]
Providers[google]",
+ "providers-test-types-list-as-string": "Providers[amazon] "
+ "Providers[common.sql,openlineage,pgvector,postgres]
Providers[google]",
"needs-mypy": "true",
"mypy-folders": "['providers']",
},
@@ -359,6 +374,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always "
"Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http]
Providers[amazon]",
+ "providers-test-types-list-as-string":
"Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http]
Providers[amazon]",
"needs-mypy": "true",
"mypy-folders": "['providers']",
},
@@ -389,6 +405,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"run-kubernetes-tests": "true",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always
Providers[airbyte,http]",
+ "providers-test-types-list-as-string":
"Providers[airbyte,http]",
"needs-mypy": "true",
"mypy-folders": "['providers']",
},
@@ -420,6 +437,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"run-kubernetes-tests": "true",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string": "Always",
+ "providers-test-types-list-as-string": "",
"needs-mypy": "true",
"mypy-folders": "['airflow']",
},
@@ -446,6 +464,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "true",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
@@ -472,6 +491,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "true",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
@@ -778,6 +798,7 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
@@ -811,6 +832,7 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
@@ -844,6 +866,7 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
@@ -878,6 +901,7 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
@@ -912,6 +936,7 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
@@ -943,6 +968,7 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
"skip-pre-commits":
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
"upgrade-to-newer-dependencies": "false",
"parallel-test-types-list-as-string":
ALL_CI_SELECTIVE_TEST_TYPES,
+ "providers-test-types-list-as-string":
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
"needs-mypy": "true",
"mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
},
diff --git a/pyproject.toml b/pyproject.toml
index fadfa13b46..e1bd7e846d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -410,6 +410,9 @@ combine-as-imports = true
# https://github.com/apache/airflow/issues/39252
"airflow/providers/amazon/aws/hooks/eks.py" = ["W605"]
+# Test compat imports banned imports to allow testing against older airflow
versions
+"tests/test_utils/compat.py" = ["TID251", "F401"]
+
[tool.ruff.lint.flake8-tidy-imports]
# Disallow all relative imports.
ban-relative-imports = "all"
diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh
index a0738dc1d6..3c1699acbc 100755
--- a/scripts/docker/entrypoint_ci.sh
+++ b/scripts/docker/entrypoint_ci.sh
@@ -205,6 +205,8 @@ function determine_airflow_to_use() {
mkdir -p "${AIRFLOW_SOURCES}"/tmp/
else
python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
+ # Some packages might leave legacy typing module which causes test
issues
+ pip uninstall -y typing || true
fi
if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* &&
"${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
diff --git a/scripts/in_container/run_ci_tests.sh
b/scripts/in_container/run_ci_tests.sh
index 3d6b5a6a68..98ff9ddef9 100755
--- a/scripts/in_container/run_ci_tests.sh
+++ b/scripts/in_container/run_ci_tests.sh
@@ -60,6 +60,15 @@ if [[ ${TEST_TYPE:=} == "Quarantined" ]]; then
fi
fi
+if [[ ${CI:="false"} == "true" && ${RES} != "0" && ${USE_AIRFLOW_VERSION=} !=
"" ]]; then
+ echo
+ echo "${COLOR_YELLOW}Failing compatibility test of providers for for
${USE_AIRFLOW_VERSION} Airflow and you need to make sure it passes for it as
well or deal with compatibility.${COLOR_RESET}"
+ echo
+ echo "${COLOR_BLUE}Read more on how to run the test locally and how to
deal with Provider's compatibility with older Airflow versions
at:${COLOR_RESET}"
+ echo
"https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#running-provider-compatibility-tests"
+ echo
+fi
+
if [[ ${CI:="false"} == "true" || ${CI} == "True" ]]; then
if [[ ${RES} != "0" ]]; then
echo
diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py
b/tests/api_connexion/endpoints/test_import_error_endpoint.py
index ce084165d2..4549b74ae9 100644
--- a/tests/api_connexion/endpoints/test_import_error_endpoint.py
+++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py
@@ -22,11 +22,11 @@ import pytest
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models.dag import DagModel
-from airflow.models.errors import ParseImportError
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import provide_session
from tests.test_utils.api_connexion_utils import assert_401, create_user,
delete_user
+from tests.test_utils.compat import ParseImportError
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_import_errors
diff --git a/tests/api_connexion/schemas/test_error_schema.py
b/tests/api_connexion/schemas/test_error_schema.py
index 7056417e66..8604bee516 100644
--- a/tests/api_connexion/schemas/test_error_schema.py
+++ b/tests/api_connexion/schemas/test_error_schema.py
@@ -23,9 +23,9 @@ from airflow.api_connexion.schemas.error_schema import (
import_error_collection_schema,
import_error_schema,
)
-from airflow.models.errors import ParseImportError
from airflow.utils import timezone
from airflow.utils.session import provide_session
+from tests.test_utils.compat import ParseImportError
from tests.test_utils.db import clear_db_import_errors
pytestmark = pytest.mark.db_test
diff --git a/tests/api_experimental/common/test_delete_dag.py
b/tests/api_experimental/common/test_delete_dag.py
index 961d04dd37..4dc5f9b00f 100644
--- a/tests/api_experimental/common/test_delete_dag.py
+++ b/tests/api_experimental/common/test_delete_dag.py
@@ -23,7 +23,6 @@ from airflow.api.common.delete_dag import delete_dag
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun as DR
-from airflow.models.errors import ParseImportError as IE
from airflow.models.log import Log
from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance as TI
@@ -33,6 +32,7 @@ from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
+from tests.test_utils.compat import ParseImportError as IE
from tests.test_utils.db import clear_db_dags, clear_db_runs
pytestmark = pytest.mark.db_test
diff --git a/tests/conftest.py b/tests/conftest.py
index efff93f58b..63e5e40018 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -334,8 +334,12 @@ def initial_db_init():
from airflow.utils import db
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_auth_manager import get_auth_manager
+ from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
- db.resetdb(use_migration_files=True)
+ if AIRFLOW_V_2_10_PLUS:
+ db.resetdb(use_migration_files=True)
+ else:
+ db.resetdb()
db.bootstrap_dagbag()
# minimal app to add roles
flask_app = Flask(__name__)
@@ -1292,6 +1296,20 @@ def _disable_redact(request: pytest.FixtureRequest,
mocker):
return
[email protected]
+def airflow_root_path() -> Path:
+ import airflow
+
+ return Path(airflow.__path__[0]).parent
+
+
+# This constant is set to True if tests are run with Airflow installed from
Packages rather than running
+# the tests within Airflow sources. While most tests in CI are run using
Airflow sources, there are
+# also compatibility tests that only use `tests` package and run against
installed packages of Airflow in
+# for supported Airflow versions.
+RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES = not (Path(__file__).parents[1] /
"airflow" / "__init__.py").exists()
+
+
if TYPE_CHECKING:
# Static checkers do not know about pytest fixtures' types and return,
# In case if them distributed through third party packages.
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index 0ebb6466aa..8e2bfbde62 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -54,13 +54,13 @@ from airflow.jobs.dag_processor_job_runner import
DagProcessorJobRunner
from airflow.jobs.job import Job
from airflow.models import DagBag, DagModel, DbCallbackRequest
from airflow.models.dagcode import DagCode
-from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
from tests.core.test_logging_config import SETTINGS_FILE_VALID,
settings_context
from tests.models import TEST_DAGS_FOLDER
+from tests.test_utils.compat import ParseImportError
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_callbacks, clear_db_dags,
clear_db_runs, clear_db_serialized_dags
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index 98640aaf3d..b79095994a 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -32,7 +32,6 @@ from airflow.configuration import TEST_DAGS_FOLDER, conf
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessor,
DagFileProcessorProcess
from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance
-from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.operators.empty import EmptyOperator
@@ -40,6 +39,7 @@ from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
+from tests.test_utils.compat import ParseImportError
from tests.test_utils.config import conf_vars, env_vars
from tests.test_utils.db import (
clear_db_dags,
diff --git a/tests/listeners/class_listener.py
b/tests/listeners/class_listener.py
index a719d372bd..ececa85321 100644
--- a/tests/listeners/class_listener.py
+++ b/tests/listeners/class_listener.py
@@ -19,38 +19,70 @@ from __future__ import annotations
from airflow.listeners import hookimpl
from airflow.utils.state import DagRunState, TaskInstanceState
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
+if AIRFLOW_V_2_10_PLUS:
-class ClassBasedListener:
- def __init__(self):
- self.started_component = None
- self.stopped_component = None
- self.state = []
-
- @hookimpl
- def on_starting(self, component):
- self.started_component = component
- self.state.append(DagRunState.RUNNING)
-
- @hookimpl
- def before_stopping(self, component):
- global stopped_component
- stopped_component = component
- self.state.append(DagRunState.SUCCESS)
-
- @hookimpl
- def on_task_instance_running(self, previous_state, task_instance, session):
- self.state.append(TaskInstanceState.RUNNING)
-
- @hookimpl
- def on_task_instance_success(self, previous_state, task_instance, session):
- self.state.append(TaskInstanceState.SUCCESS)
-
- @hookimpl
- def on_task_instance_failed(
- self, previous_state, task_instance, error: None | str |
BaseException, session
- ):
- self.state.append(TaskInstanceState.FAILED)
+ class ClassBasedListener:
+ def __init__(self):
+ self.started_component = None
+ self.stopped_component = None
+ self.state = []
+
+ @hookimpl
+ def on_starting(self, component):
+ self.started_component = component
+ self.state.append(DagRunState.RUNNING)
+
+ @hookimpl
+ def before_stopping(self, component):
+ global stopped_component
+ stopped_component = component
+ self.state.append(DagRunState.SUCCESS)
+
+ @hookimpl
+ def on_task_instance_running(self, previous_state, task_instance,
session):
+ self.state.append(TaskInstanceState.RUNNING)
+
+ @hookimpl
+ def on_task_instance_success(self, previous_state, task_instance,
session):
+ self.state.append(TaskInstanceState.SUCCESS)
+
+ @hookimpl
+ def on_task_instance_failed(
+ self, previous_state, task_instance, error: None | str |
BaseException, session
+ ):
+ self.state.append(TaskInstanceState.FAILED)
+else:
+
+ class ClassBasedListener: # type: ignore[no-redef]
+ def __init__(self):
+ self.started_component = None
+ self.stopped_component = None
+ self.state = []
+
+ @hookimpl
+ def on_starting(self, component):
+ self.started_component = component
+ self.state.append(DagRunState.RUNNING)
+
+ @hookimpl
+ def before_stopping(self, component):
+ global stopped_component
+ stopped_component = component
+ self.state.append(DagRunState.SUCCESS)
+
+ @hookimpl
+ def on_task_instance_running(self, previous_state, task_instance,
session):
+ self.state.append(TaskInstanceState.RUNNING)
+
+ @hookimpl
+ def on_task_instance_success(self, previous_state, task_instance,
session):
+ self.state.append(TaskInstanceState.SUCCESS)
+
+ @hookimpl
+ def on_task_instance_failed(self, previous_state, task_instance,
session):
+ self.state.append(TaskInstanceState.FAILED)
def clear():
diff --git a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
index 0daae8811e..5c632ac1ba 100644
--- a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
+++ b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
@@ -17,7 +17,6 @@
from __future__ import annotations
import json
-from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import Mock
@@ -312,13 +311,10 @@ class TestAwsAuthManagerAmazonVerifiedPermissionsFacade:
user=test_user,
)
- def test_is_policy_store_schema_up_to_date_when_schema_up_to_date(self,
facade):
- schema_path = (
- Path(__file__)
- .parents[6]
- .joinpath("airflow", "providers", "amazon", "aws", "auth_manager",
"avp", "schema.json")
- .resolve()
- )
+ def test_is_policy_store_schema_up_to_date_when_schema_up_to_date(self,
facade, airflow_root_path):
+ schema_path = airflow_root_path.joinpath(
+ "airflow", "providers", "amazon", "aws", "auth_manager", "avp",
"schema.json"
+ ).resolve()
with open(schema_path) as schema_file:
avp_response = {"schema": schema_file.read()}
mock_get_schema = Mock(return_value=avp_response)
@@ -326,13 +322,10 @@ class TestAwsAuthManagerAmazonVerifiedPermissionsFacade:
assert facade.is_policy_store_schema_up_to_date()
- def test_is_policy_store_schema_up_to_date_when_schema_is_modified(self,
facade):
- schema_path = (
- Path(__file__)
- .parents[6]
- .joinpath("airflow", "providers", "amazon", "aws", "auth_manager",
"avp", "schema.json")
- .resolve()
- )
+ def test_is_policy_store_schema_up_to_date_when_schema_is_modified(self,
facade, airflow_root_path):
+ schema_path = airflow_root_path.joinpath(
+ "airflow", "providers", "amazon", "aws", "auth_manager", "avp",
"schema.json"
+ ).resolve()
with open(schema_path) as schema_file:
schema = json.loads(schema_file.read())
schema["new_field"] = "new_value"
diff --git a/tests/providers/amazon/aws/executors/batch/test_batch_executor.py
b/tests/providers/amazon/aws/executors/batch/test_batch_executor.py
index 8e0d20653b..cd6c2c87ac 100644
--- a/tests/providers/amazon/aws/executors/batch/test_batch_executor.py
+++ b/tests/providers/amazon/aws/executors/batch/test_batch_executor.py
@@ -42,6 +42,7 @@ from airflow.providers.amazon.aws.executors.batch.utils
import (
)
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.state import State
+from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
from tests.test_utils.config import conf_vars
ARN1 = "arn1"
@@ -655,6 +656,11 @@ class TestBatchExecutorConfig:
def teardown_method(self) -> None:
self._unset_conf()
+ @pytest.mark.skipif(
+ RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES,
+ reason="Config defaults are validated against provider.yaml so this
test "
+ "should only run when tests are run from sources",
+ )
def test_validate_config_defaults(self):
"""Assert that the defaults stated in the config.yml file match those
in utils.CONFIG_DEFAULTS."""
curr_dir = os.path.dirname(os.path.abspath(__file__))
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 6c6bef5f7c..b547f39833 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -54,6 +54,8 @@ from airflow.providers.amazon.aws.hooks.ecs import EcsHook
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.timezone import utcnow
+from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
@@ -367,6 +369,7 @@ class TestEcsExecutorTask:
class TestAwsEcsExecutor:
"""Tests the AWS ECS Executor."""
+ @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow
2.10+")
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state")
def test_execute(self, change_state_mock, mock_airflow_key, mock_executor):
"""Test execution from end-to-end."""
@@ -1205,8 +1208,18 @@ class TestEcsExecutorConfig:
nested_dict = {"a": "a", "b": "b", "c": {"d": "d"}}
assert _recursive_flatten_dict(nested_dict) == {"a": "a", "b": "b",
"d": "d"}
+ @pytest.mark.skipif(
+ RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES,
+ reason="Config defaults are validated against provider.yaml so this
test "
+ "should only run when tests are run from sources",
+ )
def test_validate_config_defaults(self):
- """Assert that the defaults stated in the config.yml file match those
in utils.CONFIG_DEFAULTS."""
+ """Assert that the defaults stated in the config.yml file match those
in utils.CONFIG_DEFAULTS.
+
+ This test should only be run to verify configuration defaults are the
same when it is run from
+ airflow sources, not when airflow is installed from packages, because
airflow installed from packages
+ will not have the provider.yml file.
+ """
curr_dir = os.path.dirname(os.path.abspath(__file__))
executor_path = "aws/executors/ecs"
config_filename = curr_dir.replace("tests",
"airflow").replace(executor_path, "provider.yaml")
diff --git a/tests/providers/amazon/aws/hooks/test_dynamodb.py
b/tests/providers/amazon/aws/hooks/test_dynamodb.py
index f3baba8b69..4e3e96c0dd 100644
--- a/tests/providers/amazon/aws/hooks/test_dynamodb.py
+++ b/tests/providers/amazon/aws/hooks/test_dynamodb.py
@@ -60,4 +60,4 @@ class TestDynamoDBHook:
def test_waiter_path_generated_from_resource_type(self, _):
hook = DynamoDBHook(aws_conn_id="aws_default")
path = hook.waiter_path
- assert
path.as_uri().endswith("/airflow/airflow/providers/amazon/aws/waiters/dynamodb.json")
+ assert
path.as_uri().endswith("/airflow/providers/amazon/aws/waiters/dynamodb.json")
diff --git a/tests/providers/amazon/aws/hooks/test_hooks_signature.py
b/tests/providers/amazon/aws/hooks/test_hooks_signature.py
index a6530f45a6..f253722618 100644
--- a/tests/providers/amazon/aws/hooks/test_hooks_signature.py
+++ b/tests/providers/amazon/aws/hooks/test_hooks_signature.py
@@ -65,7 +65,9 @@ ALLOWED_THICK_HOOKS_PARAMETERS: dict[str, set[str]] = {
def get_aws_hooks_modules():
"""Parse Amazon Provider metadata and find all hooks based on
`AwsGenericHook` and return it."""
- hooks_dir = Path(__file__).absolute().parents[5] / "airflow" / "providers"
/ "amazon" / "aws" / "hooks"
+ import airflow.providers.amazon.aws.hooks as aws_hooks
+
+ hooks_dir = Path(aws_hooks.__path__[0])
if not hooks_dir.exists():
msg = f"Amazon Provider hooks directory not found:
{hooks_dir.__fspath__()!r}"
raise FileNotFoundError(msg)
diff --git a/tests/providers/amazon/aws/links/test_base_aws.py
b/tests/providers/amazon/aws/links/test_base_aws.py
index 222be31458..446d584edf 100644
--- a/tests/providers/amazon/aws/links/test_base_aws.py
+++ b/tests/providers/amazon/aws/links/test_base_aws.py
@@ -203,9 +203,10 @@ class BaseAwsLinksTestCase:
"""Test: Operator links should exist for serialized DAG."""
self.create_op_and_ti(self.link_class, dag_id="test_link_serialize",
task_id=self.task_id)
serialized_dag = self.dag_maker.get_serialized_data()
- operator_extra_link =
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
error_message = "Operator links should exist for serialized DAG"
- assert operator_extra_link == [{self.full_qualname: {}}], error_message
+ assert operator_extra_link.name == self.link_class.name, error_message
def test_empty_xcom(self):
"""Test: Operator links should return empty string if no XCom value."""
diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py
b/tests/providers/amazon/aws/operators/test_emr_serverless.py
index be42cf63ba..b1344521b9 100644
--- a/tests/providers/amazon/aws/operators/test_emr_serverless.py
+++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py
@@ -41,6 +41,7 @@ from airflow.serialization.serialized_objects import (
BaseSerialization,
)
from airflow.utils.types import NOTSET
+from tests.test_utils.compat import deserialize_operator
if TYPE_CHECKING:
from unittest.mock import MagicMock
@@ -1119,7 +1120,7 @@ class TestEmrServerlessStartJobOperator:
)
ser_operator = BaseSerialization.serialize(operator)
- deser_operator = BaseSerialization.deserialize(ser_operator)
+ deser_operator = deserialize_operator(ser_operator)
assert deser_operator.operator_extra_links == [
EmrServerlessS3LogsLink(),
@@ -1140,7 +1141,7 @@ class TestEmrServerlessStartJobOperator:
)
ser_operator = BaseSerialization.serialize(operator)
- deser_operator = BaseSerialization.deserialize(ser_operator)
+ deser_operator = deserialize_operator(ser_operator)
assert deser_operator.operator_extra_links == [
EmrServerlessS3LogsLink(),
diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py
b/tests/providers/amazon/aws/utils/test_eks_get_token.py
index 8825c6c218..672ccfc9b3 100644
--- a/tests/providers/amazon/aws/utils/test_eks_get_token.py
+++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py
@@ -25,8 +25,6 @@ from unittest import mock
import pytest
import time_machine
-from tests.test_utils import AIRFLOW_MAIN_FOLDER
-
class TestGetEksToken:
@mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook")
@@ -65,13 +63,13 @@ class TestGetEksToken:
],
],
)
- def test_run(self, mock_eks_hook, args, expected_aws_conn_id,
expected_region_name):
+ def test_run(self, mock_eks_hook, args, expected_aws_conn_id,
expected_region_name, airflow_root_path):
(
mock_eks_hook.return_value.fetch_access_token_for_cluster.return_value
) = "k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t"
with mock.patch("sys.argv", args),
contextlib.redirect_stdout(StringIO()) as temp_stdout:
- os.chdir(AIRFLOW_MAIN_FOLDER)
+ os.chdir(airflow_root_path)
# We are not using run_module because of
https://github.com/pytest-dev/pytest/issues/9007
runpy.run_path("airflow/providers/amazon/aws/utils/eks_get_token.py",
run_name="__main__")
output = temp_stdout.getvalue()
diff --git a/tests/providers/apache/iceberg/hooks/test_iceberg.py
b/tests/providers/apache/iceberg/hooks/test_iceberg.py
index af58bb55b0..f11e1d4ea6 100644
--- a/tests/providers/apache/iceberg/hooks/test_iceberg.py
+++ b/tests/providers/apache/iceberg/hooks/test_iceberg.py
@@ -17,6 +17,8 @@
from __future__ import annotations
+from unittest.mock import Mock, patch
+
import pytest
import requests_mock
@@ -27,16 +29,22 @@ pytestmark = pytest.mark.db_test
def test_iceberg_hook():
access_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSU"
- with requests_mock.Mocker() as m:
- m.post(
- "https://api.iceberg.io/ws/v1/oauth/tokens",
- json={
- "access_token": access_token,
- "token_type": "Bearer",
- "expires_in": 86400,
- "warehouse_id": "fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
- "region": "us-west-2",
- "catalog_url":
"warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
- },
- )
- assert IcebergHook().get_conn() == access_token
+ with patch("airflow.models.Connection.get_connection_from_secrets") as
mock_get_connection:
+ mock_conn = Mock()
+ mock_conn.conn_id = "iceberg_default"
+ mock_conn.host = "https://api.iceberg.io/ws/v1"
+ mock_conn.extra_dejson = {}
+ mock_get_connection.return_value = mock_conn
+ with requests_mock.Mocker() as m:
+ m.post(
+ "https://api.iceberg.io/ws/v1/oauth/tokens",
+ json={
+ "access_token": access_token,
+ "token_type": "Bearer",
+ "expires_in": 86400,
+ "warehouse_id": "fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
+ "region": "us-west-2",
+ "catalog_url":
"warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
+ },
+ )
+ assert IcebergHook().get_conn() == access_token
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 224824edbc..8b7c238e9c 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -358,7 +358,8 @@ class TestKubernetesPodOperator:
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "0",
+ # Try number behaves differently on different versions of Airflow
+ "try_number": mock.ANY,
"airflow_version": mock.ANY,
"run_id": "test",
"airflow_kpo_in_cluster": str(in_cluster),
@@ -374,7 +375,7 @@ class TestKubernetesPodOperator:
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "0",
+ "try_number": mock.ANY,
"airflow_version": mock.ANY,
"run_id": "test",
"map_index": "10",
@@ -884,7 +885,7 @@ class TestKubernetesPodOperator:
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "0",
+ "try_number": mock.ANY,
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -920,7 +921,7 @@ class TestKubernetesPodOperator:
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "0",
+ "try_number": mock.ANY,
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -991,7 +992,7 @@ class TestKubernetesPodOperator:
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "0",
+ "try_number": mock.ANY,
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -1061,7 +1062,7 @@ class TestKubernetesPodOperator:
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "0",
+ "try_number": mock.ANY,
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
@@ -1112,7 +1113,7 @@ class TestKubernetesPodOperator:
"dag_id": "dag",
"kubernetes_pod_operator": "True",
"task_id": "task",
- "try_number": "0",
+ "try_number": mock.ANY,
"airflow_version": mock.ANY,
"airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
"run_id": "test",
diff --git a/tests/providers/cncf/kubernetes/test_template_rendering.py
b/tests/providers/cncf/kubernetes/test_template_rendering.py
index f3e61101ea..98764a2f1f 100644
--- a/tests/providers/cncf/kubernetes/test_template_rendering.py
+++ b/tests/providers/cncf/kubernetes/test_template_rendering.py
@@ -48,7 +48,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook,
create_task_instance):
"dag_id": "test_render_k8s_pod_yaml",
"run_id": "test_run_id",
"task_id": "op1",
- "try_number": "0",
+ "try_number": mock.ANY,
},
"labels": {
"airflow-worker": "0",
@@ -57,7 +57,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook,
create_task_instance):
"run_id": "test_run_id",
"kubernetes_executor": "True",
"task_id": "op1",
- "try_number": "0",
+ "try_number": mock.ANY,
},
"name": mock.ANY,
"namespace": "default",
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index d84218f8bd..4cfcb7fe87 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -731,7 +731,8 @@ class TestBigQueryOperator:
sql="SELECT * FROM test_table",
)
serialized_dag = dag_maker.get_serialized_data()
- assert "sql" in serialized_dag["dag"]["tasks"][0]["__var"]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ assert hasattr(deserialized_dag.tasks[0], "sql")
dag = SerializedDAG.from_dict(serialized_dag)
simple_task = dag.task_dict[TASK_ID]
@@ -740,11 +741,8 @@ class TestBigQueryOperator:
#########################################################
# Verify Operator Links work with Serialized Operator
#########################################################
-
- # Check Serialized version of operator link
- assert
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [
-
{"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink": {}}
- ]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ assert deserialized_dag.tasks[0].operator_extra_links[0].name ==
"BigQuery Console"
# Check DeSerialized version of operator link
assert isinstance(next(iter(simple_task.operator_extra_links)),
BigQueryConsoleLink)
@@ -768,7 +766,8 @@ class TestBigQueryOperator:
sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
)
serialized_dag = dag_maker.get_serialized_data()
- assert "sql" in serialized_dag["dag"]["tasks"][0]["__var"]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ assert hasattr(deserialized_dag.tasks[0], "sql")
dag = SerializedDAG.from_dict(serialized_dag)
simple_task = dag.task_dict[TASK_ID]
@@ -777,12 +776,10 @@ class TestBigQueryOperator:
#########################################################
# Verify Operator Links work with Serialized Operator
#########################################################
-
- # Check Serialized version of operator link
- assert
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [
-
{"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink":
{"index": 0}},
-
{"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink":
{"index": 1}},
- ]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_links = deserialized_dag.tasks[0].operator_extra_links
+ assert operator_extra_links[0].name == "BigQuery Console #1"
+ assert operator_extra_links[1].name == "BigQuery Console #2"
# Check DeSerialized version of operator link
assert isinstance(next(iter(simple_task.operator_extra_links)),
BigQueryConsoleIndexableLink)
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py
b/tests/providers/google/cloud/operators/test_dataproc.py
index 0d97cfe36c..c3d945c808 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -79,12 +79,12 @@ from airflow.providers.google.cloud.triggers.dataproc
import (
from airflow.providers.google.common.consts import
GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.timezone import datetime
-from airflow.version import version as airflow_version
+from tests.test_utils.compat import AIRFLOW_VERSION
from tests.test_utils.db import clear_db_runs, clear_db_xcom
-cluster_params = inspect.signature(ClusterGenerator.__init__).parameters
+AIRFLOW_VERSION_LABEL = "v" + str(AIRFLOW_VERSION).replace(".",
"-").replace("+", "-")
-AIRFLOW_VERSION = "v" + airflow_version.replace(".", "-").replace("+", "-")
+cluster_params = inspect.signature(ClusterGenerator.__init__).parameters
DATAPROC_PATH = "airflow.providers.google.cloud.operators.dataproc.{}"
DATAPROC_TRIGGERS_PATH = "airflow.providers.google.cloud.triggers.dataproc.{}"
@@ -325,9 +325,9 @@ CONFIG_WITH_GPU_ACCELERATOR = {
"endpoint_config": {},
}
-LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION}
+LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION_LABEL}
-LABELS.update({"airflow-version": "v" + airflow_version.replace(".",
"-").replace("+", "-")})
+LABELS.update({"airflow-version": AIRFLOW_VERSION_LABEL})
CLUSTER = {"project_id": "project_id", "cluster_name": CLUSTER_NAME, "config":
CONFIG, "labels": LABELS}
@@ -1064,11 +1064,10 @@ def test_create_cluster_operator_extra_links(dag_maker,
create_task_instance_of_
serialized_dag = dag_maker.get_serialized_data()
deserialized_dag = SerializedDAG.from_dict(serialized_dag)
deserialized_task = deserialized_dag.task_dict[TASK_ID]
-
# Assert operator links for serialized DAG
- assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
== [
- {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink":
{}}
- ]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+ assert operator_extra_link.name == "Dataproc Cluster"
# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0],
DataprocClusterLink)
@@ -1168,9 +1167,9 @@ def test_scale_cluster_operator_extra_links(dag_maker,
create_task_instance_of_o
deserialized_task = deserialized_dag.task_dict[TASK_ID]
# Assert operator links for serialized DAG
- assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
== [
- {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}}
- ]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+ assert operator_extra_link.name == "Dataproc resource"
# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink)
@@ -1562,10 +1561,10 @@ def test_submit_job_operator_extra_links(mock_hook,
dag_maker, create_task_insta
deserialized_dag = SerializedDAG.from_dict(serialized_dag)
deserialized_task = deserialized_dag.task_dict[TASK_ID]
- # Assert operator links for serialized_dag
- assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
== [
- {"airflow.providers.google.cloud.links.dataproc.DataprocJobLink": {}}
- ]
+ # Assert operator links for serialized DAG
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+ assert operator_extra_link.name == "Dataproc Job"
# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0],
DataprocJobLink)
@@ -1767,10 +1766,10 @@ def test_update_cluster_operator_extra_links(dag_maker,
create_task_instance_of_
deserialized_dag = SerializedDAG.from_dict(serialized_dag)
deserialized_task = deserialized_dag.task_dict[TASK_ID]
- # Assert operator links for serialized_dag
- assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
== [
- {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink":
{}}
- ]
+ # Assert operator links for serialized DAG
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+ assert operator_extra_link.name == "Dataproc Cluster"
# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0],
DataprocClusterLink)
@@ -1989,10 +1988,10 @@ def
test_instantiate_workflow_operator_extra_links(mock_hook, dag_maker, create_
deserialized_dag = SerializedDAG.from_dict(serialized_dag)
deserialized_task = deserialized_dag.task_dict[TASK_ID]
- # Assert operator links for serialized_dag
- assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
== [
- {"airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink":
{}}
- ]
+ # Assert operator links for serialized DAG
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+ assert operator_extra_link.name == "Dataproc Workflow"
# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0],
DataprocWorkflowLink)
@@ -2151,10 +2150,10 @@ def
test_instantiate_inline_workflow_operator_extra_links(
deserialized_dag = SerializedDAG.from_dict(serialized_dag)
deserialized_task = deserialized_dag.task_dict[TASK_ID]
- # Assert operator links for serialized_dag
- assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
== [
- {"airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink":
{}}
- ]
+ # Assert operator links for serialized DAG
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+ assert operator_extra_link.name == "Dataproc Workflow"
# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0],
DataprocWorkflowLink)
@@ -2182,7 +2181,7 @@ class TestDataProcHiveOperator:
job = {
"reference": {"project_id": GCP_PROJECT, "job_id":
f"{job_name}_{job_id}"},
"placement": {"cluster_name": "cluster-1"},
- "labels": {"airflow-version": AIRFLOW_VERSION},
+ "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
"hive_job": {"query_list": {"queries": [query]}, "script_variables":
variables},
}
@@ -2244,7 +2243,7 @@ class TestDataProcPigOperator:
job = {
"reference": {"project_id": GCP_PROJECT, "job_id":
f"{job_name}_{job_id}"},
"placement": {"cluster_name": "cluster-1"},
- "labels": {"airflow-version": AIRFLOW_VERSION},
+ "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
"pig_job": {"query_list": {"queries": [query]}, "script_variables":
variables},
}
@@ -2306,13 +2305,13 @@ class TestDataProcSparkSqlOperator:
job = {
"reference": {"project_id": GCP_PROJECT, "job_id":
f"{job_name}_{job_id}"},
"placement": {"cluster_name": "cluster-1"},
- "labels": {"airflow-version": AIRFLOW_VERSION},
+ "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
"spark_sql_job": {"query_list": {"queries": [query]},
"script_variables": variables},
}
other_project_job = {
"reference": {"project_id": "other-project", "job_id":
f"{job_name}_{job_id}"},
"placement": {"cluster_name": "cluster-1"},
- "labels": {"airflow-version": AIRFLOW_VERSION},
+ "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
"spark_sql_job": {"query_list": {"queries": [query]},
"script_variables": variables},
}
@@ -2410,7 +2409,7 @@ class TestDataProcSparkOperator(DataprocJobTestBase):
"job_id": f"{job_name}_{TEST_JOB_ID}",
},
"placement": {"cluster_name": "cluster-1"},
- "labels": {"airflow-version": AIRFLOW_VERSION},
+ "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
"spark_job": {"jar_file_uris": jars, "main_class": main_class},
}
@@ -2473,9 +2472,9 @@ def test_submit_spark_job_operator_extra_links(mock_hook,
dag_maker, create_task
deserialized_task = deserialized_dag.task_dict[TASK_ID]
# Assert operator links for serialized DAG
- assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
== [
- {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}}
- ]
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+ operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+ assert operator_extra_link.name == "Dataproc resource"
# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink)
@@ -2504,7 +2503,7 @@ class TestDataProcHadoopOperator:
job = {
"reference": {"project_id": GCP_PROJECT, "job_id":
f"{job_name}_{job_id}"},
"placement": {"cluster_name": "cluster-1"},
- "labels": {"airflow-version": AIRFLOW_VERSION},
+ "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
"hadoop_job": {"main_jar_file_uri": jar, "args": args},
}
@@ -2542,7 +2541,7 @@ class TestDataProcPySparkOperator:
job = {
"reference": {"project_id": GCP_PROJECT, "job_id":
f"{job_name}_{job_id}"},
"placement": {"cluster_name": "cluster-1"},
- "labels": {"airflow-version": AIRFLOW_VERSION},
+ "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
"pyspark_job": {"main_python_file_uri": uri},
}
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index d9fbb0dfd3..65ac9657c0 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -33,10 +33,20 @@ from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.listener import OpenLineageListener
from airflow.providers.openlineage.utils.selective_enable import
disable_lineage, enable_lineage
from airflow.utils.state import State
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
+EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0
+EXPECTED_TRY_NUMBER_2 = 2 if AIRFLOW_V_2_10_PLUS else 1
+
+TRY_NUMBER_BEFORE_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 1
+TRY_NUMBER_RUNNING = 0 if AIRFLOW_V_2_10_PLUS else 1
+TRY_NUMBER_FAILED = 0 if AIRFLOW_V_2_10_PLUS else 1
+TRY_NUMBER_SUCCESS = 0 if AIRFLOW_V_2_10_PLUS else 2
+TRY_NUMBER_AFTER_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 2
+
class TemplateOperator(BaseOperator):
template_fields = ["df"]
@@ -304,7 +314,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="dag_id.dag_run_run_id",
- run_id="dag_id.task_id.execution_date.1",
+ run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_1}",
task=listener.extractor_manager.extract_metadata(),
)
@@ -319,7 +329,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="dag_id.dag_run_run_id",
- run_id="dag_id.task_id.execution_date.2",
+ run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_2}",
task=listener.extractor_manager.extract_metadata(),
)
@@ -334,7 +344,9 @@ def
test_run_id_is_constant_across_all_methods(mocked_adapter):
"""
def mock_task_id(dag_id, task_id, execution_date, try_number):
- return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+ returned_try_number = try_number if AIRFLOW_V_2_10_PLUS else
max(try_number - 1, 1)
+
+ return f"{dag_id}.{task_id}.{execution_date}.{returned_try_number}"
listener, task_instance = _create_listener_and_task_instance()
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
@@ -344,7 +356,11 @@ def
test_run_id_is_constant_across_all_methods(mocked_adapter):
assert listener.adapter.start_task.call_args.kwargs["run_id"] ==
expected_run_id_1
listener.on_task_instance_failed(None, task_instance, None)
- assert listener.adapter.fail_task.call_args.kwargs["run_id"] ==
expected_run_id_1
+ assert (
+ listener.adapter.fail_task.call_args.kwargs["run_id"] ==
expected_run_id_1
+ if AIRFLOW_V_2_10_PLUS
+ else expected_run_id_2
+ )
# This run_id will not be different as we did NOT simulate increase of the
try_number attribute,
listener.on_task_instance_success(None, task_instance, None)
@@ -354,7 +370,11 @@ def
test_run_id_is_constant_across_all_methods(mocked_adapter):
# This is how airflow works, and that's why we expect the run_id to remain
constant across all methods.
task_instance.try_number += 1
listener.on_task_instance_success(None, task_instance, None)
- assert listener.adapter.complete_task.call_args.kwargs["run_id"] ==
expected_run_id_2
+ assert (
+ listener.adapter.complete_task.call_args.kwargs["run_id"] ==
expected_run_id_2
+ if AIRFLOW_V_2_10_PLUS
+ else expected_run_id_1
+ )
def test_running_task_correctly_calls_openlineage_adapter_run_id_method():
@@ -406,7 +426,7 @@ def
test_successful_task_correctly_calls_openlineage_adapter_run_id_method(mock_
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
- try_number=1,
+ try_number=EXPECTED_TRY_NUMBER_1,
)
@@ -431,16 +451,16 @@ def
test_listener_on_task_instance_failed_is_called_before_try_number_increment(
_, task_instance = _create_test_dag_and_task(fail_callable, "failure")
# try_number before execution
- assert task_instance.try_number == 0
+ assert task_instance.try_number == TRY_NUMBER_BEFORE_EXECUTION
with suppress(CustomError):
task_instance.run()
# try_number at the moment of function being called
- assert captured_try_numbers["running"] == 0
- assert captured_try_numbers["failed"] == 0
+ assert captured_try_numbers["running"] == TRY_NUMBER_RUNNING
+ assert captured_try_numbers["failed"] == TRY_NUMBER_FAILED
# try_number after task has been executed
- assert task_instance.try_number == 0
+ assert task_instance.try_number == TRY_NUMBER_AFTER_EXECUTION
@mock.patch("airflow.models.taskinstance.get_listener_manager")
@@ -460,15 +480,15 @@ def
test_listener_on_task_instance_success_is_called_after_try_number_increment(
_, task_instance = _create_test_dag_and_task(success_callable, "success")
# try_number before execution
- assert task_instance.try_number == 0
+ assert task_instance.try_number == TRY_NUMBER_BEFORE_EXECUTION
task_instance.run()
# try_number at the moment of function being called
- assert captured_try_numbers["running"] == 0
- assert captured_try_numbers["success"] == 0
+ assert captured_try_numbers["running"] == TRY_NUMBER_RUNNING
+ assert captured_try_numbers["success"] == TRY_NUMBER_SUCCESS
# try_number after task has been executed
- assert task_instance.try_number == 0
+ assert task_instance.try_number == TRY_NUMBER_AFTER_EXECUTION
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
diff --git a/tests/providers/openlineage/plugins/test_openlineage.py
b/tests/providers/openlineage/plugins/test_openlineage.py
index 409c8461e8..8d9cc9602b 100644
--- a/tests/providers/openlineage/plugins/test_openlineage.py
+++ b/tests/providers/openlineage/plugins/test_openlineage.py
@@ -24,9 +24,13 @@ from unittest.mock import patch
import pytest
from airflow.providers.openlineage.conf import config_path, is_disabled,
transport
+from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
from tests.test_utils.config import conf_vars
[email protected](
+ RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES, reason="Plugin initialization is
done early in case of packages"
+)
class TestOpenLineageProviderPlugin:
def setup_method(self):
is_disabled.cache_clear()
diff --git a/tests/providers/smtp/notifications/test_smtp.py
b/tests/providers/smtp/notifications/test_smtp.py
index b19cc4baa8..98fd7387e7 100644
--- a/tests/providers/smtp/notifications/test_smtp.py
+++ b/tests/providers/smtp/notifications/test_smtp.py
@@ -31,6 +31,7 @@ from airflow.providers.smtp.notifications.smtp import (
send_smtp_notification,
)
from airflow.utils import timezone
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
@@ -38,6 +39,9 @@ pytestmark = pytest.mark.db_test
SMTP_API_DEFAULT_CONN_ID = SmtpHook.default_conn_name
+NUM_TRY = 0 if AIRFLOW_V_2_10_PLUS else 1
+
+
class TestSmtpNotifier:
@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier(self, mock_smtphook_hook, dag_maker):
@@ -129,7 +133,7 @@ class TestSmtpNotifier:
from_email=conf.get("smtp", "smtp_mail_from"),
to="[email protected]",
subject="DAG dag - Task op - Run ID test in State None",
- html_content="""<!DOCTYPE html>\n<html>\n <head>\n <meta
http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta
name="viewport" content="width=device-width">\n </head>\n<body>\n <table
role="presentation">\n \n <tr>\n <td>Run ID:</td>\n
<td>test</td>\n </tr>\n <tr>\n <td>Try:</td>\n
<td>0 of 1</td>\n </tr>\n <tr>\n <td>Task
State:</td>\n [...]
+ html_content=f"""<!DOCTYPE html>\n<html>\n <head>\n
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n
<meta name="viewport" content="width=device-width">\n </head>\n<body>\n
<table role="presentation">\n \n <tr>\n <td>Run
ID:</td>\n <td>test</td>\n </tr>\n <tr>\n
<td>Try:</td>\n <td>{NUM_TRY} of 1</td>\n </tr>\n
<tr>\n <td>Task State:</ [...]
smtp_conn_id="smtp_default",
files=None,
cc=None,
diff --git a/tests/test_utils/compat.py b/tests/test_utils/compat.py
new file mode 100644
index 0000000000..5724ec1d4e
--- /dev/null
+++ b/tests/test_utils/compat.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from importlib.metadata import version
+from typing import TYPE_CHECKING, Any, cast
+
+from packaging.version import Version
+
+from airflow.models import Operator
+
+try:
+ # ImportError has been renamed to ParseImportError in airflow 2.10.0, and
since our provider tests should
+ # run on all supported versions of Airflow, this compatibility shim falls
back to the old ImportError so
+ # that tests can import it from here and use it in their code and run
against older versions of Airflow
+ # This import can be removed (and all tests switched to import
ParseImportError directly) as soon as
+ # all providers are updated to airflow 2.10+.
+ from airflow.models.errors import ParseImportError
+except ImportError:
+ from airflow.models.errors import ImportError as ParseImportError # type:
ignore[no-redef]
+
+from airflow import __version__ as airflow_version
+
+AIRFLOW_VERSION = Version(airflow_version)
+AIRFLOW_V_2_7_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.7.0")
+AIRFLOW_V_2_8_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.8.0")
+AIRFLOW_V_2_9_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.9.0")
+AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >=
Version("2.10.0")
+
+
+def deserialize_operator(serialized_operator: dict[str, Any]) -> Operator:
+ if AIRFLOW_V_2_10_PLUS:
+ # In airflow 2.10+ we can deserialize operator using regular
deserialize method.
+ # We do not need to use deserialize_operator method explicitly but
some tests are deserializing the
+ # operator and in the future they could use regular ``deserialize``
method. This method is a shim
+ # to make deserialization of operator works for tests run against
older Airflow versions and tests
+ # should use that method instead of calling
``BaseSerialization.deserialize`` directly.
+ # We can remove this method and switch to the regular ``deserialize``
method as long as all providers
+ # are updated to airflow 2.10+.
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ return cast(Operator,
BaseSerialization.deserialize(serialized_operator))
+ else:
+ from airflow.serialization.serialized_objects import
SerializedBaseOperator
+
+ return SerializedBaseOperator.deserialize_operator(serialized_operator)
diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py
index 783362b8b1..1c2b871b19 100644
--- a/tests/test_utils/db.py
+++ b/tests/test_utils/db.py
@@ -36,7 +36,6 @@ from airflow.models import (
XCom,
)
from airflow.models.dag import DagOwnerAttributes
-from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.models.dagcode import DagCode
from airflow.models.dagwarning import DagWarning
from airflow.models.dataset import (
@@ -46,12 +45,12 @@ from airflow.models.dataset import (
DatasetModel,
TaskOutletDatasetReference,
)
-from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.fab.auth_manager.models import Permission, Resource,
assoc_permission_role
from airflow.security.permissions import RESOURCE_DAG_PREFIX
from airflow.utils.db import add_default_pool_if_not_exists,
create_default_connections, reflect_tables
from airflow.utils.session import create_session
+from tests.test_utils.compat import ParseImportError
def clear_db_runs():
@@ -172,6 +171,8 @@ def clear_db_task_reschedule():
def clear_db_dag_parsing_requests():
with create_session() as session:
+ from airflow.models.dagbag import DagPriorityParsingRequest
+
session.query(DagPriorityParsingRequest).delete()