This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a0f3353c471 Move Asset user facing components to task_sdk (#43773)
a0f3353c471 is described below

commit a0f3353c471e4d9a2cd4b23f0c358d0ae908580a
Author: Wei Lee <[email protected]>
AuthorDate: Wed Nov 20 17:09:41 2024 +0800

    Move Asset user facing components to task_sdk (#43773)
    
    * feat(task_sdk): Move asset from Airflow core to task_sdk
    
    * feat(task_sdk): Move assets.metadata to task_sdk.definitions.asset
    
    * fix(providers/amazon): fix common.compat provider ImportError handling
    
    * fix(providers/google): fix common.compat provider ImportError handling
    
    * fix(providers/openlineage): fix common.compat provider ImportError 
handling
    
    * fix(provider/common/compat): fix common.compat provider ImportError 
handling
    
    * feat(task_sdk): expose Model
    
    * docs(nesfragements): update how asset module should be imported
    
    * fix(task_sdk): fix 2_10 compatibility
    
    * feat(common.compat): use version to decide how to import assets instead 
of exception
    
    * feat(providers/common.compat): use airflow version instead of exception 
to return compat method
    
    * refactor(providers/common/compat): extract airflow version to __init__
    
    * fix(providers): use version compare to decide whether to import asset
    
    * feat(decorators/asset): move @asset to task_sdk
    
    * refactor(asset): rename _AssetAliasCondition as AssetAliasCondition
    
    * feat(task_sdk): make airflow.sdk.definitions.decoratos a package
    
    * Revert "feat(task_sdk): make airflow.sdk.definitions.decoratos a package"
    
    This reverts commit 324efc079ec1c5c2618bb19d48d188a1363f3931.
    
    * feat(task_sdk): move asset related logic in 
airflow.sdk.definitions.decorators to airflow.sdk.definitions.asset.*
    
    * refactor(task_sdk): move @asset to 
airflow.sdk.definitions.asset.decorators
    
    * test(providers/amazon): remove unnecessary compat handling
    
    * test(providers/google): remove unnecessary compat handling
    
    * test(openlineage): remove unnecessary compat handling
    
    * fix(provider/openlineage): fix how asset compat is handled
    
    * feat(task_sdk/asset): expose extract_event_key
    
    * test(providers/google): change Asset import back to common.compat
    
    * docs(newsfragments): fix error naming
    
    * docs(newsfragments): fix typo
    
    * docs(newsfragment): add missing metadata
    
    * fixup! feat(task_sdk): Move asset from Airflow core to task_sdk
    
    * fixup! feat(task_sdk): Move asset from Airflow core to task_sdk
---
 airflow/__init__.py                                |   4 +-
 airflow/api_connexion/endpoints/asset_endpoint.py  |   2 +-
 .../api_fastapi/core_api/routes/public/assets.py   |   2 +-
 airflow/api_internal/endpoints/rpc_api_endpoint.py |   2 +-
 airflow/assets/__init__.py                         | 508 ---------------------
 airflow/assets/manager.py                          |   4 +-
 airflow/dag_processing/collection.py               |   2 +-
 airflow/datasets/__init__.py                       |   4 +-
 airflow/decorators/base.py                         |   2 +-
 airflow/example_dags/example_asset_alias.py        |   2 +-
 .../example_asset_alias_with_no_taskflow.py        |   2 +-
 airflow/example_dags/example_asset_decorator.py    |   4 +-
 airflow/example_dags/example_assets.py             |   2 +-
 airflow/example_dags/example_inlet_event_extra.py  |   2 +-
 airflow/example_dags/example_outlet_event_extra.py |   4 +-
 airflow/lineage/hook.py                            |   2 +-
 airflow/listeners/spec/asset.py                    |   2 +-
 airflow/models/asset.py                            |   2 +-
 airflow/models/dag.py                              |   2 +-
 airflow/models/taskinstance.py                     |   2 +-
 airflow/providers_manager.py                       |   4 +-
 airflow/serialization/serialized_objects.py        |  20 +-
 airflow/timetables/assets.py                       |   4 +-
 airflow/timetables/base.py                         |   4 +-
 airflow/timetables/simple.py                       |   6 +-
 airflow/utils/context.py                           |   8 +-
 airflow/utils/context.pyi                          |   2 +-
 airflow/utils/operator_helpers.py                  |   2 +-
 airflow/www/views.py                               |   2 +-
 .../tests/test_pytest_args_for_test_types.py       |   1 +
 .../authoring-and-scheduling/datasets.rst          |  16 +-
 newsfragments/41348.significant.rst                |   8 +-
 .../src/airflow/providers/amazon/aws/assets/s3.py  |  18 +-
 .../airflow/providers/common/compat/__init__.py    |  14 +-
 .../providers/common/compat/assets/__init__.py     |  31 +-
 .../providers/common/compat/lineage/hook.py        |  35 +-
 .../src/airflow/providers/common/io/assets/file.py |  12 +-
 .../src/airflow/providers/google/assets/gcs.py     |  18 +-
 .../airflow/providers/openlineage/utils/utils.py   |  30 +-
 .../aws/auth_manager/test_aws_auth_manager.py      |   1 +
 providers/tests/google/cloud/hooks/test_gcs.py     |  10 +-
 .../system/microsoft/azure/example_msfabric.py     |   2 +-
 .../ci/pre_commit/check_tests_in_right_folders.py  |   1 +
 .../src/airflow/sdk/definitions/asset}/__init__.py | 183 ++++----
 .../airflow/sdk/definitions/asset/decorators.py    |   9 +-
 .../src/airflow/sdk/definitions/asset}/metadata.py |  33 +-
 task_sdk/src/airflow/sdk/definitions/dag.py        |   4 +-
 .../tests/defintions}/test_asset.py                |  87 ++--
 .../tests/defintions/test_asset_decorators.py      |  13 +-
 .../endpoints/test_dag_run_endpoint.py             |   2 +-
 tests/api_connexion/schemas/test_asset_schema.py   |   2 +-
 tests/api_connexion/schemas/test_dag_schema.py     |   2 +-
 .../core_api/routes/public/test_dag_run.py         |   2 +-
 .../api_fastapi/core_api/routes/ui/test_assets.py  |   2 +-
 tests/assets/test_manager.py                       |   2 +-
 tests/dags/test_assets.py                          |   2 +-
 tests/dags/test_only_empty_tasks.py                |   2 +-
 .../{models/test_asset.py => datasets/__init__.py} |  13 -
 tests/datasets/test_dataset.py                     |  53 +++
 tests/decorators/test_python.py                    |   2 +-
 tests/io/test_path.py                              |   2 +-
 tests/io/test_wrapper.py                           |   2 +-
 tests/jobs/test_scheduler_job.py                   |   2 +-
 tests/lineage/test_hook.py                         |   2 +-
 tests/listeners/asset_listener.py                  |   2 +-
 tests/listeners/test_asset_listener.py             |   2 +-
 tests/models/test_asset.py                         |   2 +-
 tests/models/test_dag.py                           |   2 +-
 tests/models/test_serialized_dag.py                |   2 +-
 tests/models/test_taskinstance.py                  |  32 +-
 tests/serialization/test_dag_serialization.py      |   2 +-
 tests/serialization/test_serde.py                  |   4 +-
 tests/serialization/test_serialized_objects.py     |   2 +-
 tests/timetables/test_assets_timetable.py          |   2 +-
 tests/utils/test_context.py                        |   2 +-
 tests/utils/test_json.py                           |   2 +-
 tests/www/views/test_views_asset.py                |   2 +-
 tests/www/views/test_views_grid.py                 |   2 +-
 78 files changed, 451 insertions(+), 835 deletions(-)

diff --git a/airflow/__init__.py b/airflow/__init__.py
index 411aac70fc6..fed233f0146 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -85,15 +85,15 @@ __lazy_imports: dict[str, tuple[str, str, bool]] = {
     "version": (".version", "", False),
     # Deprecated lazy imports
     "AirflowException": (".exceptions", "AirflowException", True),
-    "Dataset": (".assets", "Dataset", True),
+    "Dataset": (".sdk.definitions.asset", "Dataset", True),
 }
 if TYPE_CHECKING:
     # These objects are imported by PEP-562, however, static analyzers and 
IDE's
     # have no idea about typing of these objects.
     # Add it under TYPE_CHECKING block should help with it.
-    from airflow.assets import Asset, Dataset
     from airflow.models.dag import DAG
     from airflow.models.xcom_arg import XComArg
+    from airflow.sdk.definitions.asset import Asset, Dataset
 
 
 def __getattr__(name: str):
diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py 
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 6919fb3c4e8..1bda4fdb2a2 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -43,9 +43,9 @@ from airflow.api_connexion.schemas.asset_schema import (
     queued_event_collection_schema,
     queued_event_schema,
 )
-from airflow.assets import Asset
 from airflow.assets.manager import asset_manager
 from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils import timezone
 from airflow.utils.api_migration import mark_fastapi_migration_done
 from airflow.utils.db import get_query_count
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 163d0763ca8..b3ee47c6b14 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -49,9 +49,9 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
     QueuedEventResponse,
 )
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
-from airflow.assets import Asset
 from airflow.assets.manager import asset_manager
 from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils import timezone
 
 assets_router = AirflowRouter(tags=["Asset"])
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index abe685a009d..2db39c345f9 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -53,7 +53,6 @@ log = logging.getLogger(__name__)
 @functools.lru_cache
 def initialize_method_map() -> dict[str, Callable]:
     from airflow.api.common.trigger_dag import trigger_dag
-    from airflow.assets import expand_alias_to_assets
     from airflow.assets.manager import AssetManager
     from airflow.cli.commands.task_command import _get_ti_db_access
     from airflow.dag_processing.manager import DagFileProcessorManager
@@ -76,6 +75,7 @@ def initialize_method_map() -> dict[str, Callable]:
         _update_ti_heartbeat,
         _xcom_pull,
     )
+    from airflow.sdk.definitions.asset import expand_alias_to_assets
     from airflow.secrets.metastore import MetastoreBackend
     from airflow.utils.cli_action_loggers import _default_action_log_internal
     from airflow.utils.log.file_task_handler import FileTaskHandler
