This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9075c88f813 Move remaining `airflow.io` code to Task SDK (#53211)
9075c88f813 is described below

commit 9075c88f813c1408d6f7d949e1366df95c711cb3
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Jul 12 05:21:26 2025 +0530

    Move remaining `airflow.io` code to Task SDK (#53211)
---
 airflow-core/src/airflow/io/__init__.py            | 120 ++-------
 airflow-core/src/airflow/io/path.py                |  22 --
 airflow-core/src/airflow/io/storage.py             |  22 --
 .../src/airflow/utils/deprecation_tools.py         | 114 ++++++---
 .../tests/unit/utils/test_deprecation_tools.py     | 272 +++++++++++++++++----
 .../io/tests/unit/common/io/xcom/test_backend.py   |   2 +-
 task-sdk/src/airflow/sdk/io/__init__.py            |   4 +-
 .../src/airflow/sdk/io/fs.py                       |   6 +-
 task-sdk/src/airflow/sdk/io/store.py               |   6 +-
 .../src/airflow/sdk}/io/typedef.py                 |   0
 task-sdk/tests/task_sdk/io/test_path.py            |   2 +-
 11 files changed, 336 insertions(+), 234 deletions(-)

diff --git a/airflow-core/src/airflow/io/__init__.py 
b/airflow-core/src/airflow/io/__init__.py
index 6bbea93e59b..3b255aacdf8 100644
--- a/airflow-core/src/airflow/io/__init__.py
+++ b/airflow-core/src/airflow/io/__init__.py
@@ -16,102 +16,26 @@
 # under the License.
 from __future__ import annotations
 
-import inspect
-import logging
-from collections.abc import Callable, Mapping
-from functools import cache
-from typing import (
-    TYPE_CHECKING,
+from airflow.utils.deprecation_tools import add_deprecated_classes
+
+add_deprecated_classes(
+    {
+        __name__: {
+            "get_fs": "airflow.sdk.io.get_fs",
+            "has_fs": "airflow.sdk.io.has_fs",
+            "attach": "airflow.sdk.io.attach",
+            "Properties": "airflow.sdk.io.Properties",
+            "_BUILTIN_SCHEME_TO_FS": "airflow.sdk.io.fs._BUILTIN_SCHEME_TO_FS",
+        },
+        "path": {
+            "ObjectStoragePath": "airflow.sdk.ObjectStoragePath",
+        },
+        "storage": {
+            "attach": "airflow.sdk.io.attach",
+        },
+        "typedef": {
+            "Properties": "airflow.sdk.io.typedef.Properties",
+        },
+    },
+    package=__name__,
 )
-
-from fsspec.implementations.local import LocalFileSystem
-
-from airflow.providers_manager import ProvidersManager
-from airflow.stats import Stats
-from airflow.utils.module_loading import import_string
-
-if TYPE_CHECKING:
-    from fsspec import AbstractFileSystem
-
-    from airflow.io.typedef import Properties
-
-
-log = logging.getLogger(__name__)
-
-
-def _file(_: str | None, storage_options: Properties) -> LocalFileSystem:
-    return LocalFileSystem(**storage_options)
-
-
-# builtin supported filesystems
-_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None, Properties], 
AbstractFileSystem]] = {
-    "file": _file,
-    "local": _file,
-}
-
-
-@cache
-def _register_filesystems() -> Mapping[
-    str,
-    Callable[[str | None, Properties], AbstractFileSystem] | Callable[[str | 
None], AbstractFileSystem],
-]:
-    scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
-    with Stats.timer("airflow.io.load_filesystems") as timer:
-        manager = ProvidersManager()
-        for fs_module_name in manager.filesystem_module_names:
-            fs_module = import_string(fs_module_name)
-            for scheme in getattr(fs_module, "schemes", []):
-                if scheme in scheme_to_fs:
-                    log.warning("Overriding scheme %s for %s", scheme, 
fs_module_name)
-
-                method = getattr(fs_module, "get_fs", None)
-                if method is None:
-                    raise ImportError(f"Filesystem {fs_module_name} does not 
have a get_fs method")
-                scheme_to_fs[scheme] = method
-
-    log.debug("loading filesystems from providers took %.3f seconds", 
timer.duration)
-    return scheme_to_fs
-
-
-def get_fs(
-    scheme: str, conn_id: str | None = None, storage_options: Properties | 
None = None
-) -> AbstractFileSystem:
-    """
-    Get a filesystem by scheme.
-
-    :param scheme: the scheme to get the filesystem for
-    :return: the filesystem method
-    :param conn_id: the airflow connection id to use
-    :param storage_options: the storage options to pass to the filesystem
-    """
-    filesystems = _register_filesystems()
-    try:
-        fs = filesystems[scheme]
-    except KeyError:
-        raise ValueError(f"No filesystem registered for scheme {scheme}") from 
None
-
-    options = storage_options or {}
-
-    # MyPy does not recognize dynamic parameters inspection when we call the 
method, and we have to do
-    # it for compatibility reasons with already released providers, that's why 
we need to ignore
-    # mypy errors here
-    parameters = inspect.signature(fs).parameters
-    if len(parameters) == 1:
-        if options:
-            raise AttributeError(
-                f"Filesystem {scheme} does not support storage options, but 
options were passed."
-                f"This most likely means that you are using an old version of 
the provider that does not "
-                f"support storage options. Please upgrade the provider if 
possible."
-            )
-        return fs(conn_id)  # type: ignore[call-arg]
-    return fs(conn_id, options)  # type: ignore[call-arg]
-
-
-def has_fs(scheme: str) -> bool:
-    """
-    Check if a filesystem is available for a scheme.
-
-    :param scheme: the scheme to check
-    :return: True if a filesystem is available for the scheme
-    """
-    return scheme in _register_filesystems()
diff --git a/airflow-core/src/airflow/io/path.py 
b/airflow-core/src/airflow/io/path.py
deleted file mode 100644
index bc323d0030b..00000000000
--- a/airflow-core/src/airflow/io/path.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import annotations
-
-from airflow.sdk import ObjectStoragePath
-
-__all__ = ["ObjectStoragePath"]
diff --git a/airflow-core/src/airflow/io/storage.py 
b/airflow-core/src/airflow/io/storage.py
deleted file mode 100644
index 4723e8a15f6..00000000000
--- a/airflow-core/src/airflow/io/storage.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import annotations
-
-from airflow.sdk.io import attach
-
-__all__ = ["attach"]
diff --git a/airflow-core/src/airflow/utils/deprecation_tools.py 
b/airflow-core/src/airflow/utils/deprecation_tools.py
index 3f9977fc963..e501a758690 100644
--- a/airflow-core/src/airflow/utils/deprecation_tools.py
+++ b/airflow-core/src/airflow/utils/deprecation_tools.py
@@ -84,51 +84,105 @@ def add_deprecated_classes(
     Add deprecated attribute PEP-563 imports and warnings modules to the 
package.
 
     Works for classes, functions, variables, and other module attributes.
-
-    :param module_imports: imports to use
-    :param package: package name
-    :param override_deprecated_classes: override target attributes with 
deprecated ones. If module +
-       target attribute is found in the dictionary, it will be displayed in 
the warning message.
+    Supports both creating virtual modules and modifying existing modules.
+
+    :param module_imports: imports to use. Format: dict[str, dict[str, str]]
+        - Keys are module names (creates virtual modules)
+        - Special key __name__ modifies the current module for direct 
attribute imports
+        - Can mix both approaches in a single call
+    :param package: package name (typically __name__)
+    :param override_deprecated_classes: override target attributes with 
deprecated ones.
+        Format: dict[str, dict[str, str]] matching the structure of 
module_imports
     :param extra_message: extra message to display in the warning or import 
error message
 
-    Example:
+    Examples:
+        # Create virtual modules (e.g., for removed .py files)
         add_deprecated_classes(
             {"basenotifier": {"BaseNotifier": 
"airflow.sdk.bases.notifier.BaseNotifier"}},
             package=__name__,
         )
 
-    This makes 'from airflow.notifications.basenotifier import BaseNotifier' 
still work,
-    even if 'basenotifier.py' was removed, and shows a warning with the new 
path.
-
-    Wildcard Example:
+        # Wildcard support - redirect all attributes to new module
         add_deprecated_classes(
             {"timezone": {"*": "airflow.sdk.timezone"}},
             package=__name__,
         )
 
-    This makes 'from airflow.utils.timezone import utc' redirect to 
'airflow.sdk.timezone.utc',
+        # Current module direct imports
+        add_deprecated_classes(
+            {
+                __name__: {
+                    "get_fs": "airflow.sdk.io.fs.get_fs",
+                    "has_fs": "airflow.sdk.io.fs.has_fs",
+                }
+            },
+            package=__name__,
+        )
+
+        # Mixed behavior - both current module and submodule attributes
+        add_deprecated_classes(
+            {
+                __name__: {
+                    "get_fs": "airflow.sdk.io.fs.get_fs",
+                    "has_fs": "airflow.sdk.io.fs.has_fs",
+                    "Properties": "airflow.sdk.io.typedef.Properties",
+                },
+                "typedef": {
+                    "Properties": "airflow.sdk.io.typedef.Properties",
+                }
+            },
+            package=__name__,
+        )
+
+    The first example makes 'from airflow.notifications.basenotifier import 
BaseNotifier' work
+    even if 'basenotifier.py' was removed.
+
+    The second example makes 'from airflow.utils.timezone import utc' redirect 
to 'airflow.sdk.timezone.utc',
     allowing any attribute from the deprecated module to be accessed from the 
new location.
 
-    Note that "add_deprecated_classes method should be called in the 
`__init__.py` file in the package
-    where the deprecated classes are located - this way the module `.py` files 
should be removed and what
-    remains in the package is just the `__init__.py` file.
+    The third example makes 'from airflow.io import get_fs' work with direct 
imports from the current module.
 
-    See for example `airflow/decorators/__init__.py` file.
+    The fourth example handles both direct imports from the current module and 
submodule imports.
     """
+    # Handle both current module and virtual module deprecations
     for module_name, imports in module_imports.items():
-        full_module_name = f"{package}.{module_name}"
-        module_type = ModuleType(full_module_name)
-        if override_deprecated_classes and module_name in 
override_deprecated_classes:
-            override_deprecated_classes_for_module = 
override_deprecated_classes[module_name]
+        if module_name == package:
+            # Special case: modify the current module for direct attribute 
imports
+            if package not in sys.modules:
+                raise ValueError(f"Module {package} not found in sys.modules")
+
+            module = sys.modules[package]
+
+            # Create the __getattr__ function for current module
+            current_override = {}
+            if override_deprecated_classes and package in 
override_deprecated_classes:
+                current_override = override_deprecated_classes[package]
+
+            getattr_func = functools.partial(
+                getattr_with_deprecation,
+                imports,
+                package,
+                current_override,
+                extra_message or "",
+            )
+
+            # Set the __getattr__ function on the current module
+            setattr(module, "__getattr__", getattr_func)
         else:
-            override_deprecated_classes_for_module = {}
-
-        # Mypy is not able to derive the right function signature 
https://github.com/python/mypy/issues/2427
-        module_type.__getattr__ = functools.partial(  # type: 
ignore[assignment]
-            getattr_with_deprecation,
-            imports,
-            full_module_name,
-            override_deprecated_classes_for_module,
-            extra_message or "",
-        )
-        sys.modules.setdefault(full_module_name, module_type)
+            # Create virtual modules for submodule imports
+            full_module_name = f"{package}.{module_name}"
+            module_type = ModuleType(full_module_name)
+            if override_deprecated_classes and module_name in 
override_deprecated_classes:
+                override_deprecated_classes_for_module = 
override_deprecated_classes[module_name]
+            else:
+                override_deprecated_classes_for_module = {}
+
+            # Mypy is not able to derive the right function signature 
https://github.com/python/mypy/issues/2427
+            module_type.__getattr__ = functools.partial(  # type: 
ignore[assignment]
+                getattr_with_deprecation,
+                imports,
+                full_module_name,
+                override_deprecated_classes_for_module,
+                extra_message or "",
+            )
+            sys.modules.setdefault(full_module_name, module_type)
diff --git a/airflow-core/tests/unit/utils/test_deprecation_tools.py 
b/airflow-core/tests/unit/utils/test_deprecation_tools.py
index adaed45ff45..fafcde2364c 100644
--- a/airflow-core/tests/unit/utils/test_deprecation_tools.py
+++ b/airflow-core/tests/unit/utils/test_deprecation_tools.py
@@ -252,71 +252,239 @@ class TestGetAttrWithDeprecation:
 class TestAddDeprecatedClasses:
     """Tests for the add_deprecated_classes function."""
 
-    def test_add_deprecated_classes_basic(self):
-        """Test basic functionality of add_deprecated_classes."""
+    @pytest.mark.parametrize(
+        "test_case,module_imports,override_classes,expected_behavior",
+        [
+            (
+                "basic_class_mapping",
+                {"old_module": {"OldClass": "new.module.NewClass"}},
+                None,
+                "creates_virtual_module",
+            ),
+            (
+                "wildcard_pattern",
+                {"timezone": {"*": "airflow.sdk.timezone"}},
+                None,
+                "creates_virtual_module",
+            ),
+            (
+                "with_override",
+                {"old_module": {"OldClass": "new.module.NewClass"}},
+                {"old_module": {"OldClass": "override.module.OverrideClass"}},
+                "creates_virtual_module",
+            ),
+        ],
+        ids=["basic_class_mapping", "wildcard_pattern", "with_override"],
+    )
+    def test_virtual_module_creation(self, test_case, module_imports, 
override_classes, expected_behavior):
+        """Test add_deprecated_classes creates virtual modules correctly."""
         # Use unique package and module names to avoid conflicts
         package_name = get_unique_module_name("test_package")
-        module_name = f"{package_name}.old_module"
-
-        module_imports = {"old_module": {"OldClass": "new.module.NewClass"}}
+        module_name = f"{package_name}.{next(iter(module_imports.keys()))}"
 
         with temporary_module(module_name):
-            add_deprecated_classes(module_imports, package_name)
+            add_deprecated_classes(module_imports, package_name, 
override_classes)
 
             # Check that the module was added to sys.modules
             assert module_name in sys.modules
             assert isinstance(sys.modules[module_name], ModuleType)
             assert hasattr(sys.modules[module_name], "__getattr__")
 
-    def test_add_deprecated_classes_with_wildcard(self):
-        """Test add_deprecated_classes with wildcard pattern."""
-        # Use unique package and module names to avoid conflicts
-        package_name = get_unique_module_name("test_package")
-        module_name = f"{package_name}.timezone"
-
-        module_imports = {"timezone": {"*": "airflow.sdk.timezone"}}
-
-        with temporary_module(module_name):
-            add_deprecated_classes(module_imports, package_name)
-
-            # Check that the module was added to sys.modules
-            assert module_name in sys.modules
-            assert isinstance(sys.modules[module_name], ModuleType)
-            assert hasattr(sys.modules[module_name], "__getattr__")
-
-    def test_add_deprecated_classes_with_override(self):
-        """Test add_deprecated_classes with override_deprecated_classes."""
-        # Use unique package and module names to avoid conflicts
-        package_name = get_unique_module_name("test_package")
-        module_name = f"{package_name}.old_module"
-
-        module_imports = {"old_module": {"OldClass": "new.module.NewClass"}}
-
-        override_deprecated_classes = {"old_module": {"OldClass": 
"override.module.OverrideClass"}}
-
-        with temporary_module(module_name):
-            add_deprecated_classes(module_imports, package_name, 
override_deprecated_classes)
-
-            # Check that the module was added to sys.modules
-            assert module_name in sys.modules
-            assert isinstance(sys.modules[module_name], ModuleType)
-
     def test_add_deprecated_classes_doesnt_override_existing(self):
         """Test that add_deprecated_classes doesn't override existing 
modules."""
-        # Use unique package and module names to avoid conflicts
-        package_name = get_unique_module_name("test_package")
-        module_name = f"{package_name}.existing_module"
+        module_name = get_unique_module_name("existing_module")
+        full_module_name = f"airflow.test.{module_name}"
+
+        # Create an existing module
+        existing_module = ModuleType(full_module_name)
+        existing_module.existing_attr = "existing_value"
+        sys.modules[full_module_name] = existing_module
+
+        with temporary_module(full_module_name):
+            # This should not override the existing module
+            add_deprecated_classes(
+                {module_name: {"NewClass": "new.module.NewClass"}},
+                package="airflow.test",
+            )
 
-        module_imports = {"existing_module": {"SomeClass": 
"new.module.SomeClass"}}
+            # The existing module should still be there
+            assert sys.modules[full_module_name] == existing_module
+            assert sys.modules[full_module_name].existing_attr == 
"existing_value"
+
+    @pytest.mark.parametrize(
+        
"test_case,module_imports,attr_name,target_attr,expected_target_msg,override_classes",
+        [
+            (
+                "direct_imports",
+                {
+                    "get_something": "target.module.get_something",
+                    "another_attr": "target.module.another_attr",
+                },
+                "get_something",
+                "get_something",
+                "target.module.get_something",
+                None,
+            ),
+            (
+                "with_wildcard",
+                {"specific_attr": "target.module.specific_attr", "*": 
"target.module"},
+                "any_attribute",
+                "any_attribute",
+                "target.module.any_attribute",
+                None,
+            ),
+            (
+                "with_override",
+                {"get_something": "target.module.get_something"},
+                "get_something",
+                "get_something",
+                "override.module.OverrideClass",
+                {"get_something": "override.module.OverrideClass"},
+            ),
+        ],
+        ids=["direct_imports", "with_wildcard", "with_override"],
+    )
+    def test_current_module_deprecation(
+        self, test_case, module_imports, attr_name, target_attr, 
expected_target_msg, override_classes
+    ):
+        """Test add_deprecated_classes with current module (__name__ key) 
functionality."""
+        module_name = get_unique_module_name(f"{test_case}_module")
+        full_module_name = f"airflow.test.{module_name}"
+
+        # Create a module to modify
+        test_module = ModuleType(full_module_name)
+        sys.modules[full_module_name] = test_module
+
+        with temporary_module(full_module_name):
+            # Mock the target module and attribute
+            mock_target_module = mock.MagicMock()
+            mock_attribute = mock.MagicMock()
+            setattr(mock_target_module, target_attr, mock_attribute)
+
+            with mock.patch(
+                "airflow.utils.deprecation_tools.importlib.import_module", 
return_value=mock_target_module
+            ):
+                # Prepare override parameter
+                override_param = {full_module_name: override_classes} if 
override_classes else None
+
+                add_deprecated_classes(
+                    {full_module_name: module_imports},
+                    package=full_module_name,
+                    override_deprecated_classes=override_param,
+                )
 
-        with temporary_module(module_name):
-            # Create a mock existing module
-            existing_module = ModuleType(module_name)
-            existing_module.existing_attribute = "existing_value"
-            sys.modules[module_name] = existing_module
+                # The module should now have a __getattr__ method
+                assert hasattr(test_module, "__getattr__")
+
+                # Test that accessing the deprecated attribute works
+                with warnings.catch_warnings(record=True) as w:
+                    warnings.simplefilter("always")
+                    result = getattr(test_module, attr_name)
+
+                    assert result == mock_attribute
+                    assert len(w) == 1
+                    assert issubclass(w[0].category, DeprecationWarning)
+                    assert f"{full_module_name}.{attr_name}" in 
str(w[0].message)
+                    assert expected_target_msg in str(w[0].message)
+
+    def test_add_deprecated_classes_mixed_current_and_virtual_modules(self):
+        """Test add_deprecated_classes with mixed current module and virtual 
module imports."""
+        base_module_name = get_unique_module_name("mixed_module")
+        full_module_name = f"airflow.test.{base_module_name}"
+        virtual_module_name = f"{base_module_name}_virtual"
+        full_virtual_module_name = f"{full_module_name}.{virtual_module_name}"
+
+        # Create a module to modify
+        test_module = ModuleType(full_module_name)
+        sys.modules[full_module_name] = test_module
+
+        with temporary_module(full_module_name), 
temporary_module(full_virtual_module_name):
+            # Mock the target modules and attributes
+            mock_current_module = mock.MagicMock()
+            mock_current_attr = mock.MagicMock()
+            mock_current_module.current_attr = mock_current_attr
+
+            mock_virtual_module = mock.MagicMock()
+            mock_virtual_attr = mock.MagicMock()
+            mock_virtual_module.VirtualClass = mock_virtual_attr
+
+            def mock_import_module(module_name):
+                if "current.module" in module_name:
+                    return mock_current_module
+                if "virtual.module" in module_name:
+                    return mock_virtual_module
+                raise ImportError(f"Module {module_name} not found")
+
+            with mock.patch(
+                "airflow.utils.deprecation_tools.importlib.import_module", 
side_effect=mock_import_module
+            ):
+                add_deprecated_classes(
+                    {
+                        full_module_name: {
+                            "current_attr": "current.module.current_attr",
+                        },
+                        virtual_module_name: {
+                            "VirtualClass": "virtual.module.VirtualClass",
+                        },
+                    },
+                    package=full_module_name,
+                )
+
+                # Test current module access
+                with warnings.catch_warnings(record=True) as w:
+                    warnings.simplefilter("always")
+                    result = test_module.current_attr
+
+                    assert result == mock_current_attr
+                    assert len(w) == 1
+                    assert issubclass(w[0].category, DeprecationWarning)
+                    assert f"{full_module_name}.current_attr" in 
str(w[0].message)
+
+                # Test virtual module access
+                virtual_module = sys.modules[full_virtual_module_name]
+                with warnings.catch_warnings(record=True) as w:
+                    warnings.simplefilter("always")
+                    result = virtual_module.VirtualClass
+
+                    assert result == mock_virtual_attr
+                    assert len(w) == 1
+                    assert issubclass(w[0].category, DeprecationWarning)
+                    assert f"{full_virtual_module_name}.VirtualClass" in 
str(w[0].message)
+
+    def test_add_deprecated_classes_current_module_not_in_sys_modules(self):
+        """Test add_deprecated_classes raises error when current module not in 
sys.modules."""
+        nonexistent_module = "nonexistent.module.name"
+
+        with pytest.raises(ValueError, match=f"Module {nonexistent_module} not 
found in sys.modules"):
+            add_deprecated_classes(
+                {nonexistent_module: {"attr": "target.module.attr"}},
+                package=nonexistent_module,
+            )
+
+    def test_add_deprecated_classes_preserves_existing_module_attributes(self):
+        """Test that add_deprecated_classes preserves existing module 
attributes."""
+        module_name = get_unique_module_name("preserve_module")
+        full_module_name = f"airflow.test.{module_name}"
+
+        # Create a module with existing attributes
+        test_module = ModuleType(full_module_name)
+        test_module.existing_attr = "existing_value"
+        test_module.existing_function = lambda: "existing_function_result"
+        sys.modules[full_module_name] = test_module
+
+        with temporary_module(full_module_name):
+            add_deprecated_classes(
+                {
+                    full_module_name: {
+                        "deprecated_attr": "target.module.deprecated_attr",
+                    }
+                },
+                package=full_module_name,
+            )
 
-            add_deprecated_classes(module_imports, package_name)
+            # Existing attributes should still be accessible
+            assert test_module.existing_attr == "existing_value"
+            assert test_module.existing_function() == 
"existing_function_result"
 
-            # Check that the existing module was not overridden
-            assert sys.modules[module_name] is existing_module
-            assert sys.modules[module_name].existing_attribute == 
"existing_value"
+            # The module should have __getattr__ for deprecated attributes
+            assert hasattr(test_module, "__getattr__")
diff --git a/providers/common/io/tests/unit/common/io/xcom/test_backend.py 
b/providers/common/io/tests/unit/common/io/xcom/test_backend.py
index e1b242daf42..72f868c62fc 100644
--- a/providers/common/io/tests/unit/common/io/xcom/test_backend.py
+++ b/providers/common/io/tests/unit/common/io/xcom/test_backend.py
@@ -38,7 +38,7 @@ if AIRFLOW_V_3_0_PLUS:
     from airflow.sdk.execution_time.comms import XComResult
     from airflow.sdk.execution_time.xcom import resolve_xcom_backend
 else:
-    from airflow.io.path import ObjectStoragePath
+    from airflow.io.path import ObjectStoragePath  # type: ignore[no-redef]
     from airflow.models.xcom import BaseXCom, resolve_xcom_backend  # type: 
ignore[no-redef]
 
 
diff --git a/task-sdk/src/airflow/sdk/io/__init__.py 
b/task-sdk/src/airflow/sdk/io/__init__.py
index 4247c5e4acf..0fec7a7bc03 100644
--- a/task-sdk/src/airflow/sdk/io/__init__.py
+++ b/task-sdk/src/airflow/sdk/io/__init__.py
@@ -17,7 +17,9 @@
 
 from __future__ import annotations
 
+from airflow.sdk.io.fs import get_fs, has_fs
 from airflow.sdk.io.path import ObjectStoragePath
 from airflow.sdk.io.store import attach
+from airflow.sdk.io.typedef import Properties
 
-__all__ = ["ObjectStoragePath", "attach"]
+__all__ = ["ObjectStoragePath", "attach", "get_fs", "has_fs", "Properties"]
diff --git a/airflow-core/src/airflow/io/__init__.py 
b/task-sdk/src/airflow/sdk/io/fs.py
similarity index 97%
copy from airflow-core/src/airflow/io/__init__.py
copy to task-sdk/src/airflow/sdk/io/fs.py
index 6bbea93e59b..45f07ec46eb 100644
--- a/airflow-core/src/airflow/io/__init__.py
+++ b/task-sdk/src/airflow/sdk/io/fs.py
@@ -20,9 +20,7 @@ import inspect
 import logging
 from collections.abc import Callable, Mapping
 from functools import cache
-from typing import (
-    TYPE_CHECKING,
-)
+from typing import TYPE_CHECKING
 
 from fsspec.implementations.local import LocalFileSystem
 
@@ -33,7 +31,7 @@ from airflow.utils.module_loading import import_string
 if TYPE_CHECKING:
     from fsspec import AbstractFileSystem
 
-    from airflow.io.typedef import Properties
+    from airflow.sdk.io.typedef import Properties
 
 
 log = logging.getLogger(__name__)
diff --git a/task-sdk/src/airflow/sdk/io/store.py 
b/task-sdk/src/airflow/sdk/io/store.py
index 68c7ad9fbf8..05f9b98b136 100644
--- a/task-sdk/src/airflow/sdk/io/store.py
+++ b/task-sdk/src/airflow/sdk/io/store.py
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, ClassVar
 if TYPE_CHECKING:
     from fsspec import AbstractFileSystem
 
-    from airflow.io.typedef import Properties
+    from airflow.sdk.io.typedef import Properties
 
 
 class ObjectStore:
@@ -57,7 +57,7 @@ class ObjectStore:
 
     @cached_property
     def fs(self) -> AbstractFileSystem:
-        from airflow.io import get_fs
+        from airflow.sdk.io import get_fs
 
         # if the fs is provided in init, the next statement will be ignored
         return get_fs(self.protocol, self.conn_id)
@@ -89,7 +89,7 @@ class ObjectStore:
 
     @classmethod
     def deserialize(cls, data: dict[str, str], version: int):
-        from airflow.io import has_fs
+        from airflow.sdk.io import has_fs
 
         if version > cls.__version__:
             raise ValueError(f"Cannot deserialize version {version} for 
{cls.__name__}")
diff --git a/airflow-core/src/airflow/io/typedef.py 
b/task-sdk/src/airflow/sdk/io/typedef.py
similarity index 100%
rename from airflow-core/src/airflow/io/typedef.py
rename to task-sdk/src/airflow/sdk/io/typedef.py
diff --git a/task-sdk/tests/task_sdk/io/test_path.py 
b/task-sdk/tests/task_sdk/io/test_path.py
index 1e85b2ee7a2..696d1400431 100644
--- a/task-sdk/tests/task_sdk/io/test_path.py
+++ b/task-sdk/tests/task_sdk/io/test_path.py
@@ -325,7 +325,7 @@ class TestLocalPath:
 class TestBackwardsCompatibility:
     @pytest.fixture(autouse=True)
     def reset(self):
-        from airflow.io import _register_filesystems
+        from airflow.sdk.io.fs import _register_filesystems
 
         _register_filesystems.cache_clear()
         yield

Reply via email to