This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 81a82d8481 Run unit tests for Providers with airflow installed as 
package. (#39513)
81a82d8481 is described below

commit 81a82d848100acf95fc4764030f02bbdde9832fd
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed May 15 16:16:09 2024 -0400

    Run unit tests for Providers with airflow installed as package. (#39513)
    
    This PR adds the option of running unit tests for providers against
    a specific airflow version (for example released version in PyPI)
    and enables it for back-compatibility testing for 2.9.1. In the
    future it could be used to run forward-compatibility testing with
    Airflow 3 as well.
---
 .github/workflows/check-providers.yml              |  23 +-
 .github/workflows/ci.yml                           |   3 +
 Dockerfile.ci                                      |   2 +
 airflow/providers/openlineage/conf.py              |   1 -
 .../providers/openlineage/plugins/openlineage.py   |   3 +
 contributing-docs/testing/unit_tests.rst           | 607 +++++++++++++--------
 dev/breeze/src/airflow_breeze/global_constants.py  |   8 +
 .../src/airflow_breeze/utils/selective_checks.py   |  13 +-
 dev/breeze/tests/test_selective_checks.py          |  28 +-
 pyproject.toml                                     |   3 +
 scripts/docker/entrypoint_ci.sh                    |   2 +
 scripts/in_container/run_ci_tests.sh               |   9 +
 .../endpoints/test_import_error_endpoint.py        |   2 +-
 tests/api_connexion/schemas/test_error_schema.py   |   2 +-
 tests/api_experimental/common/test_delete_dag.py   |   2 +-
 tests/conftest.py                                  |  20 +-
 tests/dag_processing/test_job_runner.py            |   2 +-
 tests/dag_processing/test_processor.py             |   2 +-
 tests/listeners/class_listener.py                  |  92 +++-
 .../amazon/aws/auth_manager/avp/test_facade.py     |  23 +-
 .../aws/executors/batch/test_batch_executor.py     |   6 +
 .../amazon/aws/executors/ecs/test_ecs_executor.py  |  15 +-
 tests/providers/amazon/aws/hooks/test_dynamodb.py  |   2 +-
 .../amazon/aws/hooks/test_hooks_signature.py       |   4 +-
 tests/providers/amazon/aws/links/test_base_aws.py  |   5 +-
 .../amazon/aws/operators/test_emr_serverless.py    |   5 +-
 .../amazon/aws/utils/test_eks_get_token.py         |   6 +-
 .../providers/apache/iceberg/hooks/test_iceberg.py |  34 +-
 .../cncf/kubernetes/operators/test_pod.py          |  15 +-
 .../cncf/kubernetes/test_template_rendering.py     |   4 +-
 .../google/cloud/operators/test_bigquery.py        |  23 +-
 .../google/cloud/operators/test_dataproc.py        |  75 ++-
 .../providers/openlineage/plugins/test_listener.py |  48 +-
 .../openlineage/plugins/test_openlineage.py        |   4 +
 tests/providers/smtp/notifications/test_smtp.py    |   6 +-
 tests/test_utils/compat.py                         |  60 ++
 tests/test_utils/db.py                             |   5 +-
 37 files changed, 766 insertions(+), 398 deletions(-)

diff --git a/.github/workflows/check-providers.yml 
b/.github/workflows/check-providers.yml
index d71b8d678d..8bac79f6fd 100644
--- a/.github/workflows/check-providers.yml
+++ b/.github/workflows/check-providers.yml
@@ -43,7 +43,11 @@ on:  # yamllint disable-line rule:truthy
       providers-compatibility-checks:
         description: >
           JSON-formatted array of providers compatibility checks in the form 
of array of dicts
-          (airflow-version, python-versions, remove-providers)
+          (airflow-version, python-versions, remove-providers, run-tests)
+        required: true
+        type: string
+      providers-test-types-list-as-string:
+        description: "List of parallel provider test types as string"
         required: true
         type: string
       skip-provider-tests:
@@ -237,6 +241,9 @@ jobs:
       - name: >
           Install and verify all provider packages and airflow on
           Airflow ${{ matrix.airflow-version }}:Python ${{ 
matrix.python-version }}
+        # We do not need to run import check if we run tests, the tests should 
cover all the import checks
+        # automatically
+        if: matrix.run-tests != 'true'
         run: >
           breeze release-management verify-provider-packages
           --use-packages-from-dist
@@ -245,3 +252,17 @@ jobs:
           --airflow-constraints-reference 
constraints-${{matrix.airflow-version}}
           --providers-skip-constraints
           --install-airflow-with-constraints
+      - name: >
+          Run provider unit tests on
+          Airflow ${{ matrix.airflow-version }}:Python ${{ 
matrix.python-version }}
+        if: matrix.run-tests == 'true'
+        run: >
+          breeze testing tests --run-in-parallel
+          --parallel-test-types "${{ 
inputs.providers-test-types-list-as-string }}"
+          --use-packages-from-dist
+          --package-format wheel
+          --use-airflow-version "${{ matrix.airflow-version }}"
+          --airflow-constraints-reference 
constraints-${{matrix.airflow-version}}
+          --install-airflow-with-constraints
+          --providers-skip-constraints
+          --skip-providers "${{ matrix.remove-providers }}"
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 00155070d1..0502df3368 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -77,6 +77,8 @@ jobs:
       full-tests-needed: ${{ steps.selective-checks.outputs.full-tests-needed 
}}
       parallel-test-types-list-as-string: >-
         ${{ steps.selective-checks.outputs.parallel-test-types-list-as-string 
}}
+      providers-test-types-list-as-string: >-
+        ${{ steps.selective-checks.outputs.providers-test-types-list-as-string 
}}
       include-success-outputs: ${{ 
steps.selective-checks.outputs.include-success-outputs }}
       postgres-exclude: ${{ steps.selective-checks.outputs.postgres-exclude }}
       mysql-exclude: ${{ steps.selective-checks.outputs.mysql-exclude }}
@@ -315,6 +317,7 @@ jobs:
       providers-compatibility-checks: ${{ 
needs.build-info.outputs.providers-compatibility-checks }}
       skip-provider-tests: ${{ needs.build-info.outputs.skip-provider-tests }}
       python-versions: ${{ needs.build-info.outputs.python-versions }}
+      providers-test-types-list-as-string: ${{ 
needs.build-info.outputs.providers-test-types-list-as-string }}
 
   tests-helm:
     name: "Helm tests"
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 7b9392f291..ff95970371 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -984,6 +984,8 @@ function determine_airflow_to_use() {
         mkdir -p "${AIRFLOW_SOURCES}"/tmp/
     else
         python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
+        # Some packages might leave legacy typing module which causes test 
issues
+        pip uninstall -y typing || true
     fi
 
     if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && 
"${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
diff --git a/airflow/providers/openlineage/conf.py 
b/airflow/providers/openlineage/conf.py
index 23e663f67e..f79511a22d 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -100,7 +100,6 @@ def is_disabled() -> bool:
     option = os.getenv("OPENLINEAGE_DISABLED", "")
     if _is_true(option):
         return True
-
     # Check if both 'transport' and 'config_path' are not present and also
     # if legacy 'OPENLINEAGE_URL' environment variables is not set
     return transport() == {} and config_path(True) == "" and 
os.getenv("OPENLINEAGE_URL", "") == ""
diff --git a/airflow/providers/openlineage/plugins/openlineage.py 
b/airflow/providers/openlineage/plugins/openlineage.py
index 5927929588..5b4f2c3dbf 100644
--- a/airflow/providers/openlineage/plugins/openlineage.py
+++ b/airflow/providers/openlineage/plugins/openlineage.py
@@ -39,3 +39,6 @@ class OpenLineageProviderPlugin(AirflowPlugin):
     if not conf.is_disabled():
         macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, 
lineage_parent_id]
         listeners = [get_openlineage_listener()]
