This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b9617a98fd Remove unused code (#47635)
7b9617a98fd is described below

commit 7b9617a98fd87163f794bd6e14623504caa6750d
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Mar 11 23:08:25 2025 +0530

    Remove unused code (#47635)
    
    Found this code wasn't used anywhere so 💣 nuking it
---
 airflow/models/abstractoperator.py | 143 -------------------------------------
 airflow/models/baseoperator.py     |   2 +-
 airflow/models/taskinstance.py     |   5 --
 3 files changed, 1 insertion(+), 149 deletions(-)

diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index e1d909faa87..007e43aa525 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -18,29 +18,21 @@
 from __future__ import annotations
 
 import datetime
-from collections.abc import Iterable, Sequence
 from typing import TYPE_CHECKING, Any, Callable
 
-from sqlalchemy import select
-
 from airflow.configuration import conf
 from airflow.sdk.definitions._internal.abstractoperator import (
     AbstractOperator as TaskSDKAbstractOperator,
     NotMapped as NotMapped,  # Re-export this for compat
 )
 from airflow.sdk.definitions.context import Context
-from airflow.utils.db import exists_query
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.sqlalchemy import with_row_locks
-from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.weight_rule import db_safe_priority
 
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
-    from airflow.models.dag import DAG as SchedulerDAG
-    from airflow.models.taskinstance import TaskInstance
     from airflow.sdk.definitions.baseoperator import BaseOperator
     from airflow.task.priority_strategy import PriorityWeightStrategy
     from airflow.triggers.base import StartTriggerArgs
@@ -152,138 +144,3 @@ class AbstractOperator(LoggingMixin, 
TaskSDKAbstractOperator):
                 for task_id in self.get_flat_relative_ids(upstream=upstream)
             )
         )
