This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e24cfb0fae9 Common.Compat: Extract reusable compat utilities and 
rename to sdk (#56884)
e24cfb0fae9 is described below

commit e24cfb0fae9e3980c1df258ced560fc313ffa5b5
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Oct 21 02:11:10 2025 +0100

    Common.Compat: Extract reusable compat utilities and rename to sdk (#56884)
    
    Extract compatibility import logic into reusable _compat_utils module and 
rename lazy_compat to sdk for better clarity. The new architecture makes it 
easier to maintain consistency across all compatibility modules without 
introducing cycles due to which mypy was failing in 
https://github.com/apache/airflow/pull/56867
---
 .pre-commit-config.yaml                            |   6 +-
 .../providers/common/compat/_compat_utils.py       | 105 +++++++
 .../airflow/providers/common/compat/lazy_compat.py | 310 ---------------------
 .../providers/common/compat/lazy_compat.pyi        | 232 ---------------
 .../src/airflow/providers/common/compat/sdk.py     | 149 ++++++++++
 .../src/airflow/providers/common/compat/sdk.pyi    | 120 ++++++++
 .../providers/common/compat/standard/operators.py  |  23 +-
 .../providers/common/compat/standard/triggers.py   |  11 +-
 .../providers/common/compat/standard/utils.py      |  18 +-
 .../tests/unit/common/compat/test__compat_utils.py | 215 ++++++++++++++
 .../compat/{test_lazy_compat.py => test_sdk.py}    |  10 +-
 .../providers/edge3/example_dags/win_test.py       |   2 +-
 .../src/airflow/providers/google/ads/hooks/ads.py  |   2 +-
 .../src/airflow/providers/google/assets/gcs.py     |   2 +-
 .../providers/google/cloud/hooks/cloud_sql.py      |   2 +-
 .../providers/google/cloud/hooks/dataflow.py       |   2 +-
 .../providers/google/cloud/hooks/dataprep.py       |   2 +-
 .../airflow/providers/google/cloud/hooks/looker.py |   2 +-
 .../airflow/providers/google/cloud/links/base.py   |   2 +-
 .../providers/google/cloud/links/dataproc.py       |   2 +-
 .../providers/google/cloud/operators/cloud_sql.py  |   2 +-
 .../providers/google/cloud/sensors/bigquery.py     |   2 +-
 .../providers/google/cloud/sensors/bigquery_dts.py |   2 +-
 .../providers/google/cloud/sensors/bigtable.py     |   2 +-
 .../google/cloud/sensors/cloud_composer.py         |   2 +-
 .../sensors/cloud_storage_transfer_service.py      |   2 +-
 .../providers/google/cloud/sensors/dataflow.py     |   2 +-
 .../providers/google/cloud/sensors/dataform.py     |   2 +-
 .../providers/google/cloud/sensors/datafusion.py   |   2 +-
 .../providers/google/cloud/sensors/dataplex.py     |   2 +-
 .../providers/google/cloud/sensors/dataprep.py     |   2 +-
 .../providers/google/cloud/sensors/dataproc.py     |   2 +-
 .../google/cloud/sensors/dataproc_metastore.py     |   2 +-
 .../airflow/providers/google/cloud/sensors/gcs.py  |   2 +-
 .../providers/google/cloud/sensors/looker.py       |   2 +-
 .../providers/google/cloud/sensors/pubsub.py       |   2 +-
 .../providers/google/cloud/sensors/tasks.py        |   2 +-
 .../cloud/sensors/vertex_ai/feature_store.py       |   2 +-
 .../providers/google/cloud/sensors/workflows.py    |   2 +-
 .../providers/google/common/hooks/base_google.py   |   2 +-
 .../providers/google/leveldb/hooks/leveldb.py      |   2 +-
 .../marketing_platform/links/analytics_admin.py    |   2 +-
 .../marketing_platform/sensors/campaign_manager.py |   2 +-
 .../marketing_platform/sensors/display_video.py    |   2 +-
 .../providers/google/suite/sensors/drive.py        |   2 +-
 .../neo4j/tests/unit/neo4j/operators/test_neo4j.py |   2 +-
 .../ci/prek/check_common_compat_lazy_imports.py    |  42 ++-
 47 files changed, 687 insertions(+), 624 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 1b051f69fb9..ba96c586bac 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -465,10 +465,10 @@ repos:
         entry: ./scripts/ci/prek/check_airflow_imports.py
           --pattern '^openlineage\.client\.(facet|run)'
           --message "You should import from 
`airflow.providers.common.compat.openlineage.facet` instead."
-      - id: check-common-compat-lazy-imports-in-sync
-        name: Check common.compat lazy_compat.pyi is in sync
+      - id: check-common-compat-sdk-imports-in-sync
+        name: Check common.compat sdk.pyi is in sync
         language: python
-        files: 
^providers/common/compat/src/airflow/providers/common/compat/lazy_compat\.(py|pyi)$
+        files: 
^providers/common/compat/src/airflow/providers/common/compat/sdk\.(py|pyi)$
         pass_filenames: false
         entry: ./scripts/ci/prek/check_common_compat_lazy_imports.py
       - id: check-airflow-providers-bug-report-template
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/_compat_utils.py 
b/providers/common/compat/src/airflow/providers/common/compat/_compat_utils.py
new file mode 100644
index 00000000000..e3176c63c88
--- /dev/null
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/_compat_utils.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Reusable utilities for creating compatibility layers with fallback imports.
+
+This module provides the core machinery used by sdk.py and standard/* modules
+to handle import fallbacks between Airflow 3.x and 2.x.
+"""
+
+from __future__ import annotations
+
+import importlib
+
+
+def create_module_getattr(
+    import_map: dict[str, str | tuple[str, ...]],
+    module_map: dict[str, str | tuple[str, ...]] | None = None,
+    rename_map: dict[str, tuple[str, str, str]] | None = None,
+):
+    """
+    Create a __getattr__ function for lazy imports with fallback support.
+
+    :param import_map: Dictionary mapping attribute names to module paths 
(single or tuple for fallback)
+    :param module_map: Dictionary mapping module names to module paths (single 
or tuple for fallback)
+    :param rename_map: Dictionary mapping new names to (new_path, old_path, 
old_name) tuples
+    :return: A __getattr__ function that can be assigned at module level
+    """
+    module_map = module_map or {}
+    rename_map = rename_map or {}
+
+    def __getattr__(name: str):
+        # Check renamed imports first
+        if name in rename_map:
+            new_path, old_path, old_name = rename_map[name]
+
+            rename_error: ImportError | ModuleNotFoundError | AttributeError | 
None = None
+            # Try new path with new name first (Airflow 3.x)
+            try:
+                module = __import__(new_path, fromlist=[name])
+                return getattr(module, name)
+            except (ImportError, ModuleNotFoundError, AttributeError) as e:
+                rename_error = e
+
+            # Fall back to old path with old name (Airflow 2.x)
+            try:
+                module = __import__(old_path, fromlist=[old_name])
+                return getattr(module, old_name)
+            except (ImportError, ModuleNotFoundError, AttributeError):
+                if rename_error:
+                    raise ImportError(
+                        f"Could not import {name!r} from {new_path!r} or 
{old_name!r} from {old_path!r}"
+                    ) from rename_error
+                raise
+
+        # Check module imports
+        if name in module_map:
+            value = module_map[name]
+            paths = value if isinstance(value, tuple) else (value,)
+
+            module_error: ImportError | ModuleNotFoundError | None = None
+            for module_path in paths:
+                try:
+                    return importlib.import_module(module_path)
+                except (ImportError, ModuleNotFoundError) as e:
+                    module_error = e
+                    continue
+
+            if module_error:
+                raise ImportError(f"Could not import module {name!r} from any 
of: {paths}") from module_error
+
+        # Check regular imports
+        if name in import_map:
+            value = import_map[name]
+            paths = value if isinstance(value, tuple) else (value,)
+
+            attr_error: ImportError | ModuleNotFoundError | AttributeError | 
None = None
+            for module_path in paths:
+                try:
+                    module = __import__(module_path, fromlist=[name])
+                    return getattr(module, name)
+                except (ImportError, ModuleNotFoundError, AttributeError) as e:
+                    attr_error = e
+                    continue
+
+            if attr_error:
+                raise ImportError(f"Could not import {name!r} from any of: 
{paths}") from attr_error
+
+        raise AttributeError(f"module has no attribute {name!r}")
+
+    return __getattr__
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/lazy_compat.py 
b/providers/common/compat/src/airflow/providers/common/compat/lazy_compat.py
deleted file mode 100644
index d549e95b457..00000000000
--- a/providers/common/compat/src/airflow/providers/common/compat/lazy_compat.py
+++ /dev/null
@@ -1,310 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Airflow compatibility imports for seamless migration from Airflow 2 to Airflow 
3.
-
-This module provides lazy imports that automatically try Airflow 3 paths first,
-then fall back to Airflow 2 paths, enabling code to work across both versions.
-"""
-
-from __future__ import annotations
-
-from typing import Any
-
-# Rename map for classes that changed names between Airflow 2.x and 3.x
-# Format: new_name -> (new_path, old_path, old_name)
-_RENAME_MAP: dict[str, tuple[str, str, str]] = {
-    # Assets: Dataset -> Asset rename in Airflow 3.0
-    "Asset": ("airflow.sdk", "airflow.datasets", "Dataset"),
-    "AssetAlias": ("airflow.sdk", "airflow.datasets", "DatasetAlias"),
-    "AssetAll": ("airflow.sdk", "airflow.datasets", "DatasetAll"),
-    "AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"),
-}
-
-# Import map for classes/functions/constants
-# Format: class_name -> module_path(s)
-# - str: single module path (no fallback)
-# - tuple[str, ...]: multiple module paths (try in order, newest first)
-_IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
-    # 
============================================================================
-    # Hooks
-    # 
============================================================================
-    "BaseHook": ("airflow.sdk", "airflow.hooks.base"),
-    "FSHook": ("airflow.providers.standard.hooks.filesystem", 
"airflow.hooks.filesystem"),
-    "SubprocessHook": ("airflow.providers.standard.hooks.subprocess", 
"airflow.hooks.subprocess"),
-    "PackageIndexHook": (
-        "airflow.providers.standard.hooks.package_index",
-        "airflow.hooks.package_index",
-    ),
-    # 
============================================================================
-    # Sensors
-    # 
============================================================================
-    "BaseSensorOperator": ("airflow.sdk", "airflow.sensors.base"),
-    "PokeReturnValue": ("airflow.sdk", "airflow.sensors.base"),
-    "poke_mode_only": ("airflow.sdk.bases.sensor", "airflow.sensors.base"),
-    "PythonSensor": ("airflow.providers.standard.sensors.python", 
"airflow.sensors.python"),
-    "BashSensor": ("airflow.providers.standard.sensors.bash", 
"airflow.sensors.bash"),
-    "DateTimeSensor": ("airflow.providers.standard.sensors.date_time", 
"airflow.sensors.date_time"),
-    "DateTimeSensorAsync": ("airflow.providers.standard.sensors.date_time", 
"airflow.sensors.date_time"),
-    "TimeSensor": ("airflow.providers.standard.sensors.time", 
"airflow.sensors.time_sensor"),
-    "TimeSensorAsync": ("airflow.providers.standard.sensors.time", 
"airflow.sensors.time_sensor"),
-    "TimeDeltaSensor": ("airflow.providers.standard.sensors.time_delta", 
"airflow.sensors.time_delta"),
-    "TimeDeltaSensorAsync": (
-        "airflow.providers.standard.sensors.time_delta",
-        "airflow.sensors.time_delta",
-    ),
-    "FileSensor": ("airflow.providers.standard.sensors.filesystem", 
"airflow.sensors.filesystem"),
-    "ExternalTaskSensor": (
-        "airflow.providers.standard.sensors.external_task",
-        "airflow.sensors.external_task",
-    ),
-    "ExternalTaskMarker": (
-        "airflow.providers.standard.sensors.external_task",
-        "airflow.sensors.external_task",
-    ),
-    "ExternalDagLink": ("airflow.providers.standard.sensors.external_task", 
"airflow.sensors.external_task"),
-    "DayOfWeekSensor": ("airflow.providers.standard.sensors.weekday", 
"airflow.sensors.weekday"),
-    # 
============================================================================
-    # Operators
-    # 
============================================================================
-    "BaseOperator": ("airflow.sdk", "airflow.models.baseoperator"),
-    "PythonOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
-    "BranchPythonOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
-    "ShortCircuitOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
-    "_SERIALIZERS": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
-    "PythonVirtualenvOperator": 
("airflow.providers.standard.operators.python", "airflow.operators.python"),
-    "ExternalPythonOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
-    "BranchExternalPythonOperator": (
-        "airflow.providers.standard.operators.python",
-        "airflow.operators.python",
-    ),
-    "BranchPythonVirtualenvOperator": (
-        "airflow.providers.standard.operators.python",
-        "airflow.operators.python",
-    ),
-    "BashOperator": ("airflow.providers.standard.operators.bash", 
"airflow.operators.bash"),
-    "EmptyOperator": ("airflow.providers.standard.operators.empty", 
"airflow.operators.empty"),
-    "LatestOnlyOperator": (
-        "airflow.providers.standard.operators.latest_only",
-        "airflow.operators.latest_only",
-    ),
-    "TriggerDagRunOperator": (
-        "airflow.providers.standard.operators.trigger_dagrun",
-        "airflow.operators.trigger_dagrun",
-    ),
-    "BranchDateTimeOperator": 
("airflow.providers.standard.operators.datetime", "airflow.operators.datetime"),
-    "BranchDayOfWeekOperator": 
("airflow.providers.standard.operators.weekday", "airflow.operators.weekday"),
-    "BranchMixIn": ("airflow.providers.standard.operators.branch", 
"airflow.operators.branch"),
-    "BaseBranchOperator": ("airflow.providers.standard.operators.branch", 
"airflow.operators.branch"),
-    "SmoothOperator": ("airflow.providers.standard.operators.smooth", 
"airflow.operators.smooth"),
-    # 
============================================================================
-    # Decorators
-    # 
============================================================================
-    "task": ("airflow.sdk", "airflow.decorators"),
-    "dag": ("airflow.sdk", "airflow.decorators"),
-    "task_group": ("airflow.sdk", "airflow.decorators"),
-    "setup": ("airflow.sdk", "airflow.decorators"),
-    "teardown": ("airflow.sdk", "airflow.decorators"),
-    "TaskDecorator": ("airflow.sdk.bases.decorator", "airflow.decorators"),
-    # 
============================================================================
-    # Triggers
-    # 
============================================================================
-    "TimeDeltaTrigger": ("airflow.providers.standard.triggers.temporal", 
"airflow.triggers.temporal"),
-    # 
============================================================================
-    # Models
-    # 
============================================================================
-    "Connection": ("airflow.sdk", "airflow.models.connection"),
-    "Variable": ("airflow.sdk", "airflow.models.variable"),
-    "XCom": ("airflow.sdk.execution_time.xcom", "airflow.models.xcom"),
-    "DAG": ("airflow.sdk", "airflow.models.dag"),
-    "DagRun": "airflow.models.dagrun",
-    "TaskInstance": "airflow.models.taskinstance",
-    "Param": ("airflow.sdk", "airflow.models.param"),
-    "XComArg": ("airflow.sdk", "airflow.models.xcom_arg"),
-    "MappedOperator": "airflow.models.mappedoperator",
-    "DecoratedOperator": ("airflow.sdk.bases.decorator", 
"airflow.decorators.base"),
-    "DecoratedMappedOperator": ("airflow.sdk.bases.decorator", 
"airflow.decorators.base"),
-    # 
============================================================================
-    # Exceptions
-    # 
============================================================================
-    "AirflowException": "airflow.exceptions",
-    "AirflowSkipException": "airflow.exceptions",
-    "AirflowFailException": "airflow.exceptions",
-    "AirflowSensorTimeout": "airflow.exceptions",
-    "AirflowTaskTimeout": "airflow.exceptions",
-    "AirflowTaskTerminated": "airflow.exceptions",
-    "AirflowNotFoundException": "airflow.exceptions",
-    "AirflowConfigException": "airflow.exceptions",
-    "AirflowBadRequest": "airflow.exceptions",
-    # 
============================================================================
-    # Assets (Dataset → Asset rename in Airflow 3.0)
-    # 
============================================================================
-    # Note: Asset, AssetAlias, AssetAll, AssetAny are handled by _RENAME_MAP
-    # Metadata moved from airflow.datasets.metadata (2.x) to airflow.sdk (3.x)
-    "Metadata": ("airflow.sdk", "airflow.datasets.metadata"),
-    # 
============================================================================
-    # Notifiers
-    # 
============================================================================
-    "BaseNotifier": ("airflow.sdk", "airflow.notifications.basenotifier"),
-    # 
============================================================================
-    # Operator Links & Task Groups
-    # 
============================================================================
-    "BaseOperatorLink": ("airflow.sdk", "airflow.models.baseoperatorlink"),
-    "TaskGroup": ("airflow.sdk", "airflow.utils.task_group"),
-    # 
============================================================================
-    # Operator Utilities (chain, cross_downstream, etc.)
-    # 
============================================================================
-    "chain": ("airflow.sdk", "airflow.models.baseoperator"),
-    "chain_linear": ("airflow.sdk", "airflow.models.baseoperator"),
-    "cross_downstream": ("airflow.sdk", "airflow.models.baseoperator"),
-    # 
============================================================================
-    # Edge Modifiers & Labels
-    # 
============================================================================
-    "EdgeModifier": ("airflow.sdk", "airflow.utils.edgemodifier"),
-    "Label": ("airflow.sdk", "airflow.utils.edgemodifier"),
-    # 
============================================================================
-    # State Enums
-    # 
============================================================================
-    "DagRunState": ("airflow.sdk", "airflow.utils.state"),
-    "TaskInstanceState": ("airflow.sdk", "airflow.utils.state"),
-    "TriggerRule": ("airflow.sdk", "airflow.utils.trigger_rule"),
-    "WeightRule": ("airflow.sdk", "airflow.utils.weight_rule"),
-    # 
============================================================================
-    # IO & Storage
-    # 
============================================================================
-    "ObjectStoragePath": ("airflow.sdk", "airflow.io.path"),
-    # 
============================================================================
-    # Template Utilities
-    # 
============================================================================
-    "literal": ("airflow.sdk.definitions.template", "airflow.utils.template"),
-    # 
============================================================================
-    # Context & Utilities
-    # 
============================================================================
-    "Context": ("airflow.sdk", "airflow.utils.context"),
-    "get_current_context": ("airflow.sdk", "airflow.operators.python"),
-    "get_parsing_context": ("airflow.sdk", 
"airflow.utils.dag_parsing_context"),
-    # 
============================================================================
-    # Python Virtualenv Utilities
-    # 
============================================================================
-    "prepare_virtualenv": (
-        "airflow.providers.standard.utils.python_virtualenv",
-        "airflow.utils.python_virtualenv",
-    ),
-    "write_python_script": (
-        "airflow.providers.standard.utils.python_virtualenv",
-        "airflow.utils.python_virtualenv",
-    ),
-    # 
============================================================================
-    # Timeout Utilities
-    # 
============================================================================
-    "timeout": ("airflow.sdk.execution_time.timeout", "airflow.utils.timeout"),
-    # 
============================================================================
-    # XCom & Task Communication
-    # 
============================================================================
-    "XCOM_RETURN_KEY": "airflow.models.xcom",
-}
-
-# Module map: module_name -> module_path(s)
-# For entire modules that have been moved (e.g., timezone)
-# Usage: from airflow.providers.common.compat.lazy_compat import timezone
-_MODULE_MAP: dict[str, str | tuple[str, ...]] = {
-    "timezone": ("airflow.sdk.timezone", "airflow.utils.timezone"),
-    "io": ("airflow.sdk.io", "airflow.io"),
-}
-
-
-def __getattr__(name: str) -> Any:
-    """
-    Lazy import compatibility layer.
-
-    Tries to import from Airflow 3 paths first, falls back to Airflow 2 paths.
-    This enables code to work across both Airflow 2.x and 3.x versions.
-
-    Supports:
-    - Renamed classes from _RENAME_MAP: classes that changed names (e.g., 
Dataset -> Asset)
-    - Attributes from _IMPORT_MAP: classes, functions, constants
-    - Modules from _MODULE_MAP: entire modules that have moved
-
-    :param name: Name of the class/function/module to import
-    :return: The imported class/function/module
-    :raises AttributeError: If the name is not in any map
-    :raises ImportError: If all import paths fail
-    """
-    # Check if this is a renamed class
-    if name in _RENAME_MAP:
-        new_path, old_path, old_name = _RENAME_MAP[name]
-
-        rename_error: ImportError | ModuleNotFoundError | AttributeError | 
None = None
-        # Try new path with new name first (Airflow 3.x)
-        try:
-            module = __import__(new_path, fromlist=[name])
-            return getattr(module, name)
-        except (ImportError, ModuleNotFoundError, AttributeError) as e:
-            rename_error = e
-
-        # Fall back to old path with old name (Airflow 2.x)
-        try:
-            module = __import__(old_path, fromlist=[old_name])
-            return getattr(module, old_name)
-        except (ImportError, ModuleNotFoundError, AttributeError):
-            if rename_error:
-                raise ImportError(
-                    f"Could not import {name!r} from {new_path!r} or 
{old_name!r} from {old_path!r}"
-                ) from rename_error
-            raise
-
-    # Check if this is a module import
-    if name in _MODULE_MAP:
-        import importlib
-
-        paths = _MODULE_MAP[name]
-        if isinstance(paths, str):
-            paths = (paths,)
-
-        module_error: ImportError | ModuleNotFoundError | None = None
-        for module_path in paths:
-            try:
-                return importlib.import_module(module_path)
-            except (ImportError, ModuleNotFoundError) as e:
-                module_error = e
-                continue
-
-        if module_error:
-            raise ImportError(f"Could not import module {name!r} from any of: 
{paths}") from module_error
-
-    # Check if this is an attribute import
-    if name in _IMPORT_MAP:
-        paths = _IMPORT_MAP[name]
-        if isinstance(paths, str):
-            paths = (paths,)
-
-        attr_error: ImportError | ModuleNotFoundError | AttributeError | None 
= None
-        for module_path in paths:
-            try:
-                module = __import__(module_path, fromlist=[name])
-                return getattr(module, name)
-            except (ImportError, ModuleNotFoundError, AttributeError) as e:
-                attr_error = e
-                continue
-
-        if attr_error:
-            raise ImportError(f"Could not import {name!r} from any of: 
{paths}") from attr_error
-
-    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
-
-
-__all__ = list(_RENAME_MAP.keys()) + list(_IMPORT_MAP.keys()) + 
list(_MODULE_MAP.keys())
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/lazy_compat.pyi 
b/providers/common/compat/src/airflow/providers/common/compat/lazy_compat.pyi
deleted file mode 100644
index e9812165b65..00000000000
--- 
a/providers/common/compat/src/airflow/providers/common/compat/lazy_compat.pyi
+++ /dev/null
@@ -1,232 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Type stubs for IDE autocomplete - always uses Airflow 3 paths.
-
-This file is auto-generated from lazy_compat.py.
-    - run scripts/ci/prek/check_common_compat_lazy_imports.py --generate 
instead.
-"""
-
-import airflow.sdk.io as io
-import airflow.sdk.timezone as timezone
-from airflow.exceptions import (
-    AirflowBadRequest as AirflowBadRequest,
-    AirflowConfigException as AirflowConfigException,
-    AirflowException as AirflowException,
-    AirflowFailException as AirflowFailException,
-    AirflowNotFoundException as AirflowNotFoundException,
-    AirflowSensorTimeout as AirflowSensorTimeout,
-    AirflowSkipException as AirflowSkipException,
-    AirflowTaskTerminated as AirflowTaskTerminated,
-    AirflowTaskTimeout as AirflowTaskTimeout,
-)
-from airflow.models.dagrun import DagRun as DagRun
-from airflow.models.mappedoperator import MappedOperator as MappedOperator
-from airflow.models.taskinstance import TaskInstance as TaskInstance
-from airflow.models.xcom import XCOM_RETURN_KEY as XCOM_RETURN_KEY
-from airflow.providers.standard.hooks.filesystem import FSHook as FSHook
-from airflow.providers.standard.hooks.package_index import PackageIndexHook as 
PackageIndexHook
-from airflow.providers.standard.hooks.subprocess import SubprocessHook as 
SubprocessHook
-from airflow.providers.standard.operators.bash import BashOperator as 
BashOperator
-from airflow.providers.standard.operators.branch import (
-    BaseBranchOperator as BaseBranchOperator,
-    BranchMixIn as BranchMixIn,
-)
-from airflow.providers.standard.operators.datetime import 
BranchDateTimeOperator as BranchDateTimeOperator
-from airflow.providers.standard.operators.empty import EmptyOperator as 
EmptyOperator
-from airflow.providers.standard.operators.latest_only import 
LatestOnlyOperator as LatestOnlyOperator
-from airflow.providers.standard.operators.python import (
-    _SERIALIZERS as _SERIALIZERS,
-    BranchExternalPythonOperator as BranchExternalPythonOperator,
-    BranchPythonOperator as BranchPythonOperator,
-    BranchPythonVirtualenvOperator as BranchPythonVirtualenvOperator,
-    ExternalPythonOperator as ExternalPythonOperator,
-    PythonOperator as PythonOperator,
-    PythonVirtualenvOperator as PythonVirtualenvOperator,
-    ShortCircuitOperator as ShortCircuitOperator,
-)
-from airflow.providers.standard.operators.smooth import SmoothOperator as 
SmoothOperator
-from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator as TriggerDagRunOperator
-from airflow.providers.standard.operators.weekday import 
BranchDayOfWeekOperator as BranchDayOfWeekOperator
-from airflow.providers.standard.sensors.bash import BashSensor as BashSensor
-from airflow.providers.standard.sensors.date_time import (
-    DateTimeSensor as DateTimeSensor,
-    DateTimeSensorAsync as DateTimeSensorAsync,
-)
-from airflow.providers.standard.sensors.external_task import (
-    ExternalDagLink as ExternalDagLink,
-    ExternalTaskMarker as ExternalTaskMarker,
-    ExternalTaskSensor as ExternalTaskSensor,
-)
-from airflow.providers.standard.sensors.filesystem import FileSensor as 
FileSensor
-from airflow.providers.standard.sensors.python import PythonSensor as 
PythonSensor
-from airflow.providers.standard.sensors.time import (
-    TimeSensor as TimeSensor,
-    TimeSensorAsync as TimeSensorAsync,
-)
-from airflow.providers.standard.sensors.time_delta import (
-    TimeDeltaSensor as TimeDeltaSensor,
-    TimeDeltaSensorAsync as TimeDeltaSensorAsync,
-)
-from airflow.providers.standard.sensors.weekday import DayOfWeekSensor as 
DayOfWeekSensor
-from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger as 
TimeDeltaTrigger
-from airflow.providers.standard.utils.python_virtualenv import (
-    prepare_virtualenv as prepare_virtualenv,
-    write_python_script as write_python_script,
-)
-from airflow.sdk import (
-    DAG as DAG,
-    Asset as Asset,
-    AssetAlias as AssetAlias,
-    AssetAll as AssetAll,
-    AssetAny as AssetAny,
-    BaseHook as BaseHook,
-    BaseNotifier as BaseNotifier,
-    BaseOperator as BaseOperator,
-    BaseOperatorLink as BaseOperatorLink,
-    BaseSensorOperator as BaseSensorOperator,
-    Connection as Connection,
-    Context as Context,
-    DagRunState as DagRunState,
-    EdgeModifier as EdgeModifier,
-    Label as Label,
-    Metadata as Metadata,
-    ObjectStoragePath as ObjectStoragePath,
-    Param as Param,
-    PokeReturnValue as PokeReturnValue,
-    TaskGroup as TaskGroup,
-    TaskInstanceState as TaskInstanceState,
-    TriggerRule as TriggerRule,
-    Variable as Variable,
-    WeightRule as WeightRule,
-    XComArg as XComArg,
-    chain as chain,
-    chain_linear as chain_linear,
-    cross_downstream as cross_downstream,
-    dag as dag,
-    get_current_context as get_current_context,
-    get_parsing_context as get_parsing_context,
-    setup as setup,
-    task as task,
-    task_group as task_group,
-    teardown as teardown,
-)
-from airflow.sdk.bases.decorator import (
-    DecoratedMappedOperator as DecoratedMappedOperator,
-    DecoratedOperator as DecoratedOperator,
-    TaskDecorator as TaskDecorator,
-)
-from airflow.sdk.bases.sensor import poke_mode_only as poke_mode_only
-from airflow.sdk.definitions.template import literal as literal
-from airflow.sdk.execution_time.timeout import timeout as timeout
-from airflow.sdk.execution_time.xcom import XCom as XCom
-
-__all__: list[str] = [
-    "AirflowBadRequest",
-    "AirflowConfigException",
-    "AirflowException",
-    "AirflowFailException",
-    "AirflowNotFoundException",
-    "AirflowSensorTimeout",
-    "AirflowSkipException",
-    "AirflowTaskTerminated",
-    "AirflowTaskTimeout",
-    "Asset",
-    "AssetAlias",
-    "AssetAll",
-    "AssetAny",
-    "BaseBranchOperator",
-    "BaseHook",
-    "BaseNotifier",
-    "BaseOperator",
-    "BaseOperatorLink",
-    "BaseSensorOperator",
-    "BashOperator",
-    "BashSensor",
-    "BranchDateTimeOperator",
-    "BranchDayOfWeekOperator",
-    "BranchExternalPythonOperator",
-    "BranchMixIn",
-    "BranchPythonOperator",
-    "BranchPythonVirtualenvOperator",
-    "Connection",
-    "Context",
-    "DAG",
-    "DagRun",
-    "DagRunState",
-    "DateTimeSensor",
-    "DateTimeSensorAsync",
-    "DayOfWeekSensor",
-    "DecoratedMappedOperator",
-    "DecoratedOperator",
-    "EdgeModifier",
-    "EmptyOperator",
-    "ExternalDagLink",
-    "ExternalPythonOperator",
-    "ExternalTaskMarker",
-    "ExternalTaskSensor",
-    "FSHook",
-    "FileSensor",
-    "Label",
-    "LatestOnlyOperator",
-    "MappedOperator",
-    "Metadata",
-    "ObjectStoragePath",
-    "PackageIndexHook",
-    "Param",
-    "PokeReturnValue",
-    "PythonOperator",
-    "PythonSensor",
-    "PythonVirtualenvOperator",
-    "ShortCircuitOperator",
-    "SmoothOperator",
-    "SubprocessHook",
-    "TaskDecorator",
-    "TaskGroup",
-    "TaskInstance",
-    "TaskInstanceState",
-    "TimeDeltaSensor",
-    "TimeDeltaSensorAsync",
-    "TimeDeltaTrigger",
-    "TimeSensor",
-    "TimeSensorAsync",
-    "TriggerDagRunOperator",
-    "TriggerRule",
-    "Variable",
-    "WeightRule",
-    "XCOM_RETURN_KEY",
-    "XCom",
-    "XComArg",
-    "_SERIALIZERS",
-    "chain",
-    "chain_linear",
-    "cross_downstream",
-    "dag",
-    "get_current_context",
-    "get_parsing_context",
-    "io",
-    "literal",
-    "poke_mode_only",
-    "prepare_virtualenv",
-    "setup",
-    "task",
-    "task_group",
-    "teardown",
-    "timeout",
-    "timezone",
-    "write_python_script",
-]
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py 
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
new file mode 100644
index 00000000000..5bde8567bd1
--- /dev/null
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Airflow compatibility imports for seamless migration from Airflow 2 to Airflow 
3.
+
+This module provides lazy imports that automatically try Airflow 3 paths first,
+then fall back to Airflow 2 paths, enabling code to work across both versions.
+"""
+
+from __future__ import annotations
+
+from airflow.providers.common.compat._compat_utils import create_module_getattr
+
+# Rename map for classes that changed names between Airflow 2.x and 3.x
+# Format: new_name -> (new_path, old_path, old_name)
+_RENAME_MAP: dict[str, tuple[str, str, str]] = {
+    # Assets: Dataset -> Asset rename in Airflow 3.0
+    "Asset": ("airflow.sdk", "airflow.datasets", "Dataset"),
+    "AssetAlias": ("airflow.sdk", "airflow.datasets", "DatasetAlias"),
+    "AssetAll": ("airflow.sdk", "airflow.datasets", "DatasetAll"),
+    "AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"),
+}
+
+# Import map for classes/functions/constants
+# Format: class_name -> module_path(s)
+# - str: single module path (no fallback)
+# - tuple[str, ...]: multiple module paths (try in order, newest first)
+_IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
+    # 
============================================================================
+    # Hooks
+    # 
============================================================================
+    "BaseHook": ("airflow.sdk", "airflow.hooks.base"),
+    # 
============================================================================
+    # Sensors
+    # 
============================================================================
+    "BaseSensorOperator": ("airflow.sdk", "airflow.sensors.base"),
+    "PokeReturnValue": ("airflow.sdk", "airflow.sensors.base"),
+    "poke_mode_only": ("airflow.sdk.bases.sensor", "airflow.sensors.base"),
+    # 
============================================================================
+    # Operators
+    # 
============================================================================
+    "BaseOperator": ("airflow.sdk", "airflow.models.baseoperator"),
+    # 
============================================================================
+    # Decorators
+    # 
============================================================================
+    "task": ("airflow.sdk", "airflow.decorators"),
+    "dag": ("airflow.sdk", "airflow.decorators"),
+    "task_group": ("airflow.sdk", "airflow.decorators"),
+    "setup": ("airflow.sdk", "airflow.decorators"),
+    "teardown": ("airflow.sdk", "airflow.decorators"),
+    "TaskDecorator": ("airflow.sdk.bases.decorator", "airflow.decorators"),
+    # 
============================================================================
+    # Models
+    # 
============================================================================
+    "Connection": ("airflow.sdk", "airflow.models.connection"),
+    "Variable": ("airflow.sdk", "airflow.models.variable"),
+    "XCom": ("airflow.sdk.execution_time.xcom", "airflow.models.xcom"),
+    "DAG": ("airflow.sdk", "airflow.models.dag"),
+    "Param": ("airflow.sdk", "airflow.models.param"),
+    "XComArg": ("airflow.sdk", "airflow.models.xcom_arg"),
+    "DecoratedOperator": ("airflow.sdk.bases.decorator", 
"airflow.decorators.base"),
+    "DecoratedMappedOperator": ("airflow.sdk.bases.decorator", 
"airflow.decorators.base"),
+    # 
============================================================================
+    # Assets (Dataset → Asset rename in Airflow 3.0)
+    # 
============================================================================
+    # Note: Asset, AssetAlias, AssetAll, AssetAny are handled by _RENAME_MAP
+    # Metadata moved from airflow.datasets.metadata (2.x) to airflow.sdk (3.x)
+    "Metadata": ("airflow.sdk", "airflow.datasets.metadata"),
+    # 
============================================================================
+    # Notifiers
+    # 
============================================================================
+    "BaseNotifier": ("airflow.sdk", "airflow.notifications.basenotifier"),
+    # 
============================================================================
+    # Operator Links & Task Groups
+    # 
============================================================================
+    "BaseOperatorLink": ("airflow.sdk", "airflow.models.baseoperatorlink"),
+    "TaskGroup": ("airflow.sdk", "airflow.utils.task_group"),
+    # 
============================================================================
+    # Operator Utilities (chain, cross_downstream, etc.)
+    # 
============================================================================
+    "chain": ("airflow.sdk", "airflow.models.baseoperator"),
+    "chain_linear": ("airflow.sdk", "airflow.models.baseoperator"),
+    "cross_downstream": ("airflow.sdk", "airflow.models.baseoperator"),
+    # 
============================================================================
+    # Edge Modifiers & Labels
+    # 
============================================================================
+    "EdgeModifier": ("airflow.sdk", "airflow.utils.edgemodifier"),
+    "Label": ("airflow.sdk", "airflow.utils.edgemodifier"),
+    # 
============================================================================
+    # State Enums
+    # 
============================================================================
+    "DagRunState": ("airflow.sdk", "airflow.utils.state"),
+    "TaskInstanceState": ("airflow.sdk", "airflow.utils.state"),
+    "TriggerRule": ("airflow.sdk", "airflow.utils.trigger_rule"),
+    "WeightRule": ("airflow.sdk", "airflow.utils.weight_rule"),
+    # 
============================================================================
+    # IO & Storage
+    # 
============================================================================
+    "ObjectStoragePath": ("airflow.sdk", "airflow.io.path"),
+    # 
============================================================================
+    # Template Utilities
+    # 
============================================================================
+    "literal": ("airflow.sdk.definitions.template", "airflow.utils.template"),
+    # 
============================================================================
+    # Context & Utilities
+    # 
============================================================================
+    "Context": ("airflow.sdk", "airflow.utils.context"),
+    "get_current_context": ("airflow.sdk", "airflow.operators.python"),
+    "get_parsing_context": ("airflow.sdk", 
"airflow.utils.dag_parsing_context"),
+    # 
============================================================================
+    # Timeout Utilities
+    # 
============================================================================
+    "timeout": ("airflow.sdk.execution_time.timeout", "airflow.utils.timeout"),
+    # 
============================================================================
+    # XCom & Task Communication
+    # 
============================================================================
+    "XCOM_RETURN_KEY": "airflow.models.xcom",
+}
+
+# Module map: module_name -> module_path(s)
+# For entire modules that have been moved (e.g., timezone)
+# Usage: from airflow.providers.common.compat.lazy_compat import timezone
+_MODULE_MAP: dict[str, str | tuple[str, ...]] = {
+    "timezone": ("airflow.sdk.timezone", "airflow.utils.timezone"),
+    "io": ("airflow.sdk.io", "airflow.io"),
+}
+
+# Use the shared utility to create __getattr__
+__getattr__ = create_module_getattr(
+    import_map=_IMPORT_MAP,
+    module_map=_MODULE_MAP,
+    rename_map=_RENAME_MAP,
+)
+
+__all__ = list(_RENAME_MAP.keys()) + list(_IMPORT_MAP.keys()) + 
list(_MODULE_MAP.keys())
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/sdk.pyi 
b/providers/common/compat/src/airflow/providers/common/compat/sdk.pyi
new file mode 100644
index 00000000000..99593f5d20c
--- /dev/null
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.pyi
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Type stubs for IDE autocomplete - always uses Airflow 3 paths.
+
+This file is auto-generated from sdk.py.
+    - run scripts/ci/prek/check_common_compat_lazy_imports.py --generate 
instead.
+"""
+
+import airflow.sdk.io as io
+import airflow.sdk.timezone as timezone
+from airflow.models.xcom import XCOM_RETURN_KEY as XCOM_RETURN_KEY
+from airflow.sdk import (
+    DAG as DAG,
+    Asset as Asset,
+    AssetAlias as AssetAlias,
+    AssetAll as AssetAll,
+    AssetAny as AssetAny,
+    BaseHook as BaseHook,
+    BaseNotifier as BaseNotifier,
+    BaseOperator as BaseOperator,
+    BaseOperatorLink as BaseOperatorLink,
+    BaseSensorOperator as BaseSensorOperator,
+    Connection as Connection,
+    Context as Context,
+    DagRunState as DagRunState,
+    EdgeModifier as EdgeModifier,
+    Label as Label,
+    Metadata as Metadata,
+    ObjectStoragePath as ObjectStoragePath,
+    Param as Param,
+    PokeReturnValue as PokeReturnValue,
+    TaskGroup as TaskGroup,
+    TaskInstanceState as TaskInstanceState,
+    TriggerRule as TriggerRule,
+    Variable as Variable,
+    WeightRule as WeightRule,
+    XComArg as XComArg,
+    chain as chain,
+    chain_linear as chain_linear,
+    cross_downstream as cross_downstream,
+    dag as dag,
+    get_current_context as get_current_context,
+    get_parsing_context as get_parsing_context,
+    setup as setup,
+    task as task,
+    task_group as task_group,
+    teardown as teardown,
+)
+from airflow.sdk.bases.decorator import (
+    DecoratedMappedOperator as DecoratedMappedOperator,
+    DecoratedOperator as DecoratedOperator,
+    TaskDecorator as TaskDecorator,
+)
+from airflow.sdk.bases.sensor import poke_mode_only as poke_mode_only
+from airflow.sdk.definitions.template import literal as literal
+from airflow.sdk.execution_time.timeout import timeout as timeout
+from airflow.sdk.execution_time.xcom import XCom as XCom
+
+__all__: list[str] = [
+    "Asset",
+    "AssetAlias",
+    "AssetAll",
+    "AssetAny",
+    "BaseHook",
+    "BaseNotifier",
+    "BaseOperator",
+    "BaseOperatorLink",
+    "BaseSensorOperator",
+    "Connection",
+    "Context",
+    "DAG",
+    "DagRunState",
+    "DecoratedMappedOperator",
+    "DecoratedOperator",
+    "EdgeModifier",
+    "Label",
+    "Metadata",
+    "ObjectStoragePath",
+    "Param",
+    "PokeReturnValue",
+    "TaskDecorator",
+    "TaskGroup",
+    "TaskInstanceState",
+    "TriggerRule",
+    "Variable",
+    "WeightRule",
+    "XCOM_RETURN_KEY",
+    "XCom",
+    "XComArg",
+    "chain",
+    "chain_linear",
+    "cross_downstream",
+    "dag",
+    "get_current_context",
+    "get_parsing_context",
+    "io",
+    "literal",
+    "poke_mode_only",
+    "setup",
+    "task",
+    "task_group",
+    "teardown",
+    "timeout",
+    "timezone",
+]
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
 
b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
index 4190a1a0ace..6b77db3e4a9 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
@@ -17,13 +17,18 @@
 
 from __future__ import annotations
 
-# Re-export from lazy_compat for backward compatibility
-from airflow.providers.common.compat.lazy_compat import (
-    _SERIALIZERS,
-    BaseOperator,
-    PythonOperator,
-    ShortCircuitOperator,
-    get_current_context,
-)
+from airflow.providers.common.compat._compat_utils import create_module_getattr
 
-__all__ = ["BaseOperator", "PythonOperator", "_SERIALIZERS", 
"ShortCircuitOperator", "get_current_context"]
+_IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
+    # Re-export from sdk (which handles Airflow 2.x/3.x fallbacks)
+    "BaseOperator": "airflow.providers.common.compat.sdk",
+    "get_current_context": "airflow.providers.common.compat.sdk",
+    # Standard provider items with direct fallbacks
+    "PythonOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
+    "ShortCircuitOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
+    "_SERIALIZERS": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
+}
+
+__getattr__ = create_module_getattr(import_map=_IMPORT_MAP)
+
+__all__ = sorted(_IMPORT_MAP.keys())
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/standard/triggers.py
 
b/providers/common/compat/src/airflow/providers/common/compat/standard/triggers.py
index 8c697ff84cf..70066467e21 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/standard/triggers.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/standard/triggers.py
@@ -17,7 +17,12 @@
 
 from __future__ import annotations
 
-# Re-export from lazy_compat for backward compatibility
-from airflow.providers.common.compat.lazy_compat import TimeDeltaTrigger
+from airflow.providers.common.compat._compat_utils import create_module_getattr
 
-__all__ = ["TimeDeltaTrigger"]
+_IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
+    "TimeDeltaTrigger": ("airflow.providers.standard.triggers.temporal", 
"airflow.triggers.temporal"),
+}
+
+__getattr__ = create_module_getattr(import_map=_IMPORT_MAP)
+
+__all__ = sorted(_IMPORT_MAP.keys())
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/standard/utils.py 
b/providers/common/compat/src/airflow/providers/common/compat/standard/utils.py
index badad0e8ae0..3f7f4b2962f 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/standard/utils.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/standard/utils.py
@@ -17,7 +17,19 @@
 
 from __future__ import annotations
 
-# Re-export from lazy_compat for backward compatibility
-from airflow.providers.common.compat.lazy_compat import prepare_virtualenv, 
write_python_script
+from airflow.providers.common.compat._compat_utils import create_module_getattr
 
-__all__ = ["write_python_script", "prepare_virtualenv"]
+_IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
+    "write_python_script": (
+        "airflow.providers.standard.utils.python_virtualenv",
+        "airflow.utils.python_virtualenv",
+    ),
+    "prepare_virtualenv": (
+        "airflow.providers.standard.utils.python_virtualenv",
+        "airflow.utils.python_virtualenv",
+    ),
+}
+
+__getattr__ = create_module_getattr(import_map=_IMPORT_MAP)
+
+__all__ = sorted(_IMPORT_MAP.keys())
diff --git 
a/providers/common/compat/tests/unit/common/compat/test__compat_utils.py 
b/providers/common/compat/tests/unit/common/compat/test__compat_utils.py
new file mode 100644
index 00000000000..96bbf04701c
--- /dev/null
+++ b/providers/common/compat/tests/unit/common/compat/test__compat_utils.py
@@ -0,0 +1,215 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.common.compat._compat_utils import create_module_getattr
+
+
+class TestCreateModuleGetattr:
+    """Unit tests for the create_module_getattr utility function."""
+
+    @pytest.mark.parametrize(
+        ["name", "import_map", "is_module"],
+        [
+            ("BaseHook", {"BaseHook": "airflow.hooks.base"}, False),
+            ("timezone", {}, True),  # Will be tested with module_map
+            ("utcnow", {"utcnow": "airflow.utils.timezone"}, False),
+        ],
+    )
+    def test_single_path_import(self, name, import_map, is_module):
+        """Test basic single-path imports work correctly."""
+        if name == "timezone":
+            getattr_fn = create_module_getattr(import_map={}, 
module_map={name: "airflow.utils.timezone"})
+        else:
+            getattr_fn = create_module_getattr(import_map=import_map)
+
+        result = getattr_fn(name)
+        if is_module:
+            # Check if it's a module
+            import types
+
+            assert isinstance(result, types.ModuleType)
+        else:
+            # Check if it's a class or callable
+            assert isinstance(result, type) or callable(result)
+
+    @pytest.mark.parametrize(
+        ["name", "paths", "should_succeed"],
+        [
+            ("BaseHook", ("airflow.sdk", "airflow.hooks.base"), True),
+            ("NonExistent", ("fake.module1", "fake.module2"), False),
+            ("timezone", ("airflow.sdk.timezone", "airflow.utils.timezone"), 
True),
+        ],
+    )
+    def test_fallback_import_mechanism(self, name, paths, should_succeed):
+        """Test that fallback paths are tried in order."""
+        if name == "timezone":
+            getattr_fn = create_module_getattr(import_map={}, 
module_map={name: paths})
+        else:
+            getattr_fn = create_module_getattr(import_map={name: paths})
+
+        if should_succeed:
+            result = getattr_fn(name)
+            assert result is not None
+        else:
+            with pytest.raises(ImportError, match=f"Could not import 
{name!r}"):
+                getattr_fn(name)
+
+    def test_rename_map_tries_new_then_old(self):
+        """Test that renamed classes try new name first, then fall back to 
old."""
+        rename_map = {
+            "Asset": ("airflow.sdk", "airflow.datasets", "Dataset"),
+        }
+        getattr_fn = create_module_getattr(import_map={}, 
rename_map=rename_map)
+
+        # Should successfully import (either Asset from airflow.sdk or Dataset 
from airflow.datasets)
+        result = getattr_fn("Asset")
+        assert result is not None
+        # In Airflow 3, it's Asset; in Airflow 2, it would be Dataset
+        assert result.__name__ in ("Asset", "Dataset")
+
+    def test_module_map_imports_whole_module(self):
+        """Test that module_map imports entire modules, not just attributes."""
+        module_map = {"timezone": "airflow.utils.timezone"}
+        getattr_fn = create_module_getattr(import_map={}, 
module_map=module_map)
+
+        result = getattr_fn("timezone")
+        assert hasattr(result, "utc")  # Module should have attributes
+        assert hasattr(result, "utcnow")
+
+    def test_exception_chaining_preserves_context(self):
+        """Test that exception chaining with 'from' preserves original error 
context."""
+        import_map = {"NonExistent": ("fake.module1", "fake.module2")}
+        getattr_fn = create_module_getattr(import_map=import_map)
+
+        with pytest.raises(ImportError) as exc_info:
+            getattr_fn("NonExistent")
+
+        # Verify exception has __cause__ (exception chaining)
+        assert exc_info.value.__cause__ is not None
+
+    @pytest.mark.parametrize(
+        "error_scenario,map_config,expected_match",
+        [
+            (
+                "import_error",
+                {"import_map": {"Fake": ("nonexistent.mod1", 
"nonexistent.mod2")}},
+                "Could not import 'Fake' from any of:",
+            ),
+            (
+                "module_error",
+                {"module_map": {"fake_mod": ("nonexistent.module1", 
"nonexistent.module2")}},
+                "Could not import module 'fake_mod' from any of:",
+            ),
+            (
+                "rename_error",
+                {"rename_map": {"NewName": ("fake.new", "fake.old", 
"OldName")}},
+                "Could not import 'NewName' from 'fake.new' or 'OldName' from 
'fake.old'",
+            ),
+        ],
+    )
+    def test_error_messages_include_all_paths(self, error_scenario, 
map_config, expected_match):
+        """Test that error messages include all attempted paths for 
debugging."""
+        getattr_fn = create_module_getattr(
+            import_map=map_config.get("import_map", {}),
+            module_map=map_config.get("module_map"),
+            rename_map=map_config.get("rename_map"),
+        )
+
+        keys = (
+            map_config.get("import_map", {}).keys()
+            or map_config.get("module_map", {}).keys()
+            or map_config.get("rename_map", {}).keys()
+        )
+        name = next(iter(keys))
+
+        with pytest.raises(ImportError, match=expected_match):
+            getattr_fn(name)
+
+    def test_attribute_error_for_unknown_name(self):
+        """Test that accessing unknown attributes raises AttributeError with 
correct message."""
+        getattr_fn = create_module_getattr(import_map={"BaseHook": 
"airflow.hooks.base"})
+
+        with pytest.raises(AttributeError, match="module has no attribute 
'UnknownClass'"):
+            getattr_fn("UnknownClass")
+
+    def test_optional_params_default_to_empty(self):
+        """Test that module_map and rename_map default to empty dicts when not 
provided."""
+        getattr_fn = create_module_getattr(import_map={"BaseHook": 
"airflow.hooks.base"})
+
+        # Should work fine without module_map and rename_map
+        result = getattr_fn("BaseHook")
+        assert result is not None
+
+        # Should raise AttributeError for names not in any map
+        with pytest.raises(AttributeError):
+            getattr_fn("NonExistent")
+
+    def test_priority_order_rename_then_module_then_import(self):
+        """Test that rename_map has priority over module_map, which has 
priority over import_map."""
+        # If a name exists in multiple maps, rename_map should be checked first
+        import_map = {"test": "airflow.hooks.base"}
+        module_map = {"test": "airflow.utils.timezone"}
+        rename_map = {"test": ("airflow.sdk", "airflow.datasets", "Dataset")}
+
+        getattr_fn = create_module_getattr(
+            import_map=import_map,
+            module_map=module_map,
+            rename_map=rename_map,
+        )
+
+        # Should use rename_map (which tries to import Asset/Dataset)
+        result = getattr_fn("test")
+        # Verify it came from rename_map (Asset or Dataset class, depending on 
Airflow version)
+        assert hasattr(result, "__name__")
+        assert result.__name__ in ("Asset", "Dataset")
+
+    def test_module_not_found_error_is_caught(self):
+        """Test that ModuleNotFoundError (Python 3.6+) is properly caught."""
+        import_map = {"Fake": 
"completely.nonexistent.module.that.does.not.exist"}
+        getattr_fn = create_module_getattr(import_map=import_map)
+
+        # Should catch ModuleNotFoundError and raise ImportError
+        with pytest.raises(ImportError, match="Could not import 'Fake'"):
+            getattr_fn("Fake")
+
+    @pytest.mark.parametrize(
+        "map_type,config",
+        [
+            ("import_map", {"BaseHook": "airflow.hooks.base"}),
+            ("module_map", {"timezone": "airflow.utils.timezone"}),
+            ("rename_map", {"Asset": ("airflow.sdk", "airflow.datasets", 
"Dataset")}),
+        ],
+    )
+    def test_each_map_type_works_independently(self, map_type, config):
+        """Test that each map type (import, module, rename) works correctly on 
its own."""
+        kwargs = {"import_map": {}}
+        if map_type == "import_map":
+            kwargs["import_map"] = config
+        elif map_type == "module_map":
+            kwargs["module_map"] = config
+        elif map_type == "rename_map":
+            kwargs["rename_map"] = config
+
+        getattr_fn = create_module_getattr(**kwargs)
+        name = next(iter(config.keys()))
+
+        result = getattr_fn(name)
+        assert result is not None
diff --git 
a/providers/common/compat/tests/unit/common/compat/test_lazy_compat.py 
b/providers/common/compat/tests/unit/common/compat/test_sdk.py
similarity index 87%
rename from providers/common/compat/tests/unit/common/compat/test_lazy_compat.py
rename to providers/common/compat/tests/unit/common/compat/test_sdk.py
index c48a9360c34..08b5107e493 100644
--- a/providers/common/compat/tests/unit/common/compat/test_lazy_compat.py
+++ b/providers/common/compat/tests/unit/common/compat/test_sdk.py
@@ -28,13 +28,13 @@ def test_all_compat_imports_work():
     For each item, validates that at least one of the specified import paths 
works,
     ensuring the fallback mechanism is functional.
     """
-    from airflow.providers.common.compat import lazy_compat
+    from airflow.providers.common.compat import sdk
 
     failed_imports = []
 
-    for name in lazy_compat.__all__:
+    for name in sdk.__all__:
         try:
-            obj = getattr(lazy_compat, name)
+            obj = getattr(sdk, name)
             assert obj is not None, f"{name} imported as None"
         except (ImportError, AttributeError) as e:
             failed_imports.append((name, str(e)))
@@ -48,7 +48,7 @@ def test_all_compat_imports_work():
 
 def test_invalid_import_raises_attribute_error():
     """Test that importing non-existent attribute raises AttributeError."""
-    from airflow.providers.common.compat import lazy_compat
+    from airflow.providers.common.compat import sdk
 
     with pytest.raises(AttributeError, match="has no attribute 
'NonExistentClass'"):
-        _ = lazy_compat.NonExistentClass
+        _ = sdk.NonExistentClass
diff --git 
a/providers/edge3/src/airflow/providers/edge3/example_dags/win_test.py 
b/providers/edge3/src/airflow/providers/edge3/example_dags/win_test.py
index 5e0b52370af..363fa5e9bac 100644
--- a/providers/edge3/src/airflow/providers/edge3/example_dags/win_test.py
+++ b/providers/edge3/src/airflow/providers/edge3/example_dags/win_test.py
@@ -67,7 +67,7 @@ if TYPE_CHECKING:
 try:
     from airflow.operators.python import PythonOperator
 except ImportError:
-    from airflow.providers.common.compat.standard.operators import 
PythonOperator
+    from airflow.providers.common.compat.standard.operators import 
PythonOperator  # type: ignore[no-redef]
 
 
 class CmdOperator(BaseOperator):
diff --git a/providers/google/src/airflow/providers/google/ads/hooks/ads.py 
b/providers/google/src/airflow/providers/google/ads/hooks/ads.py
index 469174f779e..9dfd6e0f56c 100644
--- a/providers/google/src/airflow/providers/google/ads/hooks/ads.py
+++ b/providers/google/src/airflow/providers/google/ads/hooks/ads.py
@@ -28,7 +28,7 @@ from google.ads.googleads.errors import GoogleAdsException
 from google.auth.exceptions import GoogleAuthError
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseHook
+from airflow.providers.common.compat.sdk import BaseHook
 from airflow.providers.google.common.hooks.base_google import get_field
 
 if TYPE_CHECKING:
diff --git a/providers/google/src/airflow/providers/google/assets/gcs.py 
b/providers/google/src/airflow/providers/google/assets/gcs.py
index 770c75ac3e0..116f256bdc3 100644
--- a/providers/google/src/airflow/providers/google/assets/gcs.py
+++ b/providers/google/src/airflow/providers/google/assets/gcs.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.lazy_compat import Asset
+from airflow.providers.common.compat.sdk import Asset
 from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
index daa8bba9165..47be19f5f68 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -57,7 +57,7 @@ if AIRFLOW_V_3_1_PLUS:
 else:
     from airflow.models import Connection  # type: 
ignore[assignment,attr-defined,no-redef]
 
-from airflow.providers.common.compat.lazy_compat import BaseHook
+from airflow.providers.common.compat.sdk import BaseHook
 from airflow.providers.google.cloud.hooks.secret_manager import (
     GoogleCloudSecretManagerHook,
 )
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py
index 05981be2d62..ba994fbff37 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py
@@ -51,7 +51,7 @@ from googleapiclient.discovery import Resource, build
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType, 
beam_options_to_args
-from airflow.providers.common.compat.lazy_compat import timeout
+from airflow.providers.common.compat.sdk import timeout
 from airflow.providers.google.common.hooks.base_google import (
     PROVIDE_PROJECT_ID,
     GoogleBaseAsyncHook,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/dataprep.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/dataprep.py
index 3f879a15b1b..34a33a146d8 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/dataprep.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/dataprep.py
@@ -28,7 +28,7 @@ import requests
 from requests import HTTPError
 from tenacity import retry, stop_after_attempt, wait_exponential
 
-from airflow.providers.common.compat.lazy_compat import BaseHook
+from airflow.providers.common.compat.sdk import BaseHook
 
 
 def _get_field(extras: dict, field_name: str) -> str | None:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/looker.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/looker.py
index 906007a1e5e..81cd4e02d30 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/looker.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/looker.py
@@ -29,7 +29,7 @@ from looker_sdk.sdk.api40 import methods as methods40
 from packaging.version import parse as parse_version
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseHook
+from airflow.providers.common.compat.sdk import BaseHook
 from airflow.version import version
 
 if TYPE_CHECKING:
diff --git a/providers/google/src/airflow/providers/google/cloud/links/base.py 
b/providers/google/src/airflow/providers/google/cloud/links/base.py
index 64ea02a30f9..abd1e5e4e0b 100644
--- a/providers/google/src/airflow/providers/google/cloud/links/base.py
+++ b/providers/google/src/airflow/providers/google/cloud/links/base.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING, ClassVar
 from urllib.parse import urlparse
 
-from airflow.providers.common.compat.lazy_compat import BaseOperatorLink, 
BaseSensorOperator, XCom
+from airflow.providers.common.compat.sdk import BaseOperatorLink, 
BaseSensorOperator, XCom
 from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS, 
BaseOperator
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py 
b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py
index b18d3df304e..a78157c21d2 100644
--- a/providers/google/src/airflow/providers/google/cloud/links/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/links/dataproc.py
@@ -25,7 +25,7 @@ from typing import TYPE_CHECKING, Any
 import attr
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.common.compat.lazy_compat import BaseOperatorLink, XCom
+from airflow.providers.common.compat.sdk import BaseOperatorLink, XCom
 from airflow.providers.google.cloud.links.base import BASE_LINK, BaseGoogleLink
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_sql.py 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_sql.py
index ad60fbf2b00..c7392e45dd8 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_sql.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_sql.py
@@ -28,7 +28,7 @@ from googleapiclient.errors import HttpError
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseHook
+from airflow.providers.common.compat.sdk import BaseHook
 from airflow.providers.google.cloud.hooks.cloud_sql import 
CloudSQLDatabaseHook, CloudSQLHook
 from airflow.providers.google.cloud.links.cloud_sql import 
CloudSQLInstanceDatabaseLink, CloudSQLInstanceLink
 from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
index 97c5c868b28..b7a08e75d40 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery.py
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING, Any
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.google.cloud.triggers.bigquery import (
     BigQueryTableExistenceTrigger,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery_dts.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery_dts.py
index 5020079c07c..f1aa0af8713 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/sensors/bigquery_dts.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/sensors/bigquery_dts.py
@@ -26,7 +26,7 @@ from google.api_core.gapic_v1.method import DEFAULT, 
_MethodDefault
 from google.cloud.bigquery_datatransfer_v1 import TransferState
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.bigquery_dts import 
BiqQueryDataTransferServiceHook
 from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/bigtable.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/bigtable.py
index 10ef01a7630..45a4e16bc12 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/bigtable.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/bigtable.py
@@ -26,7 +26,7 @@ import google.api_core.exceptions
 from google.cloud.bigtable import enums
 from google.cloud.bigtable.table import ClusterState
 
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
 from airflow.providers.google.cloud.links.bigtable import BigtableTablesLink
 from airflow.providers.google.cloud.operators.bigtable import 
BigtableValidationMixin
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
index f58d484ad44..b306457de0d 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -31,7 +31,7 @@ from google.cloud.orchestration.airflow.service_v1.types 
import Environment, Exe
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.cloud_composer import 
CloudComposerHook
 from airflow.providers.google.cloud.triggers.cloud_composer import 
CloudComposerDAGRunTrigger
 from airflow.providers.google.common.consts import 
GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
 
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
index cdcf604ff7b..c4fe4669990 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
@@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
     COUNTERS,
     METADATA,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/dataflow.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/dataflow.py
index e33fd9f2ba4..84f92f60270 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/dataflow.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/dataflow.py
@@ -25,7 +25,7 @@ from typing import TYPE_CHECKING, Any
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator, 
PokeReturnValue
+from airflow.providers.common.compat.sdk import BaseSensorOperator, 
PokeReturnValue
 from airflow.providers.google.cloud.hooks.dataflow import (
     DEFAULT_DATAFLOW_LOCATION,
     DataflowHook,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/dataform.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/dataform.py
index 0714efa3622..65a44c1c993 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/dataform.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/dataform.py
@@ -23,7 +23,7 @@ from collections.abc import Iterable, Sequence
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.dataform import DataformHook
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
index 213102c19d7..d04806fb403 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/datafusion.py
@@ -23,7 +23,7 @@ from collections.abc import Iterable, Sequence
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowException, AirflowNotFoundException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
 from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/dataplex.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/dataplex.py
index cc9caaa1ed5..918883a9d5a 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/dataplex.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/dataplex.py
@@ -32,7 +32,7 @@ from google.api_core.gapic_v1.method import DEFAULT, 
_MethodDefault
 from google.cloud.dataplex_v1.types import DataScanJob
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.dataplex import (
     AirflowDataQualityScanException,
     AirflowDataQualityScanResultTimeoutException,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/dataprep.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/dataprep.py
index 5a41e937219..6b6cebe2763 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/dataprep.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/dataprep.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.dataprep import GoogleDataprepHook, 
JobGroupStatuses
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/dataproc.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/dataproc.py
index 27ddd289390..c56a4d01263 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/dataproc.py
@@ -27,7 +27,7 @@ from google.api_core.exceptions import ServerError
 from google.cloud.dataproc_v1.types import Batch, JobStatus
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.dataproc import DataprocHook
 from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
 
b/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
index de9fe700bb7..3cddb5d4dfd 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/sensors/dataproc_metastore.py
@@ -21,7 +21,7 @@ from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.dataproc_metastore import 
DataprocMetastoreHook
 from airflow.providers.google.cloud.hooks.gcs import parse_json_from_gcs
 
diff --git a/providers/google/src/airflow/providers/google/cloud/sensors/gcs.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/gcs.py
index 133d4d1f5e2..d8ab4449c0d 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/gcs.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/gcs.py
@@ -29,7 +29,7 @@ from google.cloud.storage.retry import DEFAULT_RETRY
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator, 
poke_mode_only
+from airflow.providers.common.compat.sdk import BaseSensorOperator, 
poke_mode_only
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.triggers.gcs import (
     GCSBlobTrigger,
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/looker.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/looker.py
index f55a891a3e7..cdc0cd5ed75 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/looker.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/looker.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.looker import JobStatus, LookerHook
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/pubsub.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/pubsub.py
index cdc148d2e5a..a0b286ab318 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/pubsub.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/pubsub.py
@@ -28,7 +28,7 @@ from google.cloud.pubsub_v1.types import ReceivedMessage
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
 from airflow.providers.google.cloud.triggers.pubsub import PubsubPullTrigger
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/tasks.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/tasks.py
index 3a1e40b691c..1c2275270b2 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/tasks.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/tasks.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook
 from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/vertex_ai/feature_store.py
 
b/providers/google/src/airflow/providers/google/cloud/sensors/vertex_ai/feature_store.py
index c403e57bc67..8f8e6804b5b 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/sensors/vertex_ai/feature_store.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/sensors/vertex_ai/feature_store.py
@@ -24,7 +24,7 @@ from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.vertex_ai.feature_store import 
FeatureStoreHook
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/cloud/sensors/workflows.py 
b/providers/google/src/airflow/providers/google/cloud/sensors/workflows.py
index 2d429d32398..89f4309078c 100644
--- a/providers/google/src/airflow/providers/google/cloud/sensors/workflows.py
+++ b/providers/google/src/airflow/providers/google/cloud/sensors/workflows.py
@@ -23,7 +23,7 @@ from google.api_core.gapic_v1.method import DEFAULT, 
_MethodDefault
 from google.cloud.workflows.executions_v1beta import Execution
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
 from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
 
diff --git 
a/providers/google/src/airflow/providers/google/common/hooks/base_google.py 
b/providers/google/src/airflow/providers/google/common/hooks/base_google.py
index 5736ab116e8..f2fd56f524c 100644
--- a/providers/google/src/airflow/providers/google/common/hooks/base_google.py
+++ b/providers/google/src/airflow/providers/google/common/hooks/base_google.py
@@ -50,7 +50,7 @@ from requests import Session
 
 from airflow import version
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseHook
+from airflow.providers.common.compat.sdk import BaseHook
 from airflow.providers.google.cloud.utils.credentials_provider import (
     _get_scopes,
     _get_target_principal_and_delegates,
diff --git 
a/providers/google/src/airflow/providers/google/leveldb/hooks/leveldb.py 
b/providers/google/src/airflow/providers/google/leveldb/hooks/leveldb.py
index f1e03d13704..947e6177c51 100644
--- a/providers/google/src/airflow/providers/google/leveldb/hooks/leveldb.py
+++ b/providers/google/src/airflow/providers/google/leveldb/hooks/leveldb.py
@@ -21,7 +21,7 @@ from __future__ import annotations
 from typing import Any
 
 from airflow.exceptions import AirflowException, 
AirflowOptionalProviderFeatureException
-from airflow.providers.common.compat.lazy_compat import BaseHook
+from airflow.providers.common.compat.sdk import BaseHook
 
 try:
     import plyvel
diff --git 
a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py
 
b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py
index 1d186d93867..6214b8d267e 100644
--- 
a/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py
+++ 
b/providers/google/src/airflow/providers/google/marketing_platform/links/analytics_admin.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, ClassVar
 
-from airflow.providers.common.compat.lazy_compat import BaseOperatorLink, XCom
+from airflow.providers.common.compat.sdk import BaseOperatorLink, XCom
 
 if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
diff --git 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/campaign_manager.py
 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/campaign_manager.py
index bf1182976ac..cb63c341a5a 100644
--- 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/campaign_manager.py
+++ 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/campaign_manager.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.marketing_platform.hooks.campaign_manager import 
GoogleCampaignManagerHook
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
index 1500397fd84..8cfebd20545 100644
--- 
a/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
+++ 
b/providers/google/src/airflow/providers/google/marketing_platform/sensors/display_video.py
@@ -22,7 +22,7 @@ from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowException
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.marketing_platform.hooks.display_video import 
GoogleDisplayVideo360Hook
 
 if TYPE_CHECKING:
diff --git 
a/providers/google/src/airflow/providers/google/suite/sensors/drive.py 
b/providers/google/src/airflow/providers/google/suite/sensors/drive.py
index 80bd30e813b..0ec7fe721e7 100644
--- a/providers/google/src/airflow/providers/google/suite/sensors/drive.py
+++ b/providers/google/src/airflow/providers/google/suite/sensors/drive.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.lazy_compat import BaseSensorOperator
+from airflow.providers.common.compat.sdk import BaseSensorOperator
 from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
 
 if TYPE_CHECKING:
diff --git a/providers/neo4j/tests/unit/neo4j/operators/test_neo4j.py 
b/providers/neo4j/tests/unit/neo4j/operators/test_neo4j.py
index 3f9253fda92..a1fdc6b5653 100644
--- a/providers/neo4j/tests/unit/neo4j/operators/test_neo4j.py
+++ b/providers/neo4j/tests/unit/neo4j/operators/test_neo4j.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 from unittest import mock
 
-from airflow.providers.common.compat.lazy_compat import timezone
+from airflow.providers.common.compat.sdk import timezone
 from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
 
 DEFAULT_DATE = timezone.datetime(2015, 1, 1)
diff --git a/scripts/ci/prek/check_common_compat_lazy_imports.py 
b/scripts/ci/prek/check_common_compat_lazy_imports.py
index 2319c6d7de6..8e2e8579892 100755
--- a/scripts/ci/prek/check_common_compat_lazy_imports.py
+++ b/scripts/ci/prek/check_common_compat_lazy_imports.py
@@ -17,7 +17,7 @@
 # under the License.
 
 """
-Check and generate lazy_compat.pyi from lazy_compat.py.
+Check and generate sdk.pyi from sdk.py.
 
 This script can be used as:
 1. Pre-commit hook - checks if .pyi is in sync with _IMPORT_MAP
@@ -38,9 +38,9 @@ from pathlib import Path
 
 def extract_import_map(py_file: Path) -> dict[str, str | tuple[str, ...]]:
     """
-    Extract _IMPORT_MAP from lazy_compat.py.
+    Extract _IMPORT_MAP from sdk.py.
 
-    :param py_file: Path to lazy_compat.py
+    :param py_file: Path to sdk.py
     :return: Dictionary mapping class names to module paths
     """
     content = py_file.read_text()
@@ -55,14 +55,14 @@ def extract_import_map(py_file: Path) -> dict[str, str | 
tuple[str, ...]]:
                 if isinstance(target, ast.Name) and target.id == "_IMPORT_MAP":
                     return ast.literal_eval(node.value)
 
-    raise ValueError("Could not find _IMPORT_MAP in lazy_compat.py")
+    raise ValueError("Could not find _IMPORT_MAP in sdk.py")
 
 
 def extract_rename_map(py_file: Path) -> dict[str, tuple[str, str, str]]:
     """
-    Extract _RENAME_MAP from lazy_compat.py.
+    Extract _RENAME_MAP from sdk.py.
 
-    :param py_file: Path to lazy_compat.py
+    :param py_file: Path to sdk.py
     :return: Dictionary mapping new class names to (new_path, old_path, 
old_name)
     """
     content = py_file.read_text()
@@ -77,14 +77,14 @@ def extract_rename_map(py_file: Path) -> dict[str, 
tuple[str, str, str]]:
                 if isinstance(target, ast.Name) and target.id == "_RENAME_MAP":
                     return ast.literal_eval(node.value)
 
-    raise ValueError("Could not find _RENAME_MAP in lazy_compat.py")
+    raise ValueError("Could not find _RENAME_MAP in sdk.py")
 
 
 def extract_module_map(py_file: Path) -> dict[str, str | tuple[str, ...]]:
     """
-    Extract _MODULE_MAP from lazy_compat.py.
+    Extract _MODULE_MAP from sdk.py.
 
-    :param py_file: Path to lazy_compat.py
+    :param py_file: Path to sdk.py
     :return: Dictionary mapping module names to module paths
     """
     content = py_file.read_text()
@@ -99,7 +99,7 @@ def extract_module_map(py_file: Path) -> dict[str, str | 
tuple[str, ...]]:
                 if isinstance(target, ast.Name) and target.id == "_MODULE_MAP":
                     return ast.literal_eval(node.value)
 
-    raise ValueError("Could not find _MODULE_MAP in lazy_compat.py")
+    raise ValueError("Could not find _MODULE_MAP in sdk.py")
 
 
 def generate_pyi_content(
@@ -134,7 +134,7 @@ def generate_pyi_content(
 """
 Type stubs for IDE autocomplete - always uses Airflow 3 paths.
 
-This file is auto-generated from lazy_compat.py.
+This file is auto-generated from sdk.py.
     - run scripts/ci/prek/check_common_compat_lazy_imports.py --generate 
instead.
 """
 
@@ -292,7 +292,7 @@ def validate_imports(
 
 
 def main() -> int:
-    """Generate and check lazy_compat.pyi."""
+    """Generate and check sdk.pyi."""
     repo_root = Path(__file__).parent.parent.parent.parent
     lazy_compat_py = (
         repo_root
@@ -304,7 +304,7 @@ def main() -> int:
         / "providers"
         / "common"
         / "compat"
-        / "lazy_compat.py"
+        / "sdk.py"
     )
     lazy_compat_pyi = lazy_compat_py.with_suffix(".pyi")
 
@@ -390,37 +390,31 @@ def main() -> int:
         extra_modules = pyi_module_imports - map_modules
 
         if not (missing_attrs or extra_attrs or missing_modules or 
extra_modules):
-            print(f"✓ lazy_compat.pyi is in sync with lazy_compat.py 
({total_imports} imports)")
+            print(f"✓ sdk.pyi is in sync with sdk.py ({total_imports} 
imports)")
             return 0
 
         # Out of sync
         if missing_attrs:
-            print(
-                f"ERROR: lazy_compat.pyi is missing {len(missing_attrs)} 
attributes from "
-                "_RENAME_MAP/_IMPORT_MAP:"
-            )
+            print(f"ERROR: sdk.pyi is missing {len(missing_attrs)} attributes 
from _RENAME_MAP/_IMPORT_MAP:")
             for name in sorted(missing_attrs)[:10]:
                 print(f"  - {name}")
             if len(missing_attrs) > 10:
                 print(f"  ... and {len(missing_attrs) - 10} more")
 
         if extra_attrs:
-            print(
-                f"ERROR: lazy_compat.pyi has {len(extra_attrs)} extra 
attributes not in "
-                "_RENAME_MAP/_IMPORT_MAP:"
-            )
+            print(f"ERROR: sdk.pyi has {len(extra_attrs)} extra attributes not 
in _RENAME_MAP/_IMPORT_MAP:")
             for name in sorted(extra_attrs)[:10]:
                 print(f"  + {name}")
             if len(extra_attrs) > 10:
                 print(f"  ... and {len(extra_attrs) - 10} more")
 
         if missing_modules:
-            print(f"ERROR: lazy_compat.pyi is missing {len(missing_modules)} 
modules from _MODULE_MAP:")
+            print(f"ERROR: sdk.pyi is missing {len(missing_modules)} modules 
from _MODULE_MAP:")
             for name in sorted(missing_modules):
                 print(f"  - {name} (module)")
 
         if extra_modules:
-            print(f"ERROR: lazy_compat.pyi has {len(extra_modules)} extra 
modules not in _MODULE_MAP:")
+            print(f"ERROR: sdk.pyi has {len(extra_modules)} extra modules not 
in _MODULE_MAP:")
             for name in sorted(extra_modules):
                 print(f"  + {name} (module)")
 


Reply via email to