+    else:
+        macros = []
+        listeners = []
diff --git a/contributing-docs/testing/unit_tests.rst 
b/contributing-docs/testing/unit_tests.rst
index e9a7263a4e..c7eaed99e2 100644
--- a/contributing-docs/testing/unit_tests.rst
+++ b/contributing-docs/testing/unit_tests.rst
@@ -427,75 +427,75 @@ You can see details about the limitation `here 
<https://pytest-xdist.readthedocs
 
 The error in this case will look similar to:
 
-  .. code-block::
+.. code-block::
 
-     Different tests were collected between gw0 and gw7. The difference is:
+   Different tests were collected between gw0 and gw7. The difference is:
 
 
 The fix for that is to sort the parameters in ``parametrize``. For example 
instead of this:
 
-  .. code-block:: python
+.. code-block:: python
 
-     @pytest.mark.parametrize("status", ALL_STATES)
-     def test_method():
-         ...
+   @pytest.mark.parametrize("status", ALL_STATES)
+   def test_method():
+       ...
 
 
 do that:
 
 
-  .. code-block:: python
+.. code-block:: python
 
-     @pytest.mark.parametrize("status", sorted(ALL_STATES))
-     def test_method():
-         ...
+   @pytest.mark.parametrize("status", sorted(ALL_STATES))
+   def test_method():
+       ...
 
 Similarly if your parameters are defined as result of utcnow() or other 
dynamic method - you should
 avoid that, or assign unique IDs for those parametrized tests. Instead of this:
 
-  .. code-block:: python
-
-     @pytest.mark.parametrize(
-         "url, expected_dag_run_ids",
-         [
-             (
-                 f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
-                 f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
-                 [],
-             ),
-             (
-                 f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
-                 f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
-                 ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
-             ),
-         ],
-     )
-     def test_end_date_gte_lte(url, expected_dag_run_ids):
-         ...
+.. code-block:: python
+
+   @pytest.mark.parametrize(
+       "url, expected_dag_run_ids",
+       [
+           (
+               f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
+               f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
+               [],
+           ),
+           (
+               f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
+               f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
+               ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
+           ),
+       ],
+   )
+   def test_end_date_gte_lte(url, expected_dag_run_ids):
+       ...
 
 Do this:
 
-  .. code-block:: python
-
-     @pytest.mark.parametrize(
-         "url, expected_dag_run_ids",
-         [
-             pytest.param(
-                 f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
-                 f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
-                 [],
-                 id="end_date_gte",
-             ),
-             pytest.param(
-                 f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
-                 f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
-                 ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
-                 id="end_date_lte",
-             ),
-         ],
-     )
-     def test_end_date_gte_lte(url, expected_dag_run_ids):
-         ...
+.. code-block:: python
+
+   @pytest.mark.parametrize(
+       "url, expected_dag_run_ids",
+       [
+           pytest.param(
+               f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_gte="
+               f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
+               [],
+               id="end_date_gte",
+           ),
+           pytest.param(
+               f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
+               f"{urllib.parse.quote((timezone.utcnow() + 
timedelta(days=1)).isoformat())}",
+               ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
+               id="end_date_lte",
+           ),
+       ],
+   )
+   def test_end_date_gte_lte(url, expected_dag_run_ids):
+       ...
 
 
 
@@ -515,107 +515,107 @@ Moving object creation from top-level to inside tests. 
This code will break coll
 the test is marked as DB test:
 
 
-  .. code-block:: python
-
-     TI = TaskInstance(
-         task=BashOperator(task_id="test", bash_command="true", 
dag=DAG(dag_id="id"), start_date=datetime.now()),
-         run_id="fake_run",
-         state=State.RUNNING,
-     )
-
-
-     class TestCallbackRequest:
-         @pytest.mark.parametrize(
-             "input,request_class",
-             [
-                 (CallbackRequest(full_filepath="filepath", 
msg="task_failure"), CallbackRequest),
-                 (
-                     TaskCallbackRequest(
-                         full_filepath="filepath",
-                         
simple_task_instance=SimpleTaskInstance.from_ti(ti=TI),
-                         processor_subdir="/test_dir",
-                         is_failure_callback=True,
-                     ),
-                     TaskCallbackRequest,
-                 ),
-                 (
-                     DagCallbackRequest(
-                         full_filepath="filepath",
-                         dag_id="fake_dag",
-                         run_id="fake_run",
-                         processor_subdir="/test_dir",
-                         is_failure_callback=False,
-                     ),
-                     DagCallbackRequest,
-                 ),
-                 (
-                     SlaCallbackRequest(
-                         full_filepath="filepath",
-                         dag_id="fake_dag",
-                         processor_subdir="/test_dir",
-                     ),
-                     SlaCallbackRequest,
-                 ),
-             ],
-         )
-         def test_from_json(self, input, request_class):
-             ...
+.. code-block:: python
+
+   TI = TaskInstance(
+       task=BashOperator(task_id="test", bash_command="true", 
dag=DAG(dag_id="id"), start_date=datetime.now()),
+       run_id="fake_run",
+       state=State.RUNNING,
+   )
+
+
+   class TestCallbackRequest:
+       @pytest.mark.parametrize(
+           "input,request_class",
+           [
+               (CallbackRequest(full_filepath="filepath", msg="task_failure"), 
CallbackRequest),
+               (
+                   TaskCallbackRequest(
+                       full_filepath="filepath",
+                       simple_task_instance=SimpleTaskInstance.from_ti(ti=TI),
+                       processor_subdir="/test_dir",
+                       is_failure_callback=True,
+                   ),
+                   TaskCallbackRequest,
+               ),
+               (
+                   DagCallbackRequest(
+                       full_filepath="filepath",
+                       dag_id="fake_dag",
+                       run_id="fake_run",
+                       processor_subdir="/test_dir",
+                       is_failure_callback=False,
+                   ),
+                   DagCallbackRequest,
+               ),
+               (
+                   SlaCallbackRequest(
+                       full_filepath="filepath",
+                       dag_id="fake_dag",
+                       processor_subdir="/test_dir",
+                   ),
+                   SlaCallbackRequest,
+               ),
+           ],
+       )
+       def test_from_json(self, input, request_class):
+           ...
 
 
 Instead - this will not break collection. The TaskInstance is not initialized 
when the module is parsed,
 it will only be initialized when the test gets executed because we moved 
initialization of it from
 top level / parametrize to inside the test:
 
-  .. code-block:: python
+.. code-block:: python
 
-    pytestmark = pytest.mark.db_test
+  pytestmark = pytest.mark.db_test
 
 
-    class TestCallbackRequest:
-        @pytest.mark.parametrize(
-            "input,request_class",
-            [
-                (CallbackRequest(full_filepath="filepath", 
msg="task_failure"), CallbackRequest),
-                (
-                    None,  # to be generated when test is run
-                    TaskCallbackRequest,
-                ),
-                (
-                    DagCallbackRequest(
-                        full_filepath="filepath",
-                        dag_id="fake_dag",
-                        run_id="fake_run",
-                        processor_subdir="/test_dir",
-                        is_failure_callback=False,
-                    ),
-                    DagCallbackRequest,
-                ),
-                (
-                    SlaCallbackRequest(
-                        full_filepath="filepath",
-                        dag_id="fake_dag",
-                        processor_subdir="/test_dir",
-                    ),
-                    SlaCallbackRequest,
-                ),
-            ],
-        )
-        def test_from_json(self, input, request_class):
-            if input is None:
-                ti = TaskInstance(
-                    task=BashOperator(
-                        task_id="test", bash_command="true", 
dag=DAG(dag_id="id"), start_date=datetime.now()
-                    ),
-                    run_id="fake_run",
-                    state=State.RUNNING,
-                )
-
-                input = TaskCallbackRequest(
-                    full_filepath="filepath",
-                    simple_task_instance=SimpleTaskInstance.from_ti(ti=ti),
-                    processor_subdir="/test_dir",
-                    is_failure_callback=True,
-                )
+  class TestCallbackRequest:
+      @pytest.mark.parametrize(
+          "input,request_class",
+          [
+              (CallbackRequest(full_filepath="filepath", msg="task_failure"), 
CallbackRequest),
+              (
+                  None,  # to be generated when test is run
+                  TaskCallbackRequest,
+              ),
+              (
+                  DagCallbackRequest(
+                      full_filepath="filepath",
+                      dag_id="fake_dag",
+                      run_id="fake_run",
+                      processor_subdir="/test_dir",
+                      is_failure_callback=False,
+                  ),
+                  DagCallbackRequest,
+              ),
+              (
+                  SlaCallbackRequest(
+                      full_filepath="filepath",
+                      dag_id="fake_dag",
+                      processor_subdir="/test_dir",
+                  ),
+                  SlaCallbackRequest,
+              ),
+          ],
+      )
+      def test_from_json(self, input, request_class):
+          if input is None:
+              ti = TaskInstance(
+                  task=BashOperator(
+                      task_id="test", bash_command="true", 
dag=DAG(dag_id="id"), start_date=datetime.now()
+                  ),
+                  run_id="fake_run",
+                  state=State.RUNNING,
+              )
+
+              input = TaskCallbackRequest(
+                  full_filepath="filepath",
+                  simple_task_instance=SimpleTaskInstance.from_ti(ti=ti),
+                  processor_subdir="/test_dir",
+                  is_failure_callback=True,
+              )
 
 
 Sometimes it is difficult to rewrite the tests, so you might add conditional 
handling and mock out some
@@ -624,119 +624,119 @@ will hit the Database while parsing the tests, because 
this is what Variable.set
 parametrize specification is being parsed - even if test is marked as DB test.
 
 
-  .. code-block:: python
-
-      from airflow.models.variable import Variable
-
-      pytestmark = pytest.mark.db_test
+.. code-block:: python
 
-      initial_db_init()
+    from airflow.models.variable import Variable
 
+    pytestmark = pytest.mark.db_test
 
-      @pytest.mark.parametrize(
-          "env, expected",
-          [
-              pytest.param(
-                  {"plain_key": "plain_value"},
-                  "{'plain_key': 'plain_value'}",
-                  id="env-plain-key-val",
-              ),
-              pytest.param(
-                  {"plain_key": Variable.setdefault("plain_var", "banana")},
-                  "{'plain_key': 'banana'}",
-                  id="env-plain-key-plain-var",
-              ),
-              pytest.param(
-                  {"plain_key": Variable.setdefault("secret_var", "monkey")},
-                  "{'plain_key': '***'}",
-                  id="env-plain-key-sensitive-var",
-              ),
-              pytest.param(
-                  {"plain_key": "{{ var.value.plain_var }}"},
-                  "{'plain_key': '{{ var.value.plain_var }}'}",
-                  id="env-plain-key-plain-tpld-var",
-              ),
-          ],
-      )
-      def test_rendered_task_detail_env_secret(patch_app, admin_client, 
request, env, expected):
-          ...
+    initial_db_init()
+
+
+    @pytest.mark.parametrize(
+        "env, expected",
+        [
+            pytest.param(
+                {"plain_key": "plain_value"},
+                "{'plain_key': 'plain_value'}",
+                id="env-plain-key-val",
+            ),
+            pytest.param(
+                {"plain_key": Variable.setdefault("plain_var", "banana")},
+                "{'plain_key': 'banana'}",
+                id="env-plain-key-plain-var",
+            ),
+            pytest.param(
+                {"plain_key": Variable.setdefault("secret_var", "monkey")},
+                "{'plain_key': '***'}",
+                id="env-plain-key-sensitive-var",
+            ),
+            pytest.param(
+                {"plain_key": "{{ var.value.plain_var }}"},
+                "{'plain_key': '{{ var.value.plain_var }}'}",
+                id="env-plain-key-plain-tpld-var",
+            ),
+        ],
+    )
+    def test_rendered_task_detail_env_secret(patch_app, admin_client, request, 
env, expected):
+        ...
 
 
 You can make the code conditional and mock out the Variable to avoid hitting 
the database.
 
 
-  .. code-block:: python
-
-      from airflow.models.variable import Variable
-
-      pytestmark = pytest.mark.db_test
+.. code-block:: python
 
+    from airflow.models.variable import Variable
 
-      if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
-          # Handle collection of the test by non-db case
-          Variable = mock.MagicMock()  # type: ignore[misc] # noqa: F811
-      else:
-          initial_db_init()
+    pytestmark = pytest.mark.db_test
 
 
-      @pytest.mark.parametrize(
-          "env, expected",
-          [
-              pytest.param(
-                  {"plain_key": "plain_value"},
-                  "{'plain_key': 'plain_value'}",
-                  id="env-plain-key-val",
-              ),
-              pytest.param(
-                  {"plain_key": Variable.setdefault("plain_var", "banana")},
-                  "{'plain_key': 'banana'}",
-                  id="env-plain-key-plain-var",
-              ),
-              pytest.param(
-                  {"plain_key": Variable.setdefault("secret_var", "monkey")},
-                  "{'plain_key': '***'}",
-                  id="env-plain-key-sensitive-var",
-              ),
-              pytest.param(
-                  {"plain_key": "{{ var.value.plain_var }}"},
-                  "{'plain_key': '{{ var.value.plain_var }}'}",
-                  id="env-plain-key-plain-tpld-var",
-              ),
-          ],
-      )
-      def test_rendered_task_detail_env_secret(patch_app, admin_client, 
request, env, expected):
-          ...
+    if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
+        # Handle collection of the test by non-db case
+        Variable = mock.MagicMock()  # type: ignore[misc] # noqa: F811
+    else:
+        initial_db_init()
+
+
+    @pytest.mark.parametrize(
+        "env, expected",
+        [
+            pytest.param(
+                {"plain_key": "plain_value"},
+                "{'plain_key': 'plain_value'}",
+                id="env-plain-key-val",
+            ),
+            pytest.param(
+                {"plain_key": Variable.setdefault("plain_var", "banana")},
+                "{'plain_key': 'banana'}",
+                id="env-plain-key-plain-var",
+            ),
+            pytest.param(
+                {"plain_key": Variable.setdefault("secret_var", "monkey")},
+                "{'plain_key': '***'}",
+                id="env-plain-key-sensitive-var",
+            ),
+            pytest.param(
+                {"plain_key": "{{ var.value.plain_var }}"},
+                "{'plain_key': '{{ var.value.plain_var }}'}",
+                id="env-plain-key-plain-tpld-var",
+            ),
+        ],
+    )
+    def test_rendered_task_detail_env_secret(patch_app, admin_client, request, 
env, expected):
+        ...
 
 You can also use fixture to create object that needs database just like this.
 
 
-  .. code-block:: python
+.. code-block:: python
 
-      from airflow.models import Connection
+    from airflow.models import Connection
 
-      pytestmark = pytest.mark.db_test
+    pytestmark = pytest.mark.db_test
 
 
-      @pytest.fixture()
-      def get_connection1():
-          return Connection()
+    @pytest.fixture()
+    def get_connection1():
+        return Connection()
 
 
-      @pytest.fixture()
-      def get_connection2():
-          return Connection(host="apache.org", extra={})
+    @pytest.fixture()
+    def get_connection2():
+        return Connection(host="apache.org", extra={})
 
 
-      @pytest.mark.parametrize(
-          "conn",
-          [
-              "get_connection1",
-              "get_connection2",
-          ],
-      )
-      def test_as_json_from_connection(self, conn: Connection):
-          conn = request.getfixturevalue(conn)
-          ...
+    @pytest.mark.parametrize(
+        "conn",
+        [
+            "get_connection1",
+            "get_connection2",
+        ],
+    )
+    def test_as_json_from_connection(self, conn: Connection):
+        conn = request.getfixturevalue(conn)
+        ...
 
 
 Running Unit tests
@@ -1061,7 +1061,7 @@ Those tests are marked with ``@pytest.mark.quarantined`` 
annotation.
 Those tests are skipped by default. You can enable them with 
``--include-quarantined`` flag. You
 can also decide to only run tests with ``-m quarantined`` flag to run only 
those tests.
 
-Running Tests with provider packages
+Running provider compatibility tests
 ....................................
 
 Airflow 2.0 introduced the concept of splitting the monolithic Airflow package 
into separate
@@ -1108,6 +1108,129 @@ This prepares airflow .whl package in the dist folder.
 
      breeze --use-airflow-version wheel --use-packages-from-dist 
--mount-sources skip
 
+Compatibility Provider unit tests against older airflow releases
+................................................................
+
+Our CI runs provider tests for providers with previous compatible airflow 
releases. This allows to check
+if the providers still work when installed for older airflow versions.
+
+.. note::
+
+  For now it's done for 2.9.1 version only.
+
+Those tests can be used to test compatibility of the providers with past and 
future releases of airflow.
+For example it could be used to run latest provider versions with released or 
main
+Airflow 3 if they are developed independently.
+
+The tests use the current source version of ``tests`` folder - so care should 
be taken that the tests
+implemented for providers in the sources allow to run it against previous 
versions of Airflow and
+against Airflow installed from package rather than from the sources.
+
+This can be reproduced locally building providers from tag/commit of the 
airflow repository.
+
+1. Make sure to build latest Breeze ci image
+
+.. code-block:: bash
+
+   breeze ci-image build --python 3.8
+
+2. Build providers from latest sources:
+
+.. code-block:: bash
+
+   rm dist/*
+   breeze release-management prepare-provider-packages 
--include-not-ready-providers \
+      --version-suffix-for-pypi dev0 --package-format wheel
+
+3. Prepare provider constraints
+
+.. code-block:: bash
+
+   breeze release-management generate-constraints --airflow-constraints-mode 
constraints-source-providers --answer yes
+
+3. Enter breeze environment, installing selected airflow version and the 
provider packages prepared from main
+
+.. code-block::bash
+
+  breeze shell --use-packages-from-dist --package-format wheel\
+   --use-airflow-version 2.9.1 --airflow-constraints-reference 
constraints-2.9.1 \
+   --install-airflow-with-constraints \
+   --providers-skip-constraints \
+   --mount-sources tests
+
+4. You can then run tests as usual:
+
+.. code-block::bash
+
+   pytest tests/providers/<provider>/test.py
+
+5. Iterate with the tests
+
+The tests are run using:
+
+* airflow installed from PyPI
+* tests coming from the current airflow sources (they are mounted inside the 
breeze image)
+* provider packages built from the current airflow sources and placed in dist
+
+This means that you can modify and run tests and re-run them, but if you want 
to modify provider code
+you need to exit breeze, rebuild the provider package and restart breeze using 
the command above.
+
+Rebuilding single provider package can be done using this command:
+
+.. code-block::bash
+
+  breeze release-management prepare-provider-packages \
+    --version-suffix-for-pypi dev0 --package-format wheel <provider>
+
+
+Note that some of the tests if written without taking care about the 
compatibility, might not work with older
+versions of Airflow - this is because of refactorings, renames, and tests 
relying on internals of Airflow that
+are not part of the public API. We deal with it in one of the following ways:
+
+1) If the whole provider is supposed to only work for later airflow version, 
we remove the whole provider
+   by excluding it from compatibility test configuration (see below)
+
+2) Some compatibility shims are defined in ``tests/test_utils/compat.py`` - 
and they can be used to make the
+   tests compatible - for example importing ``ParseImportError`` after the 
exception has been renamed from
+   ``ImportError`` and it would fail in Airflow 2.9, but we have a fallback 
import in ``compat.py`` that
+   falls back to old import automatically, so all tests testing / expecting 
``ParseImportError`` should import
+   it from the ``tests.tests_utils.compat`` module. There are few other 
compatibility shims defined there and
+   you can add more if needed in a similar way.
+
+3) If only some tests are not compatible and use features that are available 
only in newer airflow version,
+    we can mark those tests with appropriate ``AIRFLOW_V_2_X_PLUS`` boolean 
constant defined in ``compat.py``
+    For example:
+
+.. code-block::python
+
+  from tests.test_utils.compat import AIRFLOW_V_2_7_PLUS
+
+  @pytest.mark.skip(not AIRFLOW_V_2_7_PLUS, reason="The tests should be 
skipped for Airflow < 2.7")
+  def some_test_that_only_works_for_airflow_2_7_plus():
+    pass
+
+4) Sometimes, the tests should only be run when airflow is installed from the 
sources. In this case you can
+   add conditional ``skipif`` markerfor 
``RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES`` to the test. For example:
+
+.. code-block::python
+
+  @pytest.mark.skipif(RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES,
+                      reason="Plugin initialization is done early in case of 
packages")
+  def test_plugin():
+     pass
+
+
+How providers compatibility tests are run in CI?
+-------------------------------------------------
+
+We run a set of back-compatibility tests based on the configuration specified 
in the
+``BASE_PROVIDERS_COMPATIBILITY_CHECKS`` constant in the 
``./dev/breeze/src/airflow_breeze/global_constants.py``
+file - where we specify:
+* python version
+* airflow version
+* which providers should be removed (exclusions)
+* whether to run tests
+
 Other Settings
 --------------
 
@@ -1169,7 +1292,7 @@ to **ignore**, e.g. set ``PYTHONWARNINGS`` environment 
variable to ``ignore``.
 
 .. code-block:: bash
 
-     pytest tests/core/ --disable-capture-warnings
+    pytest tests/core/ --disable-capture-warnings
 
 Code Coverage
 -------------
@@ -1186,19 +1309,19 @@ a. Initiate a breeze shell.
 
 b. Execute one of the commands below based on the desired coverage area:
 
-   - **Core:** ``python scripts/cov/core_coverage.py``
-   - **REST API:** ``python scripts/cov/restapi_coverage.py``
-   - **CLI:** ``python scripts/cov/cli_coverage.py``
-   - **Webserver:** ``python scripts/cov/www_coverage.py``
+- **Core:** ``python scripts/cov/core_coverage.py``
+- **REST API:** ``python scripts/cov/restapi_coverage.py``
+- **CLI:** ``python scripts/cov/cli_coverage.py``
+- **Webserver:** ``python scripts/cov/www_coverage.py``
 
 c. After execution, the coverage report will be available at: 
http://localhost:28000/dev/coverage/index.html.
 
- .. note::
+.. note::
 
-     In order to see the coverage report, you must start webserver first in 
breeze environment via the
-     `airflow webserver`. Once you enter `breeze`, you can start `tmux`  
(terminal multiplexer) and
-     split the terminal (by pressing `ctrl-B "` for example) to continue 
testing and run the webserver
-     in one terminal and run tests in the second one (you can switch between 
the terminals with `ctrl-B <arrow>`).
+   In order to see the coverage report, you must start webserver first in 
breeze environment via the
+   ``airflow webserver``. Once you enter ``breeze``, you can start ``tmux``  
(terminal multiplexer) and
+   split the terminal (by pressing ``ctrl-B "`` for example) to continue 
testing and run the webserver
+   in one terminal and run tests in the second one (you can switch between the 
terminals with ``ctrl-B <arrow>``).
 
 Modules Not Fully Covered:
 ..........................
diff --git a/dev/breeze/src/airflow_breeze/global_constants.py 
b/dev/breeze/src/airflow_breeze/global_constants.py
index 66cde22e70..8442450565 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -476,11 +476,19 @@ BASE_PROVIDERS_COMPATIBILITY_CHECKS: list[dict[str, str]] 
= [
         "python-version": "3.8",
         "airflow-version": "2.7.1",
         "remove-providers": _exclusion(["common.io", "fab"]),
+        "run-tests": "false",
     },
     {
         "python-version": "3.8",
         "airflow-version": "2.8.0",
         "remove-providers": _exclusion(["fab"]),
+        "run-tests": "false",
+    },
+    {
+        "python-version": "3.8",
+        "airflow-version": "2.9.1",
+        "remove-providers": _exclusion([]),
+        "run-tests": "true",
     },
 ]
 
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py 
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 6b42d91346..193271f34c 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -83,6 +83,8 @@ ALL_CI_SELECTIVE_TEST_TYPES = (
     "PythonVenv Serialization WWW"
 )
 