-
-    def expand_mapped_task(self, run_id: str, *, session: Session) -> 
tuple[Sequence[TaskInstance], int]:
-        """
-        Create the mapped task instances for mapped task.
-
-        :raise NotMapped: If this task does not need expansion.
-        :return: The newly created mapped task instances (if any) in ascending
-            order by map index, and the maximum map index value.
-        """
-        from sqlalchemy import func, or_
-
-        from airflow.models.taskinstance import TaskInstance
-        from airflow.sdk.definitions.baseoperator import BaseOperator
-        from airflow.sdk.definitions.mappedoperator import MappedOperator
-        from airflow.settings import task_instance_mutation_hook
-
-        if not isinstance(self, (BaseOperator, MappedOperator)):
-            raise RuntimeError(
-                f"cannot expand unrecognized operator type 
{type(self).__module__}.{type(self).__name__}"
-            )
-
-        from airflow.models.baseoperator import BaseOperator as DBBaseOperator
-        from airflow.models.expandinput import NotFullyPopulated
-
-        try:
-            total_length: int | None = 
DBBaseOperator.get_mapped_ti_count(self, run_id, session=session)
-        except NotFullyPopulated as e:
-            # It's possible that the upstream tasks are not yet done, but we
-            # don't have upstream of upstreams in partial DAGs (possible in the
-            # mini-scheduler), so we ignore this exception.
-            if not self.dag or not self.dag.partial:
-                self.log.error(
-                    "Cannot expand %r for run %s; missing upstream values: %s",
-                    self,
-                    run_id,
-                    sorted(e.missing),
-                )
-            total_length = None
-
-        state: TaskInstanceState | None = None
-        unmapped_ti: TaskInstance | None = session.scalars(
-            select(TaskInstance).where(
-                TaskInstance.dag_id == self.dag_id,
-                TaskInstance.task_id == self.task_id,
-                TaskInstance.run_id == run_id,
-                TaskInstance.map_index == -1,
-                or_(TaskInstance.state.in_(State.unfinished), 
TaskInstance.state.is_(None)),
-            )
-        ).one_or_none()
-
-        all_expanded_tis: list[TaskInstance] = []
-
-        if unmapped_ti:
-            if TYPE_CHECKING:
-                assert self.dag is None or isinstance(self.dag, SchedulerDAG)
-
-            # The unmapped task instance still exists and is unfinished, i.e. 
we
-            # haven't tried to run it before.
-            if total_length is None:
-                # If the DAG is partial, it's likely that the upstream tasks
-                # are not done yet, so the task can't fail yet.
-                if not self.dag or not self.dag.partial:
-                    unmapped_ti.state = TaskInstanceState.UPSTREAM_FAILED
-            elif total_length < 1:
-                # If the upstream maps this to a zero-length value, simply mark
-                # the unmapped task instance as SKIPPED (if needed).
-                self.log.info(
-                    "Marking %s as SKIPPED since the map has %d values to 
expand",
-                    unmapped_ti,
-                    total_length,
-                )
-                unmapped_ti.state = TaskInstanceState.SKIPPED
-            else:
-                zero_index_ti_exists = exists_query(
-                    TaskInstance.dag_id == self.dag_id,
-                    TaskInstance.task_id == self.task_id,
-                    TaskInstance.run_id == run_id,
-                    TaskInstance.map_index == 0,
-                    session=session,
-                )
-                if not zero_index_ti_exists:
-                    # Otherwise convert this into the first mapped index, and 
create
-                    # TaskInstance for other indexes.
-                    unmapped_ti.map_index = 0
-                    self.log.debug("Updated in place to become %s", 
unmapped_ti)
-                    all_expanded_tis.append(unmapped_ti)
-                    # execute hook for task instance map index 0
-                    task_instance_mutation_hook(unmapped_ti)
-                    session.flush()
-                else:
-                    self.log.debug("Deleting the original task instance: %s", 
unmapped_ti)
-                    session.delete(unmapped_ti)
-                state = unmapped_ti.state
-
-        if total_length is None or total_length < 1:
-            # Nothing to fixup.
-            indexes_to_map: Iterable[int] = ()
-        else:
-            # Only create "missing" ones.
-            current_max_mapping = session.scalar(
-                select(func.max(TaskInstance.map_index)).where(
-                    TaskInstance.dag_id == self.dag_id,
-                    TaskInstance.task_id == self.task_id,
-                    TaskInstance.run_id == run_id,
-                )
-            )
-            indexes_to_map = range(current_max_mapping + 1, total_length)
-
-        for index in indexes_to_map:
-            # TODO: Make more efficient with 
bulk_insert_mappings/bulk_save_mappings.
-            ti = TaskInstance(self, run_id=run_id, map_index=index, 
state=state)
-            self.log.debug("Expanding TIs upserted %s", ti)
-            task_instance_mutation_hook(ti)
-            ti = session.merge(ti)
-            ti.refresh_from_task(self)  # session.merge() loses task 
information.
-            all_expanded_tis.append(ti)
-
-        # Coerce the None case to 0 -- these two are almost treated 
identically,
-        # except the unmapped ti (if exists) is marked to different states.
-        total_expanded_ti_count = total_length or 0
-
-        # Any (old) task instances with inapplicable indexes (>= the total
-        # number we need) are set to "REMOVED".
-        query = select(TaskInstance).where(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == run_id,
-            TaskInstance.map_index >= total_expanded_ti_count,
-        )
-        query = with_row_locks(query, of=TaskInstance, session=session, 
skip_locked=True)
-        to_update = session.scalars(query)
-        for ti in to_update:
-            ti.state = TaskInstanceState.REMOVED
-        session.flush()
-        return all_expanded_tis, total_expanded_ti_count - 1
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index fdc1cc532a3..3a663492c1f 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -87,7 +87,7 @@ if TYPE_CHECKING:
     from airflow.models.dag import DAG as SchedulerDAG
     from airflow.models.operator import Operator
     from airflow.sdk import BaseOperatorLink
-    from airflow.sdk.definitions.node import DAGNode
+    from airflow.sdk.definitions._internal.node import DAGNode
     from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
     from airflow.triggers.base import StartTriggerArgs
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5982a465603..fe7e3954f55 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3402,11 +3402,6 @@ class TaskInstance(Base, LoggingMixin):
             )
         return num_running_task_instances_query.scalar()
 
-    def init_run_context(self, raw: bool = False) -> None:
-        """Set the log context."""
-        self.raw = raw
-        self._set_context(self)
-
     @staticmethod
     def filter_for_tis(tis: Iterable[TaskInstance | TaskInstanceKey]) -> 
BooleanClauseList | None:
         """Return SQLAlchemy filter to query selected task instances."""

Reply via email to