diff --git a/airflow/assets/__init__.py b/airflow/assets/__init__.py
index f1d36ac12b7..13a83393a91 100644
--- a/airflow/assets/__init__.py
+++ b/airflow/assets/__init__.py
@@ -14,511 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from __future__ import annotations
-
-import logging
-import os
-import urllib.parse
-import warnings
-from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterable, Iterator, 
cast, overload
-
-import attrs
-from sqlalchemy import select
-
-from airflow.api_internal.internal_api_call import internal_api_call
-from airflow.serialization.dag_dependency import DagDependency
-from airflow.typing_compat import TypedDict
-from airflow.utils.session import NEW_SESSION, provide_session
-
-if TYPE_CHECKING:
-    from urllib.parse import SplitResult
-
-    from sqlalchemy.orm.session import Session
-
-__all__ = ["Asset", "AssetAll", "AssetAny", "Dataset"]
-
-
-log = logging.getLogger(__name__)
-
-
-def normalize_noop(parts: SplitResult) -> SplitResult:
-    """
-    Place-hold a :class:`~urllib.parse.SplitResult`` normalizer.
-
-    :meta private:
-    """
-    return parts
-
-
-def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | 
None:
-    if scheme == "file":
-        return normalize_noop
-    from airflow.providers_manager import ProvidersManager
-
-    return ProvidersManager().asset_uri_handlers.get(scheme)
-
-
-def _get_normalized_scheme(uri: str) -> str:
-    parsed = urllib.parse.urlsplit(uri)
-    return parsed.scheme.lower()
-
-
-def _sanitize_uri(uri: str) -> str:
-    """
-    Sanitize an asset URI.
-
-    This checks for URI validity, and normalizes the URI if needed. A fully
-    normalized URI is returned.
-    """
-    parsed = urllib.parse.urlsplit(uri)
-    if not parsed.scheme and not parsed.netloc:  # Does not look like a URI.
-        return uri
-    if not (normalized_scheme := _get_normalized_scheme(uri)):
-        return uri
-    if normalized_scheme.startswith("x-"):
-        return uri
-    if normalized_scheme == "airflow":
-        raise ValueError("Asset scheme 'airflow' is reserved")
-    _, auth_exists, normalized_netloc = parsed.netloc.rpartition("@")
-    if auth_exists:
-        # TODO: Collect this into a DagWarning.
-        warnings.warn(
-            "An Asset URI should not contain auth info (e.g. username or "
-            "password). It has been automatically dropped.",
-            UserWarning,
-            stacklevel=3,
-        )
-    if parsed.query:
-        normalized_query = 
urllib.parse.urlencode(sorted(urllib.parse.parse_qsl(parsed.query)))
-    else:
-        normalized_query = ""
-    parsed = parsed._replace(
-        scheme=normalized_scheme,
-        netloc=normalized_netloc,
-        path=parsed.path.rstrip("/") or "/",  # Remove all trailing slashes.
-        query=normalized_query,
-        fragment="",  # Ignore any fragments.
-    )
-    if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
-        parsed = normalizer(parsed)
-    return urllib.parse.urlunsplit(parsed)
-
-
-def _validate_identifier(instance, attribute, value):
-    if not isinstance(value, str):
-        raise ValueError(f"{type(instance).__name__} {attribute.name} must be 
a string")
-    if len(value) > 1500:
-        raise ValueError(f"{type(instance).__name__} {attribute.name} cannot 
exceed 1500 characters")
-    if value.isspace():
-        raise ValueError(f"{type(instance).__name__} {attribute.name} cannot 
be just whitespace")
-    if not value.isascii():
-        raise ValueError(f"{type(instance).__name__} {attribute.name} must 
only consist of ASCII characters")
-    return value
-
-
-def _validate_non_empty_identifier(instance, attribute, value):
-    if not _validate_identifier(instance, attribute, value):
-        raise ValueError(f"{type(instance).__name__} {attribute.name} cannot 
be empty")
-    return value
-
-
-def _validate_asset_name(instance, attribute, value):
-    _validate_non_empty_identifier(instance, attribute, value)
-    if value == "self" or value == "context":
-        raise ValueError(f"prohibited name for asset: {value}")
-    return value
-
-
-def extract_event_key(value: str | Asset | AssetAlias) -> str:
-    """
-    Extract the key of an inlet or an outlet event.
-
-    If the input value is a string, it is treated as a URI and sanitized. If 
the
-    input is a :class:`Asset`, the URI it contains is considered sanitized and
-    returned directly. If the input is a :class:`AssetAlias`, the name it 
contains
-    will be returned directly.
-
-    :meta private:
-    """
-    if isinstance(value, AssetAlias):
-        return value.name
-
-    if isinstance(value, Asset):
-        return value.uri
-    return _sanitize_uri(str(value))
-
-
-@internal_api_call
-@provide_session
-def expand_alias_to_assets(alias: str | AssetAlias, *, session: Session = 
NEW_SESSION) -> list[BaseAsset]:
-    """Expand asset alias to resolved assets."""
-    from airflow.models.asset import AssetAliasModel
-
-    alias_name = alias.name if isinstance(alias, AssetAlias) else alias
-
-    asset_alias_obj = session.scalar(
-        select(AssetAliasModel).where(AssetAliasModel.name == 
alias_name).limit(1)
-    )
-    if asset_alias_obj:
-        return [asset.to_public() for asset in asset_alias_obj.assets]
-    return []
-
-
[email protected](kw_only=True)
-class AssetRef:
-    """Reference to an asset."""
-
-    name: str
-
-
-class BaseAsset:
-    """
-    Protocol for all asset triggers to use in ``DAG(schedule=...)``.
-
-    :meta private:
-    """
-
-    def __bool__(self) -> bool:
-        return True
-
-    def __or__(self, other: BaseAsset) -> BaseAsset:
-        if not isinstance(other, BaseAsset):
-            return NotImplemented
-        return AssetAny(self, other)
-
-    def __and__(self, other: BaseAsset) -> BaseAsset:
-        if not isinstance(other, BaseAsset):
-            return NotImplemented
-        return AssetAll(self, other)
-
-    def as_expression(self) -> Any:
-        """
-        Serialize the asset into its scheduling expression.
-
-        The return value is stored in DagModel for display purposes. It must be
-        JSON-compatible.
-
-        :meta private:
-        """
-        raise NotImplementedError
-
-    def evaluate(self, statuses: dict[str, bool]) -> bool:
-        raise NotImplementedError
-
-    def iter_assets(self) -> Iterator[tuple[str, Asset]]:
-        raise NotImplementedError
-
-    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
-        raise NotImplementedError
-
-    def iter_dag_dependencies(self, *, source: str, target: str) -> 
Iterator[DagDependency]:
-        """
-        Iterate a base asset as dag dependency.
-
-        :meta private:
-        """
-        raise NotImplementedError
-
-
[email protected](unsafe_hash=False)
-class AssetAlias(BaseAsset):
-    """A represeation of asset alias which is used to create asset during the 
runtime."""
-
-    name: str = attrs.field(validator=_validate_non_empty_identifier)
-    group: str = attrs.field(kw_only=True, default="", 
validator=_validate_identifier)
-
-    def iter_assets(self) -> Iterator[tuple[str, Asset]]:
-        return iter(())
-
-    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
-        yield self.name, self
-
-    def iter_dag_dependencies(self, *, source: str, target: str) -> 
Iterator[DagDependency]:
-        """
-        Iterate an asset alias as dag dependency.
-
-        :meta private:
-        """
-        yield DagDependency(
-            source=source or "asset-alias",
-            target=target or "asset-alias",
-            dependency_type="asset-alias",
-            dependency_id=self.name,
-        )
-
-
-class AssetAliasEvent(TypedDict):
-    """A represeation of asset event to be triggered by an asset alias."""
-
-    source_alias_name: str
-    dest_asset_uri: str
-    extra: dict[str, Any]
-
-
-def _set_extra_default(extra: dict | None) -> dict:
-    """
-    Automatically convert None to an empty dict.
-
-    This allows the caller site to continue doing ``Asset(uri, extra=None)``,
-    but still allow the ``extra`` attribute to always be a dict.
-    """
-    if extra is None:
-        return {}
-    return extra
-
-
[email protected](init=False, unsafe_hash=False)
-class Asset(os.PathLike, BaseAsset):
-    """A representation of data asset dependencies between workflows."""
-
-    name: str
-    uri: str
-    group: str
-    extra: dict[str, Any]
-
-    asset_type: ClassVar[str] = "asset"
-    __version__: ClassVar[int] = 1
-
-    @overload
-    def __init__(self, name: str, uri: str, *, group: str = "", extra: dict | 
None = None) -> None:
-        """Canonical; both name and uri are provided."""
-
-    @overload
-    def __init__(self, name: str, *, group: str = "", extra: dict | None = 
None) -> None:
-        """It's possible to only provide the name, either by keyword or as the 
only positional argument."""
-
-    @overload
-    def __init__(self, *, uri: str, group: str = "", extra: dict | None = 
None) -> None:
-        """It's possible to only provide the URI as a keyword argument."""
-
-    def __init__(
-        self,
-        name: str | None = None,
-        uri: str | None = None,
-        *,
-        group: str = "",
-        extra: dict | None = None,
-    ) -> None:
-        if name is None and uri is None:
-            raise TypeError("Asset() requires either 'name' or 'uri'")
-        elif name is None:
-            name = uri
-        elif uri is None:
-            uri = name
-        fields = attrs.fields_dict(Asset)
-        self.name = _validate_asset_name(self, fields["name"], name)
-        self.uri = _sanitize_uri(_validate_non_empty_identifier(self, 
fields["uri"], uri))
-        self.group = _validate_identifier(self, fields["group"], group) if 
group else self.asset_type
-        self.extra = _set_extra_default(extra)
-
-    def __fspath__(self) -> str:
-        return self.uri
-
-    @property
-    def normalized_uri(self) -> str | None:
-        """
-        Returns the normalized and AIP-60 compliant URI whenever possible.
-
-        If we can't retrieve the scheme from URI or no normalizer is provided 
or if parsing fails,
-        it returns None.
-
-        If a normalizer for the scheme exists and parsing is successful we 
return the normalizer result.
-        """
-        if not (normalized_scheme := _get_normalized_scheme(self.uri)):
-            return None
-
-        if (normalizer := _get_uri_normalizer(normalized_scheme)) is None:
-            return None
-        parsed = urllib.parse.urlsplit(self.uri)
-        try:
-            normalized_uri = normalizer(parsed)
-            return urllib.parse.urlunsplit(normalized_uri)
-        except ValueError:
-            return None
-
-    def as_expression(self) -> Any:
-        """
-        Serialize the asset into its scheduling expression.
-
-        :meta private:
-        """
-        return self.uri
-
-    def iter_assets(self) -> Iterator[tuple[str, Asset]]:
-        yield self.uri, self
-
-    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
-        return iter(())
-
-    def evaluate(self, statuses: dict[str, bool]) -> bool:
-        return statuses.get(self.uri, False)
-
-    def iter_dag_dependencies(self, *, source: str, target: str) -> 
Iterator[DagDependency]:
-        """
-        Iterate an asset as dag dependency.
-
-        :meta private:
-        """
-        yield DagDependency(
-            source=source or "asset",
-            target=target or "asset",
-            dependency_type="asset",
-            dependency_id=self.uri,
-        )
-
-
-class Dataset(Asset):
-    """A representation of dataset dependencies between workflows."""
-
-    asset_type: ClassVar[str] = "dataset"
-
-
-class Model(Asset):
-    """A representation of model dependencies between workflows."""
-
-    asset_type: ClassVar[str] = "model"
-
-
-class _AssetBooleanCondition(BaseAsset):
-    """Base class for asset boolean logic."""
-
-    agg_func: Callable[[Iterable], bool]
-
-    def __init__(self, *objects: BaseAsset) -> None:
-        if not all(isinstance(o, BaseAsset) for o in objects):
-            raise TypeError("expect asset expressions in condition")
-
-        self.objects = [
-            _AssetAliasCondition(obj.name) if isinstance(obj, AssetAlias) else 
obj for obj in objects
-        ]
-
-    def evaluate(self, statuses: dict[str, bool]) -> bool:
-        return self.agg_func(x.evaluate(statuses=statuses) for x in 
self.objects)
-
-    def iter_assets(self) -> Iterator[tuple[str, Asset]]:
-        seen = set()  # We want to keep the first instance.
-        for o in self.objects:
-            for k, v in o.iter_assets():
-                if k in seen:
-                    continue
-                yield k, v
-                seen.add(k)
-
-    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
-        """Filter asset aliases in the condition."""
-        for o in self.objects:
-            yield from o.iter_asset_aliases()
-
-    def iter_dag_dependencies(self, *, source: str, target: str) -> 
Iterator[DagDependency]:
-        """
-        Iterate asset, asset aliases and their resolved assets  as dag 
dependency.
-
-        :meta private:
-        """
-        for obj in self.objects:
-            yield from obj.iter_dag_dependencies(source=source, target=target)
-
-
-class AssetAny(_AssetBooleanCondition):
-    """Use to combine assets schedule references in an "and" relationship."""
-
-    agg_func = any
-
-    def __or__(self, other: BaseAsset) -> BaseAsset:
-        if not isinstance(other, BaseAsset):
-            return NotImplemented
-        # Optimization: X | (Y | Z) is equivalent to X | Y | Z.
-        return AssetAny(*self.objects, other)
-
-    def __repr__(self) -> str:
-        return f"AssetAny({', '.join(map(str, self.objects))})"
-
-    def as_expression(self) -> dict[str, Any]:
-        """
-        Serialize the asset into its scheduling expression.
-
-        :meta private:
-        """
-        return {"any": [o.as_expression() for o in self.objects]}
-
-
-class _AssetAliasCondition(AssetAny):
-    """
-    Use to expand AssetAlias as AssetAny of its resolved Assets.
-
-    :meta private:
-    """
-
-    def __init__(self, name: str) -> None:
-        self.name = name
-        self.objects = expand_alias_to_assets(name)
-
-    def __repr__(self) -> str:
-        return f"_AssetAliasCondition({', '.join(map(str, self.objects))})"
-
-    def as_expression(self) -> Any:
-        """
-        Serialize the asset alias into its scheduling expression.
-
-        :meta private:
-        """
-        return {"alias": self.name}
-
-    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
-        yield self.name, AssetAlias(self.name)
-
-    def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> 
Iterator[DagDependency]:
-        """
-        Iterate an asset alias and its resolved assets as dag dependency.
-
-        :meta private:
-        """
-        if self.objects:
-            for obj in self.objects:
-                asset = cast(Asset, obj)
-                uri = asset.uri
-                # asset
-                yield DagDependency(
-                    source=f"asset-alias:{self.name}" if source else "asset",
-                    target="asset" if source else f"asset-alias:{self.name}",
-                    dependency_type="asset",
-                    dependency_id=uri,
-                )
-                # asset alias
-                yield DagDependency(
-                    source=source or f"asset:{uri}",
-                    target=target or f"asset:{uri}",
-                    dependency_type="asset-alias",
-                    dependency_id=self.name,
-                )
-        else:
-            yield DagDependency(
-                source=source or "asset-alias",
-                target=target or "asset-alias",
-                dependency_type="asset-alias",
-                dependency_id=self.name,
-            )
-
-
-class AssetAll(_AssetBooleanCondition):
-    """Use to combine assets schedule references in an "or" relationship."""
-
-    agg_func = all
-
-    def __and__(self, other: BaseAsset) -> BaseAsset:
-        if not isinstance(other, BaseAsset):
-            return NotImplemented
-        # Optimization: X & (Y & Z) is equivalent to X & Y & Z.
-        return AssetAll(*self.objects, other)
-
-    def __repr__(self) -> str:
-        return f"AssetAll({', '.join(map(str, self.objects))})"
-
-    def as_expression(self) -> Any:
-        """
-        Serialize the assets into its scheduling expression.
-
-        :meta private:
-        """
-        return {"all": [o.as_expression() for o in self.objects]}
diff --git a/airflow/assets/manager.py b/airflow/assets/manager.py
index a06c7c31786..0616d601511 100644
--- a/airflow/assets/manager.py
+++ b/airflow/assets/manager.py
@@ -24,7 +24,6 @@ from sqlalchemy import exc, select
 from sqlalchemy.orm import joinedload
 
 from airflow.api_internal.internal_api_call import internal_api_call
-from airflow.assets import Asset
 from airflow.configuration import conf
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.asset import (
@@ -36,15 +35,16 @@ from airflow.models.asset import (
     DagScheduleAssetReference,
 )
 from airflow.models.dagbag import DagPriorityParsingRequest
+from airflow.sdk.definitions.asset import Asset
 from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
     from sqlalchemy.orm.session import Session
 
-    from airflow.assets import Asset, AssetAlias
     from airflow.models.dag import DagModel
     from airflow.models.taskinstance import TaskInstance
+    from airflow.sdk.definitions.asset import Asset, AssetAlias
 
 
 class AssetManager(LoggingMixin):
diff --git a/airflow/dag_processing/collection.py 
b/airflow/dag_processing/collection.py
index 0ca121c5618..a1bfafdab4b 100644
--- a/airflow/dag_processing/collection.py
+++ b/airflow/dag_processing/collection.py
@@ -34,7 +34,6 @@ from typing import TYPE_CHECKING, NamedTuple
 from sqlalchemy import func, select, tuple_
 from sqlalchemy.orm import joinedload, load_only
 
-from airflow.assets import Asset, AssetAlias
 from airflow.assets.manager import asset_manager
 from airflow.models.asset import (
     AssetAliasModel,
@@ -45,6 +44,7 @@ from airflow.models.asset import (
 )
 from airflow.models.dag import DAG, DagModel, DagOwnerAttributes, DagTag
 from airflow.models.dagrun import DagRun
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 from airflow.utils.sqlalchemy import with_row_locks
 from airflow.utils.timezone import utcnow
 from airflow.utils.types import DagRunType
diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 34729e43780..3524466e58c 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -27,13 +27,13 @@ from __future__ import annotations
 
 import warnings
 
-from airflow.assets import AssetAlias as DatasetAlias, Dataset
+from airflow.sdk.definitions.asset import AssetAlias as DatasetAlias, Dataset
 
 # TODO: Remove this module in Airflow 3.2
 
 warnings.warn(
     "Import from the airflow.dataset module is deprecated and "
-    "will be removed in the Airflow 3.2. Please import it from 
'airflow.assets'.",
+    "will be removed in the Airflow 3.2. Please import it from 
'airflow.sdk.definitions.asset'.",
     DeprecationWarning,
     stacklevel=2,
 )
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 3de2f27d04c..4f40c6ad3b4 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -40,7 +40,6 @@ import attr
 import re2
 import typing_extensions
 
-from airflow.assets import Asset
 from airflow.models.baseoperator import (
     BaseOperator,
     coerce_resources,
@@ -56,6 +55,7 @@ from airflow.models.expandinput import (
 )
 from airflow.models.mappedoperator import MappedOperator, 
ensure_xcomarg_return_value
 from airflow.models.xcom_arg import XComArg
+from airflow.sdk.definitions.asset import Asset
 from airflow.sdk.definitions.baseoperator import BaseOperator as 
TaskSDKBaseOperator
 from airflow.sdk.definitions.contextmanager import DagContext, TaskGroupContext
 from airflow.typing_compat import ParamSpec, Protocol
diff --git a/airflow/example_dags/example_asset_alias.py 
b/airflow/example_dags/example_asset_alias.py
index 4970b1eda26..a7f2aac5845 100644
--- a/airflow/example_dags/example_asset_alias.py
+++ b/airflow/example_dags/example_asset_alias.py
@@ -38,8 +38,8 @@ from __future__ import annotations
 import pendulum
 
 from airflow import DAG
-from airflow.assets import Asset, AssetAlias
 from airflow.decorators import task
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 
 with DAG(
     dag_id="asset_s3_bucket_producer",
diff --git a/airflow/example_dags/example_asset_alias_with_no_taskflow.py 
b/airflow/example_dags/example_asset_alias_with_no_taskflow.py
index c9b04d66d2f..19f31465ea4 100644
--- a/airflow/example_dags/example_asset_alias_with_no_taskflow.py
+++ b/airflow/example_dags/example_asset_alias_with_no_taskflow.py
@@ -36,8 +36,8 @@ from __future__ import annotations
 import pendulum
 
 from airflow import DAG
-from airflow.assets import Asset, AssetAlias
 from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 
 with DAG(
     dag_id="asset_s3_bucket_producer_with_no_taskflow",
diff --git a/airflow/example_dags/example_asset_decorator.py 
b/airflow/example_dags/example_asset_decorator.py
index b4de09c2314..b7560f21342 100644
--- a/airflow/example_dags/example_asset_decorator.py
+++ b/airflow/example_dags/example_asset_decorator.py
@@ -18,9 +18,9 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.assets import Asset
 from airflow.decorators import dag, task
-from airflow.decorators.assets import asset
+from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset.decorators import asset
 
 
 @asset(uri="s3://bucket/asset1_producer", schedule=None)
diff --git a/airflow/example_dags/example_assets.py 
b/airflow/example_dags/example_assets.py
index 451f17a3a3a..b81cdad9453 100644
--- a/airflow/example_dags/example_assets.py
+++ b/airflow/example_dags/example_assets.py
@@ -54,9 +54,9 @@ from __future__ import annotations
 
 import pendulum
 
-from airflow.assets import Asset
 from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk.definitions.asset import Asset
 from airflow.timetables.assets import AssetOrTimeSchedule
 from airflow.timetables.trigger import CronTriggerTimetable
 
diff --git a/airflow/example_dags/example_inlet_event_extra.py 
b/airflow/example_dags/example_inlet_event_extra.py
index 9773df7a3f9..c503e832a83 100644
--- a/airflow/example_dags/example_inlet_event_extra.py
+++ b/airflow/example_dags/example_inlet_event_extra.py
@@ -25,10 +25,10 @@ from __future__ import annotations
 
 import datetime
 
-from airflow.assets import Asset
 from airflow.decorators import task
 from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk.definitions.asset import Asset
 
 asset = Asset("s3://output/1.txt")
 
diff --git a/airflow/example_dags/example_outlet_event_extra.py 
b/airflow/example_dags/example_outlet_event_extra.py
index 0d097eab0ac..dd3041e18fc 100644
--- a/airflow/example_dags/example_outlet_event_extra.py
+++ b/airflow/example_dags/example_outlet_event_extra.py
@@ -25,11 +25,11 @@ from __future__ import annotations
 
 import datetime
 
-from airflow.assets import Asset
-from airflow.assets.metadata import Metadata
 from airflow.decorators import task
 from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.asset.metadata import Metadata
 
 ds = Asset("s3://output/1.txt")
 
diff --git a/airflow/lineage/hook.py b/airflow/lineage/hook.py
index fd321bcab49..9e5f8f66482 100644
--- a/airflow/lineage/hook.py
+++ b/airflow/lineage/hook.py
@@ -24,8 +24,8 @@ from typing import TYPE_CHECKING, Union
 
 import attr
 
-from airflow.assets import Asset
 from airflow.providers_manager import ProvidersManager
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
diff --git a/airflow/listeners/spec/asset.py b/airflow/listeners/spec/asset.py
index dba9ac700e4..f99b11eb684 100644
--- a/airflow/listeners/spec/asset.py
+++ b/airflow/listeners/spec/asset.py
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING
 from pluggy import HookspecMarker
 
 if TYPE_CHECKING:
-    from airflow.assets import Asset, AssetAlias
+    from airflow.sdk.definitions.asset import Asset, AssetAlias
 
 hookspec = HookspecMarker("airflow")
 
diff --git a/airflow/models/asset.py b/airflow/models/asset.py
index 50914d51650..126bc5dc2d3 100644
--- a/airflow/models/asset.py
+++ b/airflow/models/asset.py
@@ -35,8 +35,8 @@ from sqlalchemy import (
 )
 from sqlalchemy.orm import relationship
 
-from airflow.assets import Asset, AssetAlias
 from airflow.models.base import Base, StringID
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 from airflow.settings import json
 from airflow.utils import timezone
 from airflow.utils.sqlalchemy import UtcDateTime
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 135a69dfd0f..d898e1a52f4 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -71,7 +71,6 @@ from sqlalchemy.sql import Select, expression
 
 from airflow import settings, utils
 from airflow.api_internal.internal_api_call import internal_api_call
-from airflow.assets import Asset, AssetAlias, BaseAsset
 from airflow.configuration import conf as airflow_conf, secrets_backend_list
 from airflow.exceptions import (
     AirflowException,
@@ -94,6 +93,7 @@ from airflow.models.taskinstance import (
     clear_task_instances,
 )
 from airflow.models.tasklog import LogTemplate
+from airflow.sdk.definitions.asset import Asset, AssetAlias, BaseAsset
 from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as 
task_sdk_dag_decorator
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
 from airflow.security import permissions
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index dd0bf3916a4..a176e0282b6 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -73,7 +73,6 @@ from sqlalchemy_utils import UUIDType
 
 from airflow import settings
 from airflow.api_internal.internal_api_call import InternalApiConfig, 
internal_api_call
-from airflow.assets import Asset, AssetAlias
 from airflow.assets.manager import asset_manager
 from airflow.configuration import conf
 from airflow.exceptions import (
@@ -102,6 +101,7 @@ from airflow.models.taskmap import TaskMap
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.models.xcom import LazyXComSelectSequence, XCom
 from airflow.plugins_manager import integrate_macros_plugins
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 from airflow.sentry import Sentry
 from airflow.settings import task_instance_mutation_hook
 from airflow.stats import Stats
diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index e5c02d0113e..5d38454b189 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -87,9 +87,9 @@ def _ensure_prefix_for_placeholders(field_behaviors: 
dict[str, Any], conn_type:
 if TYPE_CHECKING:
     from urllib.parse import SplitResult
 
-    from airflow.assets import Asset
     from airflow.decorators.base import TaskDecorator
     from airflow.hooks.base import BaseHook
+    from airflow.sdk.definitions.asset import Asset
     from airflow.typing_compat import Literal
 
 
@@ -905,7 +905,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
 
     def _discover_asset_uri_resources(self) -> None:
         """Discovers and registers asset URI handlers, factories, and 
converters for all providers."""
-        from airflow.assets import normalize_noop
+        from airflow.sdk.definitions.asset import normalize_noop
 
         def _safe_register_resource(
             provider_package_name: str,
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index b98c9c2a6bd..61d851aaed1 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -35,15 +35,6 @@ from dateutil import relativedelta
 from pendulum.tz.timezone import FixedTimezone, Timezone
 
 from airflow import macros
-from airflow.assets import (
-    Asset,
-    AssetAlias,
-    AssetAll,
-    AssetAny,
-    AssetRef,
-    BaseAsset,
-    _AssetAliasCondition,
-)
 from airflow.callbacks.callback_requests import DagCallbackRequest, 
TaskCallbackRequest
 from airflow.exceptions import AirflowException, SerializationError, 
TaskDeferred
 from airflow.jobs.job import Job
@@ -60,6 +51,15 @@ from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.models.tasklog import LogTemplate
 from airflow.models.xcom_arg import XComArg, deserialize_xcom_arg, 
serialize_xcom_arg
 from airflow.providers_manager import ProvidersManager
+from airflow.sdk.definitions.asset import (
+    Asset,
+    AssetAlias,
+    AssetAliasCondition,
+    AssetAll,
+    AssetAny,
+    AssetRef,
+    BaseAsset,
+)
 from airflow.sdk.definitions.baseoperator import BaseOperator as 
TaskSDKBaseOperator
 from airflow.serialization.dag_dependency import DagDependency
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
@@ -1053,7 +1053,7 @@ class DependencyDetector:
                     )
                 )
             elif isinstance(obj, AssetAlias):
-                cond = _AssetAliasCondition(obj.name)
+                cond = AssetAliasCondition(obj.name)
 
                 deps.extend(cond.iter_dag_dependencies(source=task.dag_id, 
target=""))
         return deps
diff --git a/airflow/timetables/assets.py b/airflow/timetables/assets.py
index d69a8e4d80c..6d233132438 100644
--- a/airflow/timetables/assets.py
+++ b/airflow/timetables/assets.py
@@ -19,8 +19,8 @@ from __future__ import annotations
 
 import typing
 
-from airflow.assets import AssetAll, BaseAsset
 from airflow.exceptions import AirflowTimetableInvalid
+from airflow.sdk.definitions.asset import AssetAll, BaseAsset
 from airflow.timetables.simple import AssetTriggeredTimetable
 from airflow.utils.types import DagRunType
 
@@ -29,7 +29,7 @@ if typing.TYPE_CHECKING:
 
     import pendulum
 
-    from airflow.assets import Asset
+    from airflow.sdk.definitions.asset import Asset
     from airflow.timetables.base import DagRunInfo, DataInterval, 
TimeRestriction, Timetable
 
 
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index f8aa4279ebb..1a076747ec5 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -18,13 +18,13 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any, Iterator, NamedTuple, Sequence
 
-from airflow.assets import BaseAsset
+from airflow.sdk.definitions.asset import BaseAsset
 from airflow.typing_compat import Protocol, runtime_checkable
 
 if TYPE_CHECKING:
     from pendulum import DateTime
 
-    from airflow.assets import Asset, AssetAlias
+    from airflow.sdk.definitions.asset import Asset, AssetAlias
     from airflow.serialization.dag_dependency import DagDependency
     from airflow.utils.types import DagRunType
 
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 3457c52a08a..8ce498c9e04 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any, Collection, Sequence
 
-from airflow.assets import AssetAlias, _AssetAliasCondition
+from airflow.sdk.definitions.asset import AssetAlias, AssetAliasCondition
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
 from airflow.utils import timezone
 
@@ -26,8 +26,8 @@ if TYPE_CHECKING:
     from pendulum import DateTime
     from sqlalchemy import Session
 
-    from airflow.assets import BaseAsset
     from airflow.models.asset import AssetEvent
+    from airflow.sdk.definitions.asset import BaseAsset
     from airflow.timetables.base import TimeRestriction
     from airflow.utils.types import DagRunType
 
@@ -169,7 +169,7 @@ class AssetTriggeredTimetable(_TrivialTimetable):
         super().__init__()
         self.asset_condition = assets
         if isinstance(self.asset_condition, AssetAlias):
-            self.asset_condition = 
_AssetAliasCondition(self.asset_condition.name)
+            self.asset_condition = 
AssetAliasCondition(self.asset_condition.name)
 
         if not next(self.asset_condition.iter_assets(), False):
             self._summary = AssetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index 3e217e748d0..b954a5e1f2f 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -40,15 +40,15 @@ import attrs
 import lazy_object_proxy
 from sqlalchemy import select
 
-from airflow.assets import (
+from airflow.exceptions import RemovedInAirflow3Warning
+from airflow.models.asset import AssetAliasModel, AssetEvent, AssetModel, 
_fetch_active_assets_by_name
+from airflow.sdk.definitions.asset import (
     Asset,
     AssetAlias,
     AssetAliasEvent,
     AssetRef,
-    extract_event_key,
 )
-from airflow.exceptions import RemovedInAirflow3Warning
-from airflow.models.asset import AssetAliasModel, AssetEvent, AssetModel, 
_fetch_active_assets_by_name
+from airflow.sdk.definitions.asset.metadata import extract_event_key
 from airflow.utils.db import LazySelectSequence
 from airflow.utils.types import NOTSET
 
diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi
index f4ed77537ff..069dba2f8f1 100644
--- a/airflow/utils/context.pyi
+++ b/airflow/utils/context.pyi
@@ -31,7 +31,6 @@ from typing import Any, Collection, Container, Iterable, 
Iterator, Mapping, Sequ
 from pendulum import DateTime
 from sqlalchemy.orm import Session
 
-from airflow.assets import Asset, AssetAlias, AssetAliasEvent
 from airflow.configuration import AirflowConfigParser
 from airflow.models.asset import AssetEvent
 from airflow.models.baseoperator import BaseOperator
@@ -39,6 +38,7 @@ from airflow.models.dag import DAG
 from airflow.models.dagrun import DagRun
 from airflow.models.param import ParamsDict
 from airflow.models.taskinstance import TaskInstance
+from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent
 from airflow.serialization.pydantic.asset import AssetEventPydantic
 from airflow.serialization.pydantic.dag_run import DagRunPydantic
 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
diff --git a/airflow/utils/operator_helpers.py 
b/airflow/utils/operator_helpers.py
index a06cdf42b50..f841d968ad6 100644
--- a/airflow/utils/operator_helpers.py
+++ b/airflow/utils/operator_helpers.py
@@ -23,7 +23,7 @@ from datetime import datetime
 from typing import TYPE_CHECKING, Any, Callable, Collection, Mapping, 
Protocol, TypeVar
 
 from airflow import settings
-from airflow.assets.metadata import Metadata
+from airflow.sdk.definitions.asset.metadata import Metadata
 from airflow.typing_compat import ParamSpec
 from airflow.utils.context import Context, lazy_mapping_from_context
 from airflow.utils.types import NOTSET
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 30ad6a79da7..805a746fba5 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -87,7 +87,6 @@ from airflow.api.common.mark_tasks import (
     set_dag_run_state_to_success,
     set_state,
 )
-from airflow.assets import Asset, AssetAlias
 from airflow.auth.managers.models.resource_details import AccessView, 
DagAccessEntity, DagDetails
 from airflow.configuration import AIRFLOW_CONFIG, conf
 from airflow.exceptions import (
@@ -112,6 +111,7 @@ from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
 from airflow.plugins_manager import PLUGINS_ATTRIBUTES_TO_DUMP
 from airflow.providers_manager import ProvidersManager
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 from airflow.security import permissions
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py 
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index 20dc91947c1..740f8c5b3e5 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -114,6 +114,7 @@ from airflow_breeze.utils.run_tests import 
convert_parallel_types_to_folders, co
                 "tests/cluster_policies",
                 "tests/config_templates",
                 "tests/dag_processing",
+                "tests/datasets",
                 "tests/decorators",
                 "tests/hooks",
                 "tests/io",
diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst 
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index 7940a905167..9e777d92995 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -27,7 +27,7 @@ In addition to scheduling DAGs based on time, you can also 
schedule DAGs to run
 
 .. code-block:: python
 
-    from airflow.assets import Asset
+    from airflow.sdk.definitions.asset import Asset
 
     with DAG(...):
         MyOperator(
@@ -57,7 +57,7 @@ An Airflow asset is a logical grouping of data. Upstream 
producer tasks can upda
 
 .. code-block:: python
 
-    from airflow.assets import Asset
+    from airflow.sdk.definitions.asset import Asset
 
     example_asset = Asset("s3://asset-bucket/example.csv")
 
@@ -67,7 +67,7 @@ You must create assets with a valid URI. Airflow core and 
providers define vario
 
 .. code-block:: python
 
-    from airflow.assets import Asset
+    from airflow.sdk.definitions.asset import Asset
 
     example_asset = Asset(uri="s3://asset-bucket/example.csv", name="bucket-1")
 
@@ -248,8 +248,8 @@ The easiest way to attach extra information to the asset 
event is by ``yield``-i
 
 .. code-block:: python
 
-    from airflow.assets import Asset
-    from airflow.assets.metadata import Metadata
+    from airflow.sdk.definitions.asset import Asset
+    from airflow.sdk.definitions.asset.metadata import Metadata
 
     example_s3_asset = Asset("s3://asset/example.csv")
 
@@ -440,7 +440,7 @@ The following example creates an asset event against the S3 
URI ``f"s3://bucket/
 
 .. code-block:: python
 
-    from airflow.assets import AssetAlias
+    from airflow.sdk.definitions.asset import AssetAlias
 
 
     @task(outlets=[AssetAlias("my-task-outputs")])
@@ -452,7 +452,7 @@ The following example creates an asset event against the S3 
URI ``f"s3://bucket/
 
 .. code-block:: python
 
-    from airflow.assets.metadata import Metadata
+    from airflow.sdk.definitions.asset.metadata import Metadata
 
 
     @task(outlets=[AssetAlias("my-task-outputs")])
@@ -464,7 +464,7 @@ Only one asset event is emitted for an added asset, even if 
it is added to the a
 
 .. code-block:: python
 
-    from airflow.assets import AssetAlias
+    from airflow.sdk.definitions.asset import AssetAlias
 
 
     @task(
diff --git a/newsfragments/41348.significant.rst 
b/newsfragments/41348.significant.rst
index eeda04d3985..eca66b78708 100644
--- a/newsfragments/41348.significant.rst
+++ b/newsfragments/41348.significant.rst
@@ -17,7 +17,7 @@
   * Rename class ``DatasetEventCollectionSchema`` as 
``AssetEventCollectionSchema``
   * Rename class ``CreateDatasetEventSchema`` as ``CreateAssetEventSchema``
 
-* Rename module ``airflow.datasets`` as ``airflow.assets``
+* Move module ``airflow.datasets`` to ``airflow.sdk.definitions.asset``
 
   * Rename class ``DatasetAlias`` as ``AssetAlias``
   * Rename class ``DatasetAll`` as ``AssetAll``
@@ -25,7 +25,7 @@
   * Rename function ``expand_alias_to_datasets`` as ``expand_alias_to_assets``
   * Rename class ``DatasetAliasEvent`` as ``AssetAliasEvent``
 
-    * Rename method ``dest_dataset_uri`` as ``dest_asset_uri``
+    * Rename attribute ``dest_dataset_uri`` as ``dest_asset_uri``
 
   * Rename class ``BaseDataset`` as ``BaseAsset``
 
@@ -52,7 +52,7 @@
       * Rename method ``create_datasets`` as ``create_assets``
       * Rename method ``register_dataset_change`` as ``notify_asset_created``
       * Rename method ``notify_dataset_changed`` as ``notify_asset_changed``
-      * Renme method ``notify_dataset_alias_created`` as 
``notify_asset_alias_created``
+      * Rename method ``notify_dataset_alias_created`` as 
``notify_asset_alias_created``
 
 * Rename module ``airflow.models.dataset`` as ``airflow.models.asset``
 
@@ -84,7 +84,7 @@
     * Rename class ``DatasetPydantic`` as ``AssetPydantic``
     * Rename class ``DatasetEventPydantic`` as ``AssetEventPydantic``
 
-* Rename module ``airflow.datasets.metadata`` as ``airflow.assets.metadata``
+* Rename module ``airflow.datasets.metadata`` as 
``airflow.sdk.definitions.asset.metadata``
 
 * In module ``airflow.jobs.scheduler_job_runner``
 
diff --git a/providers/src/airflow/providers/amazon/aws/assets/s3.py 
b/providers/src/airflow/providers/amazon/aws/assets/s3.py
index 4d02b156afb..c291078155a 100644
--- a/providers/src/airflow/providers/amazon/aws/assets/s3.py
+++ b/providers/src/airflow/providers/amazon/aws/assets/s3.py
@@ -19,14 +19,30 @@ from __future__ import annotations
 from typing import TYPE_CHECKING
 
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.common.compat.assets import Asset
 
 if TYPE_CHECKING:
     from urllib.parse import SplitResult
 
+    from airflow.providers.common.compat.assets import Asset
     from airflow.providers.common.compat.openlineage.facet import (
         Dataset as OpenLineageDataset,
     )
+else:
+    # TODO: Remove this try-exception block after bumping common provider to 
1.3.0
+    # This is due to common provider AssetDetails import error handling
+    try:
+        from airflow.providers.common.compat.assets import Asset
+    except ImportError:
+        from packaging.version import Version
+
+        from airflow import __version__ as AIRFLOW_VERSION
+
+        AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("3.0.0")
+        if AIRFLOW_V_3_0_PLUS:
+            from airflow.sdk.definitions.asset import Asset
+        else:
+            # dataset is renamed to asset since Airflow 3.0
+            from airflow.datasets import Dataset as Asset
 
 
 def create_asset(*, bucket: str, key: str, extra=None) -> Asset:
diff --git a/providers/src/airflow/providers/common/compat/__init__.py 
b/providers/src/airflow/providers/common/compat/__init__.py
index 38c5f8c6cde..1f9eab88c17 100644
--- a/providers/src/airflow/providers/common/compat/__init__.py
+++ b/providers/src/airflow/providers/common/compat/__init__.py
@@ -23,17 +23,21 @@
 #
 from __future__ import annotations
 
-import packaging.version
+from packaging.version import Version
 
-from airflow import __version__ as airflow_version
+from airflow import __version__ as AIRFLOW_VERSION
 
 __all__ = ["__version__"]
 
 __version__ = "1.2.2"
 
-if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.8.0"
-):
+
+AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("3.0.0")
+AIRFLOW_V_2_10_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("2.10.0")
+AIRFLOW_V_2_9_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("2.9.0")
+AIRFLOW_V_2_8_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("2.8.0")
+
+if not AIRFLOW_V_2_8_PLUS:
     raise RuntimeError(
         f"The package `apache-airflow-providers-common-compat:{__version__}` 
needs Apache Airflow 2.8.0+"
     )
diff --git a/providers/src/airflow/providers/common/compat/assets/__init__.py 
b/providers/src/airflow/providers/common/compat/assets/__init__.py
index e302395f701..66178cf0c68 100644
--- a/providers/src/airflow/providers/common/compat/assets/__init__.py
+++ b/providers/src/airflow/providers/common/compat/assets/__init__.py
@@ -19,10 +19,16 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from airflow import __version__ as AIRFLOW_VERSION
+from airflow.providers.common.compat import (
+    AIRFLOW_V_2_8_PLUS,
+    AIRFLOW_V_2_9_PLUS,
+    AIRFLOW_V_2_10_PLUS,
+    AIRFLOW_V_3_0_PLUS,
+)
 
 if TYPE_CHECKING:
-    from airflow.assets import (
+    from airflow.auth.managers.models.resource_details import AssetDetails
+    from airflow.sdk.definitions.asset import (
         Asset,
         AssetAlias,
         AssetAliasEvent,
@@ -30,10 +36,10 @@ if TYPE_CHECKING:
         AssetAny,
         expand_alias_to_assets,
     )
-    from airflow.auth.managers.models.resource_details import AssetDetails
 else:
-    try:
-        from airflow.assets import (
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.auth.managers.models.resource_details import AssetDetails
+        from airflow.sdk.definitions.asset import (
             Asset,
             AssetAlias,
             AssetAliasEvent,
@@ -41,27 +47,20 @@ else:
             AssetAny,
             expand_alias_to_assets,
         )
-        from airflow.auth.managers.models.resource_details import AssetDetails
-    except ModuleNotFoundError:
-        from packaging.version import Version
-
-        _IS_AIRFLOW_2_10_OR_HIGHER = 
Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.10.0")
-        _IS_AIRFLOW_2_9_OR_HIGHER = 
Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.9.0")
-        _IS_AIRFLOW_2_8_OR_HIGHER = 
Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.8.0")
-
+    else:
         # dataset is renamed to asset since Airflow 3.0
         from airflow.datasets import Dataset as Asset
 
-        if _IS_AIRFLOW_2_8_OR_HIGHER:
+        if AIRFLOW_V_2_8_PLUS:
             from airflow.auth.managers.models.resource_details import 
DatasetDetails as AssetDetails
 
-        if _IS_AIRFLOW_2_9_OR_HIGHER:
+        if AIRFLOW_V_2_9_PLUS:
             from airflow.datasets import (
                 DatasetAll as AssetAll,
                 DatasetAny as AssetAny,
             )
 
-        if _IS_AIRFLOW_2_10_OR_HIGHER:
+        if AIRFLOW_V_2_10_PLUS:
             from airflow.datasets import (
                 DatasetAlias as AssetAlias,
                 DatasetAliasEvent as AssetAliasEvent,
diff --git a/providers/src/airflow/providers/common/compat/lineage/hook.py 
b/providers/src/airflow/providers/common/compat/lineage/hook.py
index 50fbc3d0996..63214a9051c 100644
--- a/providers/src/airflow/providers/common/compat/lineage/hook.py
+++ b/providers/src/airflow/providers/common/compat/lineage/hook.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from importlib.util import find_spec
+from airflow.providers.common.compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
 
 
 def _get_asset_compat_hook_lineage_collector():
@@ -79,28 +79,27 @@ def _get_asset_compat_hook_lineage_collector():
 
 
 def get_hook_lineage_collector():
-    # HookLineageCollector added in 2.10
-    try:
-        if find_spec("airflow.assets"):
-            # Dataset has been renamed as Asset in 3.0
-            from airflow.lineage.hook import get_hook_lineage_collector
+    # Dataset has been renamed as Asset in 3.0
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.lineage.hook import get_hook_lineage_collector
 
-            return get_hook_lineage_collector()
+        return get_hook_lineage_collector()
 
+    # HookLineageCollector added in 2.10
+    if AIRFLOW_V_2_10_PLUS:
         return _get_asset_compat_hook_lineage_collector()
-    except ImportError:
 
-        class NoOpCollector:
-            """
-            NoOpCollector is a hook lineage collector that does nothing.
+    class NoOpCollector:
+        """
+        NoOpCollector is a hook lineage collector that does nothing.
 
-            It is used when you want to disable lineage collection.
-            """
+        It is used when you want to disable lineage collection.
+        """
 
-            def add_input_asset(self, *_, **__):
-                pass
+        def add_input_asset(self, *_, **__):
+            pass
 
-            def add_output_asset(self, *_, **__):
-                pass
+        def add_output_asset(self, *_, **__):
+            pass
 
-        return NoOpCollector()
+    return NoOpCollector()
diff --git a/providers/src/airflow/providers/common/io/assets/file.py 
b/providers/src/airflow/providers/common/io/assets/file.py
index fadc4cbe1bd..6277e48c0a8 100644
--- a/providers/src/airflow/providers/common/io/assets/file.py
+++ b/providers/src/airflow/providers/common/io/assets/file.py
@@ -19,9 +19,15 @@ from __future__ import annotations
 import urllib.parse
 from typing import TYPE_CHECKING
 
-try:
-    from airflow.assets import Asset
-except ModuleNotFoundError:
+from packaging.version import Version
+
+from airflow import __version__ as AIRFLOW_VERSION
+
+# TODO: Remove version check block after bumping common provider to 1.3.0
+AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("3.0.0")
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk.definitions.asset import Asset
+else:
     from airflow.datasets import Dataset as Asset  # type: ignore[no-redef]
 
 if TYPE_CHECKING:
diff --git a/providers/src/airflow/providers/google/assets/gcs.py 
b/providers/src/airflow/providers/google/assets/gcs.py
index 4df6995787e..22206e3f753 100644
--- a/providers/src/airflow/providers/google/assets/gcs.py
+++ b/providers/src/airflow/providers/google/assets/gcs.py
@@ -18,13 +18,29 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.assets import Asset
 from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
 
 if TYPE_CHECKING:
     from urllib.parse import SplitResult
 
+    from airflow.providers.common.compat.assets import Asset
     from airflow.providers.common.compat.openlineage.facet import Dataset as 
OpenLineageDataset
+else:
+    # TODO: Remove this try-exception block after bumping common provider to 
1.3.0
+    # This is due to common provider AssetDetails import error handling
+    try:
+        from airflow.providers.common.compat.assets import Asset
+    except ImportError:
+        from packaging.version import Version
+
+        from airflow import __version__ as AIRFLOW_VERSION
+
+        AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("3.0.0")
+        if AIRFLOW_V_3_0_PLUS:
+            from airflow.sdk.definitions.asset import Asset
+        else:
+            # dataset is renamed to asset since Airflow 3.0
+            from airflow.datasets import Dataset as Asset
 
 
 def create_asset(*, bucket: str, key: str, extra: dict | None = None) -> Asset:
diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py 
b/providers/src/airflow/providers/openlineage/utils/utils.py
index 99faa3c4d5c..a37b94b85c1 100644
--- a/providers/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/src/airflow/providers/openlineage/utils/utils.py
@@ -37,7 +37,6 @@ from airflow.exceptions import (
 
 # TODO: move this maybe to Airflow's logic?
 from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
-from airflow.providers.common.compat.assets import Asset
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.plugins.facets import (
     AirflowDagRunFacet,
@@ -68,7 +67,24 @@ if TYPE_CHECKING:
     from openlineage.client.facet_v2 import RunFacet, processing_engine_run
 
     from airflow.models import TaskInstance
+    from airflow.providers.common.compat.assets import Asset
     from airflow.utils.state import DagRunState, TaskInstanceState
+else:
+    # TODO: Remove this try-exception block after bumping common provider to 
1.3.0
+    # This is due to common provider AssetDetails import error handling
+    try:
+        from airflow.providers.common.compat.assets import Asset
+    except ImportError:
+        from packaging.version import Version
+
+        from airflow import __version__ as AIRFLOW_VERSION
+
+        AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("3.0.0")
+        if AIRFLOW_V_3_0_PLUS:
+            from airflow.sdk.definitions.asset import Asset
+        else:
+            # dataset is renamed to asset since Airflow 3.0
+            from airflow.datasets import Dataset as Asset
 
 log = logging.getLogger(__name__)
 _NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
@@ -708,9 +724,15 @@ def translate_airflow_asset(asset: Asset, lineage_context) 
-> OpenLineageDataset
     This function returns None if no URI normalizer is defined, no asset 
converter is found or
     some core Airflow changes are missing and ImportError is raised.
     """
-    try:
-        from airflow.assets import _get_normalized_scheme
-    except ModuleNotFoundError:
+    # TODO: Remove version check block after bumping common provider to 1.3.0
+    from packaging.version import Version
+
+    from airflow import __version__ as AIRFLOW_VERSION
+
+    AIRFLOW_V_3_0_PLUS = Version(Version(AIRFLOW_VERSION).base_version) >= 
Version("3.0.0")
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.sdk.definitions.asset import _get_normalized_scheme
+    else:
         try:
             from airflow.datasets import _get_normalized_scheme  # type: 
ignore[no-redef, attr-defined]
         except ImportError:
diff --git a/providers/tests/amazon/aws/auth_manager/test_aws_auth_manager.py 
b/providers/tests/amazon/aws/auth_manager/test_aws_auth_manager.py
index acca9122148..e973c8433b2 100644
--- a/providers/tests/amazon/aws/auth_manager/test_aws_auth_manager.py
+++ b/providers/tests/amazon/aws/auth_manager/test_aws_auth_manager.py
@@ -60,6 +60,7 @@ else:
     from airflow.providers.common.compat.assets import AssetDetails
     from airflow.providers.common.compat.security.permissions import 
RESOURCE_ASSET
 
+
 pytestmark = [
     pytest.mark.skipif(not AIRFLOW_V_2_9_PLUS, reason="Test requires Airflow 
2.9+"),
     pytest.mark.skip_if_database_isolation_mode,
diff --git a/providers/tests/google/cloud/hooks/test_gcs.py 
b/providers/tests/google/cloud/hooks/test_gcs.py
index 48f8c485811..8dc5966e3d7 100644
--- a/providers/tests/google/cloud/hooks/test_gcs.py
+++ b/providers/tests/google/cloud/hooks/test_gcs.py
@@ -424,8 +424,8 @@ class TestGCSHook:
         mock_copy.return_value = storage.Blob(
             name=destination_object_name, bucket=storage.Bucket(mock_service, 
destination_bucket_name)
         )
-        mock_service.return_value.bucket.side_effect = (
-            lambda name: source_bucket
+        mock_service.return_value.bucket.side_effect = lambda name: (
+            source_bucket
             if name == source_bucket_name
             else storage.Bucket(mock_service, destination_bucket_name)
         )
@@ -519,10 +519,8 @@ class TestGCSHook:
         blob = MagicMock(spec=storage.Blob)
         blob.rewrite = MagicMock(return_value=(None, None, None))
         dest_bucket.blob = MagicMock(return_value=blob)
-        mock_service.return_value.bucket.side_effect = (
-            lambda name: storage.Bucket(mock_service, source_bucket_name)
-            if name == source_bucket_name
-            else dest_bucket
+        mock_service.return_value.bucket.side_effect = lambda name: (
+            storage.Bucket(mock_service, source_bucket_name) if name == 
source_bucket_name else dest_bucket
         )
 
         self.gcs_hook.rewrite(
diff --git a/providers/tests/system/microsoft/azure/example_msfabric.py 
b/providers/tests/system/microsoft/azure/example_msfabric.py
index 0f65df2f72f..9da67b3a0fa 100644
--- a/providers/tests/system/microsoft/azure/example_msfabric.py
+++ b/providers/tests/system/microsoft/azure/example_msfabric.py
@@ -19,8 +19,8 @@ from __future__ import annotations
 from datetime import datetime
 
 from airflow import models
-from airflow.assets import Asset
 from airflow.providers.microsoft.azure.operators.msgraph import 
MSGraphAsyncOperator
+from airflow.sdk.definitions.asset import Asset
 
 DAG_ID = "example_msfabric"
 
diff --git a/scripts/ci/pre_commit/check_tests_in_right_folders.py 
b/scripts/ci/pre_commit/check_tests_in_right_folders.py
index 11d44efd407..a04400e1c0c 100755
--- a/scripts/ci/pre_commit/check_tests_in_right_folders.py
+++ b/scripts/ci/pre_commit/check_tests_in_right_folders.py
@@ -46,6 +46,7 @@ POSSIBLE_TEST_FOLDERS = [
     "dags",
     "dags_corrupted",
     "dags_with_system_exit",
+    "datasets",
     "decorators",
     "executors",
     "hooks",
diff --git a/airflow/assets/__init__.py 
b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py
similarity index 94%
copy from airflow/assets/__init__.py
copy to task_sdk/src/airflow/sdk/definitions/asset/__init__.py
index f1d36ac12b7..cb574c3df96 100644
--- a/airflow/assets/__init__.py
+++ b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py
@@ -21,7 +21,15 @@ import logging
 import os
 import urllib.parse
 import warnings
-from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterable, Iterator, 
cast, overload
+from collections.abc import Iterable, Iterator
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Callable,
+    ClassVar,
+    cast,
+    overload,
+)
 
 import attrs
 from sqlalchemy import select
@@ -36,7 +44,17 @@ if TYPE_CHECKING:
 
     from sqlalchemy.orm.session import Session
 
-__all__ = ["Asset", "AssetAll", "AssetAny", "Dataset"]
+
+__all__ = [
+    "Asset",
+    "Dataset",
+    "Model",
+    "AssetRef",
+    "AssetAlias",
+    "AssetAliasCondition",
+    "AssetAll",
+    "AssetAny",
+]
 
 
 log = logging.getLogger(__name__)
@@ -54,8 +72,14 @@ def normalize_noop(parts: SplitResult) -> SplitResult:
 def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | 
None:
     if scheme == "file":
         return normalize_noop
+    from packaging.version import Version
+
+    from airflow import __version__ as AIRFLOW_VERSION
     from airflow.providers_manager import ProvidersManager
 
+    AIRFLOW_V_2 = Version(AIRFLOW_VERSION).base_version < 
Version("3.0.0").base_version
+    if AIRFLOW_V_2:
+        return ProvidersManager().dataset_uri_handlers.get(scheme)  # type: 
ignore[attr-defined]
     return ProvidersManager().asset_uri_handlers.get(scheme)
 
 
@@ -130,46 +154,16 @@ def _validate_asset_name(instance, attribute, value):
     return value
 
 
-def extract_event_key(value: str | Asset | AssetAlias) -> str:
+def _set_extra_default(extra: dict | None) -> dict:
     """
-    Extract the key of an inlet or an outlet event.
-
-    If the input value is a string, it is treated as a URI and sanitized. If 
the
-    input is a :class:`Asset`, the URI it contains is considered sanitized and
-    returned directly. If the input is a :class:`AssetAlias`, the name it 
contains
-    will be returned directly.
+    Automatically convert None to an empty dict.
 
-    :meta private:
+    This allows the caller site to continue doing ``Asset(uri, extra=None)``,
+    but still allow the ``extra`` attribute to always be a dict.
     """
-    if isinstance(value, AssetAlias):
-        return value.name
-
-    if isinstance(value, Asset):
-        return value.uri
-    return _sanitize_uri(str(value))
-
-
-@internal_api_call
-@provide_session
-def expand_alias_to_assets(alias: str | AssetAlias, *, session: Session = 
NEW_SESSION) -> list[BaseAsset]:
-    """Expand asset alias to resolved assets."""
-    from airflow.models.asset import AssetAliasModel
-
-    alias_name = alias.name if isinstance(alias, AssetAlias) else alias
-
-    asset_alias_obj = session.scalar(
-        select(AssetAliasModel).where(AssetAliasModel.name == 
alias_name).limit(1)
-    )
-    if asset_alias_obj:
-        return [asset.to_public() for asset in asset_alias_obj.assets]
-    return []
-
-
[email protected](kw_only=True)
-class AssetRef:
-    """Reference to an asset."""
-
-    name: str
+    if extra is None:
+        return {}
+    return extra
 
 
 class BaseAsset:
@@ -221,53 +215,6 @@ class BaseAsset:
         raise NotImplementedError
 
 
[email protected](unsafe_hash=False)
-class AssetAlias(BaseAsset):
-    """A represeation of asset alias which is used to create asset during the 
runtime."""
-
-    name: str = attrs.field(validator=_validate_non_empty_identifier)
-    group: str = attrs.field(kw_only=True, default="", 
validator=_validate_identifier)
-
-    def iter_assets(self) -> Iterator[tuple[str, Asset]]:
-        return iter(())
-
-    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
-        yield self.name, self
-
-    def iter_dag_dependencies(self, *, source: str, target: str) -> 
Iterator[DagDependency]:
-        """
-        Iterate an asset alias as dag dependency.
-
-        :meta private:
-        """
-        yield DagDependency(
-            source=source or "asset-alias",
-            target=target or "asset-alias",
-            dependency_type="asset-alias",
-            dependency_id=self.name,
-        )
-
-
-class AssetAliasEvent(TypedDict):
-    """A represeation of asset event to be triggered by an asset alias."""
-
-    source_alias_name: str
-    dest_asset_uri: str
-    extra: dict[str, Any]
-
-
-def _set_extra_default(extra: dict | None) -> dict:
-    """
-    Automatically convert None to an empty dict.
-
-    This allows the caller site to continue doing ``Asset(uri, extra=None)``,
-    but still allow the ``extra`` attribute to always be a dict.
-    """
-    if extra is None:
-        return {}
-    return extra
-
-
 @attrs.define(init=False, unsafe_hash=False)
 class Asset(os.PathLike, BaseAsset):
     """A representation of data asset dependencies between workflows."""
@@ -368,6 +315,13 @@ class Asset(os.PathLike, BaseAsset):
         )
 
 
[email protected](kw_only=True)
+class AssetRef:
+    """Reference to an asset."""
+
+    name: str
+
+
 class Dataset(Asset):
     """A representation of dataset dependencies between workflows."""
 
@@ -380,6 +334,41 @@ class Model(Asset):
     asset_type: ClassVar[str] = "model"
 
 
[email protected](unsafe_hash=False)
+class AssetAlias(BaseAsset):
+    """A represeation of asset alias which is used to create asset during the 
runtime."""
+
+    name: str = attrs.field(validator=_validate_non_empty_identifier)
+    group: str = attrs.field(kw_only=True, default="", 
validator=_validate_identifier)
+
+    def iter_assets(self) -> Iterator[tuple[str, Asset]]:
+        return iter(())
+
+    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
+        yield self.name, self
+
+    def iter_dag_dependencies(self, *, source: str, target: str) -> 
Iterator[DagDependency]:
+        """
+        Iterate an asset alias as dag dependency.
+
+        :meta private:
+        """
+        yield DagDependency(
+            source=source or "asset-alias",
+            target=target or "asset-alias",
+            dependency_type="asset-alias",
+            dependency_id=self.name,
+        )
+
+
+class AssetAliasEvent(TypedDict):
+    """A represeation of asset event to be triggered by an asset alias."""
+
+    source_alias_name: str
+    dest_asset_uri: str
+    extra: dict[str, Any]
+
+
 class _AssetBooleanCondition(BaseAsset):
     """Base class for asset boolean logic."""
 
@@ -390,7 +379,7 @@ class _AssetBooleanCondition(BaseAsset):
             raise TypeError("expect asset expressions in condition")
 
         self.objects = [
-            _AssetAliasCondition(obj.name) if isinstance(obj, AssetAlias) else 
obj for obj in objects
+            AssetAliasCondition(obj.name) if isinstance(obj, AssetAlias) else 
obj for obj in objects
         ]
 
     def evaluate(self, statuses: dict[str, bool]) -> bool:
@@ -443,7 +432,23 @@ class AssetAny(_AssetBooleanCondition):
         return {"any": [o.as_expression() for o in self.objects]}
 
 
-class _AssetAliasCondition(AssetAny):
+@internal_api_call
+@provide_session
+def expand_alias_to_assets(alias: str | AssetAlias, *, session: Session = 
NEW_SESSION) -> list[BaseAsset]:
+    """Expand asset alias to resolved assets."""
+    from airflow.models.asset import AssetAliasModel
+
+    alias_name = alias.name if isinstance(alias, AssetAlias) else alias
+
+    asset_alias_obj = session.scalar(
+        select(AssetAliasModel).where(AssetAliasModel.name == 
alias_name).limit(1)
+    )
+    if asset_alias_obj:
+        return [asset.to_public() for asset in asset_alias_obj.assets]
+    return []
+
+
+class AssetAliasCondition(AssetAny):
     """
     Use to expand AssetAlias as AssetAny of its resolved Assets.
 
@@ -455,7 +460,7 @@ class _AssetAliasCondition(AssetAny):
         self.objects = expand_alias_to_assets(name)
 
     def __repr__(self) -> str:
-        return f"_AssetAliasCondition({', '.join(map(str, self.objects))})"
+        return f"AssetAliasCondition({', '.join(map(str, self.objects))})"
 
     def as_expression(self) -> Any:
         """
diff --git a/airflow/decorators/assets.py 
b/task_sdk/src/airflow/sdk/definitions/asset/decorators.py
similarity index 96%
rename from airflow/decorators/assets.py
rename to task_sdk/src/airflow/sdk/definitions/asset/decorators.py
index 2f5052c2d5c..55467c8d63a 100644
--- a/airflow/decorators/assets.py
+++ b/task_sdk/src/airflow/sdk/definitions/asset/decorators.py
@@ -18,14 +18,19 @@
 from __future__ import annotations
 
 import inspect
-from typing import TYPE_CHECKING, Any, Callable, Iterator, Mapping
+from collections.abc import Iterator, Mapping
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Callable,
+)
 
 import attrs
 
-from airflow.assets import Asset, AssetRef
 from airflow.models.asset import _fetch_active_assets_by_name
 from airflow.models.dag import DAG, ScheduleArg
 from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk.definitions.asset import Asset, AssetRef
 from airflow.utils.session import create_session
 
 if TYPE_CHECKING:
diff --git a/airflow/assets/metadata.py 
b/task_sdk/src/airflow/sdk/definitions/asset/metadata.py
similarity index 59%
rename from airflow/assets/metadata.py
rename to task_sdk/src/airflow/sdk/definitions/asset/metadata.py
index b7522226230..08819197039 100644
--- a/airflow/assets/metadata.py
+++ b/task_sdk/src/airflow/sdk/definitions/asset/metadata.py
@@ -17,14 +17,34 @@
 
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Any
+from typing import (
+    Any,
+)
 
 import attrs
 
-from airflow.assets import AssetAlias, extract_event_key
+from airflow.sdk.definitions.asset import Asset, AssetAlias, _sanitize_uri
 
-if TYPE_CHECKING:
-    from airflow.assets import Asset
+__all__ = ["Metadata", "extract_event_key"]
+
+
+def extract_event_key(value: str | Asset | AssetAlias) -> str:
+    """
+    Extract the key of an inlet or an outlet event.
+
+    If the input value is a string, it is treated as a URI and sanitized. If 
the
+    input is a :class:`Asset`, the URI it contains is considered sanitized and
+    returned directly. If the input is a :class:`AssetAlias`, the name it 
contains
+    will be returned directly.
+
+    :meta private:
+    """
+    if isinstance(value, AssetAlias):
+        return value.name
+
+    if isinstance(value, Asset):
+        return value.uri
+    return _sanitize_uri(str(value))
 
 
 @attrs.define(init=False)
@@ -36,7 +56,10 @@ class Metadata:
     alias_name: str | None = None
 
     def __init__(
-        self, target: str | Asset, extra: dict[str, Any], alias: AssetAlias | 
str | None = None
+        self,
+        target: str | Asset,
+        extra: dict[str, Any],
+        alias: AssetAlias | str | None = None,
     ) -> None:
         self.uri = extract_event_key(target)
         self.extra = extra
diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py 
b/task_sdk/src/airflow/sdk/definitions/dag.py
index 62caf682b77..083b28646d1 100644
--- a/task_sdk/src/airflow/sdk/definitions/dag.py
+++ b/task_sdk/src/airflow/sdk/definitions/dag.py
@@ -45,7 +45,6 @@ import re2
 from dateutil.relativedelta import relativedelta
 
 from airflow import settings
-from airflow.assets import Asset, AssetAlias, BaseAsset
 from airflow.exceptions import (
     DuplicateTaskIdFound,
     FailStopDagInvalidTriggerRule,
@@ -54,6 +53,7 @@ from airflow.exceptions import (
 )
 from airflow.models.param import DagParam, ParamsDict
 from airflow.sdk.definitions.abstractoperator import AbstractOperator
+from airflow.sdk.definitions.asset import Asset, AssetAlias, BaseAsset
 from airflow.sdk.definitions.baseoperator import BaseOperator
 from airflow.sdk.types import NOTSET
 from airflow.timetables.base import Timetable
@@ -492,7 +492,7 @@ class DAG:
 
     @timetable.default
     def _default_timetable(instance: DAG):
-        from airflow.assets import AssetAll
+        from airflow.sdk.definitions.asset import AssetAll
 
         schedule = instance.schedule
         # TODO: Once
diff --git a/tests/assets/test_asset.py 
b/task_sdk/tests/defintions/test_asset.py
similarity index 91%
rename from tests/assets/test_asset.py
rename to task_sdk/tests/defintions/test_asset.py
index a454fd2826b..9c6b147ff3c 100644
--- a/tests/assets/test_asset.py
+++ b/task_sdk/tests/defintions/test_asset.py
@@ -25,23 +25,25 @@ from unittest.mock import patch
 import pytest
 from sqlalchemy.sql import select
 
-from airflow.assets import (
+from airflow.models.asset import AssetAliasModel, AssetDagRunQueue, AssetModel
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import (
     Asset,
     AssetAlias,
+    AssetAliasCondition,
     AssetAll,
     AssetAny,
     BaseAsset,
     Dataset,
     Model,
-    _AssetAliasCondition,
     _get_normalized_scheme,
     _sanitize_uri,
 )
-from airflow.models.asset import AssetAliasModel, AssetDagRunQueue, AssetModel
-from airflow.models.serialized_dag import SerializedDagModel
-from airflow.operators.empty import EmptyOperator
 from airflow.serialization.serialized_objects import BaseSerialization, 
SerializedDAG
 
+ASSET_MODULE_PATH = "airflow.sdk.definitions.asset"
+
 
 @pytest.fixture
 def clear_assets():
@@ -269,10 +271,19 @@ def 
test_asset_logical_conditions_evaluation_and_serialization(inputs, scenario,
 @pytest.mark.parametrize(
     "status_values, expected_evaluation",
     [
-        ((False, True, True), False),  # AssetAll requires all conditions to 
be True, but d1 is False
+        (
+            (False, True, True),
+            False,
+        ),  # AssetAll requires all conditions to be True, but d1 is False
         ((True, True, True), True),  # All conditions are True
-        ((True, False, True), True),  # d1 is True, and AssetAny condition (d2 
or d3 being True) is met
-        ((True, False, False), False),  # d1 is True, but neither d2 nor d3 
meet the AssetAny condition
+        (
+            (True, False, True),
+            True,
+        ),  # d1 is True, and AssetAny condition (d2 or d3 being True) is met
+        (
+            (True, False, False),
+            False,
+        ),  # d1 is True, but neither d2 nor d3 meet the AssetAny condition
     ],
 )
 def test_nested_asset_conditions_with_serialization(status_values, 
expected_evaluation):
@@ -531,7 +542,10 @@ def _mock_get_uri_normalizer_noop(normalized_scheme):
     return normalizer
 
 
-@patch("airflow.assets._get_uri_normalizer", 
_mock_get_uri_normalizer_raising_error)
+@patch(
+    "airflow.sdk.definitions.asset._get_uri_normalizer",
+    _mock_get_uri_normalizer_raising_error,
+)
 def test_sanitize_uri_raises_exception():
     with pytest.raises(ValueError) as e_info:
         _sanitize_uri("postgres://localhost:5432/database.schema.table")
@@ -539,20 +553,23 @@ def test_sanitize_uri_raises_exception():
     assert str(e_info.value) == "Incorrect URI format"
 
 
-@patch("airflow.assets._get_uri_normalizer", lambda x: None)
+@patch("airflow.sdk.definitions.asset._get_uri_normalizer", lambda x: None)
 def test_normalize_uri_no_normalizer_found():
     asset = Asset(uri="any_uri_without_normalizer_defined")
     assert asset.normalized_uri is None
 
 
-@patch("airflow.assets._get_uri_normalizer", 
_mock_get_uri_normalizer_raising_error)
+@patch(
+    "airflow.sdk.definitions.asset._get_uri_normalizer",
+    _mock_get_uri_normalizer_raising_error,
+)
 def test_normalize_uri_invalid_uri():
     asset = Asset(uri="any_uri_not_aip60_compliant")
     assert asset.normalized_uri is None
 
 
-@patch("airflow.assets._get_uri_normalizer", _mock_get_uri_normalizer_noop)
-@patch("airflow.assets._get_normalized_scheme", lambda x: "valid_scheme")
+@patch("airflow.sdk.definitions.asset._get_uri_normalizer", 
_mock_get_uri_normalizer_noop)
+@patch("airflow.sdk.definitions.asset._get_normalized_scheme", lambda x: 
"valid_scheme")
 def test_normalize_uri_valid_uri():
     asset = Asset(uri="valid_aip60_uri")
     assert asset.normalized_uri == "valid_aip60_uri"
@@ -561,7 +578,7 @@ def test_normalize_uri_valid_uri():
 @pytest.mark.skip_if_database_isolation_mode
 @pytest.mark.db_test
 @pytest.mark.usefixtures("clear_assets")
-class Test_AssetAliasCondition:
+class TestAssetAliasCondition:
     @pytest.fixture
     def asset_1(self, session):
         """Example asset links to asset alias resolved_asset_alias_2."""
@@ -597,22 +614,22 @@ class Test_AssetAliasCondition:
         return asset_alias_2
 
     def test_init(self, asset_alias_1, asset_1, resolved_asset_alias_2):
-        cond = _AssetAliasCondition(name=asset_alias_1.name)
+        cond = AssetAliasCondition(name=asset_alias_1.name)
         assert cond.objects == []
 
-        cond = _AssetAliasCondition(name=resolved_asset_alias_2.name)
+        cond = AssetAliasCondition(name=resolved_asset_alias_2.name)
         assert cond.objects == [Asset(uri=asset_1.uri)]
 
     def test_as_expression(self, asset_alias_1, resolved_asset_alias_2):
         for assset_alias in (asset_alias_1, resolved_asset_alias_2):
-            cond = _AssetAliasCondition(assset_alias.name)
+            cond = AssetAliasCondition(assset_alias.name)
             assert cond.as_expression() == {"alias": assset_alias.name}
 
     def test_evalute(self, asset_alias_1, resolved_asset_alias_2, asset_1):
-        cond = _AssetAliasCondition(asset_alias_1.name)
+        cond = AssetAliasCondition(asset_alias_1.name)
         assert cond.evaluate({asset_1.uri: True}) is False
 
-        cond = _AssetAliasCondition(resolved_asset_alias_2.name)
+        cond = AssetAliasCondition(resolved_asset_alias_2.name)
         assert cond.evaluate({asset_1.uri: True}) is True
 
 
@@ -645,35 +662,3 @@ class TestAssetSubclasses:
         assert obj.name == arg
         assert obj.uri == arg
         assert obj.group == group
-
-
[email protected](
-    "module_path, attr_name, warning_message",
-    (
-        (
-            "airflow",
-            "Dataset",
-            (
-                "Import 'Dataset' directly from the airflow module is 
deprecated and will be removed in the future. "
-                "Please import it from 'airflow.assets.Dataset'."
-            ),
-        ),
-        (
-            "airflow.datasets",
-            "Dataset",
-            (
-                "Import from the airflow.dataset module is deprecated and "
-                "will be removed in the Airflow 3.2. Please import it from 
'airflow.assets'."
-            ),
-        ),
-    ),
-)
-def test_backward_compat_import_before_airflow_3_2(module_path, attr_name, 
warning_message):
-    with pytest.warns() as record:
-        import importlib
-
-        mod = importlib.import_module(module_path, __name__)
-        getattr(mod, attr_name)
-
-    assert record[0].category is DeprecationWarning
-    assert str(record[0].message) == warning_message
diff --git a/tests/decorators/test_assets.py 
b/task_sdk/tests/defintions/test_asset_decorators.py
similarity index 93%
rename from tests/decorators/test_assets.py
rename to task_sdk/tests/defintions/test_asset_decorators.py
index a3821140e54..04650bc6644 100644
--- a/tests/decorators/test_assets.py
+++ b/task_sdk/tests/defintions/test_asset_decorators.py
@@ -21,9 +21,9 @@ from unittest.mock import ANY
 
 import pytest
 
-from airflow.assets import Asset
-from airflow.decorators.assets import AssetRef, _AssetMainOperator, asset
 from airflow.models.asset import AssetActive, AssetModel
+from airflow.sdk.definitions.asset import Asset, AssetRef
+from airflow.sdk.definitions.asset.decorators import _AssetMainOperator, asset
 
 pytestmark = pytest.mark.db_test
 
@@ -119,8 +119,8 @@ class TestAssetDefinition:
             "uri": "s3://bucket/object",
         }
 
-    @mock.patch("airflow.decorators.assets._AssetMainOperator")
-    @mock.patch("airflow.decorators.assets.DAG")
+    @mock.patch("airflow.sdk.definitions.decorators._AssetMainOperator")
+    @mock.patch("airflow.sdk.definitions.decorators.DAG")
     def test__attrs_post_init__(
         self, DAG, _AssetMainOperator, 
example_asset_func_with_valid_arg_as_inlet_asset
     ):
@@ -169,7 +169,10 @@ class Test_AssetMainOperator:
         )
         assert op.determine_kwargs(context={"k": "v"}) == {
             "self": Asset(
-                name="example_asset_func", uri="s3://bucket/object", 
group="MLModel", extra={"k": "v"}
+                name="example_asset_func",
+                uri="s3://bucket/object",
+                group="MLModel",
+                extra={"k": "v"},
             ),
             "context": {"k": "v"},
             "inlet_asset_1": Asset(name="inlet_asset_1", 
uri="s3://bucket/object1"),
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index b295c063adc..1eb92438f49 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -24,12 +24,12 @@ import pytest
 import time_machine
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
-from airflow.assets import Asset
 from airflow.models.asset import AssetEvent, AssetModel
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagrun import DagRun
 from airflow.models.param import Param
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils import timezone
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import DagRunState, State
diff --git a/tests/api_connexion/schemas/test_asset_schema.py 
b/tests/api_connexion/schemas/test_asset_schema.py
index e403e1c6a28..103af283632 100644
--- a/tests/api_connexion/schemas/test_asset_schema.py
+++ b/tests/api_connexion/schemas/test_asset_schema.py
@@ -27,9 +27,9 @@ from airflow.api_connexion.schemas.asset_schema import (
     asset_event_schema,
     asset_schema,
 )
-from airflow.assets import Asset
 from airflow.models.asset import AssetAliasModel, AssetEvent, AssetModel
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 
 from tests_common.test_utils.db import clear_db_assets, clear_db_dags
 
diff --git a/tests/api_connexion/schemas/test_dag_schema.py 
b/tests/api_connexion/schemas/test_dag_schema.py
index a14365f07c1..e43c1f2ae76 100644
--- a/tests/api_connexion/schemas/test_dag_schema.py
+++ b/tests/api_connexion/schemas/test_dag_schema.py
@@ -27,9 +27,9 @@ from airflow.api_connexion.schemas.dag_schema import (
     DAGDetailSchema,
     DAGSchema,
 )
-from airflow.assets import Asset
 from airflow.models import DagModel, DagTag
 from airflow.models.dag import DAG
+from airflow.sdk.definitions.asset import Asset
 
 UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else 
"Timezone('UTC')"
 
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 7d28a9237e3..89705ba85ab 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -22,10 +22,10 @@ from datetime import datetime, timezone
 import pytest
 from sqlalchemy import select
 
-from airflow import Asset
 from airflow.models import DagRun
 from airflow.models.asset import AssetEvent, AssetModel
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
diff --git a/tests/api_fastapi/core_api/routes/ui/test_assets.py 
b/tests/api_fastapi/core_api/routes/ui/test_assets.py
index b5c85b98ba6..8eafb0f8bdd 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_assets.py
@@ -18,8 +18,8 @@ from __future__ import annotations
 
 import pytest
 
-from airflow.assets import Asset
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 
 from tests_common.test_utils.db import initial_db_init
 
diff --git a/tests/assets/test_manager.py b/tests/assets/test_manager.py
index 1b3e8216a9a..b37ac6c912f 100644
--- a/tests/assets/test_manager.py
+++ b/tests/assets/test_manager.py
@@ -24,7 +24,6 @@ from unittest import mock
 import pytest
 from sqlalchemy import delete
 
-from airflow.assets import Asset, AssetAlias
 from airflow.assets.manager import AssetManager
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.asset import (
@@ -37,6 +36,7 @@ from airflow.models.asset import (
 )
 from airflow.models.dag import DagModel
 from airflow.models.dagbag import DagPriorityParsingRequest
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
 
 from tests.listeners import asset_listener
diff --git a/tests/dags/test_assets.py b/tests/dags/test_assets.py
index 30a6e3f147a..1fbc67a18d3 100644
--- a/tests/dags/test_assets.py
+++ b/tests/dags/test_assets.py
@@ -19,11 +19,11 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow.assets import Asset
 from airflow.exceptions import AirflowFailException, AirflowSkipException
 from airflow.models.dag import DAG
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk.definitions.asset import Asset
 
 skip_task_dag_asset = Asset("s3://dag_with_skip_task/output_1.txt", 
extra={"hi": "bye"})
 fail_task_dag_asset = Asset("s3://dag_with_fail_task/output_1.txt", 
extra={"hi": "bye"})
diff --git a/tests/dags/test_only_empty_tasks.py 
b/tests/dags/test_only_empty_tasks.py
index 2cea9c3c6b1..99c0224e56e 100644
--- a/tests/dags/test_only_empty_tasks.py
+++ b/tests/dags/test_only_empty_tasks.py
@@ -20,9 +20,9 @@ from __future__ import annotations
 from datetime import datetime
 from typing import Sequence
 
-from airflow.assets import Asset
 from airflow.models.dag import DAG
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 
 DEFAULT_DATE = datetime(2016, 1, 1)
 
diff --git a/tests/models/test_asset.py b/tests/datasets/__init__.py
similarity index 68%
copy from tests/models/test_asset.py
copy to tests/datasets/__init__.py
index 5b35a0c8952..13a83393a91 100644
--- a/tests/models/test_asset.py
+++ b/tests/datasets/__init__.py
@@ -14,16 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from __future__ import annotations
-
-from airflow.assets import AssetAlias
-from airflow.models.asset import AssetAliasModel
-
-
-class TestAssetAliasModel:
-    def test_from_public(self):
-        asset_alias = AssetAlias(name="test_alias")
-        asset_alias_model = AssetAliasModel.from_public(asset_alias)
-
-        assert asset_alias_model.name == "test_alias"
diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py
new file mode 100644
index 00000000000..de1a9a5cc3a
--- /dev/null
+++ b/tests/datasets/test_dataset.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from __future__ import annotations
+
+import pytest
+
+
[email protected](
+    "module_path, attr_name, warning_message",
+    (
+        (
+            "airflow",
+            "Dataset",
+            (
+                "Import 'Dataset' directly from the airflow module is 
deprecated and will be removed in the future. "
+                "Please import it from 
'airflow.sdk.definitions.asset.Dataset'."
+            ),
+        ),
+        (
+            "airflow.datasets",
+            "Dataset",
+            (
+                "Import from the airflow.dataset module is deprecated and "
+                "will be removed in the Airflow 3.2. Please import it from 
'airflow.sdk.definitions.asset'."
+            ),
+        ),
+    ),
+)
+def test_backward_compat_import_before_airflow_3_2(module_path, attr_name, 
warning_message):
+    with pytest.warns() as record:
+        import importlib
+
+        mod = importlib.import_module(module_path, __name__)
+        getattr(mod, attr_name)
+
+    assert record[0].category is DeprecationWarning
+    assert str(record[0].message) == warning_message
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 668d0be99b6..83c6f8ab4dc 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -985,7 +985,7 @@ def test_no_warnings(reset_logging_config, caplog):
 
 @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db isolation 
mode
 def test_task_decorator_asset(dag_maker, session):
-    from airflow.assets import Asset
+    from airflow.sdk.definitions.asset import Asset
 
     result = None
     uri = "s3://bucket/name"
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index 264e3a6d8c1..fd9844bc4bc 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -28,10 +28,10 @@ from fsspec.implementations.local import LocalFileSystem
 from fsspec.implementations.memory import MemoryFileSystem
 from fsspec.registry import _registry as _fsspec_registry, 
register_implementation
 
-from airflow.assets import Asset
 from airflow.io import _register_filesystems, get_fs
 from airflow.io.path import ObjectStoragePath
 from airflow.io.store import _STORE_CACHE, ObjectStore, attach
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils.module_loading import qualname
 
 FAKE = "file:///fake"
diff --git a/tests/io/test_wrapper.py b/tests/io/test_wrapper.py
index 641eda84d1a..35469326794 100644
--- a/tests/io/test_wrapper.py
+++ b/tests/io/test_wrapper.py
@@ -19,8 +19,8 @@ from __future__ import annotations
 import uuid
 from unittest.mock import patch
 
-from airflow.assets import Asset
 from airflow.io.path import ObjectStoragePath
+from airflow.sdk.definitions.asset import Asset
 
 
 @patch("airflow.providers_manager.ProvidersManager")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index fbf1d4228b5..4578cc535f5 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -39,7 +39,6 @@ from sqlalchemy import func, select, update
 
 import airflow.example_dags
 from airflow import settings
-from airflow.assets import Asset
 from airflow.assets.manager import AssetManager
 from airflow.callbacks.callback_requests import DagCallbackRequest, 
TaskCallbackRequest
 from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
@@ -66,6 +65,7 @@ from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk.definitions.asset import Asset
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.timetables.base import DataInterval
 from airflow.utils import timezone
diff --git a/tests/lineage/test_hook.py b/tests/lineage/test_hook.py
index cfb446b8ade..ec6390c77a5 100644
--- a/tests/lineage/test_hook.py
+++ b/tests/lineage/test_hook.py
@@ -22,7 +22,6 @@ from unittest.mock import MagicMock, patch
 import pytest
 
 from airflow import plugins_manager
-from airflow.assets import Asset
 from airflow.hooks.base import BaseHook
 from airflow.lineage import hook
 from airflow.lineage.hook import (
@@ -33,6 +32,7 @@ from airflow.lineage.hook import (
     NoOpCollector,
     get_hook_lineage_collector,
 )
+from airflow.sdk.definitions.asset import Asset
 
 from tests_common.test_utils.mock_plugins import mock_plugin_manager
 
diff --git a/tests/listeners/asset_listener.py 
b/tests/listeners/asset_listener.py
index e7adf580363..3ceba2d676d 100644
--- a/tests/listeners/asset_listener.py
+++ b/tests/listeners/asset_listener.py
@@ -23,7 +23,7 @@ import typing
 from airflow.listeners import hookimpl
 
 if typing.TYPE_CHECKING:
-    from airflow.assets import Asset
+    from airflow.sdk.definitions.asset import Asset
 
 
 changed: list[Asset] = []
diff --git a/tests/listeners/test_asset_listener.py 
b/tests/listeners/test_asset_listener.py
index a075b87a7f3..52cdc39604d 100644
--- a/tests/listeners/test_asset_listener.py
+++ b/tests/listeners/test_asset_listener.py
@@ -18,10 +18,10 @@ from __future__ import annotations
 
 import pytest
 
-from airflow.assets import Asset
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.asset import AssetModel
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils.session import provide_session
 
 from tests.listeners import asset_listener
diff --git a/tests/models/test_asset.py b/tests/models/test_asset.py
index 5b35a0c8952..9763f220ade 100644
--- a/tests/models/test_asset.py
+++ b/tests/models/test_asset.py
@@ -17,8 +17,8 @@
 
 from __future__ import annotations
 
-from airflow.assets import AssetAlias
 from airflow.models.asset import AssetAliasModel
+from airflow.sdk.definitions.asset import AssetAlias
 
 
 class TestAssetAliasModel:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 28f745b0614..f2128b205b4 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -36,7 +36,6 @@ import time_machine
 from sqlalchemy import inspect, select
 
 from airflow import settings
-from airflow.assets import Asset, AssetAlias, AssetAll, AssetAny
 from airflow.configuration import conf
 from airflow.decorators import setup, task as task_decorator, teardown
 from airflow.exceptions import (
@@ -71,6 +70,7 @@ from airflow.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.sdk import TaskGroup
+from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny
 from airflow.sdk.definitions.contextmanager import TaskGroupContext
 from airflow.security import permissions
 from airflow.templates import NativeEnvironment, SandboxedEnvironment
diff --git a/tests/models/test_serialized_dag.py 
b/tests/models/test_serialized_dag.py
index d54de5c5a38..3838e3bdab9 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -26,7 +26,6 @@ import pytest
 from sqlalchemy import func, select
 
 import airflow.example_dags as example_dags_module
-from airflow.assets import Asset
 from airflow.decorators import task as task_decorator
 from airflow.models.dag import DAG
 from airflow.models.dag_version import DagVersion
@@ -36,6 +35,7 @@ from airflow.models.serialized_dag import SerializedDagModel 
as SDM
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk.definitions.asset import Asset
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.settings import json
 from airflow.utils.hashlib_wrapper import md5
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index fbe92f00674..81f8ed7e60a 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -39,7 +39,6 @@ import uuid6
 from sqlalchemy import select
 
 from airflow import settings
-from airflow.assets import AssetAlias
 from airflow.decorators import task, task_group
 from airflow.exceptions import (
     AirflowException,
@@ -78,6 +77,7 @@ from airflow.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.providers.standard.sensors.python import PythonSensor
+from airflow.sdk.definitions.asset import AssetAlias
 from airflow.sensors.base import BaseSensorOperator
 from airflow.serialization.serialized_objects import SerializedBaseOperator, 
SerializedDAG
 from airflow.settings import TracebackSessionForTests
@@ -2441,7 +2441,7 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_outlet_asset_extra(self, dag_maker, session):
-        from airflow.assets import Asset
+        from airflow.sdk.definitions.asset import Asset
 
         with dag_maker(schedule=None, session=session) as dag:
 
@@ -2483,7 +2483,7 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_outlet_asset_extra_ignore_different(self, dag_maker, session):
-        from airflow.assets import Asset
+        from airflow.sdk.definitions.asset import Asset
 
         with dag_maker(schedule=None, session=session):
 
@@ -2505,8 +2505,8 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_outlet_asset_extra_yield(self, dag_maker, session):
-        from airflow.assets import Asset
-        from airflow.assets.metadata import Metadata
+        from airflow.sdk.definitions.asset import Asset
+        from airflow.sdk.definitions.asset.metadata import Metadata
 
         with dag_maker(schedule=None, session=session) as dag:
 
@@ -2555,7 +2555,7 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_outlet_asset_alias(self, dag_maker, session):
-        from airflow.assets import Asset, AssetAlias
+        from airflow.sdk.definitions.asset import Asset, AssetAlias
 
         asset_uri = "test_outlet_asset_alias_test_case_ds"
         alias_name_1 = "test_outlet_asset_alias_test_case_asset_alias_1"
@@ -2604,7 +2604,7 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_outlet_multiple_asset_alias(self, dag_maker, session):
-        from airflow.assets import Asset, AssetAlias
+        from airflow.sdk.definitions.asset import Asset, AssetAlias
 
         asset_uri = "test_outlet_maa_ds"
         asset_alias_name_1 = "test_outlet_maa_asset_alias_1"
@@ -2678,8 +2678,8 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_outlet_asset_alias_through_metadata(self, dag_maker, session):
-        from airflow.assets import AssetAlias
-        from airflow.assets.metadata import Metadata
+        from airflow.sdk.definitions.asset import AssetAlias
+        from airflow.sdk.definitions.asset.metadata import Metadata
 
         asset_uri = "test_outlet_asset_alias_through_metadata_ds"
         asset_alias_name = 
"test_outlet_asset_alias_through_metadata_asset_alias"
@@ -2723,7 +2723,7 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_outlet_asset_alias_asset_not_exists(self, dag_maker, session):
-        from airflow.assets import Asset, AssetAlias
+        from airflow.sdk.definitions.asset import Asset, AssetAlias
 
         asset_alias_name = 
"test_outlet_asset_alias_asset_not_exists_asset_alias"
         asset_uri = "did_not_exists"
@@ -2763,7 +2763,7 @@ class TestTaskInstance:
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     def test_inlet_asset_extra(self, dag_maker, session):
-        from airflow.assets import Asset
+        from airflow.sdk.definitions.asset import Asset
 
         read_task_evaluated = False
 
@@ -2826,7 +2826,7 @@ class TestTaskInstance:
         session.add_all([asset_model, asset_alias_model])
         session.commit()
 
-        from airflow.assets import Asset, AssetAlias
+        from airflow.sdk.definitions.asset import Asset, AssetAlias
 
         read_task_evaluated = False
 
@@ -2885,7 +2885,7 @@ class TestTaskInstance:
         session.add(asset_alias_model)
         session.commit()
 
-        from airflow.assets import AssetAlias
+        from airflow.sdk.definitions.asset import AssetAlias
 
         with dag_maker(schedule=None, session=session):
 
@@ -2916,7 +2916,7 @@ class TestTaskInstance:
         ],
     )
     def test_inlet_asset_extra_slice(self, dag_maker, session, slicer, 
expected):
-        from airflow.assets import Asset
+        from airflow.sdk.definitions.asset import Asset
 
         asset_uri = "test_inlet_asset_extra_slice"
 
@@ -2979,7 +2979,7 @@ class TestTaskInstance:
         session.add_all([asset_model, asset_alias_model])
         session.commit()
 
-        from airflow.assets import Asset
+        from airflow.sdk.definitions.asset import Asset
 
         with dag_maker(dag_id="write", schedule="@daily", params={"i": -1}, 
session=session):
 
@@ -3024,7 +3024,7 @@ class TestTaskInstance:
         Test that when a task that produces asset has ran, that changing the 
consumer
         dag asset will not cause primary key blank-out
         """
-        from airflow.assets import Asset
+        from airflow.sdk.definitions.asset import Asset
 
         with dag_maker(schedule=None, serialized=True) as dag1:
 
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index fba82e00b06..ced00fd65ab 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -43,7 +43,6 @@ from dateutil.relativedelta import FR, relativedelta
 from kubernetes.client import models as k8s
 
 import airflow
-from airflow.assets import Asset
 from airflow.decorators import teardown
 from airflow.decorators.base import DecoratedOperator
 from airflow.exceptions import (
@@ -65,6 +64,7 @@ from airflow.operators.empty import EmptyOperator
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.sensors.bash import BashSensor
+from airflow.sdk.definitions.asset import Asset
 from airflow.security import permissions
 from airflow.serialization.enums import Encoding
 from airflow.serialization.json_schema import load_dag_schema_dict
diff --git a/tests/serialization/test_serde.py 
b/tests/serialization/test_serde.py
index 11010af86ab..a3a946124ff 100644
--- a/tests/serialization/test_serde.py
+++ b/tests/serialization/test_serde.py
@@ -28,7 +28,7 @@ import attr
 import pytest
 from pydantic import BaseModel
 
-from airflow.assets import Asset
+from airflow.sdk.definitions.asset import Asset
 from airflow.serialization.serde import (
     CLASSNAME,
     DATA,
@@ -337,7 +337,7 @@ class TestSerDe:
         """
         uri = "s3://does/not/exist"
         data = {
-            "__type": "airflow.assets.Asset",
+            "__type": "airflow.sdk.definitions.asset.Asset",
             "__source": None,
             "__var": {
                 "__var": {
diff --git a/tests/serialization/test_serialized_objects.py 
b/tests/serialization/test_serialized_objects.py
index 96f7414b776..a7d775f82c3 100644
--- a/tests/serialization/test_serialized_objects.py
+++ b/tests/serialization/test_serialized_objects.py
@@ -31,7 +31,6 @@ from kubernetes.client import models as k8s
 from pendulum.tz.timezone import Timezone
 from pydantic import BaseModel
 
-from airflow.assets import Asset, AssetAlias, AssetAliasEvent
 from airflow.exceptions import (
     AirflowException,
     AirflowFailException,
@@ -50,6 +49,7 @@ from airflow.models.tasklog import LogTemplate
 from airflow.models.xcom_arg import XComArg
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.serialization.pydantic.asset import AssetEventPydantic, 
AssetPydantic
 from airflow.serialization.pydantic.dag import DagModelPydantic, DagTagPydantic
diff --git a/tests/timetables/test_assets_timetable.py 
b/tests/timetables/test_assets_timetable.py
index bb942a4a01d..9d572295773 100644
--- a/tests/timetables/test_assets_timetable.py
+++ b/tests/timetables/test_assets_timetable.py
@@ -23,8 +23,8 @@ from typing import TYPE_CHECKING, Any
 import pytest
 from pendulum import DateTime
 
-from airflow.assets import Asset, AssetAlias
 from airflow.models.asset import AssetAliasModel, AssetEvent, AssetModel
+from airflow.sdk.definitions.asset import Asset, AssetAlias
 from airflow.timetables.assets import AssetOrTimeSchedule
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 from airflow.timetables.simple import AssetTriggeredTimetable
diff --git a/tests/utils/test_context.py b/tests/utils/test_context.py
index 5d2f7543b62..0e7309075b3 100644
--- a/tests/utils/test_context.py
+++ b/tests/utils/test_context.py
@@ -20,8 +20,8 @@ from __future__ import annotations
 
 import pytest
 
-from airflow.assets import Asset, AssetAlias, AssetAliasEvent
 from airflow.models.asset import AssetAliasModel, AssetModel
+from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent
 from airflow.utils.context import OutletEventAccessor, OutletEventAccessors
 
 
diff --git a/tests/utils/test_json.py b/tests/utils/test_json.py
index 5a58b5d7903..b99681c2231 100644
--- a/tests/utils/test_json.py
+++ b/tests/utils/test_json.py
@@ -26,7 +26,7 @@ import numpy as np
 import pendulum
 import pytest
 
-from airflow.assets import Asset
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils import json as utils_json
 
 
diff --git a/tests/www/views/test_views_asset.py 
b/tests/www/views/test_views_asset.py
index f2e860958ca..e4fda0aeac6 100644
--- a/tests/www/views/test_views_asset.py
+++ b/tests/www/views/test_views_asset.py
@@ -22,9 +22,9 @@ import pendulum
 import pytest
 from dateutil.tz import UTC
 
-from airflow.assets import Asset
 from airflow.models.asset import AssetActive, AssetEvent, AssetModel
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 
 from tests_common.test_utils.asserts import assert_queries_count
 from tests_common.test_utils.db import clear_db_assets
diff --git a/tests/www/views/test_views_grid.py 
b/tests/www/views/test_views_grid.py
index c94539f0558..c7a453ffaee 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -24,12 +24,12 @@ import pendulum
 import pytest
 from dateutil.tz import UTC
 
-from airflow.assets import Asset
 from airflow.decorators import task_group
 from airflow.lineage.entities import File
 from airflow.models import DagBag
 from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
 from airflow.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
 from airflow.utils import timezone
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.task_group import TaskGroup

Reply via email to