+ALL_PROVIDERS_SELECTIVE_TEST_TYPES = "Providers[-amazon,google] 
Providers[amazon] Providers[google]"
+
 
 class FileGroupForCi(Enum):
     ENVIRONMENT_FILES = "environment_files"
@@ -816,6 +818,15 @@ class SelectiveChecks:
         self._extract_long_provider_tests(current_test_types)
         return " ".join(sorted(current_test_types))
 
+    @cached_property
+    def providers_test_types_list_as_string(self) -> str | None:
+        all_test_types = self.parallel_test_types_list_as_string
+        if all_test_types is None:
+            return None
+        return " ".join(
+            test_type for test_type in all_test_types.split(" ") if 
test_type.startswith("Providers")
+        )
+
     @cached_property
     def include_success_outputs(
         self,
@@ -828,7 +839,7 @@ class SelectiveChecks:
 
     @staticmethod
     def _print_diff(old_lines: list[str], new_lines: list[str]):
-        diff = "\n".join([line for line in difflib.ndiff(old_lines, new_lines) 
if line and line[0] in "+-?"])
+        diff = "\n".join(line for line in difflib.ndiff(old_lines, new_lines) 
if line and line[0] in "+-?")
         get_console().print(diff)
 
     @cached_property
diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 964c8c3487..81789de714 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -30,7 +30,11 @@ from airflow_breeze.global_constants import (
     DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
     GithubEvents,
 )
-from airflow_breeze.utils.selective_checks import ALL_CI_SELECTIVE_TEST_TYPES, 
SelectiveChecks
+from airflow_breeze.utils.selective_checks import (
+    ALL_CI_SELECTIVE_TEST_TYPES,
+    ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
+    SelectiveChecks,
+)
 
 ANSI_COLORS_MATCHER = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]")
 
