This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 67b71d37645 Remove top-level SDK reference in Core (#59817)
67b71d37645 is described below

commit 67b71d376454cc95cf2f5bb17e0f4edb0e05f480
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Sun Dec 28 14:02:32 2025 +0800

    Remove top-level SDK reference in Core (#59817)
---
 .../core_api/services/ui/connections.py            |  4 +-
 .../src/airflow/cli/commands/dag_command.py        |  3 +-
 airflow-core/src/airflow/models/connection.py      |  3 +-
 airflow-core/src/airflow/models/dag.py             | 26 +++++-----
 airflow-core/src/airflow/models/taskinstance.py    |  2 +-
 airflow-core/src/airflow/models/variable.py        |  7 ++-
 .../serialization/definitions/mappedoperator.py    |  5 --
 airflow-core/src/airflow/utils/cli.py              |  2 +-
 airflow-core/src/airflow/utils/context.py          | 44 ++++++----------
 airflow-core/src/airflow/utils/dag_edges.py        | 28 ++++++-----
 airflow-core/src/airflow/utils/dot_renderer.py     | 58 +++++++++++++---------
 devel-common/src/tests_common/test_utils/compat.py |  5 ++
 .../src/tests_common/test_utils/mock_context.py    |  2 +-
 .../providers/alibaba/cloud/links/maxcompute.py    |  2 +-
 .../alibaba/cloud/operators/analyticdb_spark.py    |  2 +-
 .../alibaba/cloud/operators/maxcompute.py          |  2 +-
 .../providers/alibaba/cloud/operators/oss.py       |  2 +-
 .../alibaba/cloud/sensors/analyticdb_spark.py      |  2 +-
 .../providers/alibaba/cloud/sensors/oss_key.py     |  2 +-
 .../airflow/providers/amazon/aws/links/base_aws.py |  2 +-
 .../providers/amazon/aws/notifications/chime.py    |  2 +-
 .../providers/amazon/aws/operators/appflow.py      |  2 +-
 .../providers/amazon/aws/operators/athena.py       |  2 +-
 .../providers/amazon/aws/operators/batch.py        |  2 +-
 .../providers/amazon/aws/operators/bedrock.py      |  7 ++-
 .../amazon/aws/operators/cloud_formation.py        |  2 +-
 .../providers/amazon/aws/operators/comprehend.py   |  7 ++-
 .../providers/amazon/aws/operators/datasync.py     |  2 +-
 .../airflow/providers/amazon/aws/operators/dms.py  |  8 +--
 .../airflow/providers/amazon/aws/operators/ec2.py  |  2 +-
 .../airflow/providers/amazon/aws/operators/ecs.py  |  2 +-
 .../airflow/providers/amazon/aws/operators/eks.py  |  2 +-
 .../airflow/providers/amazon/aws/operators/emr.py  |  2 +-
 .../providers/amazon/aws/operators/eventbridge.py  |  2 +-
 .../providers/amazon/aws/operators/glacier.py      |  2 +-
 .../airflow/providers/amazon/aws/operators/glue.py |  2 +-
 .../providers/amazon/aws/operators/glue_crawler.py |  5 +-
 .../amazon/aws/operators/glue_databrew.py          |  2 +-
 .../amazon/aws/operators/kinesis_analytics.py      |  2 +-
 .../amazon/aws/operators/lambda_function.py        |  2 +-
 .../airflow/providers/amazon/aws/operators/mwaa.py |  2 +-
 .../providers/amazon/aws/operators/neptune.py      |  2 +-
 .../providers/amazon/aws/operators/quicksight.py   |  2 +-
 .../airflow/providers/amazon/aws/operators/rds.py  |  2 +-
 .../amazon/aws/operators/redshift_cluster.py       |  2 +-
 .../amazon/aws/operators/redshift_data.py          |  2 +-
 .../airflow/providers/amazon/aws/operators/s3.py   |  2 +-
 .../providers/amazon/aws/operators/sagemaker.py    |  2 +-
 .../aws/operators/sagemaker_unified_studio.py      |  2 +-
 .../airflow/providers/amazon/aws/operators/sns.py  |  2 +-
 .../airflow/providers/amazon/aws/operators/sqs.py  |  2 +-
 .../airflow/providers/amazon/aws/operators/ssm.py  |  2 +-
 .../amazon/aws/operators/step_function.py          |  2 +-
 .../airflow/providers/amazon/aws/sensors/athena.py |  7 ++-
 .../airflow/providers/amazon/aws/sensors/batch.py  |  2 +-
 .../providers/amazon/aws/sensors/bedrock.py        |  2 +-
 .../amazon/aws/sensors/cloud_formation.py          |  5 +-
 .../providers/amazon/aws/sensors/comprehend.py     |  2 +-
 .../airflow/providers/amazon/aws/sensors/dms.py    |  2 +-
 .../providers/amazon/aws/sensors/dynamodb.py       |  2 +-
 .../airflow/providers/amazon/aws/sensors/ec2.py    |  2 +-
 .../airflow/providers/amazon/aws/sensors/ecs.py    |  2 +-
 .../airflow/providers/amazon/aws/sensors/eks.py    |  2 +-
 .../airflow/providers/amazon/aws/sensors/emr.py    |  2 +-
 .../providers/amazon/aws/sensors/glacier.py        |  2 +-
 .../airflow/providers/amazon/aws/sensors/glue.py   |  2 +-
 .../amazon/aws/sensors/glue_catalog_partition.py   |  2 +-
 .../providers/amazon/aws/sensors/glue_crawler.py   |  2 +-
 .../amazon/aws/sensors/kinesis_analytics.py        |  2 +-
 .../amazon/aws/sensors/lambda_function.py          |  2 +-
 .../airflow/providers/amazon/aws/sensors/mwaa.py   |  2 +-
 .../amazon/aws/sensors/opensearch_serverless.py    |  2 +-
 .../providers/amazon/aws/sensors/quicksight.py     |  2 +-
 .../airflow/providers/amazon/aws/sensors/rds.py    |  2 +-
 .../amazon/aws/sensors/redshift_cluster.py         |  2 +-
 .../src/airflow/providers/amazon/aws/sensors/s3.py |  9 ++--
 .../providers/amazon/aws/sensors/sagemaker.py      |  2 +-
 .../amazon/aws/sensors/sagemaker_unified_studio.py |  2 +-
 .../airflow/providers/amazon/aws/sensors/sqs.py    |  2 +-
 .../airflow/providers/amazon/aws/sensors/ssm.py    |  2 +-
 .../providers/amazon/aws/sensors/step_function.py  |  2 +-
 .../amazon/aws/transfers/azure_blob_to_s3.py       |  2 +-
 .../providers/amazon/aws/transfers/exasol_to_s3.py |  2 +-
 .../providers/amazon/aws/transfers/ftp_to_s3.py    |  2 +-
 .../providers/amazon/aws/transfers/gcs_to_s3.py    |  2 +-
 .../amazon/aws/transfers/glacier_to_gcs.py         |  2 +-
 .../amazon/aws/transfers/google_api_to_s3.py       |  2 +-
 .../amazon/aws/transfers/hive_to_dynamodb.py       |  2 +-
 .../providers/amazon/aws/transfers/http_to_s3.py   |  2 +-
 .../amazon/aws/transfers/imap_attachment_to_s3.py  |  2 +-
 .../providers/amazon/aws/transfers/local_to_s3.py  |  2 +-
 .../providers/amazon/aws/transfers/mongo_to_s3.py  |  2 +-
 .../amazon/aws/transfers/redshift_to_s3.py         |  2 +-
 .../amazon/aws/transfers/s3_to_dynamodb.py         |  2 +-
 .../providers/amazon/aws/transfers/s3_to_ftp.py    |  2 +-
 .../amazon/aws/transfers/s3_to_redshift.py         |  2 +-
 .../providers/amazon/aws/transfers/s3_to_sftp.py   |  2 +-
 .../providers/amazon/aws/transfers/s3_to_sql.py    |  2 +-
 .../amazon/aws/transfers/salesforce_to_s3.py       |  2 +-
 .../providers/amazon/aws/transfers/sftp_to_s3.py   |  2 +-
 .../providers/amazon/aws/transfers/sql_to_s3.py    |  2 +-
 .../tests/unit/amazon/aws/sensors/test_emr_base.py |  6 +--
 .../aws/sensors/test_sagemaker_unified_studio.py   |  7 +--
 .../providers/celery/sensors/celery_queue.py       |  6 +--
 .../airflow/providers/cncf/kubernetes/callbacks.py |  2 +-
 .../cncf/kubernetes/decorators/kubernetes.py       |  2 +-
 .../cncf/kubernetes/decorators/kubernetes_cmd.py   |  2 +-
 .../providers/cncf/kubernetes/operators/job.py     |  5 +-
 .../providers/cncf/kubernetes/operators/pod.py     |  7 +--
 .../cncf/kubernetes/operators/spark_kubernetes.py  |  6 +--
 .../cncf/kubernetes/sensors/spark_kubernetes.py    |  2 +-
 .../unit/cncf/kubernetes/operators/test_pod.py     |  2 +-
 .../common/sql/operators/generic_transfer.pyi      |  2 +-
 .../airflow/providers/common/sql/sensors/sql.pyi   |  2 +-
 .../airflow/providers/databricks/utils/mixins.py   |  8 +--
 .../airflow/providers/dbt/cloud/operators/dbt.py   |  2 +-
 .../src/airflow/providers/dbt/cloud/sensors/dbt.py |  2 +-
 .../providers/edge3/example_dags/win_notepad.py    |  2 +-
 .../src/airflow/providers/grpc/operators/grpc.py   |  6 +--
 .../src/airflow/providers/http/operators/http.py   |  7 +--
 .../src/airflow/providers/http/sensors/http.py     | 13 +----
 .../providers/imap/sensors/imap_attachment.py      |  6 +--
 .../providers/influxdb/operators/influxdb.py       |  6 +--
 .../airflow/providers/microsoft/azure/hooks/asb.py |  2 +-
 .../providers/microsoft/azure/operators/adls.py    |  2 +-
 .../providers/microsoft/azure/operators/adx.py     |  2 +-
 .../providers/microsoft/azure/operators/asb.py     |  2 +-
 .../providers/microsoft/azure/operators/batch.py   |  2 +-
 .../azure/operators/container_instances.py         |  2 +-
 .../providers/microsoft/azure/operators/cosmos.py  |  2 +-
 .../microsoft/azure/operators/data_factory.py      |  2 +-
 .../providers/microsoft/azure/operators/msgraph.py |  2 +-
 .../providers/microsoft/azure/operators/powerbi.py |  2 +-
 .../providers/microsoft/azure/operators/synapse.py |  2 +-
 .../microsoft/azure/operators/wasb_delete_blob.py  |  2 +-
 .../providers/microsoft/azure/sensors/cosmos.py    |  2 +-
 .../microsoft/azure/sensors/data_factory.py        |  2 +-
 .../providers/microsoft/azure/sensors/msgraph.py   |  2 +-
 .../providers/microsoft/azure/sensors/wasb.py      |  2 +-
 .../microsoft/azure/transfers/local_to_adls.py     |  2 +-
 .../microsoft/azure/transfers/local_to_wasb.py     |  2 +-
 .../azure/transfers/oracle_to_azure_data_lake.py   |  2 +-
 .../microsoft/azure/transfers/s3_to_wasb.py        |  2 +-
 .../microsoft/azure/transfers/sftp_to_wasb.py      |  6 +--
 .../tests/unit/microsoft/azure/hooks/test_asb.py   |  6 +--
 .../unit/microsoft/azure/operators/test_asb.py     |  6 +--
 .../azure/operators/test_container_instances.py    |  6 +--
 .../providers/microsoft/psrp/operators/psrp.py     |  2 +-
 .../providers/microsoft/winrm/operators/winrm.py   |  2 +-
 .../src/airflow/providers/mongo/sensors/mongo.py   |  6 +--
 .../mongo/tests/unit/mongo/sensors/test_mongo.py   |  6 +--
 .../src/airflow/providers/neo4j/operators/neo4j.py |  6 +--
 .../src/airflow/providers/neo4j/sensors/neo4j.py   |  5 +-
 .../providers/opensearch/operators/opensearch.py   |  6 +--
 .../providers/pinecone/operators/pinecone.py       |  5 +-
 .../providers/presto/transfers/gcs_to_presto.py    |  6 +--
 .../airflow/providers/qdrant/operators/qdrant.py   |  6 +--
 .../tests/unit/standard/operators/test_python.py   |  2 +-
 .../airflow/providers/teradata/operators/tpt.py    | 13 ++---
 shared/dagnode/src/airflow_shared/dagnode/node.py  |  1 +
 .../sdk/definitions/_internal/abstractoperator.py  |  1 -
 161 files changed, 266 insertions(+), 367 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/connections.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/connections.py
index 787f43c9a73..50ab6a7d69c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/connections.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/connections.py
@@ -27,7 +27,7 @@ from airflow.api_fastapi.core_api.datamodels.connections 
import (
     ConnectionHookMetaData,
     StandardHookFields,
 )
-from airflow.sdk import Param
+from airflow.serialization.definitions.param import SerializedParam
 
 if TYPE_CHECKING:
     from airflow.providers_manager import ConnectionFormWidgetInfo, HookInfo
@@ -79,7 +79,7 @@ class HookMetaService:
                 for v in validators:
                     if isinstance(v, HookMetaService.MockEnum):
                         enum = {"enum": v.allowed_values}
-            self.param = Param(
+            self.param = SerializedParam(
                 default=default,
                 title=label,
                 description=description or None,
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 32fce4553d5..6acbfa072cc 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -60,6 +60,7 @@ if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
     from airflow import DAG
+    from airflow.serialization.definitions.dag import SerializedDAG
     from airflow.timetables.base import DataInterval
 
 DAG_DETAIL_FIELDS = {*DAGResponse.model_fields, 
*DAGResponse.model_computed_fields}
@@ -656,7 +657,7 @@ def dag_test(args, dag: DAG | None = None, session: Session 
= NEW_SESSION) -> No
             )
         ).all()
 
-        dot_graph = render_dag(dag, tis=list(tis))
+        dot_graph = render_dag(cast("SerializedDAG", dag), tis=list(tis))
         print()
         if filename:
             _save_dot_to_file(dot_graph, filename)
diff --git a/airflow-core/src/airflow/models/connection.py 
b/airflow-core/src/airflow/models/connection.py
index b7edf3ba700..93128823648 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -36,7 +36,6 @@ from airflow.configuration import ensure_secrets_loaded
 from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.models.base import ID_LEN, Base
 from airflow.models.crypto import get_fernet
-from airflow.sdk import SecretCache
 from airflow.utils.helpers import prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -531,6 +530,8 @@ class Connection(Base, LoggingMixin):
 
         # check cache first
         # enabled only if SecretCache.init() has been called first
+        from airflow.sdk import SecretCache
+
         try:
             uri = SecretCache.get_connection_uri(conn_id)
             return Connection(conn_id=conn_id, uri=uri)
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 64e62b8517c..6ea07c3c230 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -21,11 +21,10 @@ import logging
 from collections import defaultdict
 from collections.abc import Callable, Collection
 from datetime import datetime, timedelta
-from typing import TYPE_CHECKING, Any, Union, cast
+from typing import TYPE_CHECKING, Any, cast
 
 import pendulum
 import sqlalchemy_jsonfield
-from dateutil.relativedelta import relativedelta
 from sqlalchemy import (
     Boolean,
     Float,
@@ -61,7 +60,6 @@ from airflow.settings import json
 from airflow.timetables.base import DataInterval, Timetable
 from airflow.timetables.interval import CronDataIntervalTimetable, 
DeltaDataIntervalTimetable
 from airflow.timetables.simple import AssetTriggeredTimetable, NullTimetable, 
OnceTimetable
-from airflow.utils.context import Context
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime, mapped_column, with_row_locks
 from airflow.utils.state import DagRunState
@@ -70,6 +68,9 @@ from airflow.utils.types import DagRunType
 if TYPE_CHECKING:
     from typing import TypeAlias
 
+    from dateutil.relativedelta import relativedelta
+
+    from airflow.sdk import Context
     from airflow.serialization.definitions.assets import (
         SerializedAsset,
         SerializedAssetAlias,
@@ -78,21 +79,20 @@ if TYPE_CHECKING:
     from airflow.serialization.definitions.dag import SerializedDAG
 
     UKey: TypeAlias = SerializedAssetUniqueKey
+    DagStateChangeCallback = Callable[[Context], None]
+    ScheduleInterval = None | str | timedelta | relativedelta
+
+    ScheduleArg = (
+        ScheduleInterval
+        | Timetable
+        | "SerializedAssetBase"
+        | Collection["SerializedAsset" | "SerializedAssetAlias"]
+    )
 
 log = logging.getLogger(__name__)
 
 TAG_MAX_LEN = 100
 
-DagStateChangeCallback = Callable[[Context], None]
-ScheduleInterval = None | str | timedelta | relativedelta
-
-ScheduleArg = Union[
-    ScheduleInterval,
-    Timetable,
-    "SerializedAssetBase",
-    Collection[Union["SerializedAsset", "SerializedAssetAlias"]],
-]
-
 
 def infer_automated_data_interval(timetable: Timetable, logical_date: 
datetime) -> DataInterval:
     """
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 583fa9f3ee6..d917bec159b 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -114,10 +114,10 @@ if TYPE_CHECKING:
     from airflow.api_fastapi.execution_api.datamodels.asset import AssetProfile
     from airflow.models.dag import DagModel
     from airflow.models.dagrun import DagRun
+    from airflow.sdk import Context
     from airflow.serialization.definitions.dag import SerializedDAG
     from airflow.serialization.definitions.mappedoperator import Operator
     from airflow.serialization.definitions.taskgroup import SerializedTaskGroup
-    from airflow.utils.context import Context
 
 
 PAST_DEPENDS_MET = "past_depends_met"
diff --git a/airflow-core/src/airflow/models/variable.py 
b/airflow-core/src/airflow/models/variable.py
index 9ab3dd854d6..41202559b84 100644
--- a/airflow-core/src/airflow/models/variable.py
+++ b/airflow-core/src/airflow/models/variable.py
@@ -32,7 +32,6 @@ from airflow._shared.secrets_masker import mask_secret
 from airflow.configuration import conf, ensure_secrets_loaded
 from airflow.models.base import ID_LEN, Base
 from airflow.models.crypto import get_fernet
-from airflow.sdk import SecretCache
 from airflow.secrets.metastore import MetastoreBackend
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
@@ -238,6 +237,8 @@ class Variable(Base, LoggingMixin):
             )
 
         # check if the secret exists in the custom secrets' backend.
+        from airflow.sdk import SecretCache
+
         Variable.check_for_write_conflict(key=key)
         if serialize_json:
             stored_value = json.dumps(value, indent=2)
@@ -428,6 +429,8 @@ class Variable(Base, LoggingMixin):
                 "Multi-team mode is not configured in the Airflow environment 
but the task trying to delete the variable belongs to a team"
             )
 
+        from airflow.sdk import SecretCache
+
         ctx: contextlib.AbstractContextManager
         if session is not None:
             ctx = contextlib.nullcontext(session)
@@ -494,6 +497,8 @@ class Variable(Base, LoggingMixin):
         :param team_name: Team name associated to the task trying to access 
the variable (if any)
         :return: Variable Value
         """
+        from airflow.sdk import SecretCache
+
         # Disable cache if the variable belongs to a team. We might enable it 
later
         if not team_name:
             # check cache first
diff --git 
a/airflow-core/src/airflow/serialization/definitions/mappedoperator.py 
b/airflow-core/src/airflow/serialization/definitions/mappedoperator.py
index f15afdf34a1..561a23f0b9a 100644
--- a/airflow-core/src/airflow/serialization/definitions/mappedoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/mappedoperator.py
@@ -30,7 +30,6 @@ from sqlalchemy.orm import Session
 
 from airflow.exceptions import AirflowException, NotMapped
 from airflow.sdk import BaseOperator as TaskSDKBaseOperator
-from airflow.sdk.definitions._internal.abstractoperator import 
DEFAULT_RETRY_DELAY_MULTIPLIER
 from airflow.sdk.definitions.mappedoperator import MappedOperator as 
TaskSDKMappedOperator
 from airflow.serialization.definitions.baseoperator import 
DEFAULT_OPERATOR_DEPS, SerializedBaseOperator
 from airflow.serialization.definitions.node import DAGNode
@@ -288,10 +287,6 @@ class SerializedMappedOperator(DAGNode):
     def max_retry_delay(self) -> datetime.timedelta | float | None:
         return self._get_partial_kwargs_or_operator_default("max_retry_delay")
 
-    @property
-    def retry_delay_multiplier(self) -> float:
-        return float(self.partial_kwargs.get("retry_delay_multiplier", 
DEFAULT_RETRY_DELAY_MULTIPLIER))
-
     @property
     def weight_rule(self) -> PriorityWeightStrategy:
         from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
diff --git a/airflow-core/src/airflow/utils/cli.py 
b/airflow-core/src/airflow/utils/cli.py
index 1e9e98514a5..d4e012f6cc3 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -37,7 +37,6 @@ from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.exceptions import AirflowException
-from airflow.sdk.definitions._internal.dag_parsing_context import 
_airflow_parsing_context_manager
 from airflow.utils import cli_action_loggers
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.platform import getuser, is_terminal_support_colors
@@ -274,6 +273,7 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, 
dagfile_path: str | N
     dags folder.
     """
     from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
+    from airflow.sdk.definitions._internal.dag_parsing_context import 
_airflow_parsing_context_manager
 
     manager = DagBundlesManager()
     for bundle_name in bundle_names or ():
diff --git a/airflow-core/src/airflow/utils/context.py 
b/airflow-core/src/airflow/utils/context.py
index 439b142ea27..02e324b083b 100644
--- a/airflow-core/src/airflow/utils/context.py
+++ b/airflow-core/src/airflow/utils/context.py
@@ -19,23 +19,22 @@
 
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Any, cast
+import warnings
+from typing import Any
 
 from sqlalchemy import select
 
 from airflow.models.asset import AssetModel
-from airflow.sdk import Asset, Context
+from airflow.sdk import Asset
 from airflow.sdk.execution_time.context import (
     ConnectionAccessor as ConnectionAccessorSDK,
     OutletEventAccessors as OutletEventAccessorsSDK,
     VariableAccessor as VariableAccessorSDK,
 )
 from airflow.serialization.definitions.notset import NOTSET, is_arg_set
+from airflow.utils.deprecation_tools import DeprecatedImportWarning
 from airflow.utils.session import create_session
 
-if TYPE_CHECKING:
-    from collections.abc import Container
-
 # NOTE: Please keep this in sync with the following:
 # * Context in task-sdk/src/airflow/sdk/definitions/context.py
 # * Table in docs/apache-airflow/templates-ref.rst
@@ -141,30 +140,17 @@ class OutletEventAccessors(OutletEventAccessorsSDK):
         return Asset(name=asset.name, uri=asset.uri, group=asset.group, 
extra=asset.extra)
 
 
-def context_merge(context: Context, *args: Any, **kwargs: Any) -> None:
-    """
-    Merge parameters into an existing context.
-
-    Like ``dict.update()`` , this take the same parameters, and updates
-    ``context`` in-place.
-
-    This is implemented as a free function because the ``Context`` type is
-    "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom
-    functions.
+def __getattr__(name: str):
+    if name in ("Context", "context_copy_partial", "context_merge"):
+        warnings.warn(
+            "Importing Context from airflow.utils.context is deprecated and 
will "
+            "be removed in the future. Please import it from airflow.sdk 
instead.",
+            DeprecatedImportWarning,
+            stacklevel=2,
+        )
 
-    :meta private:
-    """
-    if not context:
-        context = Context()
+        import airflow.sdk.definitions.context as sdk
 
-    context.update(*args, **kwargs)
+        return getattr(sdk, name)
 
-
-def context_copy_partial(source: Context, keys: Container[str]) -> Context:
-    """
-    Create a context by copying items under selected keys in ``source``.
-
-    :meta private:
-    """
-    new = {k: v for k, v in source.items() if k in keys}
-    return cast("Context", new)
+    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/airflow-core/src/airflow/utils/dag_edges.py 
b/airflow-core/src/airflow/utils/dag_edges.py
index a59d083bbab..331b12b66b2 100644
--- a/airflow-core/src/airflow/utils/dag_edges.py
+++ b/airflow-core/src/airflow/utils/dag_edges.py
@@ -16,20 +16,23 @@
 # under the License.
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING, Any
 
-from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator
-from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
-from airflow.serialization.definitions.dag import SerializedDAG
-from airflow.serialization.definitions.mappedoperator import 
SerializedMappedOperator
+from airflow.serialization.definitions.taskgroup import SerializedTaskGroup
+
+# Also support SDK types if possible.
+try:
+    from airflow.sdk import TaskGroup
+except ImportError:
+    TaskGroup = SerializedTaskGroup  # type: ignore[misc]
 
 if TYPE_CHECKING:
-    from airflow.sdk import DAG
     from airflow.serialization.definitions.dag import SerializedDAG
     from airflow.serialization.definitions.mappedoperator import Operator
+    from airflow.serialization.definitions.node import DAGNode
 
 
-def dag_edges(dag: DAG | SerializedDAG):
+def dag_edges(dag: SerializedDAG):
     """
     Create the list of edges needed to construct the Graph view.
 
@@ -62,9 +65,10 @@ def dag_edges(dag: DAG | SerializedDAG):
 
     task_group_map = dag.task_group.get_task_group_dict()
 
-    def collect_edges(task_group):
+    def collect_edges(task_group: DAGNode) -> None:
         """Update edges_to_add and edges_to_skip according to TaskGroups."""
-        if isinstance(task_group, (AbstractOperator, SerializedBaseOperator, 
SerializedMappedOperator)):
+        child: DAGNode
+        if not isinstance(task_group, (TaskGroup, SerializedTaskGroup)):
             return
 
         for target_id in task_group.downstream_group_ids:
@@ -111,9 +115,7 @@ def dag_edges(dag: DAG | SerializedDAG):
     edges = set()
     setup_teardown_edges = set()
 
-    # TODO (GH-52141): 'roots' in scheduler needs to return scheduler types
-    # instead, but currently it inherits SDK's DAG.
-    tasks_to_trace = cast("list[Operator]", dag.roots)
+    tasks_to_trace = dag.roots
     while tasks_to_trace:
         tasks_to_trace_next: list[Operator] = []
         for task in tasks_to_trace:
@@ -130,7 +132,7 @@ def dag_edges(dag: DAG | SerializedDAG):
     # Build result dicts with the two ends of the edge, plus any extra metadata
     # if we have it.
     for source_id, target_id in sorted(edges.union(edges_to_add) - 
edges_to_skip):
-        record = {"source_id": source_id, "target_id": target_id}
+        record: dict[str, Any] = {"source_id": source_id, "target_id": 
target_id}
         label = dag.get_edge_info(source_id, target_id).get("label")
         if (source_id, target_id) in setup_teardown_edges:
             record["is_setup_teardown"] = True
diff --git a/airflow-core/src/airflow/utils/dot_renderer.py 
b/airflow-core/src/airflow/utils/dot_renderer.py
index b0b7dbb44c2..ccd5c32b36d 100644
--- a/airflow-core/src/airflow/utils/dot_renderer.py
+++ b/airflow-core/src/airflow/utils/dot_renderer.py
@@ -24,20 +24,34 @@ import warnings
 from typing import TYPE_CHECKING, Any
 
 from airflow.exceptions import AirflowException
-from airflow.sdk import DAG, BaseOperator, TaskGroup
-from airflow.sdk.definitions.mappedoperator import MappedOperator
 from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
 from airflow.serialization.definitions.mappedoperator import 
SerializedMappedOperator
 from airflow.serialization.definitions.taskgroup import SerializedTaskGroup
 from airflow.utils.dag_edges import dag_edges
 from airflow.utils.state import State
 
+# Also support SDK types if possible.
+try:
+    from airflow.sdk import BaseOperator
+except ImportError:
+    BaseOperator = SerializedBaseOperator  # type: ignore[assignment,misc]
+try:
+    from airflow.sdk.definitions.mappedoperator import MappedOperator
+except ImportError:
+    MappedOperator = SerializedMappedOperator  # type: ignore[assignment,misc]
+try:
+    from airflow.sdk import TaskGroup
+except ImportError:
+    TaskGroup = SerializedTaskGroup  # type: ignore[assignment,misc]
+
 if TYPE_CHECKING:
     import graphviz
 
     from airflow.models import TaskInstance
     from airflow.serialization.dag_dependency import DagDependency
     from airflow.serialization.definitions.dag import SerializedDAG
+    from airflow.serialization.definitions.mappedoperator import Operator
+    from airflow.serialization.definitions.node import DAGNode
 else:
     try:
         import graphviz
@@ -70,7 +84,7 @@ def _refine_color(color: str):
 
 
 def _draw_task(
-    task: BaseOperator | MappedOperator | SerializedBaseOperator | 
SerializedMappedOperator,
+    task: Operator,
     parent_graph: graphviz.Digraph,
     states_by_task_id: dict[Any, Any] | None,
 ) -> None:
@@ -96,7 +110,7 @@ def _draw_task(
 
 
 def _draw_task_group(
-    task_group: TaskGroup | SerializedTaskGroup,
+    task_group: SerializedTaskGroup,
     parent_graph: graphviz.Digraph,
     states_by_task_id: dict[str, str | None] | None,
 ) -> None:
@@ -136,29 +150,27 @@ def _draw_task_group(
 
 
 def _draw_nodes(
-    node: object, parent_graph: graphviz.Digraph, states_by_task_id: dict[str, 
str | None] | None
+    node: DAGNode, parent_graph: graphviz.Digraph, states_by_task_id: 
dict[str, str | None] | None
 ) -> None:
     """Draw the node and its children on the given parent_graph recursively."""
     if isinstance(node, (BaseOperator, MappedOperator, SerializedBaseOperator, 
SerializedMappedOperator)):
         _draw_task(node, parent_graph, states_by_task_id)
+    elif not isinstance(node, (TaskGroup, SerializedTaskGroup)):
+        raise AirflowException(f"The node {node} should be TaskGroup and is 
not")
+    elif node.is_root:
+        # No need to draw background for root TaskGroup.
+        _draw_task_group(node, parent_graph, states_by_task_id)
     else:
-        if not isinstance(node, (SerializedTaskGroup, TaskGroup)):
-            raise AirflowException(f"The node {node} should be TaskGroup and 
is not")
-        # Draw TaskGroup
-        if node.is_root:
-            # No need to draw background for root TaskGroup.
-            _draw_task_group(node, parent_graph, states_by_task_id)
-        else:
-            with parent_graph.subgraph(name=f"cluster_{node.group_id}") as sub:
-                sub.attr(
-                    shape="rectangle",
-                    style="filled",
-                    color=_refine_color(node.ui_fgcolor),
-                    # Partially transparent CornflowerBlue
-                    fillcolor="#6495ed7f",
-                    label=node.label,
-                )
-                _draw_task_group(node, sub, states_by_task_id)
+        with parent_graph.subgraph(name=f"cluster_{node.group_id}") as sub:
+            sub.attr(
+                shape="rectangle",
+                style="filled",
+                color=_refine_color(node.ui_fgcolor),
+                # Partially transparent CornflowerBlue
+                fillcolor="#6495ed7f",
+                label=node.label,
+            )
+            _draw_task_group(node, sub, states_by_task_id)
 
 
 def render_dag_dependencies(deps: dict[str, list[DagDependency]]) -> 
graphviz.Digraph:
@@ -194,7 +206,7 @@ def render_dag_dependencies(deps: dict[str, 
list[DagDependency]]) -> graphviz.Di
     return dot
 
 
-def render_dag(dag: DAG | SerializedDAG, tis: list[TaskInstance] | None = 
None) -> graphviz.Digraph:
+def render_dag(dag: SerializedDAG, tis: list[TaskInstance] | None = None) -> 
graphviz.Digraph:
     """
     Render the DAG object to the DOT object.
 
diff --git a/devel-common/src/tests_common/test_utils/compat.py 
b/devel-common/src/tests_common/test_utils/compat.py
index 1e2286db9e8..0f8ae0042d0 100644
--- a/devel-common/src/tests_common/test_utils/compat.py
+++ b/devel-common/src/tests_common/test_utils/compat.py
@@ -81,6 +81,11 @@ try:
 except ImportError:
     from airflow.utils import timezone  # type: ignore[no-redef,attr-defined]
 
+try:
+    from airflow.sdk import Context
+except ImportError:
+    from airflow.utils.context import Context  # type: 
ignore[no-redef,attr-defined]
+
 try:
     from airflow.sdk import TriggerRule
 except ImportError:
diff --git a/devel-common/src/tests_common/test_utils/mock_context.py 
b/devel-common/src/tests_common/test_utils/mock_context.py
index 2a22cd223a1..5defc527e05 100644
--- a/devel-common/src/tests_common/test_utils/mock_context.py
+++ b/devel-common/src/tests_common/test_utils/mock_context.py
@@ -21,7 +21,7 @@ from collections.abc import Iterable
 from typing import TYPE_CHECKING, Any
 from unittest import mock
 
-from airflow.utils.context import Context
+from tests_common.test_utils.compat import Context
 
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
diff --git 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py
index f521ac7a71c..90003571066 100644
--- a/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py
+++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py
@@ -23,7 +23,7 @@ from airflow.providers.common.compat.sdk import 
BaseOperatorLink, XCom
 if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
     from airflow.providers.common.compat.sdk import BaseOperator
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class MaxComputeLogViewLink(BaseOperatorLink):
diff --git 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
index c11106f91c6..77364da787b 100644
--- 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
+++ 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
@@ -26,7 +26,7 @@ from airflow.providers.alibaba.cloud.hooks.analyticdb_spark 
import AnalyticDBSpa
 from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AnalyticDBSparkBaseOperator(BaseOperator):
diff --git 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py
index f35f0519acf..f8c7b4f761f 100644
--- 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py
+++ 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py
@@ -29,7 +29,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 if TYPE_CHECKING:
     from odps.models import Instance
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class MaxComputeSQLOperator(BaseOperator):
diff --git 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py
index f43315b7530..18da5985dcf 100644
--- a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py
+++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py
@@ -25,7 +25,7 @@ from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
 from airflow.providers.common.compat.sdk import BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class OSSCreateBucketOperator(BaseOperator):
diff --git 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/analyticdb_spark.py
 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/analyticdb_spark.py
index 325fe35cbf9..46e8549fc8a 100644
--- 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/analyticdb_spark.py
+++ 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/analyticdb_spark.py
@@ -25,7 +25,7 @@ from airflow.providers.alibaba.cloud.hooks.analyticdb_spark 
import AnalyticDBSpa
 from airflow.providers.common.compat.sdk import BaseSensorOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AnalyticDBSparkSensor(BaseSensorOperator):
diff --git 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/oss_key.py 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/oss_key.py
index b71abe01f50..27bffe8a11b 100644
--- a/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/oss_key.py
+++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/sensors/oss_key.py
@@ -29,7 +29,7 @@ from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
 from airflow.providers.common.compat.sdk import AirflowException, 
BaseSensorOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class OSSKeySensor(BaseSensorOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py 
b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py
index 5be80e58d9f..8b08d395b77 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/links/base_aws.py
@@ -25,7 +25,7 @@ from airflow.providers.common.compat.sdk import 
BaseOperatorLink, XCom
 if TYPE_CHECKING:
     from airflow.models import BaseOperator
     from airflow.models.taskinstancekey import TaskInstanceKey
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 BASE_AWS_CONSOLE_LINK = "https://console.{aws_domain}";
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/notifications/chime.py 
b/providers/amazon/src/airflow/providers/amazon/aws/notifications/chime.py
index 1c2ac5b3ff7..5bf8aba2de4 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/notifications/chime.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/notifications/chime.py
@@ -24,7 +24,7 @@ from airflow.providers.amazon.aws.hooks.chime import 
ChimeWebhookHook
 from airflow.providers.common.compat.notifier import BaseNotifier
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class ChimeNotifier(BaseNotifier):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/appflow.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/appflow.py
index a8dc08619ef..b47c188e85c 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/appflow.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/appflow.py
@@ -34,7 +34,7 @@ if TYPE_CHECKING:
         TaskTypeDef,
     )
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 SUPPORTED_SOURCES = {"salesforce", "zendesk"}
 MANDATORY_FILTER_DATE_MSG = "The filter_date argument is mandatory for 
{entity}!"
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py
index f5006b34ad0..ed10c37418e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/athena.py
@@ -33,7 +33,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 if TYPE_CHECKING:
     from airflow.providers.common.compat.openlineage.facet import BaseFacet, 
Dataset, DatasetFacet
     from airflow.providers.openlineage.extractors.base import OperatorLineage
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AthenaOperator(AwsBaseOperator[AthenaHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py
index 80ac1d11a29..aaae0e65fee 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py
@@ -49,7 +49,7 @@ from airflow.providers.amazon.aws.utils.task_log_fetcher 
import AwsTaskLogFetche
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class BatchOperator(AwsBaseOperator[BatchClientHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
index 71142a1701f..d1ee17ec257 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/bedrock.py
@@ -40,12 +40,11 @@ from airflow.providers.amazon.aws.triggers.bedrock import (
 )
 from airflow.providers.amazon.aws.utils import validate_execute_complete_event
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, timezone
 from airflow.utils.helpers import prune_dict
-from airflow.utils.timezone import utcnow
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class BedrockInvokeModelOperator(AwsBaseOperator[BedrockRuntimeHook]):
@@ -237,7 +236,7 @@ class 
BedrockCustomizeModelOperator(AwsBaseOperator[BedrockHook]):
                 if not self.ensure_unique_job_name:
                     raise error
                 retry = True
-                self.job_name = f"{self.job_name}-{int(utcnow().timestamp())}"
+                self.job_name = 
f"{self.job_name}-{int(timezone.utcnow().timestamp())}"
                 self.log.info("Changed job name to '%s' to avoid collision.", 
self.job_name)
 
         if response["ResponseMetadata"]["HTTPStatusCode"] != 201:
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/cloud_formation.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/cloud_formation.py
index d4bfcc95322..1af9d6446c2 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/cloud_formation.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/cloud_formation.py
@@ -27,7 +27,7 @@ from airflow.providers.amazon.aws.operators.base_aws import 
AwsBaseOperator
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class CloudFormationCreateStackOperator(AwsBaseOperator[CloudFormationHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/comprehend.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/comprehend.py
index 9932a803f24..454636edfff 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/comprehend.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/comprehend.py
@@ -33,13 +33,12 @@ from airflow.providers.amazon.aws.triggers.comprehend 
import (
 )
 from airflow.providers.amazon.aws.utils import validate_execute_complete_event
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
-from airflow.providers.common.compat.sdk import AirflowException
-from airflow.utils.timezone import utcnow
+from airflow.providers.common.compat.sdk import AirflowException, timezone
 
 if TYPE_CHECKING:
     import boto3
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class ComprehendBaseOperator(AwsBaseOperator[ComprehendHook]):
@@ -157,7 +156,7 @@ class 
ComprehendStartPiiEntitiesDetectionJobOperator(ComprehendBaseOperator):
     def execute(self, context: Context) -> str:
         if self.start_pii_entities_kwargs.get("JobName", None) is None:
             self.start_pii_entities_kwargs["JobName"] = (
-                f"start_pii_entities_detection_job-{int(utcnow().timestamp())}"
+                
f"start_pii_entities_detection_job-{int(timezone.utcnow().timestamp())}"
             )
 
         self.log.info(
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/datasync.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/datasync.py
index bee99070ed1..9345278ad53 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/datasync.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/datasync.py
@@ -30,7 +30,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException, 
AirflowTaskTimeout
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class DataSyncOperator(AwsBaseOperator[DataSyncHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
index 4df6582c2b5..9e518fc99fc 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from collections.abc import Sequence
 from datetime import datetime
-from typing import TYPE_CHECKING, Any, ClassVar
+from typing import Any, ClassVar
 
 from airflow.configuration import conf
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
@@ -32,11 +32,7 @@ from airflow.providers.amazon.aws.triggers.dms import (
     DmsReplicationTerminalStatusTrigger,
 )
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
-from airflow.providers.common.compat.sdk import AirflowException
-from airflow.utils.context import Context
-
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
+from airflow.providers.common.compat.sdk import AirflowException, Context
 
 
 class DmsCreateTaskOperator(AwsBaseOperator[DmsHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
index 55ae590deb8..78202e7bd8f 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
@@ -30,7 +30,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class EC2StartInstanceOperator(AwsBaseOperator[EC2Hook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
index 0038546cde6..948d039a812 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ecs.py
@@ -45,7 +45,7 @@ if TYPE_CHECKING:
     import boto3
 
     from airflow.models import TaskInstance
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class EcsBaseOperator(AwsBaseOperator[EcsHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
index af196e4e72c..f352b1cd4c9 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
@@ -54,7 +54,7 @@ except ImportError:
     )
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 CHECK_INTERVAL_SECONDS = 15
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
index ba88505f5f3..1af7a0011a0 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py
@@ -62,7 +62,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 from airflow.utils.helpers import exactly_one, prune_dict
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class EmrAddStepsOperator(AwsBaseOperator[EmrHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/eventbridge.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/eventbridge.py
index ba1a9cb4e2e..bf810e129f3 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/eventbridge.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/eventbridge.py
@@ -26,7 +26,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 from airflow.utils.helpers import prune_dict
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class EventBridgePutEventsOperator(AwsBaseOperator[EventBridgeHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glacier.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glacier.py
index 70f06b07ef9..179c3ad1505 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/glacier.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/glacier.py
@@ -25,7 +25,7 @@ from airflow.providers.amazon.aws.operators.base_aws import 
AwsBaseOperator
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GlacierCreateJobOperator(AwsBaseOperator[GlacierHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
index 536121de679..64a6e24ac2b 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py
@@ -39,7 +39,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
index 504fa4df81d..ba25efebc93 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -21,6 +21,7 @@ from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
 from airflow.configuration import conf
+from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
 from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
 from airflow.providers.amazon.aws.triggers.glue_crawler import 
GlueCrawlerCompleteTrigger
 from airflow.providers.amazon.aws.utils import validate_execute_complete_event
@@ -28,9 +29,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-from airflow.providers.amazon.aws.hooks.glue_crawler import GlueCrawlerHook
+    from airflow.sdk import Context
 
 
 class GlueCrawlerOperator(AwsBaseOperator[GlueCrawlerHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_databrew.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_databrew.py
index 3a4b4ac3dff..12708d944a8 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_databrew.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue_databrew.py
@@ -29,7 +29,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GlueDataBrewStartJobOperator(AwsBaseOperator[GlueDataBrewHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/kinesis_analytics.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/kinesis_analytics.py
index fbf28b955fe..4c266e032ab 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/kinesis_analytics.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/kinesis_analytics.py
@@ -32,7 +32,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class 
KinesisAnalyticsV2CreateApplicationOperator(AwsBaseOperator[KinesisAnalyticsV2Hook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/lambda_function.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/lambda_function.py
index 215e4513ea2..9e6c51f609e 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/lambda_function.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/lambda_function.py
@@ -31,7 +31,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class LambdaCreateFunctionOperator(AwsBaseOperator[LambdaHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa.py
index 64f56611878..d553bc08c9f 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/mwaa.py
@@ -30,7 +30,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class MwaaTriggerDagRunOperator(AwsBaseOperator[MwaaHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/neptune.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/neptune.py
index afb633c4b1e..2447a935990 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/neptune.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/neptune.py
@@ -34,7 +34,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 def handle_waitable_exception(
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/quicksight.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/quicksight.py
index 5b333ec49db..51b0b3b372e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/quicksight.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/quicksight.py
@@ -24,7 +24,7 @@ from airflow.providers.amazon.aws.operators.base_aws import 
AwsBaseOperator
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class QuickSightCreateIngestionOperator(AwsBaseOperator[QuickSightHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/rds.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/rds.py
index 1871b1e066a..84a3ef2229a 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/rds.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/rds.py
@@ -41,7 +41,7 @@ from airflow.utils.helpers import prune_dict
 if TYPE_CHECKING:
     from mypy_boto3_rds.type_defs import TagTypeDef
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class RdsBaseOperator(AwsBaseOperator[RdsHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 9172ac31370..a910383ed87 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -37,7 +37,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 from airflow.utils.helpers import prune_dict
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_data.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_data.py
index 0898c7a3671..7f3685ad4f3 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_data.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_data.py
@@ -33,7 +33,7 @@ if TYPE_CHECKING:
         GetStatementResultResponseTypeDef,
     )
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class RedshiftDataOperator(AwsBaseOperator[RedshiftDataHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3.py
index 4753a0a7c0f..b75dfd40b0f 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3.py
@@ -37,7 +37,7 @@ from airflow.utils.helpers import exactly_one
 if TYPE_CHECKING:
     from datetime import datetime
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 BUCKET_DOES_NOT_EXIST_MSG = "Bucket with name: %s doesn't exist"
 
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
index 4e5f9d5f44d..a4a43ef622b 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker.py
@@ -48,7 +48,7 @@ from airflow.utils.helpers import prune_dict
 if TYPE_CHECKING:
     from airflow.providers.common.compat.openlineage.facet import Dataset
     from airflow.providers.openlineage.extractors.base import OperatorLineage
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 # DEFAULT_CONN_ID: str = "aws_default"
 CHECK_INTERVAL_SECOND: int = 30
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker_unified_studio.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker_unified_studio.py
index 5eb11e00033..757bb453bb3 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker_unified_studio.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/sagemaker_unified_studio.py
@@ -35,7 +35,7 @@ from 
airflow.providers.amazon.aws.triggers.sagemaker_unified_studio import (
 from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SageMakerNotebookOperator(BaseOperator):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/sns.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/sns.py
index 5b13e58863a..df2bc908041 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/sns.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/sns.py
@@ -27,7 +27,7 @@ from airflow.providers.amazon.aws.operators.base_aws import 
AwsBaseOperator
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SnsPublishOperator(AwsBaseOperator[SnsHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/sqs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/sqs.py
index 817e6113270..08d6123eed1 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/sqs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/sqs.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.operators.base_aws import 
AwsBaseOperator
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SqsPublishOperator(AwsBaseOperator[SqsHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ssm.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ssm.py
index 7c2af24919a..57ef9654a3c 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ssm.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ssm.py
@@ -27,7 +27,7 @@ from airflow.providers.amazon.aws.utils import 
validate_execute_complete_event
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SsmRunCommandOperator(AwsBaseOperator[SsmHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/step_function.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/step_function.py
index be2cff71d4b..9d93444e6d9 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/step_function.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/step_function.py
@@ -34,7 +34,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class StepFunctionStartExecutionOperator(AwsBaseOperator[StepFunctionHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/athena.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/athena.py
index 191e13d7b18..c8710606ed0 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/athena.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/athena.py
@@ -20,14 +20,13 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
+from airflow.providers.amazon.aws.hooks.athena import AthenaHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
+from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-from airflow.providers.amazon.aws.hooks.athena import AthenaHook
-from airflow.providers.common.compat.sdk import AirflowException
+    from airflow.sdk import Context
 
 
 class AthenaSensor(AwsBaseSensor[AthenaHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/batch.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/batch.py
index 48a3a728bb8..33d8389cd81 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/batch.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/batch.py
@@ -28,7 +28,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class BatchSensor(AwsBaseSensor[BatchClientHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/bedrock.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/bedrock.py
index 7c606c0d359..bbe7763bbe8 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/bedrock.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/bedrock.py
@@ -37,7 +37,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 
 if TYPE_CHECKING:
     from airflow.providers.amazon.aws.triggers.bedrock import 
BedrockBaseBatchInferenceTrigger
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 _GenericBedrockHook = TypeVar("_GenericBedrockHook", BedrockAgentHook, 
BedrockHook)
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/cloud_formation.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/cloud_formation.py
index 6b5c7fa08a1..d204ec5ca3e 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/cloud_formation.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/cloud_formation.py
@@ -22,13 +22,12 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
+from airflow.providers.amazon.aws.hooks.cloud_formation import 
CloudFormationHook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-from airflow.providers.amazon.aws.hooks.cloud_formation import 
CloudFormationHook
+    from airflow.sdk import Context
 
 
 class CloudFormationCreateStackSensor(AwsBaseSensor[CloudFormationHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/comprehend.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/comprehend.py
index a15b910a1de..2372d1f37d5 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/comprehend.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/comprehend.py
@@ -31,7 +31,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class ComprehendBaseSensor(AwsBaseSensor[ComprehendHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/dms.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/dms.py
index 1dbd1f92f4f..47eb56f10e9 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/dms.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/dms.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class DmsTaskBaseSensor(AwsBaseSensor[DmsHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/dynamodb.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/dynamodb.py
index bc0f7bc979d..c4236054110 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/dynamodb.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/dynamodb.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.sensors.base_aws import 
AwsBaseSensor
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class DynamoDBValueSensor(AwsBaseSensor[DynamoDBHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/ec2.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/ec2.py
index 018f5ef4d7f..c7b525bda2e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/ec2.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/ec2.py
@@ -29,7 +29,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class EC2InstanceStateSensor(AwsBaseSensor[EC2Hook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/ecs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/ecs.py
index 552534bf3c7..f2741af458b 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/ecs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/ecs.py
@@ -33,7 +33,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 if TYPE_CHECKING:
     import boto3
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 def _check_failed(current_state, target_state, failure_states) -> None:
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/eks.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/eks.py
index ebbf42e1ff7..637a8ef444a 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/eks.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/eks.py
@@ -35,7 +35,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 DEFAULT_CONN_ID = "aws_default"
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/emr.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/emr.py
index 1eef31c57e4..ef936708aca 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/emr.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/emr.py
@@ -35,7 +35,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class EmrBaseSensor(AwsBaseSensor[EmrHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glacier.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glacier.py
index 29cccd2c363..e1826538487 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glacier.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glacier.py
@@ -27,7 +27,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class JobStatus(Enum):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue.py
index 9480ff26461..995552bbe72 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue.py
@@ -33,7 +33,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GlueJobSensor(AwsBaseSensor[GlueJobHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
index 0cfd733b2f3..b5a1b867d7e 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_catalog_partition.py
@@ -30,7 +30,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GlueCatalogPartitionSensor(AwsBaseSensor[GlueCatalogHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_crawler.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_crawler.py
index 5103237ff54..1d84930adc1 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_crawler.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/glue_crawler.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GlueCrawlerSensor(AwsBaseSensor[GlueCrawlerHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
index 30836baa7da..f7e5821a56e 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/kinesis_analytics.py
@@ -29,7 +29,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class KinesisAnalyticsV2BaseSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/lambda_function.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/lambda_function.py
index e3d78561d5d..cbe9f0c8d2b 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/lambda_function.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/lambda_function.py
@@ -27,7 +27,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class LambdaFunctionStateSensor(AwsBaseSensor[LambdaHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/mwaa.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/mwaa.py
index 07c9063fa90..46229f570e0 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/mwaa.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/mwaa.py
@@ -29,7 +29,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 from airflow.utils.state import DagRunState, TaskInstanceState
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class MwaaDagRunSensor(AwsBaseSensor[MwaaHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/opensearch_serverless.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/opensearch_serverless.py
index 6fcef192965..1675d46e50b 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/opensearch_serverless.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/opensearch_serverless.py
@@ -30,7 +30,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 from airflow.utils.helpers import exactly_one
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class 
OpenSearchServerlessCollectionActiveSensor(AwsBaseSensor[OpenSearchServerlessHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/quicksight.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/quicksight.py
index b33af4c4916..4c1f9085033 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/quicksight.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/quicksight.py
@@ -25,7 +25,7 @@ from airflow.providers.amazon.aws.sensors.base_aws import 
AwsBaseSensor
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class QuickSightSensor(AwsBaseSensor[QuickSightHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/rds.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/rds.py
index d7ef8981244..1143a3b2cc8 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/rds.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/rds.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.utils.rds import RdsDbType
 from airflow.providers.common.compat.sdk import AirflowException, 
AirflowNotFoundException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class RdsBaseSensor(AwsBaseSensor[RdsHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
index b8555056c4a..b6f30a5c532 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/redshift_cluster.py
@@ -29,7 +29,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class RedshiftClusterSensor(AwsBaseSensor[RedshiftHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/s3.py
index a686ab2d279..d67b3341a3e 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/s3.py
@@ -26,17 +26,16 @@ from datetime import datetime, timedelta
 from typing import TYPE_CHECKING, Any, cast
 
 from airflow.configuration import conf
-from airflow.providers.amazon.aws.utils import validate_execute_complete_event
-
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
 from airflow.providers.amazon.aws.triggers.s3 import S3KeysUnchangedTrigger, 
S3KeyTrigger
+from airflow.providers.amazon.aws.utils import validate_execute_complete_event
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException, 
poke_mode_only
 
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
 
 class S3KeySensor(AwsBaseSensor[S3Hook]):
     """
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker.py
index 2b8d42c87fc..faafea5f9ce 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SageMakerBaseSensor(AwsBaseSensor[SageMakerHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker_unified_studio.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker_unified_studio.py
index 0a346c99345..ee5caf033d3 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker_unified_studio.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/sagemaker_unified_studio.py
@@ -27,7 +27,7 @@ from 
airflow.providers.amazon.aws.hooks.sagemaker_unified_studio import (
 from airflow.providers.common.compat.sdk import AirflowException, 
BaseSensorOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SageMakerNotebookSensor(BaseSensorOperator):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/sqs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/sqs.py
index 9e2b4be32d2..ee6124c30ec 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/sqs.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/sqs.py
@@ -35,7 +35,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException
 if TYPE_CHECKING:
     from airflow.providers.amazon.aws.hooks.base_aws import BaseAwsConnection
     from airflow.providers.amazon.aws.utils.sqs import MessageFilteringType
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SqsSensor(AwsBaseSensor[SqsHook]):
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/sensors/ssm.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/ssm.py
index 514c382067d..fc09b0e1c8d 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/ssm.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/ssm.py
@@ -28,7 +28,7 @@ from airflow.providers.amazon.aws.utils import 
validate_execute_complete_event
 from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SsmRunCommandCompletedSensor(AwsBaseSensor[SsmHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/sensors/step_function.py 
b/providers/amazon/src/airflow/providers/amazon/aws/sensors/step_function.py
index da36064aa5e..2df9b4eed3c 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/sensors/step_function.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/sensors/step_function.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.utils.mixins import 
aws_template_fields
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class StepFunctionExecutionSensor(AwsBaseSensor[StepFunctionHook]):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py
index 52aa4570396..c34d3469f2b 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py
@@ -33,7 +33,7 @@ except ModuleNotFoundError as e:
     raise AirflowOptionalProviderFeatureException(e)
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureBlobStorageToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py
index 2c75ef7ad35..5cc47764ce3 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py
@@ -28,7 +28,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.exasol.hooks.exasol import ExasolHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class ExasolToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py
index a0460a2f8ff..251c16a5e26 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/ftp_to_s3.py
@@ -26,7 +26,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.ftp.hooks.ftp import FTPHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class FTPToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index 8ccd006f9e8..76de1a3542a 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -31,7 +31,7 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook
 
 if TYPE_CHECKING:
     from airflow.providers.openlineage.extractors import OperatorLineage
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GCSToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py
index 2574f836248..0b96a27d9b4 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/glacier_to_gcs.py
@@ -26,7 +26,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GlacierToGCSOperator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/google_api_to_s3.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/google_api_to_s3.py
index e73b4c2bc90..1ae0a642990 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/google_api_to_s3.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/google_api_to_s3.py
@@ -31,7 +31,7 @@ from airflow.providers.google.common.hooks.discovery_api 
import GoogleDiscoveryA
 
 if TYPE_CHECKING:
     from airflow.providers.common.compat.sdk import RuntimeTaskInstanceProtocol
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 # MAX XCOM Size is 48KB
 # https://github.com/apache/airflow/pull/1618#discussion_r68249677
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py
index 7d1d1d13484..1e99bf4aa4a 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/hive_to_dynamodb.py
@@ -28,7 +28,7 @@ from airflow.providers.apache.hive.hooks.hive import 
HiveServer2Hook
 from airflow.providers.common.compat.sdk import BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class HiveToDynamoDBOperator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/http_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/http_to_s3.py
index ca7582bbc81..363212239a0 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/http_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/http_to_s3.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
 
     from requests.auth import AuthBase
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class HttpToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/imap_attachment_to_s3.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/imap_attachment_to_s3.py
index 6aa046ea588..eff983ffc01 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/imap_attachment_to_s3.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/imap_attachment_to_s3.py
@@ -27,7 +27,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.imap.hooks.imap import ImapHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class ImapAttachmentToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/local_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/local_to_s3.py
index a47acd77214..6b0b05490fd 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/local_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/local_to_s3.py
@@ -24,7 +24,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.common.compat.sdk import BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class LocalFilesystemToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/mongo_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/mongo_to_s3.py
index 5547e955bbc..e53668ad3ac 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/mongo_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/mongo_to_s3.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
     from pymongo.command_cursor import CommandCursor
     from pymongo.cursor import Cursor
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class MongoToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/redshift_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
index 661d08ec7e4..11ada2454d4 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
@@ -31,7 +31,7 @@ from airflow.providers.amazon.version_compat import NOTSET, 
ArgNotSet, is_arg_se
 from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class RedshiftToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py
index a0460642079..68bcdfaba53 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py
@@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.hooks.dynamodb import 
DynamoDBHook
 from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AttributeDefinition(TypedDict):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py
index c259e8c9f98..2a0a4fb91e8 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_ftp.py
@@ -26,7 +26,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.ftp.hooks.ftp import FTPHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class S3ToFTPOperator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_redshift.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index 29eb5f05b41..a4c371a4df4 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -27,7 +27,7 @@ from airflow.providers.amazon.version_compat import NOTSET, 
ArgNotSet, is_arg_se
 from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 AVAILABLE_METHODS = ["APPEND", "REPLACE", "UPSERT"]
 
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
index eee92bf23eb..0b3b5e1cb84 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sftp.py
@@ -27,7 +27,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.ssh.hooks.ssh import SSHHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class S3ToSFTPOperator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sql.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sql.py
index 5d835208281..d726ca2b306 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sql.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/s3_to_sql.py
@@ -25,7 +25,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.common.compat.sdk import AirflowException, BaseHook, 
BaseOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class S3ToSqlOperator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
index e873e9490db..d3161227fe6 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/salesforce_to_s3.py
@@ -26,7 +26,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.salesforce.hooks.salesforce import SalesforceHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SalesforceToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
index 0399b2168f9..0b3aada1950 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py
@@ -27,7 +27,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.ssh.hooks.ssh import SSHHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SFTPToS3Operator(BaseOperator):
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py 
b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py
index b79179abf03..2c3133229cf 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py
@@ -34,7 +34,7 @@ if TYPE_CHECKING:
     import polars as pl
 
     from airflow.providers.common.sql.hooks.sql import DbApiHook
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class FILE_FORMAT(enum.Enum):
diff --git a/providers/amazon/tests/unit/amazon/aws/sensors/test_emr_base.py 
b/providers/amazon/tests/unit/amazon/aws/sensors/test_emr_base.py
index fd499311625..b7ca00e2b29 100644
--- a/providers/amazon/tests/unit/amazon/aws/sensors/test_emr_base.py
+++ b/providers/amazon/tests/unit/amazon/aws/sensors/test_emr_base.py
@@ -25,11 +25,7 @@ from airflow.providers.amazon.aws.sensors.emr import 
EmrBaseSensor
 from airflow.providers.common.compat.sdk import AirflowException
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 TARGET_STATE = "TARGET_STATE"
 FAILED_STATE = "FAILED_STATE"
diff --git 
a/providers/amazon/tests/unit/amazon/aws/sensors/test_sagemaker_unified_studio.py
 
b/providers/amazon/tests/unit/amazon/aws/sensors/test_sagemaker_unified_studio.py
index 38ffd75e1b5..c30aa3f0855 100644
--- 
a/providers/amazon/tests/unit/amazon/aws/sensors/test_sagemaker_unified_studio.py
+++ 
b/providers/amazon/tests/unit/amazon/aws/sensors/test_sagemaker_unified_studio.py
@@ -20,11 +20,8 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
-from airflow.providers.amazon.aws.sensors.sagemaker_unified_studio import (
-    SageMakerNotebookSensor,
-)
-from airflow.providers.common.compat.sdk import AirflowException
-from airflow.utils.context import Context
+from airflow.providers.amazon.aws.sensors.sagemaker_unified_studio import 
SageMakerNotebookSensor
+from airflow.providers.common.compat.sdk import AirflowException, Context
 
 
 class TestSageMakerNotebookSensor:
diff --git 
a/providers/celery/src/airflow/providers/celery/sensors/celery_queue.py 
b/providers/celery/src/airflow/providers/celery/sensors/celery_queue.py
index 990f8ca9008..50a12e17caf 100644
--- a/providers/celery/src/airflow/providers/celery/sensors/celery_queue.py
+++ b/providers/celery/src/airflow/providers/celery/sensors/celery_queue.py
@@ -29,11 +29,7 @@ else:
     from airflow.sensors.base import BaseSensorOperator  # type: 
ignore[no-redef]
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class CeleryQueueSensor(BaseSensorOperator):
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py
index 20ea0d02a88..e3ff825b9d1 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/callbacks.py
@@ -24,7 +24,7 @@ import kubernetes_asyncio.client as async_k8s
 
 if TYPE_CHECKING:
     from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 client_type: TypeAlias = k8s.CoreV1Api | async_k8s.CoreV1Api
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
index f1a2fa4395e..3ee2fd4f50c 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
@@ -38,7 +38,7 @@ from airflow.providers.common.compat.sdk import (
 )
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 _PYTHON_SCRIPT_ENV = "__PYTHON_SCRIPT"
 _PYTHON_INPUT_ENV = "__PYTHON_INPUT"
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
index 08cd8530653..b4acf727cfc 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
@@ -30,7 +30,7 @@ from airflow.providers.common.compat.sdk import (
 from airflow.utils.operator_helpers import determine_kwargs
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class _KubernetesCmdDecoratedOperator(DecoratedOperator, 
KubernetesPodOperator):
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 397eb7eadea..46cdb3a29fd 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -45,16 +45,15 @@ from airflow.providers.cncf.kubernetes.triggers.job import 
KubernetesJobTrigger
 from airflow.providers.cncf.kubernetes.utils.pod_manager import 
EMPTY_XCOM_RESULT, PodNotFoundException
 from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
 from airflow.providers.common.compat.sdk import AirflowException
+from airflow.utils import yaml
 
 if AIRFLOW_V_3_1_PLUS:
     from airflow.sdk import BaseOperator
 else:
     from airflow.models import BaseOperator
-from airflow.utils import yaml
-from airflow.utils.context import Context
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 log = logging.getLogger(__name__)
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 51b0ae284ea..54331fa6cef 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -98,12 +98,7 @@ if TYPE_CHECKING:
 
     from airflow.providers.cncf.kubernetes.hooks.kubernetes import 
PodOperatorHookProtocol
     from airflow.providers.cncf.kubernetes.secret import Secret
-
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 alphanum_lower = string.ascii_lowercase + string.digits
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index a0aa4c4bdeb..5e1b38305b0 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -36,11 +36,7 @@ from airflow.utils.helpers import prune_dict
 if TYPE_CHECKING:
     import jinja2
 
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SparkKubernetesOperator(KubernetesPodOperator):
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 4c44a03c998..bc69d57f069 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -27,7 +27,7 @@ from airflow.providers.cncf.kubernetes.hooks.kubernetes 
import KubernetesHook
 from airflow.providers.common.compat.sdk import AirflowException, 
BaseSensorOperator
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class SparkKubernetesSensor(BaseSensorOperator):
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 54cda8ac9f0..19fefd9b61d 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -59,7 +59,7 @@ else:
     from airflow.models.xcom import XCom  # type: ignore[no-redef]
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 pytestmark = pytest.mark.db_test
 
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi
 
b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi
index 06daa77ee74..acd1bca2698 100644
--- 
a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi
@@ -38,7 +38,7 @@ from _typeshed import Incomplete as Incomplete
 
 from airflow.models import BaseOperator
 from airflow.providers.common.sql.hooks.sql import DbApiHook as DbApiHook
-from airflow.utils.context import Context as Context
+from airflow.sdk import Context
 
 class GenericTransfer(BaseOperator):
     template_fields: Sequence[str]
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/sensors/sql.pyi 
b/providers/common/sql/src/airflow/providers/common/sql/sensors/sql.pyi
index f72b8ac554b..dc5f916ffb3 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/sensors/sql.pyi
+++ b/providers/common/sql/src/airflow/providers/common/sql/sensors/sql.pyi
@@ -38,7 +38,7 @@ from typing import Any
 from _typeshed import Incomplete as Incomplete
 
 from airflow.providers.common.compat.sdk import BaseSensorOperator
-from airflow.utils.context import Context as Context
+from airflow.sdk import Context
 
 class SqlSensor(BaseSensorOperator):
     template_fields: Sequence[str]
diff --git 
a/providers/databricks/src/airflow/providers/databricks/utils/mixins.py 
b/providers/databricks/src/airflow/providers/databricks/utils/mixins.py
index cfa73e221cb..33519e44191 100644
--- a/providers/databricks/src/airflow/providers/databricks/utils/mixins.py
+++ b/providers/databricks/src/airflow/providers/databricks/utils/mixins.py
@@ -20,18 +20,14 @@ from __future__ import annotations
 
 import time
 from logging import Logger
-from typing import (
-    TYPE_CHECKING,
-    Any,
-    Protocol,
-)
+from typing import TYPE_CHECKING, Any, Protocol
 
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.providers.databricks.hooks.databricks import DatabricksHook, 
SQLStatementState
 from airflow.providers.databricks.triggers.databricks import 
DatabricksSQLStatementExecutionTrigger
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GetHookHasFields(Protocol):
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index e03ba21a301..a3cf66ae552 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -36,7 +36,7 @@ from airflow.providers.dbt.cloud.utils.openlineage import 
generate_openlineage_e
 
 if TYPE_CHECKING:
     from airflow.providers.openlineage.extractors import OperatorLineage
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class DbtCloudRunJobOperatorLink(BaseOperatorLink):
diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py
index e5d8c703586..1032bdbab4c 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py
@@ -28,7 +28,7 @@ from airflow.providers.dbt.cloud.utils.openlineage import 
generate_openlineage_e
 
 if TYPE_CHECKING:
     from airflow.providers.openlineage.extractors import OperatorLineage
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class DbtCloudJobRunSensor(BaseSensorOperator):
diff --git 
a/providers/edge3/src/airflow/providers/edge3/example_dags/win_notepad.py 
b/providers/edge3/src/airflow/providers/edge3/example_dags/win_notepad.py
index 6b875dd2aca..ee85dded3c4 100644
--- a/providers/edge3/src/airflow/providers/edge3/example_dags/win_notepad.py
+++ b/providers/edge3/src/airflow/providers/edge3/example_dags/win_notepad.py
@@ -37,7 +37,7 @@ from airflow.models.dag import DAG
 from airflow.sdk import Param
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class NotepadOperator(BaseOperator):
diff --git a/providers/grpc/src/airflow/providers/grpc/operators/grpc.py 
b/providers/grpc/src/airflow/providers/grpc/operators/grpc.py
index efa13f972bd..4bae0c3bfc7 100644
--- a/providers/grpc/src/airflow/providers/grpc/operators/grpc.py
+++ b/providers/grpc/src/airflow/providers/grpc/operators/grpc.py
@@ -24,11 +24,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.grpc.hooks.grpc import GrpcHook
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GrpcOperator(BaseOperator):
diff --git a/providers/http/src/airflow/providers/http/operators/http.py 
b/providers/http/src/airflow/providers/http/operators/http.py
index 99d3e85c2be..5eeb8f14d3c 100644
--- a/providers/http/src/airflow/providers/http/operators/http.py
+++ b/providers/http/src/airflow/providers/http/operators/http.py
@@ -34,12 +34,7 @@ if TYPE_CHECKING:
     from requests.auth import AuthBase
 
     from airflow.providers.http.hooks.http import HttpHook
-
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class HttpOperator(BaseOperator):
diff --git a/providers/http/src/airflow/providers/http/sensors/http.py 
b/providers/http/src/airflow/providers/http/sensors/http.py
index 399608e321d..2c30f600d3e 100644
--- a/providers/http/src/airflow/providers/http/sensors/http.py
+++ b/providers/http/src/airflow/providers/http/sensors/http.py
@@ -27,18 +27,7 @@ from airflow.providers.http.hooks.http import HttpHook
 from airflow.providers.http.triggers.http import HttpSensorTrigger
 
 if TYPE_CHECKING:
-    from airflow.providers.common.compat.version_compat import 
AIRFLOW_V_3_0_PLUS
-
-    try:
-        from airflow.sdk.definitions.context import Context
-
-        if AIRFLOW_V_3_0_PLUS:
-            from airflow.sdk import PokeReturnValue
-        else:
-            from airflow.sensors.base import PokeReturnValue  # type: 
ignore[no-redef]
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context, PokeReturnValue
 
 
 class HttpSensor(BaseSensorOperator):
diff --git 
a/providers/imap/src/airflow/providers/imap/sensors/imap_attachment.py 
b/providers/imap/src/airflow/providers/imap/sensors/imap_attachment.py
index 4f9ddb2d7a3..f96ad9da325 100644
--- a/providers/imap/src/airflow/providers/imap/sensors/imap_attachment.py
+++ b/providers/imap/src/airflow/providers/imap/sensors/imap_attachment.py
@@ -26,11 +26,7 @@ from airflow.providers.common.compat.sdk import 
BaseSensorOperator
 from airflow.providers.imap.hooks.imap import ImapHook
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class ImapAttachmentSensor(BaseSensorOperator):
diff --git 
a/providers/influxdb/src/airflow/providers/influxdb/operators/influxdb.py 
b/providers/influxdb/src/airflow/providers/influxdb/operators/influxdb.py
index 9ca440e7c16..f786d8e0b2f 100644
--- a/providers/influxdb/src/airflow/providers/influxdb/operators/influxdb.py
+++ b/providers/influxdb/src/airflow/providers/influxdb/operators/influxdb.py
@@ -24,11 +24,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.influxdb.hooks.influxdb import InfluxDBHook
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class InfluxDBOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/asb.py 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/asb.py
index 8dd2d93578e..7e88bac14a5 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/asb.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/asb.py
@@ -49,7 +49,7 @@ if TYPE_CHECKING:
 
     from azure.identity import DefaultAzureCredential
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
     MessageCallback = Callable[[ServiceBusMessage, Context], None]
 
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
index a0f12abdfae..5e6e5e68b32 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py
@@ -23,7 +23,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.microsoft.azure.hooks.data_lake import 
AzureDataLakeHook, AzureDataLakeStorageV2Hook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 DEFAULT_AZURE_DATA_LAKE_CONN_ID = "azure_data_lake_default"
 
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py
index 55548267c76..e12544e9db6 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py
@@ -30,7 +30,7 @@ from airflow.providers.microsoft.azure.hooks.adx import 
AzureDataExplorerHook
 if TYPE_CHECKING:
     from azure.kusto.data._models import KustoResultTable
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureDataExplorerQueryOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py
index 116233095e1..cdcf9d5e87c 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py
@@ -29,7 +29,7 @@ if TYPE_CHECKING:
     from azure.servicebus import ServiceBusMessage
     from azure.servicebus.management import AuthorizationRule, 
CorrelationRuleFilter, SqlRuleFilter
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
     MessageCallback = Callable[[ServiceBusMessage, Context], None]
 
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py
index a35a3f79ed8..e5b36841c66 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py
@@ -27,7 +27,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException, BaseOperator
 from airflow.providers.microsoft.azure.hooks.batch import AzureBatchHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureBatchOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
index f26e04f5c36..3bcacfd4c00 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
@@ -48,7 +48,7 @@ from 
airflow.providers.microsoft.azure.hooks.container_registry import AzureCont
 from airflow.providers.microsoft.azure.hooks.container_volume import 
AzureContainerVolumeHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 Volume = namedtuple(
     "Volume",
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py
index 021a38d733c..5c6f6d72ae3 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py
@@ -24,7 +24,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureCosmosInsertDocumentOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py
index c0e4a537a3a..1c58d198dc7 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py
@@ -41,7 +41,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureDataFactoryPipelineRunLink(LoggingMixin, BaseOperatorLink):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
index 2449ac1a1fd..7399afbe4b2 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
@@ -38,7 +38,7 @@ if TYPE_CHECKING:
 
     from msgraph_core import APIVersion
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 def default_event_handler(event: dict[Any, Any] | None = None, **context) -> 
Any:
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
index 92fb7873186..8a2a7292d7b 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
@@ -32,7 +32,7 @@ if TYPE_CHECKING:
     from msgraph_core import APIVersion
 
     from airflow.models.taskinstancekey import TaskInstanceKey
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class PowerBILink(BaseOperatorLink):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py
index 82eefb397cf..9cb51556cee 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py
@@ -40,7 +40,7 @@ if TYPE_CHECKING:
     from azure.synapse.spark.models import SparkBatchJobOptions
 
     from airflow.models.taskinstancekey import TaskInstanceKey
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureSynapseRunSparkBatchOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
index 631700ff938..91e165f6bf8 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py
@@ -24,7 +24,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class WasbDeleteBlobOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/cosmos.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/cosmos.py
index 5ef8d49afaa..c77e4b8a27e 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/cosmos.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/cosmos.py
@@ -24,7 +24,7 @@ from airflow.providers.common.compat.sdk import 
BaseSensorOperator
 from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureCosmosDocumentSensor(BaseSensorOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py
index c84d98e9ff3..b5a8f62a50d 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/data_factory.py
@@ -31,7 +31,7 @@ from airflow.providers.microsoft.azure.hooks.data_factory 
import (
 from airflow.providers.microsoft.azure.triggers.data_factory import 
ADFPipelineRunStatusSensorTrigger
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/msgraph.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/msgraph.py
index d3aec5a5aa5..040ccac0bb5 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/msgraph.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/msgraph.py
@@ -32,7 +32,7 @@ if TYPE_CHECKING:
 
     from msgraph_core import APIVersion
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class MSGraphSensor(BaseSensorOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/wasb.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/wasb.py
index bb4224521b4..2d62342c5c2 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/wasb.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/sensors/wasb.py
@@ -27,7 +27,7 @@ from airflow.providers.microsoft.azure.hooks.wasb import 
WasbHook
 from airflow.providers.microsoft.azure.triggers.wasb import 
WasbBlobSensorTrigger, WasbPrefixSensorTrigger
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class WasbBlobSensor(BaseSensorOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py
index ee8cf6c3b04..255bcc1a4eb 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py
@@ -23,7 +23,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException, BaseOperator
 from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class LocalFilesystemToADLSOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py
index 06d5b8c3ac7..2678f9dbd0b 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py
@@ -24,7 +24,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class LocalFilesystemToWasbOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
index 4b463012428..6a54be331e9 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py
@@ -28,7 +28,7 @@ from airflow.providers.microsoft.azure.hooks.data_lake import 
AzureDataLakeHook
 from airflow.providers.oracle.hooks.oracle import OracleHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class OracleToAzureDataLakeOperator(BaseOperator):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py
index c66e9724f50..c45c6bd866d 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py
@@ -27,7 +27,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 # Create three custom exception that are
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
index c51e381b9e7..65b1af1b8ad 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
@@ -26,14 +26,14 @@ from functools import cached_property
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
 from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.providers.sftp.hooks.sftp import SFTPHook
 
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
 WILDCARD = "*"
 SftpFile = namedtuple("SftpFile", "sftp_file_path, blob_name")
 
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_asb.py 
b/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_asb.py
index be3046c7455..9c9e61a7cc2 100644
--- a/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_asb.py
+++ b/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_asb.py
@@ -34,11 +34,7 @@ except ImportError:
 from airflow.models import Connection
 from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, 
MessageHook
 
-try:
-    from airflow.sdk.definitions.context import Context
-except ImportError:
-    # TODO: Remove once provider drops support for Airflow 2
-    from airflow.utils.context import Context
+from tests_common.test_utils.compat import Context
 
 MESSAGE = "Test Message"
 MESSAGE_LIST = [f"{MESSAGE} {n}" for n in range(10)]
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_asb.py 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_asb.py
index aff7fe8f8b6..32b557e3eb4 100644
--- a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_asb.py
+++ b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_asb.py
@@ -39,11 +39,7 @@ from airflow.providers.microsoft.azure.operators.asb import (
     AzureServiceBusUpdateSubscriptionOperator,
 )
 
-try:
-    from airflow.sdk.definitions.context import Context
-except ImportError:
-    # TODO: Remove once provider drops support for Airflow 2
-    from airflow.utils.context import Context
+from tests_common.test_utils.compat import Context
 
 QUEUE_NAME = "test_queue"
 MESSAGE = "Test Message"
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
index 6298c4bcef4..9325b318bce 100644
--- 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
+++ 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_container_instances.py
@@ -35,11 +35,7 @@ from azure.mgmt.containerinstance.models import (
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.providers.microsoft.azure.operators.container_instances import 
AzureContainerInstancesOperator
 
-try:
-    from airflow.sdk.definitions.context import Context
-except ImportError:
-    # TODO: Remove once provider drops support for Airflow 2
-    from airflow.utils.context import Context
+from tests_common.test_utils.compat import Context
 
 
 def make_mock_cg(container_state, events=None):
diff --git 
a/providers/microsoft/psrp/src/airflow/providers/microsoft/psrp/operators/psrp.py
 
b/providers/microsoft/psrp/src/airflow/providers/microsoft/psrp/operators/psrp.py
index 56d4c4e8b3f..4b64dc6dac5 100644
--- 
a/providers/microsoft/psrp/src/airflow/providers/microsoft/psrp/operators/psrp.py
+++ 
b/providers/microsoft/psrp/src/airflow/providers/microsoft/psrp/operators/psrp.py
@@ -32,7 +32,7 @@ from airflow.utils.helpers import exactly_one
 if TYPE_CHECKING:
     from pypsrp.powershell import Command
 
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class PsrpOperator(BaseOperator):
diff --git 
a/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
 
b/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
index 13ace2ddbfe..9eb40d62846 100644
--- 
a/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
+++ 
b/providers/microsoft/winrm/src/airflow/providers/microsoft/winrm/operators/winrm.py
@@ -27,7 +27,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException, BaseOperator
 from airflow.providers.microsoft.winrm.hooks.winrm import WinRMHook
 
 if TYPE_CHECKING:
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 # Hide the following error message in urllib3 when making WinRM connections:
 # requests.packages.urllib3.exceptions.HeaderParsingError: 
[StartBoundaryNotFoundDefect(),
diff --git a/providers/mongo/src/airflow/providers/mongo/sensors/mongo.py 
b/providers/mongo/src/airflow/providers/mongo/sensors/mongo.py
index b8b152b49b4..e4abe7fe56f 100644
--- a/providers/mongo/src/airflow/providers/mongo/sensors/mongo.py
+++ b/providers/mongo/src/airflow/providers/mongo/sensors/mongo.py
@@ -24,11 +24,7 @@ from airflow.providers.common.compat.sdk import 
BaseSensorOperator
 from airflow.providers.mongo.hooks.mongo import MongoHook
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class MongoSensor(BaseSensorOperator):
diff --git a/providers/mongo/tests/unit/mongo/sensors/test_mongo.py 
b/providers/mongo/tests/unit/mongo/sensors/test_mongo.py
index e02c54ee269..874beb9bf1e 100644
--- a/providers/mongo/tests/unit/mongo/sensors/test_mongo.py
+++ b/providers/mongo/tests/unit/mongo/sensors/test_mongo.py
@@ -24,11 +24,7 @@ from airflow.providers.mongo.hooks.mongo import MongoHook
 from airflow.providers.mongo.sensors.mongo import MongoSensor
 from airflow.utils import timezone
 
-try:
-    from airflow.sdk.definitions.context import Context
-except ImportError:
-    # TODO: Remove once provider drops support for Airflow 2
-    from airflow.utils.context import Context
+from tests_common.test_utils.compat import Context
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
diff --git a/providers/neo4j/src/airflow/providers/neo4j/operators/neo4j.py 
b/providers/neo4j/src/airflow/providers/neo4j/operators/neo4j.py
index 136b3175253..6333f800d0a 100644
--- a/providers/neo4j/src/airflow/providers/neo4j/operators/neo4j.py
+++ b/providers/neo4j/src/airflow/providers/neo4j/operators/neo4j.py
@@ -24,11 +24,7 @@ from airflow.providers.common.compat.sdk import BaseOperator
 from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class Neo4jOperator(BaseOperator):
diff --git a/providers/neo4j/src/airflow/providers/neo4j/sensors/neo4j.py 
b/providers/neo4j/src/airflow/providers/neo4j/sensors/neo4j.py
index bf4419f402e..8ecc65f56c3 100644
--- a/providers/neo4j/src/airflow/providers/neo4j/sensors/neo4j.py
+++ b/providers/neo4j/src/airflow/providers/neo4j/sensors/neo4j.py
@@ -24,10 +24,7 @@ from airflow.providers.common.compat.sdk import 
AirflowException, BaseSensorOper
 from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class Neo4jSensor(BaseSensorOperator):
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/operators/opensearch.py 
b/providers/opensearch/src/airflow/providers/opensearch/operators/opensearch.py
index b3e0512b54f..265108b8ee4 100644
--- 
a/providers/opensearch/src/airflow/providers/opensearch/operators/opensearch.py
+++ 
b/providers/opensearch/src/airflow/providers/opensearch/operators/opensearch.py
@@ -30,11 +30,7 @@ from airflow.providers.opensearch.hooks.opensearch import 
OpenSearchHook
 if TYPE_CHECKING:
     from opensearchpy import Connection as OpenSearchConnectionClass
 
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class OpenSearchQueryOperator(BaseOperator):
diff --git 
a/providers/pinecone/src/airflow/providers/pinecone/operators/pinecone.py 
b/providers/pinecone/src/airflow/providers/pinecone/operators/pinecone.py
index bdabe442523..7fbb1e57469 100644
--- a/providers/pinecone/src/airflow/providers/pinecone/operators/pinecone.py
+++ b/providers/pinecone/src/airflow/providers/pinecone/operators/pinecone.py
@@ -27,10 +27,7 @@ from airflow.providers.pinecone.hooks.pinecone import 
PineconeHook
 if TYPE_CHECKING:
     from pinecone import Vector
 
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class PineconeIngestOperator(BaseOperator):
diff --git 
a/providers/presto/src/airflow/providers/presto/transfers/gcs_to_presto.py 
b/providers/presto/src/airflow/providers/presto/transfers/gcs_to_presto.py
index 16d255ac0b5..7622150d03a 100644
--- a/providers/presto/src/airflow/providers/presto/transfers/gcs_to_presto.py
+++ b/providers/presto/src/airflow/providers/presto/transfers/gcs_to_presto.py
@@ -30,11 +30,7 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.presto.hooks.presto import PrestoHook
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class GCSToPrestoOperator(BaseOperator):
diff --git a/providers/qdrant/src/airflow/providers/qdrant/operators/qdrant.py 
b/providers/qdrant/src/airflow/providers/qdrant/operators/qdrant.py
index e6c54d8711e..4eccc8455f2 100644
--- a/providers/qdrant/src/airflow/providers/qdrant/operators/qdrant.py
+++ b/providers/qdrant/src/airflow/providers/qdrant/operators/qdrant.py
@@ -27,11 +27,7 @@ from airflow.providers.qdrant.hooks.qdrant import QdrantHook
 if TYPE_CHECKING:
     from qdrant_client.models import VectorStruct
 
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 
 class QdrantIngestOperator(BaseOperator):
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py 
b/providers/standard/tests/unit/standard/operators/test_python.py
index 4061274a566..02f87854e08 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -91,7 +91,7 @@ except ImportError:
 if TYPE_CHECKING:
     from airflow.models.dag import DAG
     from airflow.models.dagrun import DagRun
-    from airflow.utils.context import Context
+    from airflow.sdk import Context
 
 pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
 
diff --git a/providers/teradata/src/airflow/providers/teradata/operators/tpt.py 
b/providers/teradata/src/airflow/providers/teradata/operators/tpt.py
index 51f5f3d047b..51106d59925 100644
--- a/providers/teradata/src/airflow/providers/teradata/operators/tpt.py
+++ b/providers/teradata/src/airflow/providers/teradata/operators/tpt.py
@@ -19,6 +19,10 @@ from __future__ import annotations
 import logging
 from typing import TYPE_CHECKING
 
+from airflow.models import BaseOperator
+from airflow.providers.ssh.hooks.ssh import SSHHook
+from airflow.providers.teradata.hooks.teradata import TeradataHook
+from airflow.providers.teradata.hooks.tpt import TptHook
 from airflow.providers.teradata.utils.tpt_util import (
     get_remote_temp_directory,
     is_valid_file,
@@ -29,16 +33,9 @@ from airflow.providers.teradata.utils.tpt_util import (
 )
 
 if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        from airflow.utils.context import Context
     from paramiko import SSHClient
 
-from airflow.models import BaseOperator
-from airflow.providers.ssh.hooks.ssh import SSHHook
-from airflow.providers.teradata.hooks.teradata import TeradataHook
-from airflow.providers.teradata.hooks.tpt import TptHook
+    from airflow.sdk import Context
 
 
 class DdlOperator(BaseOperator):
diff --git a/shared/dagnode/src/airflow_shared/dagnode/node.py 
b/shared/dagnode/src/airflow_shared/dagnode/node.py
index 2f4504818e2..42b2f6bee3f 100644
--- a/shared/dagnode/src/airflow_shared/dagnode/node.py
+++ b/shared/dagnode/src/airflow_shared/dagnode/node.py
@@ -40,6 +40,7 @@ class GenericDAGNode(Generic[Dag, Task, TaskGroup]):
 
     dag: Dag | None
     task_group: TaskGroup | None
+    downstream_group_ids: set[str | None]
     upstream_task_ids: set[str]
     downstream_task_ids: set[str]
 
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py 
b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
index f8ee0dc68d9..b4503b4177b 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py
@@ -66,7 +66,6 @@ DEFAULT_RETRIES: int = conf.getint("core", 
"default_task_retries", fallback=0)
 DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(
     seconds=conf.getint("core", "default_task_retry_delay", fallback=300)
 )
-DEFAULT_RETRY_DELAY_MULTIPLIER: float = 2.0
 MAX_RETRY_DELAY: int = conf.getint("core", "max_task_retry_delay", fallback=24 
* 60 * 60)
 
 # TODO: Task-SDK -- these defaults should be overridable from the Airflow 
config


Reply via email to