This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7b9617a98fd Remove unused code (#47635)
7b9617a98fd is described below
commit 7b9617a98fd87163f794bd6e14623504caa6750d
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Mar 11 23:08:25 2025 +0530
Remove unused code (#47635)
Found this code wasn't used anywhere so 💣 nuking it
---
airflow/models/abstractoperator.py | 143 -------------------------------------
airflow/models/baseoperator.py | 2 +-
airflow/models/taskinstance.py | 5 --
3 files changed, 1 insertion(+), 149 deletions(-)
diff --git a/airflow/models/abstractoperator.py
b/airflow/models/abstractoperator.py
index e1d909faa87..007e43aa525 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -18,29 +18,21 @@
from __future__ import annotations
import datetime
-from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, Any, Callable
-from sqlalchemy import select
-
from airflow.configuration import conf
from airflow.sdk.definitions._internal.abstractoperator import (
AbstractOperator as TaskSDKAbstractOperator,
NotMapped as NotMapped, # Re-export this for compat
)
from airflow.sdk.definitions.context import Context
-from airflow.utils.db import exists_query
from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.sqlalchemy import with_row_locks
-from airflow.utils.state import State, TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weight_rule import db_safe_priority
if TYPE_CHECKING:
from sqlalchemy.orm import Session
- from airflow.models.dag import DAG as SchedulerDAG
- from airflow.models.taskinstance import TaskInstance
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.task.priority_strategy import PriorityWeightStrategy
from airflow.triggers.base import StartTriggerArgs
@@ -152,138 +144,3 @@ class AbstractOperator(LoggingMixin,
TaskSDKAbstractOperator):
for task_id in self.get_flat_relative_ids(upstream=upstream)
)
)
-
- def expand_mapped_task(self, run_id: str, *, session: Session) ->
tuple[Sequence[TaskInstance], int]:
- """
- Create the mapped task instances for mapped task.
-
- :raise NotMapped: If this task does not need expansion.
- :return: The newly created mapped task instances (if any) in ascending
- order by map index, and the maximum map index value.
- """
- from sqlalchemy import func, or_
-
- from airflow.models.taskinstance import TaskInstance
- from airflow.sdk.definitions.baseoperator import BaseOperator
- from airflow.sdk.definitions.mappedoperator import MappedOperator
- from airflow.settings import task_instance_mutation_hook
-
- if not isinstance(self, (BaseOperator, MappedOperator)):
- raise RuntimeError(
- f"cannot expand unrecognized operator type
{type(self).__module__}.{type(self).__name__}"
- )
-
- from airflow.models.baseoperator import BaseOperator as DBBaseOperator
- from airflow.models.expandinput import NotFullyPopulated
-
- try:
- total_length: int | None =
DBBaseOperator.get_mapped_ti_count(self, run_id, session=session)
- except NotFullyPopulated as e:
- # It's possible that the upstream tasks are not yet done, but we
- # don't have upstream of upstreams in partial DAGs (possible in the
- # mini-scheduler), so we ignore this exception.
- if not self.dag or not self.dag.partial:
- self.log.error(
- "Cannot expand %r for run %s; missing upstream values: %s",
- self,
- run_id,
- sorted(e.missing),
- )
- total_length = None
-
- state: TaskInstanceState | None = None
- unmapped_ti: TaskInstance | None = session.scalars(
- select(TaskInstance).where(
- TaskInstance.dag_id == self.dag_id,
- TaskInstance.task_id == self.task_id,
- TaskInstance.run_id == run_id,
- TaskInstance.map_index == -1,
- or_(TaskInstance.state.in_(State.unfinished),
TaskInstance.state.is_(None)),
- )
- ).one_or_none()
-
- all_expanded_tis: list[TaskInstance] = []
-
- if unmapped_ti:
- if TYPE_CHECKING:
- assert self.dag is None or isinstance(self.dag, SchedulerDAG)
-
- # The unmapped task instance still exists and is unfinished, i.e.
we
- # haven't tried to run it before.
- if total_length is None:
- # If the DAG is partial, it's likely that the upstream tasks
- # are not done yet, so the task can't fail yet.
- if not self.dag or not self.dag.partial:
- unmapped_ti.state = TaskInstanceState.UPSTREAM_FAILED
- elif total_length < 1:
- # If the upstream maps this to a zero-length value, simply mark
- # the unmapped task instance as SKIPPED (if needed).
- self.log.info(
- "Marking %s as SKIPPED since the map has %d values to
expand",
- unmapped_ti,
- total_length,
- )
- unmapped_ti.state = TaskInstanceState.SKIPPED
- else:
- zero_index_ti_exists = exists_query(
- TaskInstance.dag_id == self.dag_id,
- TaskInstance.task_id == self.task_id,
- TaskInstance.run_id == run_id,
- TaskInstance.map_index == 0,
- session=session,
- )
- if not zero_index_ti_exists:
- # Otherwise convert this into the first mapped index, and
create
- # TaskInstance for other indexes.
- unmapped_ti.map_index = 0
- self.log.debug("Updated in place to become %s",
unmapped_ti)
- all_expanded_tis.append(unmapped_ti)
- # execute hook for task instance map index 0
- task_instance_mutation_hook(unmapped_ti)
- session.flush()
- else:
- self.log.debug("Deleting the original task instance: %s",
unmapped_ti)
- session.delete(unmapped_ti)
- state = unmapped_ti.state
-
- if total_length is None or total_length < 1:
- # Nothing to fixup.
- indexes_to_map: Iterable[int] = ()
- else:
- # Only create "missing" ones.
- current_max_mapping = session.scalar(
- select(func.max(TaskInstance.map_index)).where(
- TaskInstance.dag_id == self.dag_id,
- TaskInstance.task_id == self.task_id,
- TaskInstance.run_id == run_id,
- )
- )
- indexes_to_map = range(current_max_mapping + 1, total_length)
-
- for index in indexes_to_map:
- # TODO: Make more efficient with
bulk_insert_mappings/bulk_save_mappings.
- ti = TaskInstance(self, run_id=run_id, map_index=index,
state=state)
- self.log.debug("Expanding TIs upserted %s", ti)
- task_instance_mutation_hook(ti)
- ti = session.merge(ti)
- ti.refresh_from_task(self) # session.merge() loses task
information.
- all_expanded_tis.append(ti)
-
- # Coerce the None case to 0 -- these two are almost treated
identically,
- # except the unmapped ti (if exists) is marked to different states.
- total_expanded_ti_count = total_length or 0
-
- # Any (old) task instances with inapplicable indexes (>= the total
- # number we need) are set to "REMOVED".
- query = select(TaskInstance).where(
- TaskInstance.dag_id == self.dag_id,
- TaskInstance.task_id == self.task_id,
- TaskInstance.run_id == run_id,
- TaskInstance.map_index >= total_expanded_ti_count,
- )
- query = with_row_locks(query, of=TaskInstance, session=session,
skip_locked=True)
- to_update = session.scalars(query)
- for ti in to_update:
- ti.state = TaskInstanceState.REMOVED
- session.flush()
- return all_expanded_tis, total_expanded_ti_count - 1
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index fdc1cc532a3..3a663492c1f 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -87,7 +87,7 @@ if TYPE_CHECKING:
from airflow.models.dag import DAG as SchedulerDAG
from airflow.models.operator import Operator
from airflow.sdk import BaseOperatorLink
- from airflow.sdk.definitions.node import DAGNode
+ from airflow.sdk.definitions._internal.node import DAGNode
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import StartTriggerArgs
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5982a465603..fe7e3954f55 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3402,11 +3402,6 @@ class TaskInstance(Base, LoggingMixin):
)
return num_running_task_instances_query.scalar()
- def init_run_context(self, raw: bool = False) -> None:
- """Set the log context."""
- self.raw = raw
- self._set_context(self)
-
@staticmethod
def filter_for_tis(tis: Iterable[TaskInstance | TaskInstanceKey]) ->
BooleanClauseList | None:
"""Return SQLAlchemy filter to query selected task instances."""