@@ -114,6 +118,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "mypy-docs,mypy-providers,ts-compile-format-lint-www",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": None,
+                    "providers-test-types-list-as-string": None,
                     "needs-mypy": "false",
                     "mypy-folders": "[]",
                 },
@@ -139,6 +144,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "mypy-docs,mypy-providers,ts-compile-format-lint-www",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "API Always 
Providers[fab]",
+                    "providers-test-types-list-as-string": "Providers[fab]",
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow']",
                 },
@@ -164,6 +170,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "mypy-docs,mypy-providers,ts-compile-format-lint-www",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always Operators",
+                    "providers-test-types-list-as-string": "",
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow']",
                 },
@@ -190,6 +197,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always 
BranchExternalPython BranchPythonVenv "
                     "ExternalPython Operators PythonVenv",
+                    "providers-test-types-list-as-string": "",
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow']",
                 },
@@ -215,6 +223,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "mypy-docs,mypy-providers,ts-compile-format-lint-www",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always 
Serialization",
+                    "providers-test-types-list-as-string": "",
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow']",
                 },
@@ -245,6 +254,8 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "API Always 
Providers[amazon] "
                     "Providers[common.sql,fab,openlineage,pgvector,postgres] 
Providers[google]",
+                    "providers-test-types-list-as-string": "Providers[amazon] "
+                    "Providers[common.sql,fab,openlineage,pgvector,postgres] 
Providers[google]",
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers']",
                 },
@@ -271,6 +282,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "run-kubernetes-tests": "false",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always 
Providers[apache.beam] Providers[google]",
+                    "providers-test-types-list-as-string": 
"Providers[apache.beam] Providers[google]",
                     "needs-mypy": "true",
                     "mypy-folders": "['providers']",
                 },
@@ -297,6 +309,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "run-kubernetes-tests": "false",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": None,
+                    "providers-test-types-list-as-string": None,
                     "needs-mypy": "false",
                     "mypy-folders": "[]",
                 },
@@ -327,6 +340,8 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always 
Providers[amazon] "
                     "Providers[common.sql,openlineage,pgvector,postgres] 
Providers[google]",
+                    "providers-test-types-list-as-string": "Providers[amazon] "
+                    "Providers[common.sql,openlineage,pgvector,postgres] 
Providers[google]",
                     "needs-mypy": "true",
                     "mypy-folders": "['providers']",
                 },
@@ -359,6 +374,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always "
                     
"Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http] 
Providers[amazon]",
+                    "providers-test-types-list-as-string": 
"Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http] 
Providers[amazon]",
                     "needs-mypy": "true",
                     "mypy-folders": "['providers']",
                 },
@@ -389,6 +405,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "run-kubernetes-tests": "true",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always 
Providers[airbyte,http]",
+                    "providers-test-types-list-as-string": 
"Providers[airbyte,http]",
                     "needs-mypy": "true",
                     "mypy-folders": "['providers']",
                 },
@@ -420,6 +437,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "run-kubernetes-tests": "true",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": "Always",
+                    "providers-test-types-list-as-string": "",
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow']",
                 },
@@ -446,6 +464,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "true",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
@@ -472,6 +491,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "true",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
@@ -778,6 +798,7 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
@@ -811,6 +832,7 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
@@ -844,6 +866,7 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
@@ -878,6 +901,7 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
@@ -912,6 +936,7 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
@@ -943,6 +968,7 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
                     "skip-pre-commits": 
"identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers",
                     "upgrade-to-newer-dependencies": "false",
                     "parallel-test-types-list-as-string": 
ALL_CI_SELECTIVE_TEST_TYPES,
+                    "providers-test-types-list-as-string": 
ALL_PROVIDERS_SELECTIVE_TEST_TYPES,
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'providers', 'docs', 'dev']",
                 },
diff --git a/pyproject.toml b/pyproject.toml
index fadfa13b46..e1bd7e846d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -410,6 +410,9 @@ combine-as-imports = true
 # https://github.com/apache/airflow/issues/39252
 "airflow/providers/amazon/aws/hooks/eks.py" = ["W605"]
 
+# Test compat imports banned imports to allow testing against older airflow 
versions
+"tests/test_utils/compat.py" = ["TID251", "F401"]
+
 [tool.ruff.lint.flake8-tidy-imports]
 # Disallow all relative imports.
 ban-relative-imports = "all"
diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh
index a0738dc1d6..3c1699acbc 100755
--- a/scripts/docker/entrypoint_ci.sh
+++ b/scripts/docker/entrypoint_ci.sh
@@ -205,6 +205,8 @@ function determine_airflow_to_use() {
         mkdir -p "${AIRFLOW_SOURCES}"/tmp/
     else
         python "${IN_CONTAINER_DIR}/install_airflow_and_providers.py"
+        # Some packages might leave legacy typing module which causes test 
issues
+        pip uninstall -y typing || true
     fi
 
     if [[ "${USE_AIRFLOW_VERSION}" =~ ^2\.2\..*|^2\.1\..*|^2\.0\..* && 
"${AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=}" != "" ]]; then
diff --git a/scripts/in_container/run_ci_tests.sh 
b/scripts/in_container/run_ci_tests.sh
index 3d6b5a6a68..98ff9ddef9 100755
--- a/scripts/in_container/run_ci_tests.sh
+++ b/scripts/in_container/run_ci_tests.sh
@@ -60,6 +60,15 @@ if [[ ${TEST_TYPE:=} == "Quarantined" ]]; then
     fi
 fi
 
+if [[ ${CI:="false"} == "true" && ${RES} != "0" && ${USE_AIRFLOW_VERSION=} != 
"" ]]; then
+    echo
+    echo "${COLOR_YELLOW}Failing compatibility test of providers for for 
${USE_AIRFLOW_VERSION} Airflow and you need to make sure it passes for it as 
well or deal with compatibility.${COLOR_RESET}"
+    echo
+    echo "${COLOR_BLUE}Read more on how to run the test locally and how to 
deal with Provider's compatibility with older Airflow versions 
at:${COLOR_RESET}"
+    echo 
"https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#running-provider-compatibility-tests";
+    echo
+fi
+
 if [[ ${CI:="false"} == "true" || ${CI} == "True" ]]; then
     if [[ ${RES} != "0" ]]; then
         echo
diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py 
b/tests/api_connexion/endpoints/test_import_error_endpoint.py
index ce084165d2..4549b74ae9 100644
--- a/tests/api_connexion/endpoints/test_import_error_endpoint.py
+++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py
@@ -22,11 +22,11 @@ import pytest
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
 from airflow.models.dag import DagModel
-from airflow.models.errors import ParseImportError
 from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import provide_session
 from tests.test_utils.api_connexion_utils import assert_401, create_user, 
delete_user
+from tests.test_utils.compat import ParseImportError
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_dags, clear_db_import_errors
 
diff --git a/tests/api_connexion/schemas/test_error_schema.py 
b/tests/api_connexion/schemas/test_error_schema.py
index 7056417e66..8604bee516 100644
--- a/tests/api_connexion/schemas/test_error_schema.py
+++ b/tests/api_connexion/schemas/test_error_schema.py
@@ -23,9 +23,9 @@ from airflow.api_connexion.schemas.error_schema import (
     import_error_collection_schema,
     import_error_schema,
 )
-from airflow.models.errors import ParseImportError
 from airflow.utils import timezone
 from airflow.utils.session import provide_session
+from tests.test_utils.compat import ParseImportError
 from tests.test_utils.db import clear_db_import_errors
 
 pytestmark = pytest.mark.db_test
diff --git a/tests/api_experimental/common/test_delete_dag.py 
b/tests/api_experimental/common/test_delete_dag.py
index 961d04dd37..4dc5f9b00f 100644
--- a/tests/api_experimental/common/test_delete_dag.py
+++ b/tests/api_experimental/common/test_delete_dag.py
@@ -23,7 +23,6 @@ from airflow.api.common.delete_dag import delete_dag
 from airflow.exceptions import AirflowException, DagNotFound
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagrun import DagRun as DR
-from airflow.models.errors import ParseImportError as IE
 from airflow.models.log import Log
 from airflow.models.taskfail import TaskFail
 from airflow.models.taskinstance import TaskInstance as TI
@@ -33,6 +32,7 @@ from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
+from tests.test_utils.compat import ParseImportError as IE
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 
 pytestmark = pytest.mark.db_test
diff --git a/tests/conftest.py b/tests/conftest.py
index efff93f58b..63e5e40018 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -334,8 +334,12 @@ def initial_db_init():
     from airflow.utils import db
     from airflow.www.extensions.init_appbuilder import init_appbuilder
     from airflow.www.extensions.init_auth_manager import get_auth_manager
+    from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
 
-    db.resetdb(use_migration_files=True)
+    if AIRFLOW_V_2_10_PLUS:
+        db.resetdb(use_migration_files=True)
+    else:
+        db.resetdb()
     db.bootstrap_dagbag()
     # minimal app to add roles
     flask_app = Flask(__name__)
@@ -1292,6 +1296,20 @@ def _disable_redact(request: pytest.FixtureRequest, 
mocker):
     return
 
 
[email protected]
+def airflow_root_path() -> Path:
+    import airflow
+
+    return Path(airflow.__path__[0]).parent
+
+
+# This constant is set to True if tests are run with Airflow installed from 
Packages rather than running
+# the tests within Airflow sources. While most tests in CI are run using 
Airflow sources, there are
+# also compatibility tests that only use `tests` package and run against 
installed packages of Airflow in
+# for supported Airflow versions.
+RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES = not (Path(__file__).parents[1] / 
"airflow" / "__init__.py").exists()
+
+
 if TYPE_CHECKING:
     # Static checkers do not know about pytest fixtures' types and return,
     # In case if them distributed through third party packages.
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index 0ebb6466aa..8e2bfbde62 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -54,13 +54,13 @@ from airflow.jobs.dag_processor_job_runner import 
DagProcessorJobRunner
 from airflow.jobs.job import Job
 from airflow.models import DagBag, DagModel, DbCallbackRequest
 from airflow.models.dagcode import DagCode
-from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import timezone
 from airflow.utils.net import get_hostname
 from airflow.utils.session import create_session
 from tests.core.test_logging_config import SETTINGS_FILE_VALID, 
settings_context
 from tests.models import TEST_DAGS_FOLDER
+from tests.test_utils.compat import ParseImportError
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_callbacks, clear_db_dags, 
clear_db_runs, clear_db_serialized_dags
 
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index 98640aaf3d..b79095994a 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -32,7 +32,6 @@ from airflow.configuration import TEST_DAGS_FOLDER, conf
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessor, 
DagFileProcessorProcess
 from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance
-from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.operators.empty import EmptyOperator
@@ -40,6 +39,7 @@ from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
+from tests.test_utils.compat import ParseImportError
 from tests.test_utils.config import conf_vars, env_vars
 from tests.test_utils.db import (
     clear_db_dags,
diff --git a/tests/listeners/class_listener.py 
b/tests/listeners/class_listener.py
index a719d372bd..ececa85321 100644
--- a/tests/listeners/class_listener.py
+++ b/tests/listeners/class_listener.py
@@ -19,38 +19,70 @@ from __future__ import annotations
 
 from airflow.listeners import hookimpl
 from airflow.utils.state import DagRunState, TaskInstanceState
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
 
+if AIRFLOW_V_2_10_PLUS:
 
-class ClassBasedListener:
-    def __init__(self):
-        self.started_component = None
-        self.stopped_component = None
-        self.state = []
-
-    @hookimpl
-    def on_starting(self, component):
-        self.started_component = component
-        self.state.append(DagRunState.RUNNING)
-
-    @hookimpl
-    def before_stopping(self, component):
-        global stopped_component
-        stopped_component = component
-        self.state.append(DagRunState.SUCCESS)
-
-    @hookimpl
-    def on_task_instance_running(self, previous_state, task_instance, session):
-        self.state.append(TaskInstanceState.RUNNING)
-
-    @hookimpl
-    def on_task_instance_success(self, previous_state, task_instance, session):
-        self.state.append(TaskInstanceState.SUCCESS)
-
-    @hookimpl
-    def on_task_instance_failed(
-        self, previous_state, task_instance, error: None | str | 
BaseException, session
-    ):
-        self.state.append(TaskInstanceState.FAILED)
+    class ClassBasedListener:
+        def __init__(self):
+            self.started_component = None
+            self.stopped_component = None
+            self.state = []
+
+        @hookimpl
+        def on_starting(self, component):
+            self.started_component = component
+            self.state.append(DagRunState.RUNNING)
+
+        @hookimpl
+        def before_stopping(self, component):
+            global stopped_component
+            stopped_component = component
+            self.state.append(DagRunState.SUCCESS)
+
+        @hookimpl
+        def on_task_instance_running(self, previous_state, task_instance, 
session):
+            self.state.append(TaskInstanceState.RUNNING)
+
+        @hookimpl
+        def on_task_instance_success(self, previous_state, task_instance, 
session):
+            self.state.append(TaskInstanceState.SUCCESS)
+
+        @hookimpl
+        def on_task_instance_failed(
+            self, previous_state, task_instance, error: None | str | 
BaseException, session
+        ):
+            self.state.append(TaskInstanceState.FAILED)
+else:
+
+    class ClassBasedListener:  # type: ignore[no-redef]
+        def __init__(self):
+            self.started_component = None
+            self.stopped_component = None
+            self.state = []
+
+        @hookimpl
+        def on_starting(self, component):
+            self.started_component = component
+            self.state.append(DagRunState.RUNNING)
+
+        @hookimpl
+        def before_stopping(self, component):
+            global stopped_component
+            stopped_component = component
+            self.state.append(DagRunState.SUCCESS)
+
+        @hookimpl
+        def on_task_instance_running(self, previous_state, task_instance, 
session):
+            self.state.append(TaskInstanceState.RUNNING)
+
+        @hookimpl
+        def on_task_instance_success(self, previous_state, task_instance, 
session):
+            self.state.append(TaskInstanceState.SUCCESS)
+
+        @hookimpl
+        def on_task_instance_failed(self, previous_state, task_instance, 
session):
+            self.state.append(TaskInstanceState.FAILED)
 
 
 def clear():
diff --git a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py 
b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
index 0daae8811e..5c632ac1ba 100644
--- a/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
+++ b/tests/providers/amazon/aws/auth_manager/avp/test_facade.py
@@ -17,7 +17,6 @@
 from __future__ import annotations
 
 import json
-from pathlib import Path
 from typing import TYPE_CHECKING
 from unittest.mock import Mock
 
@@ -312,13 +311,10 @@ class TestAwsAuthManagerAmazonVerifiedPermissionsFacade:
                 user=test_user,
             )
 
-    def test_is_policy_store_schema_up_to_date_when_schema_up_to_date(self, 
facade):
-        schema_path = (
-            Path(__file__)
-            .parents[6]
-            .joinpath("airflow", "providers", "amazon", "aws", "auth_manager", 
"avp", "schema.json")
-            .resolve()
-        )
+    def test_is_policy_store_schema_up_to_date_when_schema_up_to_date(self, 
facade, airflow_root_path):
+        schema_path = airflow_root_path.joinpath(
+            "airflow", "providers", "amazon", "aws", "auth_manager", "avp", 
"schema.json"
+        ).resolve()
         with open(schema_path) as schema_file:
             avp_response = {"schema": schema_file.read()}
             mock_get_schema = Mock(return_value=avp_response)
@@ -326,13 +322,10 @@ class TestAwsAuthManagerAmazonVerifiedPermissionsFacade:
 
             assert facade.is_policy_store_schema_up_to_date()
 
-    def test_is_policy_store_schema_up_to_date_when_schema_is_modified(self, 
facade):
-        schema_path = (
-            Path(__file__)
-            .parents[6]
-            .joinpath("airflow", "providers", "amazon", "aws", "auth_manager", 
"avp", "schema.json")
-            .resolve()
-        )
+    def test_is_policy_store_schema_up_to_date_when_schema_is_modified(self, 
facade, airflow_root_path):
+        schema_path = airflow_root_path.joinpath(
+            "airflow", "providers", "amazon", "aws", "auth_manager", "avp", 
"schema.json"
+        ).resolve()
         with open(schema_path) as schema_file:
             schema = json.loads(schema_file.read())
             schema["new_field"] = "new_value"
diff --git a/tests/providers/amazon/aws/executors/batch/test_batch_executor.py 
b/tests/providers/amazon/aws/executors/batch/test_batch_executor.py
index 8e0d20653b..cd6c2c87ac 100644
--- a/tests/providers/amazon/aws/executors/batch/test_batch_executor.py
+++ b/tests/providers/amazon/aws/executors/batch/test_batch_executor.py
@@ -42,6 +42,7 @@ from airflow.providers.amazon.aws.executors.batch.utils 
import (
 )
 from airflow.utils.helpers import convert_camel_to_snake
 from airflow.utils.state import State
+from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
 from tests.test_utils.config import conf_vars
 
 ARN1 = "arn1"
@@ -655,6 +656,11 @@ class TestBatchExecutorConfig:
     def teardown_method(self) -> None:
         self._unset_conf()
 
+    @pytest.mark.skipif(
+        RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES,
+        reason="Config defaults are validated against provider.yaml so this 
test "
+        "should only run when tests are run from sources",
+    )
     def test_validate_config_defaults(self):
         """Assert that the defaults stated in the config.yml file match those 
in utils.CONFIG_DEFAULTS."""
         curr_dir = os.path.dirname(os.path.abspath(__file__))
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py 
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 6c6bef5f7c..b547f39833 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -54,6 +54,8 @@ from airflow.providers.amazon.aws.hooks.ecs import EcsHook
 from airflow.utils.helpers import convert_camel_to_snake
 from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.timezone import utcnow
+from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
 from tests.test_utils.config import conf_vars
 
 pytestmark = pytest.mark.db_test
@@ -367,6 +369,7 @@ class TestEcsExecutorTask:
 class TestAwsEcsExecutor:
     """Tests the AWS ECS Executor."""
 
+    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow 
2.10+")
     
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state")
     def test_execute(self, change_state_mock, mock_airflow_key, mock_executor):
         """Test execution from end-to-end."""
@@ -1205,8 +1208,18 @@ class TestEcsExecutorConfig:
         nested_dict = {"a": "a", "b": "b", "c": {"d": "d"}}
         assert _recursive_flatten_dict(nested_dict) == {"a": "a", "b": "b", 
"d": "d"}
 
+    @pytest.mark.skipif(
+        RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES,
+        reason="Config defaults are validated against provider.yaml so this 
test "
+        "should only run when tests are run from sources",
+    )
     def test_validate_config_defaults(self):
-        """Assert that the defaults stated in the config.yml file match those 
in utils.CONFIG_DEFAULTS."""
+        """Assert that the defaults stated in the config.yml file match those 
in utils.CONFIG_DEFAULTS.
+
+        This test should only be run to verify configuration defaults are the 
same when it is run from
+        airflow sources, not when airflow is installed from packages, because 
airflow installed from packages
+        will not have the provider.yml file.
+        """
         curr_dir = os.path.dirname(os.path.abspath(__file__))
         executor_path = "aws/executors/ecs"
         config_filename = curr_dir.replace("tests", 
"airflow").replace(executor_path, "provider.yaml")
diff --git a/tests/providers/amazon/aws/hooks/test_dynamodb.py 
b/tests/providers/amazon/aws/hooks/test_dynamodb.py
index f3baba8b69..4e3e96c0dd 100644
--- a/tests/providers/amazon/aws/hooks/test_dynamodb.py
+++ b/tests/providers/amazon/aws/hooks/test_dynamodb.py
@@ -60,4 +60,4 @@ class TestDynamoDBHook:
     def test_waiter_path_generated_from_resource_type(self, _):
         hook = DynamoDBHook(aws_conn_id="aws_default")
         path = hook.waiter_path
-        assert 
path.as_uri().endswith("/airflow/airflow/providers/amazon/aws/waiters/dynamodb.json")
+        assert 
path.as_uri().endswith("/airflow/providers/amazon/aws/waiters/dynamodb.json")
diff --git a/tests/providers/amazon/aws/hooks/test_hooks_signature.py 
b/tests/providers/amazon/aws/hooks/test_hooks_signature.py
index a6530f45a6..f253722618 100644
--- a/tests/providers/amazon/aws/hooks/test_hooks_signature.py
+++ b/tests/providers/amazon/aws/hooks/test_hooks_signature.py
@@ -65,7 +65,9 @@ ALLOWED_THICK_HOOKS_PARAMETERS: dict[str, set[str]] = {
 
 def get_aws_hooks_modules():
     """Parse Amazon Provider metadata and find all hooks based on 
`AwsGenericHook` and return it."""
-    hooks_dir = Path(__file__).absolute().parents[5] / "airflow" / "providers" 
/ "amazon" / "aws" / "hooks"
+    import airflow.providers.amazon.aws.hooks as aws_hooks
+
+    hooks_dir = Path(aws_hooks.__path__[0])
     if not hooks_dir.exists():
         msg = f"Amazon Provider hooks directory not found: 
{hooks_dir.__fspath__()!r}"
         raise FileNotFoundError(msg)
diff --git a/tests/providers/amazon/aws/links/test_base_aws.py 
b/tests/providers/amazon/aws/links/test_base_aws.py
index 222be31458..446d584edf 100644
--- a/tests/providers/amazon/aws/links/test_base_aws.py
+++ b/tests/providers/amazon/aws/links/test_base_aws.py
@@ -203,9 +203,10 @@ class BaseAwsLinksTestCase:
         """Test: Operator links should exist for serialized DAG."""
         self.create_op_and_ti(self.link_class, dag_id="test_link_serialize", 
task_id=self.task_id)
         serialized_dag = self.dag_maker.get_serialized_data()
-        operator_extra_link = 
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"]
+        deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+        operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
         error_message = "Operator links should exist for serialized DAG"
-        assert operator_extra_link == [{self.full_qualname: {}}], error_message
+        assert operator_extra_link.name == self.link_class.name, error_message
 
     def test_empty_xcom(self):
         """Test: Operator links should return empty string if no XCom value."""
diff --git a/tests/providers/amazon/aws/operators/test_emr_serverless.py 
b/tests/providers/amazon/aws/operators/test_emr_serverless.py
index be42cf63ba..b1344521b9 100644
--- a/tests/providers/amazon/aws/operators/test_emr_serverless.py
+++ b/tests/providers/amazon/aws/operators/test_emr_serverless.py
@@ -41,6 +41,7 @@ from airflow.serialization.serialized_objects import (
     BaseSerialization,
 )
 from airflow.utils.types import NOTSET
+from tests.test_utils.compat import deserialize_operator
 
 if TYPE_CHECKING:
     from unittest.mock import MagicMock
@@ -1119,7 +1120,7 @@ class TestEmrServerlessStartJobOperator:
         )
 
         ser_operator = BaseSerialization.serialize(operator)
-        deser_operator = BaseSerialization.deserialize(ser_operator)
+        deser_operator = deserialize_operator(ser_operator)
 
         assert deser_operator.operator_extra_links == [
             EmrServerlessS3LogsLink(),
@@ -1140,7 +1141,7 @@ class TestEmrServerlessStartJobOperator:
         )
 
         ser_operator = BaseSerialization.serialize(operator)
-        deser_operator = BaseSerialization.deserialize(ser_operator)
+        deser_operator = deserialize_operator(ser_operator)
 
         assert deser_operator.operator_extra_links == [
             EmrServerlessS3LogsLink(),
diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py 
b/tests/providers/amazon/aws/utils/test_eks_get_token.py
index 8825c6c218..672ccfc9b3 100644
--- a/tests/providers/amazon/aws/utils/test_eks_get_token.py
+++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py
@@ -25,8 +25,6 @@ from unittest import mock
 import pytest
 import time_machine
 
-from tests.test_utils import AIRFLOW_MAIN_FOLDER
-
 
 class TestGetEksToken:
     @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook")
@@ -65,13 +63,13 @@ class TestGetEksToken:
             ],
         ],
     )
