This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f7b3d7912f3 Remove unused `models.TI.defer_task` method (#55233)
f7b3d7912f3 is described below

commit f7b3d7912f34fa4fc2422ba59da5d49788e3c07c
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Sep 4 05:50:14 2025 +0100

    Remove unused `models.TI.defer_task` method (#55233)
    
    This isn't used anymore, we do the deferral via the API-server & Task 
Runner.
---
 airflow-core/src/airflow/models/taskinstance.py | 78 -------------------------
 1 file changed, 78 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 05cd9ce357e..86b4eaf992d 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -73,8 +73,6 @@ from airflow.assets.manager import asset_manager
 from airflow.configuration import conf
 from airflow.exceptions import (
     AirflowInactiveAssetInInletOrOutletException,
-    TaskDeferralError,
-    TaskDeferred,
 )
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.asset import AssetEvent, AssetModel
@@ -1433,82 +1431,6 @@ class TaskInstance(Base, LoggingMixin):
                 .values(last_heartbeat_at=timezone.utcnow())
             )
 
-    @provide_session
-    def defer_task(self, exception: TaskDeferred | None, session: Session = 
NEW_SESSION) -> None:
-        """
-        Mark the task as deferred and sets up the trigger to resume it.
-
-        :meta: private
-        """
-        from airflow.models.trigger import Trigger
-
-        # TODO: TaskSDK add start_trigger_args to SDK definitions
-        if TYPE_CHECKING:
-            assert self.task is not None
-
-        timeout: timedelta | None
-        if exception is not None:
-            trigger_row = Trigger.from_object(exception.trigger)
-            next_method = exception.method_name
-            next_kwargs = exception.kwargs
-            timeout = exception.timeout
-        elif self.task is not None and self.task.start_trigger_args is not 
None:
-            context = self.get_template_context()
-            start_trigger_args = 
self.task.expand_start_trigger_args(context=context)
-            if start_trigger_args is None:
-                raise TaskDeferralError(
-                    "A none 'None' start_trigger_args has been change to 
'None' during expandion"
-                )
-
-            trigger_kwargs = start_trigger_args.trigger_kwargs or {}
-            next_kwargs = start_trigger_args.next_kwargs
-            next_method = start_trigger_args.next_method
-            timeout = start_trigger_args.timeout
-            trigger_row = Trigger(
-                classpath=self.task.start_trigger_args.trigger_cls,
-                kwargs=trigger_kwargs,
-            )
-        else:
-            raise TaskDeferralError("exception and ti.task.start_trigger_args 
cannot both be None")
-
-        # First, make the trigger entry
-        session.add(trigger_row)
-        session.flush()
-
-        if TYPE_CHECKING:
-            assert self.task
-
-        # Then, update ourselves so it matches the deferral request
-        # Keep an eye on the logic in 
`check_and_change_state_before_execution()`
-        # depending on self.next_method semantics
-        self.state = TaskInstanceState.DEFERRED
-        self.trigger_id = trigger_row.id
-        self.next_method = next_method
-        self.next_kwargs = next_kwargs or {}
-
-        # Calculate timeout too if it was passed
-        if timeout is not None:
-            self.trigger_timeout = timezone.utcnow() + timeout
-        else:
-            self.trigger_timeout = None
-
-        # If an execution_timeout is set, set the timeout to the minimum of
-        # it and the trigger timeout
-        execution_timeout = self.task.execution_timeout
-        if execution_timeout:
-            if TYPE_CHECKING:
-                assert self.start_date
-            if self.trigger_timeout:
-                self.trigger_timeout = min(self.start_date + 
execution_timeout, self.trigger_timeout)
-            else:
-                self.trigger_timeout = self.start_date + execution_timeout
-        if self.test_mode:
-            _add_log(event=self.state, task_instance=self, session=session)
-
-        if exception is not None:
-            session.merge(self)
-            session.commit()
-
     @provide_session
     def run(
         self,

Reply via email to