This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch task-sdk-first-code in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0bdef03387b8e9ad16cd2fa3108263eca589d026 Author: Kaxil Naik <[email protected]> AuthorDate: Wed Oct 23 13:04:54 2024 +0100 Replace DagContext from core to Task SDK [skip ci] --- airflow/decorators/base.py | 2 +- airflow/models/dag.py | 21 --------------------- .../src/airflow/sdk/definitions/contextmanager.py | 6 +++++- 3 files changed, 6 insertions(+), 23 deletions(-) diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index 3290f5342ce..fe85c07ccd0 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -49,7 +49,6 @@ from airflow.models.baseoperator import ( get_merged_defaults, parse_retries, ) -from airflow.models.dag import DagContext from airflow.models.expandinput import ( EXPAND_INPUT_EMPTY, DictOfListsExpandInput, @@ -60,6 +59,7 @@ from airflow.models.mappedoperator import MappedOperator, ensure_xcomarg_return_ from airflow.models.pool import Pool from airflow.models.xcom_arg import XComArg from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator +from airflow.sdk.definitions.contextmanager import DagContext from airflow.typing_compat import ParamSpec, Protocol from airflow.utils import timezone from airflow.utils.context import KNOWN_CONTEXT_KEYS diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 230087602ca..fc0928ddcc1 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -36,7 +36,6 @@ from typing import ( Collection, Container, Iterable, - Optional, Pattern, Sequence, Union, @@ -71,8 +70,6 @@ from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import backref, relationship from sqlalchemy.sql import Select, expression -import airflow.sdk.definitions.contextmanager -import airflow.templates from airflow import settings, utils from airflow.api_internal.internal_api_call import internal_api_call from airflow.assets import Asset, AssetAlias, BaseAsset @@ -2424,24 +2421,6 @@ if STATICA_HACK: # pragma: no cover """:sphinx-autoapi-skip:""" -class DagContext(airflow.sdk.definitions.contextmanager.DagContext, share_parent_context=True): - """:meta private:""" # noqa: D400 - - # TODO: Method is not used anywhere. Remove them if not needed. - @classmethod - def push_context_managed_dag(cls, dag: DAG): - cls.push(dag) - - # TODO: Method is not used anywhere. Remove them if not needed. - @classmethod - def pop_context_managed_dag(cls) -> DAG | None: - return cast(DAG, cls.pop()) - - @classmethod - def get_current_dag(cls) -> DAG | None: - return cast(Optional[DAG], cls.get_current()) - - def _run_inline_trigger(trigger): async def _run_inline_trigger_main(): async for event in trigger.run(): diff --git a/task_sdk/src/airflow/sdk/definitions/contextmanager.py b/task_sdk/src/airflow/sdk/definitions/contextmanager.py index a22662a0619..8b5458c65b9 100644 --- a/task_sdk/src/airflow/sdk/definitions/contextmanager.py +++ b/task_sdk/src/airflow/sdk/definitions/contextmanager.py @@ -20,7 +20,7 @@ from __future__ import annotations import sys from collections import deque from types import ModuleType -from typing import Any, Generic, TypeVar +from typing import Any, Generic, Optional, TypeVar, cast from airflow.sdk.definitions.dag import DAG from airflow.sdk.definitions.taskgroup import TaskGroup @@ -107,6 +107,10 @@ class DagContext(ContextStack[DAG]): cls.autoregistered_dags.add((dag, mod)) return dag + @classmethod + def get_current_dag(cls) -> DAG | None: + return cast(Optional[DAG], cls.get_current()) + class TaskGroupContext(ContextStack[TaskGroup]): """TaskGroup context is used to keep the current TaskGroup when TaskGroup is used as ContextManager."""