-    def test_run(self, mock_eks_hook, args, expected_aws_conn_id, 
expected_region_name):
+    def test_run(self, mock_eks_hook, args, expected_aws_conn_id, 
expected_region_name, airflow_root_path):
         (
             
mock_eks_hook.return_value.fetch_access_token_for_cluster.return_value
         ) = "k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t"
 
         with mock.patch("sys.argv", args), 
contextlib.redirect_stdout(StringIO()) as temp_stdout:
-            os.chdir(AIRFLOW_MAIN_FOLDER)
+            os.chdir(airflow_root_path)
             # We are not using run_module because of 
https://github.com/pytest-dev/pytest/issues/9007
             
runpy.run_path("airflow/providers/amazon/aws/utils/eks_get_token.py", 
run_name="__main__")
         output = temp_stdout.getvalue()
diff --git a/tests/providers/apache/iceberg/hooks/test_iceberg.py 
b/tests/providers/apache/iceberg/hooks/test_iceberg.py
index af58bb55b0..f11e1d4ea6 100644
--- a/tests/providers/apache/iceberg/hooks/test_iceberg.py
+++ b/tests/providers/apache/iceberg/hooks/test_iceberg.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+from unittest.mock import Mock, patch
+
 import pytest
 import requests_mock
 
@@ -27,16 +29,22 @@ pytestmark = pytest.mark.db_test
 
 def test_iceberg_hook():
     access_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSU"
-    with requests_mock.Mocker() as m:
-        m.post(
-            "https://api.iceberg.io/ws/v1/oauth/tokens";,
-            json={
-                "access_token": access_token,
-                "token_type": "Bearer",
-                "expires_in": 86400,
-                "warehouse_id": "fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
-                "region": "us-west-2",
-                "catalog_url": 
"warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
-            },
-        )
-        assert IcebergHook().get_conn() == access_token
+    with patch("airflow.models.Connection.get_connection_from_secrets") as 
mock_get_connection:
+        mock_conn = Mock()
+        mock_conn.conn_id = "iceberg_default"
+        mock_conn.host = "https://api.iceberg.io/ws/v1";
+        mock_conn.extra_dejson = {}
+        mock_get_connection.return_value = mock_conn
+        with requests_mock.Mocker() as m:
+            m.post(
+                "https://api.iceberg.io/ws/v1/oauth/tokens";,
+                json={
+                    "access_token": access_token,
+                    "token_type": "Bearer",
+                    "expires_in": 86400,
+                    "warehouse_id": "fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
+                    "region": "us-west-2",
+                    "catalog_url": 
"warehouses/fadc4c31-e81f-48cd-9ce8-64cd5ce3fa5d",
+                },
+            )
+            assert IcebergHook().get_conn() == access_token
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 224824edbc..8b7c238e9c 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -358,7 +358,8 @@ class TestKubernetesPodOperator:
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "0",
+            # Try number behaves differently on different versions of Airflow
+            "try_number": mock.ANY,
             "airflow_version": mock.ANY,
             "run_id": "test",
             "airflow_kpo_in_cluster": str(in_cluster),
@@ -374,7 +375,7 @@ class TestKubernetesPodOperator:
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "0",
+            "try_number": mock.ANY,
             "airflow_version": mock.ANY,
             "run_id": "test",
             "map_index": "10",
@@ -884,7 +885,7 @@ class TestKubernetesPodOperator:
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "0",
+            "try_number": mock.ANY,
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -920,7 +921,7 @@ class TestKubernetesPodOperator:
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "0",
+            "try_number": mock.ANY,
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -991,7 +992,7 @@ class TestKubernetesPodOperator:
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "0",
+            "try_number": mock.ANY,
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -1061,7 +1062,7 @@ class TestKubernetesPodOperator:
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "0",
+            "try_number": mock.ANY,
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
@@ -1112,7 +1113,7 @@ class TestKubernetesPodOperator:
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
             "task_id": "task",
-            "try_number": "0",
+            "try_number": mock.ANY,
             "airflow_version": mock.ANY,
             "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
diff --git a/tests/providers/cncf/kubernetes/test_template_rendering.py 
b/tests/providers/cncf/kubernetes/test_template_rendering.py
index f3e61101ea..98764a2f1f 100644
--- a/tests/providers/cncf/kubernetes/test_template_rendering.py
+++ b/tests/providers/cncf/kubernetes/test_template_rendering.py
@@ -48,7 +48,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, 
create_task_instance):
                 "dag_id": "test_render_k8s_pod_yaml",
                 "run_id": "test_run_id",
                 "task_id": "op1",
-                "try_number": "0",
+                "try_number": mock.ANY,
             },
             "labels": {
                 "airflow-worker": "0",
@@ -57,7 +57,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, 
create_task_instance):
                 "run_id": "test_run_id",
                 "kubernetes_executor": "True",
                 "task_id": "op1",
-                "try_number": "0",
+                "try_number": mock.ANY,
             },
             "name": mock.ANY,
             "namespace": "default",
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index d84218f8bd..4cfcb7fe87 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -731,7 +731,8 @@ class TestBigQueryOperator:
             sql="SELECT * FROM test_table",
         )
         serialized_dag = dag_maker.get_serialized_data()
-        assert "sql" in serialized_dag["dag"]["tasks"][0]["__var"]
+        deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+        assert hasattr(deserialized_dag.tasks[0], "sql")
 
         dag = SerializedDAG.from_dict(serialized_dag)
         simple_task = dag.task_dict[TASK_ID]
@@ -740,11 +741,8 @@ class TestBigQueryOperator:
         #########################################################
         # Verify Operator Links work with Serialized Operator
         #########################################################
-
-        # Check Serialized version of operator link
-        assert 
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [
-            
{"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink": {}}
-        ]
+        deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+        assert deserialized_dag.tasks[0].operator_extra_links[0].name == 
"BigQuery Console"
 
         # Check DeSerialized version of operator link
         assert isinstance(next(iter(simple_task.operator_extra_links)), 
BigQueryConsoleLink)
@@ -768,7 +766,8 @@ class TestBigQueryOperator:
             sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
         )
         serialized_dag = dag_maker.get_serialized_data()
