This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fa5773308f Migrate openlineage provider to common.compat (#57124)
9fa5773308f is described below

commit 9fa5773308fd4483c44458da89c5309e3165f29c
Author: Bhavani Ravi <[email protected]>
AuthorDate: Sat Oct 25 01:07:23 2025 +0530

    Migrate openlineage provider to common.compat (#57124)
---
 .../providers/openlineage/extractors/manager.py    |  6 +++---
 .../providers/openlineage/operators/empty.py       |  4 ++--
 .../providers/openlineage/plugins/listener.py      |  2 +-
 .../providers/openlineage/plugins/macros.py        |  2 +-
 .../src/airflow/providers/openlineage/sqlparser.py |  2 +-
 .../openlineage/utils/selective_enable.py          |  9 ++------
 .../airflow/providers/openlineage/utils/spark.py   |  2 +-
 .../src/airflow/providers/openlineage/utils/sql.py |  2 +-
 .../airflow/providers/openlineage/utils/utils.py   | 25 ++--------------------
 .../providers/openlineage/version_compat.py        | 15 ++-----------
 .../example_openlineage_base_complex_dag.py        |  2 +-
 .../tests/system/openlineage/operator.py           | 13 ++---------
 .../openlineage/dags/test_openlineage_execution.py |  2 +-
 .../tests/unit/openlineage/extractors/test_base.py |  8 +------
 .../unit/openlineage/extractors/test_manager.py    | 24 +--------------------
 .../unit/openlineage/plugins/test_listener.py      |  5 +----
 .../tests/unit/openlineage/plugins/test_macros.py  |  2 +-
 .../tests/unit/openlineage/plugins/test_utils.py   |  9 +++-----
 .../unit/openlineage/utils/custom_facet_fixture.py |  2 +-
 .../openlineage/utils/test_selective_enable.py     |  8 +------
 .../tests/unit/openlineage/utils/test_utils.py     |  9 +-------
 21 files changed, 30 insertions(+), 123 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py 
b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
index cc899b335e8..75a32d48bcf 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py
@@ -41,8 +41,8 @@ from airflow.utils.state import TaskInstanceState
 if TYPE_CHECKING:
     from openlineage.client.event_v2 import Dataset
 
-    from airflow.models import Operator
     from airflow.providers.common.compat.lineage.entities import Table
+    from airflow.providers.common.compat.sdk import BaseOperator
 
 
 def _iter_extractor_types() -> Iterator[type[BaseExtractor]]:
@@ -161,7 +161,7 @@ class ExtractorManager(LoggingMixin):
 
         return OperatorLineage()
 
-    def get_extractor_class(self, task: Operator) -> type[BaseExtractor] | 
None:
+    def get_extractor_class(self, task: BaseOperator) -> type[BaseExtractor] | 
None:
         if task.task_type in self.extractors:
             return self.extractors[task.task_type]
 
@@ -172,7 +172,7 @@ class ExtractorManager(LoggingMixin):
             return self.default_extractor
         return None
 
-    def _get_extractor(self, task: Operator) -> BaseExtractor | None:
+    def _get_extractor(self, task: BaseOperator) -> BaseExtractor | None:
         # TODO: Re-enable in Extractor PR
         # self.instantiate_abstract_extractors(task)
         extractor = self.get_extractor_class(task)
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py 
b/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py
index 6ac8754797f..69e35542cb0 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py
@@ -18,11 +18,11 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
+from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.openlineage.extractors.base import OperatorLineage
-from airflow.providers.openlineage.version_compat import BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.sdk.definitions.context import Context
+    from airflow.providers.common.compat.sdk import Context
 
 
 class EmptyOperator(BaseOperator):
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 734f4c5761a..78c3fffda89 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -29,6 +29,7 @@ from openlineage.client.serde import Serde
 from airflow import settings
 from airflow.listeners import hookimpl
 from airflow.models import DagRun, TaskInstance
+from airflow.providers.common.compat.sdk import timeout, timezone
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.extractors import ExtractorManager, 
OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, 
RunState
@@ -48,7 +49,6 @@ from airflow.providers.openlineage.utils.utils import (
     is_selective_lineage_enabled,
     print_warning,
 )
-from airflow.providers.openlineage.version_compat import timeout, timezone
 from airflow.settings import configure_orm
 from airflow.stats import Stats
 from airflow.utils.state import TaskInstanceState
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
index 699800fb66e..ac9c75b4a94 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
@@ -24,7 +24,7 @@ from airflow.providers.openlineage.utils.utils import 
get_job_name
 from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
-    from airflow.models import TaskInstance
+    from airflow.providers.common.compat.sdk import TaskInstance
 
 
 def lineage_job_namespace():
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py 
b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
index d89852acbda..0ac80fc9d73 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/sqlparser.py
@@ -39,8 +39,8 @@ if TYPE_CHECKING:
     from openlineage.client.facet_v2 import JobFacet, RunFacet
     from sqlalchemy.engine import Engine
 
+    from airflow.providers.common.compat.sdk import BaseHook
     from airflow.providers.common.sql.hooks.sql import DbApiHook
-    from airflow.sdk import BaseHook
 
 log = logging.getLogger(__name__)
 
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/selective_enable.py
 
b/providers/openlineage/src/airflow/providers/openlineage/utils/selective_enable.py
index edf6f9c858f..5f49231dc8f 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/utils/selective_enable.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/utils/selective_enable.py
@@ -22,19 +22,14 @@ from typing import TYPE_CHECKING, TypeVar
 
 from airflow.models import Param
 from airflow.models.xcom_arg import XComArg
+from airflow.providers.common.compat.sdk import DAG
 
 if TYPE_CHECKING:
+    from airflow.providers.common.compat.sdk import BaseOperator, 
MappedOperator
     from airflow.providers.openlineage.utils.utils import AnyOperator
-    from airflow.sdk import DAG, BaseOperator
-    from airflow.sdk.definitions.mappedoperator import MappedOperator
     from airflow.serialization.serialized_objects import SerializedDAG
 
     T = TypeVar("T", bound=DAG | BaseOperator | MappedOperator)
-else:
-    try:
-        from airflow.sdk import DAG
-    except ImportError:
-        from airflow.models import DAG
 
 ENABLE_OL_PARAM_NAME = "_selective_enable_ol"
 ENABLE_OL_PARAM = Param(True, const=True)
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
index 4925cd00f39..a92ac25eab2 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
@@ -30,7 +30,7 @@ from airflow.providers.openlineage.plugins.macros import (
 )
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.providers.common.compat.sdk import Context
 
 log = logging.getLogger(__name__)
 
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
index 904206170e9..7f07d390039 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
     from sqlalchemy.engine import Engine
     from sqlalchemy.sql import ClauseElement
 
-    from airflow.sdk import BaseHook
+    from airflow.providers.common.compat.sdk import BaseHook
 
 
 log = logging.getLogger(__name__)
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index b2e483315fc..8ef5c23ced8 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -35,6 +35,8 @@ from airflow import __version__ as AIRFLOW_VERSION
 # TODO: move this maybe to Airflow's logic?
 from airflow.models import DagRun, TaskReschedule
 from airflow.models.mappedoperator import MappedOperator as 
SerializedMappedOperator
+from airflow.providers.common.compat.assets import Asset
+from airflow.providers.common.compat.sdk import DAG, BaseOperator, 
BaseSensorOperator, MappedOperator
 from airflow.providers.openlineage import (
     __version__ as OPENLINEAGE_PROVIDER_VERSION,
     conf,
@@ -57,11 +59,6 @@ from airflow.providers.openlineage.version_compat import 
AIRFLOW_V_3_0_PLUS, get
 from airflow.serialization.serialized_objects import SerializedBaseOperator, 
SerializedDAG
 from airflow.utils.module_loading import import_string
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseSensorOperator
-else:
-    from airflow.sensors.base import BaseSensorOperator  # type: 
ignore[no-redef]
-
 if not AIRFLOW_V_3_0_PLUS:
     from airflow.utils.session import NEW_SESSION, provide_session
 
@@ -72,9 +69,6 @@ if TYPE_CHECKING:
     from openlineage.client.facet_v2 import RunFacet, processing_engine_run
 
     from airflow.models import TaskInstance
-    from airflow.providers.common.compat.assets import Asset
-    from airflow.sdk import DAG, BaseOperator
-    from airflow.sdk.definitions.mappedoperator import MappedOperator
     from airflow.sdk.execution_time.secrets_masker import (
         Redactable,
         Redacted,
@@ -85,21 +79,6 @@ if TYPE_CHECKING:
 
     AnyOperator: TypeAlias = BaseOperator | MappedOperator | 
SerializedBaseOperator | SerializedMappedOperator
 else:
-    try:
-        from airflow.sdk import DAG, BaseOperator
-        from airflow.sdk.definitions.mappedoperator import MappedOperator
-    except ImportError:
-        from airflow.models import DAG, BaseOperator, MappedOperator
-
-    try:
-        from airflow.providers.common.compat.assets import Asset
-    except ImportError:
-        if AIRFLOW_V_3_0_PLUS:
-            from airflow.sdk import Asset
-        else:
-            # dataset is renamed to asset since Airflow 3.0
-            from airflow.datasets import Dataset as Asset
-
     try:
         from airflow.sdk._shared.secrets_masker import (
             Redactable,
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py 
b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
index ddf39a1898a..4a2c6ca5c6c 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
@@ -34,16 +34,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseOperator
-else:
-    from airflow.models import BaseOperator
-
-try:
-    from airflow.sdk import timezone
-    from airflow.sdk.execution_time.timeout import timeout
-except ImportError:
-    from airflow.utils import timezone  # type: ignore[attr-defined,no-redef]
-    from airflow.utils.timeout import timeout  # type: 
ignore[assignment,attr-defined,no-redef]
-
-__all__ = ["AIRFLOW_V_3_0_PLUS", "BaseOperator", "timeout", "timezone"]
+
+__all__ = ["AIRFLOW_V_3_0_PLUS"]
diff --git 
a/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py
 
b/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py
index c3cb654a5e3..de1636e1b91 100644
--- 
a/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py
+++ 
b/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py
@@ -35,8 +35,8 @@ from typing import Any
 
 from airflow import DAG
 from airflow.models import Variable
-from airflow.models.baseoperator import BaseOperator
 from airflow.providers.common.compat.assets import Asset
+from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
diff --git a/providers/openlineage/tests/system/openlineage/operator.py 
b/providers/openlineage/tests/system/openlineage/operator.py
index 6cbfd3c0c3c..000696a95b1 100644
--- a/providers/openlineage/tests/system/openlineage/operator.py
+++ b/providers/openlineage/tests/system/openlineage/operator.py
@@ -29,19 +29,10 @@ from urllib.parse import urlparse
 from dateutil.parser import parse
 from jinja2 import Environment
 
-from airflow.models.operator import BaseOperator
-
-try:
-    from airflow.sdk import Variable
-except ImportError:
-    from airflow.models.variable import Variable
+from airflow.providers.common.compat.sdk import BaseOperator, Variable
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.providers.common.compat.sdk import Context
 
 log = logging.getLogger(__name__)
 
diff --git 
a/providers/openlineage/tests/unit/openlineage/dags/test_openlineage_execution.py
 
b/providers/openlineage/tests/unit/openlineage/dags/test_openlineage_execution.py
index f8db91611e8..cc710d91e33 100644
--- 
a/providers/openlineage/tests/unit/openlineage/dags/test_openlineage_execution.py
+++ 
b/providers/openlineage/tests/unit/openlineage/dags/test_openlineage_execution.py
@@ -21,8 +21,8 @@ import datetime
 import time
 
 from airflow.models.dag import DAG
-from airflow.models.operator import BaseOperator
 from airflow.providers.common.compat.openlineage.facet import Dataset
+from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.openlineage.extractors import OperatorLineage
 
 
diff --git 
a/providers/openlineage/tests/unit/openlineage/extractors/test_base.py 
b/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
index 129a86b00f7..d5647e3f3b0 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
@@ -24,14 +24,8 @@ from attrs import Factory, define, field
 from openlineage.client.event_v2 import Dataset
 from openlineage.client.facet_v2 import BaseFacet, JobFacet, parent_run, 
sql_job
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseOperator
-else:
-    from airflow.models.baseoperator import BaseOperator  # type: 
ignore[no-redef]
-
 from airflow.models.taskinstance import TaskInstanceState
+from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.openlineage.extractors.base import (
     BaseExtractor,
     DefaultExtractor,
diff --git 
a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py 
b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
index f18f7e22a9e..a54830055fd 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
@@ -31,6 +31,7 @@ from openlineage.client.facet_v2 import (
 
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.common.compat.lineage.entities import Column, File, 
Table, User
+from airflow.providers.common.compat.sdk import BaseOperator, Context, 
ObjectStoragePath
 from airflow.providers.openlineage.extractors import OperatorLineage
 from airflow.providers.openlineage.extractors.manager import ExtractorManager
 from airflow.providers.openlineage.utils.utils import Asset
@@ -43,13 +44,7 @@ from tests_common.test_utils.version_compat import 
AIRFLOW_V_3_0_PLUS
 if TYPE_CHECKING:
     try:
         from airflow.sdk.api.datamodels._generated import 
AssetEventDagRunReference, TIRunContext
-        from airflow.sdk.definitions.context import Context
-
     except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        # TIRunContext is only used in Airflow 3 tests
-        from airflow.utils.context import Context
-
         AssetEventDagRunReference = TIRunContext = Any  # type: ignore[misc, 
assignment]
 
 
@@ -68,23 +63,6 @@ def hook_lineage_collector():
     hook._hook_lineage_collector = None
 
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseOperator, ObjectStoragePath
-    from airflow.sdk.api.datamodels._generated import TaskInstance as 
SDKTaskInstance
-    from airflow.sdk.execution_time import task_runner
-    from airflow.sdk.execution_time.comms import StartupDetails
-    from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, 
parse
-else:
-    from airflow.io.path import ObjectStoragePath  # type: ignore[no-redef]
-    from airflow.models import BaseOperator
-
-    SDKTaskInstance = ...  # type: ignore
-    task_runner = ...  # type: ignore
-    StartupDetails = ...  # type: ignore
-    RuntimeTaskInstance = ...  # type: ignore
-    parse = ...  # type: ignore
-
-
 @pytest.mark.parametrize(
     ("uri", "dataset"),
     (
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
index 941666b8be4..76b743f1a82 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
@@ -52,10 +52,7 @@ if AIRFLOW_V_3_1_PLUS:
 else:
     from airflow.utils import timezone  # type: ignore[attr-defined,no-redef]
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseOperator
-else:
-    from airflow.models.baseoperator import BaseOperator  # type: 
ignore[no-redef]
+from airflow.providers.common.compat.sdk import BaseOperator
 
 EXPECTED_TRY_NUMBER_1 = 1
 
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
index a0b090ec538..520421be768 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_macros.py
@@ -114,7 +114,7 @@ def test_lineage_parent_id(mock_run_id):
 @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 
3.0+")
 def test_lineage_root_run_id_with_runtime_task_instance(create_runtime_ti):
     """Test lineage_root_run_id with real RuntimeTaskInstance object doesn't 
throw AttributeError."""
-    from airflow.sdk import BaseOperator
+    from airflow.providers.common.compat.sdk import BaseOperator
 
     task = BaseOperator(task_id="test_task")
 
diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
index 40c27f0b878..b1cff03bcad 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
@@ -29,6 +29,7 @@ from openlineage.client.utils import RedactMixin
 from pkg_resources import parse_version
 
 from airflow.providers.common.compat.assets import Asset
+from airflow.providers.common.compat.sdk import timezone
 from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
 from airflow.providers.openlineage.utils.utils import (
     DagInfo,
@@ -53,9 +54,6 @@ from tests_common.test_utils.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V
 
 if AIRFLOW_V_3_1_PLUS:
     from airflow.models.dag import get_next_data_interval
-    from airflow.sdk import timezone
-else:
-    from airflow.utils import timezone  # type: ignore[attr-defined,no-redef]
 
 if AIRFLOW_V_3_1_PLUS:
     from airflow.sdk._shared.secrets_masker import DEFAULT_SENSITIVE_FIELDS, 
SecretsMasker
@@ -70,11 +68,10 @@ else:
         SecretsMasker,
     )
 
+from airflow.providers.common.compat.sdk import DAG
+
 if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import DAG
     from airflow.utils.types import DagRunTriggeredByType
-else:
-    from airflow import DAG
 
 
 class SafeStrDict(dict):
diff --git 
a/providers/openlineage/tests/unit/openlineage/utils/custom_facet_fixture.py 
b/providers/openlineage/tests/unit/openlineage/utils/custom_facet_fixture.py
index 040c8c774c3..e34af2ebb68 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/custom_facet_fixture.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/custom_facet_fixture.py
@@ -23,7 +23,7 @@ import attrs
 from airflow.providers.common.compat.openlineage.facet import RunFacet
 
 if TYPE_CHECKING:
-    from airflow.models.taskinstance import TaskInstance, TaskInstanceState
+    from airflow.providers.common.compat.sdk import TaskInstance, 
TaskInstanceState
 
 
 @attrs.define
diff --git 
a/providers/openlineage/tests/unit/openlineage/utils/test_selective_enable.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_selective_enable.py
index 4a1bcb39ba7..082fec07687 100644
--- 
a/providers/openlineage/tests/unit/openlineage/utils/test_selective_enable.py
+++ 
b/providers/openlineage/tests/unit/openlineage/utils/test_selective_enable.py
@@ -19,13 +19,7 @@ from __future__ import annotations
 
 from pendulum import now
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import dag, task
-else:
-    from airflow.decorators import dag, task  # type: 
ignore[attr-defined,no-redef]
-from airflow.models import DAG
+from airflow.providers.common.compat.sdk import DAG, dag, task
 from airflow.providers.openlineage.utils.selective_enable import (
     DISABLE_OL_PARAM,
     ENABLE_OL_PARAM,
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 2d82b6b6ca5..3bbea4aafac 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -30,6 +30,7 @@ from airflow import DAG
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance, TaskInstanceState
 from airflow.providers.common.compat.assets import Asset
+from airflow.providers.common.compat.sdk import BaseOperator, TaskGroup, task, 
timezone
 from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, 
AirflowJobFacet
 from airflow.providers.openlineage.utils.utils import (
     _MAX_DOC_BYTES,
@@ -57,7 +58,6 @@ from airflow.providers.standard.operators.empty import 
EmptyOperator
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.timetables.events import EventsTimetable
 from airflow.timetables.trigger import CronTriggerTimetable
-from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
@@ -66,13 +66,6 @@ from tests_common.test_utils.compat import BashOperator, 
PythonOperator
 from tests_common.test_utils.mock_operators import MockOperator
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_3_PLUS, 
AIRFLOW_V_3_0_PLUS
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseOperator, TaskGroup, task
-else:
-    from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
-    from airflow.models.baseoperator import BaseOperator  # type: 
ignore[no-redef]
-    from airflow.utils.task_group import TaskGroup  # type: ignore[no-redef]
-
 BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
 PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
 

Reply via email to