This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new feb39ed3788 Remove AIP-44 from airflow/cli/commands/task_command.py
(#44521)
feb39ed3788 is described below
commit feb39ed3788e22fd83a1cb294a824e60f4c13187
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat Nov 30 22:55:27 2024 +0100
Remove AIP-44 from airflow/cli/commands/task_command.py (#44521)
---
airflow/cli/commands/task_command.py | 35 ++++++-----------------------------
1 file changed, 6 insertions(+), 29 deletions(-)
diff --git a/airflow/cli/commands/task_command.py
b/airflow/cli/commands/task_command.py
index 4748aea2bbf..9aa909f06ce 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -35,7 +35,6 @@ from pendulum.parsing.exceptions import ParserError
from sqlalchemy import select
from airflow import settings
-from airflow.api_internal.internal_api_call import internal_api_call
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred,
TaskInstanceNotFound
@@ -193,10 +192,8 @@ def _get_dag_run(
raise ValueError(f"unknown create_if_necessary value:
{create_if_necessary!r}")
-@internal_api_call
@provide_session
-def _get_ti_db_access(
- dag: DAG,
+def _get_ti(
task: Operator,
map_index: int,
*,
@@ -204,8 +201,11 @@ def _get_ti_db_access(
pool: str | None = None,
create_if_necessary: CreateIfNecessary = False,
session: Session = NEW_SESSION,
-) -> tuple[TaskInstance | TaskInstancePydantic, bool]:
- """Get the task instance through DagRun.run_id, if that fails, get the TI
the old way."""
+):
+ dag = task.dag
+ if dag is None:
+ raise ValueError("Cannot get task instance for a task not assigned to
a DAG")
+
# this check is imperfect because diff dags could have tasks with same name
# but in a task, dag_id is a property that accesses its dag, and we don't
# currently include the dag when serializing an operator
@@ -242,29 +242,6 @@ def _get_ti_db_access(
else:
ti = ti_or_none
ti.refresh_from_task(task, pool_override=pool)
- return ti, dr_created
-
-
-def _get_ti(
- task: Operator,
- map_index: int,
- *,
- logical_date_or_run_id: str | None = None,
- pool: str | None = None,
- create_if_necessary: CreateIfNecessary = False,
-):
- dag = task.dag
- if dag is None:
- raise ValueError("Cannot get task instance for a task not assigned to
a DAG")
-
- ti, dr_created = _get_ti_db_access(
- dag=dag,
- task=task,
- map_index=map_index,
- logical_date_or_run_id=logical_date_or_run_id,
- pool=pool,
- create_if_necessary=create_if_necessary,
- )
# we do refresh_from_task so that if TI has come back via RPC, we ensure
that ti.task
# is the original task object and not the result of the round trip