-        assert "sql" in serialized_dag["dag"]["tasks"][0]["__var"]
+        deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+        assert hasattr(deserialized_dag.tasks[0], "sql")
 
         dag = SerializedDAG.from_dict(serialized_dag)
         simple_task = dag.task_dict[TASK_ID]
@@ -777,12 +776,10 @@ class TestBigQueryOperator:
         #########################################################
         # Verify Operator Links work with Serialized Operator
         #########################################################
-
-        # Check Serialized version of operator link
-        assert 
serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] == [
-            
{"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink":
 {"index": 0}},
-            
{"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink":
 {"index": 1}},
-        ]
+        deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+        operator_extra_links = deserialized_dag.tasks[0].operator_extra_links
+        assert operator_extra_links[0].name == "BigQuery Console #1"
+        assert operator_extra_links[1].name == "BigQuery Console #2"
 
         # Check DeSerialized version of operator link
         assert isinstance(next(iter(simple_task.operator_extra_links)), 
BigQueryConsoleIndexableLink)
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py 
b/tests/providers/google/cloud/operators/test_dataproc.py
index 0d97cfe36c..c3d945c808 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -79,12 +79,12 @@ from airflow.providers.google.cloud.triggers.dataproc 
import (
 from airflow.providers.google.common.consts import 
GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils.timezone import datetime
-from airflow.version import version as airflow_version
+from tests.test_utils.compat import AIRFLOW_VERSION
 from tests.test_utils.db import clear_db_runs, clear_db_xcom
 
-cluster_params = inspect.signature(ClusterGenerator.__init__).parameters
+AIRFLOW_VERSION_LABEL = "v" + str(AIRFLOW_VERSION).replace(".", 
"-").replace("+", "-")
 
-AIRFLOW_VERSION = "v" + airflow_version.replace(".", "-").replace("+", "-")
+cluster_params = inspect.signature(ClusterGenerator.__init__).parameters
 
 DATAPROC_PATH = "airflow.providers.google.cloud.operators.dataproc.{}"
 DATAPROC_TRIGGERS_PATH = "airflow.providers.google.cloud.triggers.dataproc.{}"
@@ -325,9 +325,9 @@ CONFIG_WITH_GPU_ACCELERATOR = {
     "endpoint_config": {},
 }
 
-LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION}
+LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION_LABEL}
 
-LABELS.update({"airflow-version": "v" + airflow_version.replace(".", 
"-").replace("+", "-")})
+LABELS.update({"airflow-version": AIRFLOW_VERSION_LABEL})
 
 CLUSTER = {"project_id": "project_id", "cluster_name": CLUSTER_NAME, "config": 
CONFIG, "labels": LABELS}
 
@@ -1064,11 +1064,10 @@ def test_create_cluster_operator_extra_links(dag_maker, 
create_task_instance_of_
     serialized_dag = dag_maker.get_serialized_data()
     deserialized_dag = SerializedDAG.from_dict(serialized_dag)
     deserialized_task = deserialized_dag.task_dict[TASK_ID]
-
     # Assert operator links for serialized DAG
-    assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] 
== [
-        {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink": 
{}}
-    ]
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+    operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+    assert operator_extra_link.name == "Dataproc Cluster"
 
     # Assert operator link types are preserved during deserialization
     assert isinstance(deserialized_task.operator_extra_links[0], 
DataprocClusterLink)
@@ -1168,9 +1167,9 @@ def test_scale_cluster_operator_extra_links(dag_maker, 
create_task_instance_of_o
     deserialized_task = deserialized_dag.task_dict[TASK_ID]
 
     # Assert operator links for serialized DAG
-    assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] 
== [
-        {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}}
-    ]
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+    operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+    assert operator_extra_link.name == "Dataproc resource"
 
     # Assert operator link types are preserved during deserialization
     assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink)
@@ -1562,10 +1561,10 @@ def test_submit_job_operator_extra_links(mock_hook, 
dag_maker, create_task_insta
     deserialized_dag = SerializedDAG.from_dict(serialized_dag)
     deserialized_task = deserialized_dag.task_dict[TASK_ID]
 
-    # Assert operator links for serialized_dag
-    assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] 
== [
-        {"airflow.providers.google.cloud.links.dataproc.DataprocJobLink": {}}
-    ]
+    # Assert operator links for serialized DAG
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+    operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+    assert operator_extra_link.name == "Dataproc Job"
 
     # Assert operator link types are preserved during deserialization
     assert isinstance(deserialized_task.operator_extra_links[0], 
DataprocJobLink)
@@ -1767,10 +1766,10 @@ def test_update_cluster_operator_extra_links(dag_maker, 
create_task_instance_of_
     deserialized_dag = SerializedDAG.from_dict(serialized_dag)
     deserialized_task = deserialized_dag.task_dict[TASK_ID]
 
-    # Assert operator links for serialized_dag
-    assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] 
== [
-        {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink": 
{}}
-    ]
+    # Assert operator links for serialized DAG
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+    operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+    assert operator_extra_link.name == "Dataproc Cluster"
 
     # Assert operator link types are preserved during deserialization
     assert isinstance(deserialized_task.operator_extra_links[0], 
DataprocClusterLink)
@@ -1989,10 +1988,10 @@ def 
test_instantiate_workflow_operator_extra_links(mock_hook, dag_maker, create_
     deserialized_dag = SerializedDAG.from_dict(serialized_dag)
     deserialized_task = deserialized_dag.task_dict[TASK_ID]
 
-    # Assert operator links for serialized_dag
-    assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] 
== [
-        {"airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink": 
{}}
-    ]
+    # Assert operator links for serialized DAG
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+    operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+    assert operator_extra_link.name == "Dataproc Workflow"
 
     # Assert operator link types are preserved during deserialization
     assert isinstance(deserialized_task.operator_extra_links[0], 
DataprocWorkflowLink)
@@ -2151,10 +2150,10 @@ def 
test_instantiate_inline_workflow_operator_extra_links(
     deserialized_dag = SerializedDAG.from_dict(serialized_dag)
     deserialized_task = deserialized_dag.task_dict[TASK_ID]
 
-    # Assert operator links for serialized_dag
-    assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] 
== [
-        {"airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink": 
{}}
-    ]
+    # Assert operator links for serialized DAG
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+    operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+    assert operator_extra_link.name == "Dataproc Workflow"
 
     # Assert operator link types are preserved during deserialization
     assert isinstance(deserialized_task.operator_extra_links[0], 
DataprocWorkflowLink)
@@ -2182,7 +2181,7 @@ class TestDataProcHiveOperator:
     job = {
         "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
-        "labels": {"airflow-version": AIRFLOW_VERSION},
+        "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
         "hive_job": {"query_list": {"queries": [query]}, "script_variables": 
variables},
     }
 
@@ -2244,7 +2243,7 @@ class TestDataProcPigOperator:
     job = {
         "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
-        "labels": {"airflow-version": AIRFLOW_VERSION},
+        "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
         "pig_job": {"query_list": {"queries": [query]}, "script_variables": 
variables},
     }
 
@@ -2306,13 +2305,13 @@ class TestDataProcSparkSqlOperator:
     job = {
         "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
-        "labels": {"airflow-version": AIRFLOW_VERSION},
+        "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
         "spark_sql_job": {"query_list": {"queries": [query]}, 
"script_variables": variables},
     }
     other_project_job = {
         "reference": {"project_id": "other-project", "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
-        "labels": {"airflow-version": AIRFLOW_VERSION},
+        "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
         "spark_sql_job": {"query_list": {"queries": [query]}, 
"script_variables": variables},
     }
 
@@ -2410,7 +2409,7 @@ class TestDataProcSparkOperator(DataprocJobTestBase):
             "job_id": f"{job_name}_{TEST_JOB_ID}",
         },
         "placement": {"cluster_name": "cluster-1"},
-        "labels": {"airflow-version": AIRFLOW_VERSION},
+        "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
         "spark_job": {"jar_file_uris": jars, "main_class": main_class},
     }
 
@@ -2473,9 +2472,9 @@ def test_submit_spark_job_operator_extra_links(mock_hook, 
dag_maker, create_task
     deserialized_task = deserialized_dag.task_dict[TASK_ID]
 
     # Assert operator links for serialized DAG
-    assert serialized_dag["dag"]["tasks"][0]["__var"]["_operator_extra_links"] 
== [
-        {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}}
-    ]
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
+    operator_extra_link = deserialized_dag.tasks[0].operator_extra_links[0]
+    assert operator_extra_link.name == "Dataproc resource"
 
     # Assert operator link types are preserved during deserialization
     assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink)
@@ -2504,7 +2503,7 @@ class TestDataProcHadoopOperator:
     job = {
         "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
-        "labels": {"airflow-version": AIRFLOW_VERSION},
+        "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
         "hadoop_job": {"main_jar_file_uri": jar, "args": args},
     }
 
@@ -2542,7 +2541,7 @@ class TestDataProcPySparkOperator:
     job = {
         "reference": {"project_id": GCP_PROJECT, "job_id": 
f"{job_name}_{job_id}"},
         "placement": {"cluster_name": "cluster-1"},
-        "labels": {"airflow-version": AIRFLOW_VERSION},
+        "labels": {"airflow-version": AIRFLOW_VERSION_LABEL},
         "pyspark_job": {"main_python_file_uri": uri},
     }
 
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index d9fbb0dfd3..65ac9657c0 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -33,10 +33,20 @@ from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.listener import OpenLineageListener
 from airflow.providers.openlineage.utils.selective_enable import 
disable_lineage, enable_lineage
 from airflow.utils.state import State
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
 from tests.test_utils.config import conf_vars
 
 pytestmark = pytest.mark.db_test
 
+EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0
+EXPECTED_TRY_NUMBER_2 = 2 if AIRFLOW_V_2_10_PLUS else 1
+
+TRY_NUMBER_BEFORE_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 1
+TRY_NUMBER_RUNNING = 0 if AIRFLOW_V_2_10_PLUS else 1
+TRY_NUMBER_FAILED = 0 if AIRFLOW_V_2_10_PLUS else 1
+TRY_NUMBER_SUCCESS = 0 if AIRFLOW_V_2_10_PLUS else 2
+TRY_NUMBER_AFTER_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 2
+
 
 class TemplateOperator(BaseOperator):
     template_fields = ["df"]
@@ -304,7 +314,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="dag_id.dag_run_run_id",
-        run_id="dag_id.task_id.execution_date.1",
+        run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_1}",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -319,7 +329,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="dag_id.dag_run_run_id",
-        run_id="dag_id.task_id.execution_date.2",
+        run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_2}",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -334,7 +344,9 @@ def 
test_run_id_is_constant_across_all_methods(mocked_adapter):
     """
 
     def mock_task_id(dag_id, task_id, execution_date, try_number):
-        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+        returned_try_number = try_number if AIRFLOW_V_2_10_PLUS else 
max(try_number - 1, 1)
+
+        return f"{dag_id}.{task_id}.{execution_date}.{returned_try_number}"
 
     listener, task_instance = _create_listener_and_task_instance()
     mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
@@ -344,7 +356,11 @@ def 
test_run_id_is_constant_across_all_methods(mocked_adapter):
     assert listener.adapter.start_task.call_args.kwargs["run_id"] == 
expected_run_id_1
 
     listener.on_task_instance_failed(None, task_instance, None)
-    assert listener.adapter.fail_task.call_args.kwargs["run_id"] == 
expected_run_id_1
+    assert (
+        listener.adapter.fail_task.call_args.kwargs["run_id"] == 
expected_run_id_1
+        if AIRFLOW_V_2_10_PLUS
+        else expected_run_id_2
+    )
 
     # This run_id will not be different as we did NOT simulate increase of the 
try_number attribute,
     listener.on_task_instance_success(None, task_instance, None)
@@ -354,7 +370,11 @@ def 
test_run_id_is_constant_across_all_methods(mocked_adapter):
     # This is how airflow works, and that's why we expect the run_id to remain 
constant across all methods.
     task_instance.try_number += 1
     listener.on_task_instance_success(None, task_instance, None)
-    assert listener.adapter.complete_task.call_args.kwargs["run_id"] == 
expected_run_id_2
+    assert (
+        listener.adapter.complete_task.call_args.kwargs["run_id"] == 
expected_run_id_2
+        if AIRFLOW_V_2_10_PLUS
+        else expected_run_id_1
+    )
 
 
 def test_running_task_correctly_calls_openlineage_adapter_run_id_method():
@@ -406,7 +426,7 @@ def 
test_successful_task_correctly_calls_openlineage_adapter_run_id_method(mock_
         dag_id="dag_id",
         task_id="task_id",
         execution_date="execution_date",
-        try_number=1,
+        try_number=EXPECTED_TRY_NUMBER_1,
     )
 
 
@@ -431,16 +451,16 @@ def 
test_listener_on_task_instance_failed_is_called_before_try_number_increment(
 
     _, task_instance = _create_test_dag_and_task(fail_callable, "failure")
     # try_number before execution
-    assert task_instance.try_number == 0
+    assert task_instance.try_number == TRY_NUMBER_BEFORE_EXECUTION
     with suppress(CustomError):
         task_instance.run()
 
     # try_number at the moment of function being called
-    assert captured_try_numbers["running"] == 0
-    assert captured_try_numbers["failed"] == 0
+    assert captured_try_numbers["running"] == TRY_NUMBER_RUNNING
+    assert captured_try_numbers["failed"] == TRY_NUMBER_FAILED
 
     # try_number after task has been executed
-    assert task_instance.try_number == 0
+    assert task_instance.try_number == TRY_NUMBER_AFTER_EXECUTION
 
 
 @mock.patch("airflow.models.taskinstance.get_listener_manager")
@@ -460,15 +480,15 @@ def 
test_listener_on_task_instance_success_is_called_after_try_number_increment(
 
     _, task_instance = _create_test_dag_and_task(success_callable, "success")
     # try_number before execution
-    assert task_instance.try_number == 0
+    assert task_instance.try_number == TRY_NUMBER_BEFORE_EXECUTION
     task_instance.run()
 
     # try_number at the moment of function being called
-    assert captured_try_numbers["running"] == 0
-    assert captured_try_numbers["success"] == 0
+    assert captured_try_numbers["running"] == TRY_NUMBER_RUNNING
+    assert captured_try_numbers["success"] == TRY_NUMBER_SUCCESS
 
     # try_number after task has been executed
-    assert task_instance.try_number == 0
+    assert task_instance.try_number == TRY_NUMBER_AFTER_EXECUTION
 
 
 
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
diff --git a/tests/providers/openlineage/plugins/test_openlineage.py 
b/tests/providers/openlineage/plugins/test_openlineage.py
index 409c8461e8..8d9cc9602b 100644
--- a/tests/providers/openlineage/plugins/test_openlineage.py
+++ b/tests/providers/openlineage/plugins/test_openlineage.py
@@ -24,9 +24,13 @@ from unittest.mock import patch
 import pytest
 
 from airflow.providers.openlineage.conf import config_path, is_disabled, 
transport
+from tests.conftest import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
 from tests.test_utils.config import conf_vars
 
 
[email protected](
+    RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES, reason="Plugin initialization is 
done early in case of packages"
+)
 class TestOpenLineageProviderPlugin:
     def setup_method(self):
         is_disabled.cache_clear()
diff --git a/tests/providers/smtp/notifications/test_smtp.py 
b/tests/providers/smtp/notifications/test_smtp.py
index b19cc4baa8..98fd7387e7 100644
--- a/tests/providers/smtp/notifications/test_smtp.py
+++ b/tests/providers/smtp/notifications/test_smtp.py
@@ -31,6 +31,7 @@ from airflow.providers.smtp.notifications.smtp import (
     send_smtp_notification,
 )
 from airflow.utils import timezone
+from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
 from tests.test_utils.config import conf_vars
 
 pytestmark = pytest.mark.db_test
@@ -38,6 +39,9 @@ pytestmark = pytest.mark.db_test
 SMTP_API_DEFAULT_CONN_ID = SmtpHook.default_conn_name
 
 
+NUM_TRY = 0 if AIRFLOW_V_2_10_PLUS else 1
+
+
 class TestSmtpNotifier:
     @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
     def test_notifier(self, mock_smtphook_hook, dag_maker):
@@ -129,7 +133,7 @@ class TestSmtpNotifier:
             from_email=conf.get("smtp", "smtp_mail_from"),
             to="[email protected]",
             subject="DAG dag - Task op - Run ID test in State None",
-            html_content="""<!DOCTYPE html>\n<html>\n    <head>\n        <meta 
http-equiv="Content-Type" content="text/html; charset=utf-8" />\n        <meta 
name="viewport" content="width=device-width">\n    </head>\n<body>\n    <table 
role="presentation">\n        \n        <tr>\n            <td>Run ID:</td>\n    
        <td>test</td>\n        </tr>\n        <tr>\n            <td>Try:</td>\n 
           <td>0 of 1</td>\n        </tr>\n        <tr>\n            <td>Task 
State:</td>\n     [...]
+            html_content=f"""<!DOCTYPE html>\n<html>\n    <head>\n        
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n        
<meta name="viewport" content="width=device-width">\n    </head>\n<body>\n    
<table role="presentation">\n        \n        <tr>\n            <td>Run 
ID:</td>\n            <td>test</td>\n        </tr>\n        <tr>\n            
<td>Try:</td>\n            <td>{NUM_TRY} of 1</td>\n        </tr>\n        
<tr>\n            <td>Task State:</ [...]
             smtp_conn_id="smtp_default",
             files=None,
             cc=None,
diff --git a/tests/test_utils/compat.py b/tests/test_utils/compat.py
new file mode 100644
index 0000000000..5724ec1d4e
--- /dev/null
+++ b/tests/test_utils/compat.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from importlib.metadata import version
+from typing import TYPE_CHECKING, Any, cast
+
+from packaging.version import Version
+
+from airflow.models import Operator
+
+try:
+    # ImportError has been renamed to ParseImportError in airflow 2.10.0, and 
since our provider tests should
+    # run on all supported versions of Airflow, this compatibility shim falls 
back to the old ImportError so
+    # that tests can import it from here and use it in their code and run 
against older versions of Airflow
+    # This import can be removed (and all tests switched to import 
ParseImportError directly) as soon as
+    # all providers are updated to airflow 2.10+.
+    from airflow.models.errors import ParseImportError
+except ImportError:
+    from airflow.models.errors import ImportError as ParseImportError  # type: 
ignore[no-redef]
+
+from airflow import __version__ as airflow_version
+
+AIRFLOW_VERSION = Version(airflow_version)
+AIRFLOW_V_2_7_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.7.0")
+AIRFLOW_V_2_8_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.8.0")
+AIRFLOW_V_2_9_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.9.0")
+AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >= 
Version("2.10.0")
+
+
+def deserialize_operator(serialized_operator: dict[str, Any]) -> Operator:
+    if AIRFLOW_V_2_10_PLUS:
+        # In airflow 2.10+ we can deserialize operator using regular 
deserialize method.
+        # We do not need to use deserialize_operator method explicitly but 
some tests are deserializing the
+        # operator and in the future they could use regular ``deserialize`` 
method. This method is a shim
+        # to make deserialization of operator works for tests run against 
older Airflow versions and tests
+        # should use that method instead of calling 
``BaseSerialization.deserialize`` directly.
+        # We can remove this method and switch to the regular ``deserialize`` 
method as long as all providers
+        # are updated to airflow 2.10+.
+        from airflow.serialization.serialized_objects import BaseSerialization
+
+        return cast(Operator, 
BaseSerialization.deserialize(serialized_operator))
+    else:
+        from airflow.serialization.serialized_objects import 
SerializedBaseOperator
+
+        return SerializedBaseOperator.deserialize_operator(serialized_operator)
diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py
index 783362b8b1..1c2b871b19 100644
--- a/tests/test_utils/db.py
+++ b/tests/test_utils/db.py
@@ -36,7 +36,6 @@ from airflow.models import (
     XCom,
 )
 from airflow.models.dag import DagOwnerAttributes
-from airflow.models.dagbag import DagPriorityParsingRequest
 from airflow.models.dagcode import DagCode
 from airflow.models.dagwarning import DagWarning
 from airflow.models.dataset import (
@@ -46,12 +45,12 @@ from airflow.models.dataset import (
     DatasetModel,
     TaskOutletDatasetReference,
 )
-from airflow.models.errors import ParseImportError
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.providers.fab.auth_manager.models import Permission, Resource, 
assoc_permission_role
 from airflow.security.permissions import RESOURCE_DAG_PREFIX
 from airflow.utils.db import add_default_pool_if_not_exists, 
create_default_connections, reflect_tables
 from airflow.utils.session import create_session
+from tests.test_utils.compat import ParseImportError
 
 
 def clear_db_runs():
@@ -172,6 +171,8 @@ def clear_db_task_reschedule():
 
 def clear_db_dag_parsing_requests():
     with create_session() as session:
+        from airflow.models.dagbag import DagPriorityParsingRequest
+
         session.query(DagPriorityParsingRequest).delete()
 
 

Reply via email to