This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f7b3d7912f3 Remove unused `models.TI.defer_task` method (#55233)
f7b3d7912f3 is described below
commit f7b3d7912f34fa4fc2422ba59da5d49788e3c07c
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Sep 4 05:50:14 2025 +0100
Remove unused `models.TI.defer_task` method (#55233)
This isn't used anymore, we do the deferral via the API-server & Task
Runner.
---
airflow-core/src/airflow/models/taskinstance.py | 78 -------------------------
1 file changed, 78 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 05cd9ce357e..86b4eaf992d 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -73,8 +73,6 @@ from airflow.assets.manager import asset_manager
from airflow.configuration import conf
from airflow.exceptions import (
AirflowInactiveAssetInInletOrOutletException,
- TaskDeferralError,
- TaskDeferred,
)
from airflow.listeners.listener import get_listener_manager
from airflow.models.asset import AssetEvent, AssetModel
@@ -1433,82 +1431,6 @@ class TaskInstance(Base, LoggingMixin):
.values(last_heartbeat_at=timezone.utcnow())
)
- @provide_session
- def defer_task(self, exception: TaskDeferred | None, session: Session =
NEW_SESSION) -> None:
- """
- Mark the task as deferred and sets up the trigger to resume it.
-
- :meta: private
- """
- from airflow.models.trigger import Trigger
-
- # TODO: TaskSDK add start_trigger_args to SDK definitions
- if TYPE_CHECKING:
- assert self.task is not None
-
- timeout: timedelta | None
- if exception is not None:
- trigger_row = Trigger.from_object(exception.trigger)
- next_method = exception.method_name
- next_kwargs = exception.kwargs
- timeout = exception.timeout
- elif self.task is not None and self.task.start_trigger_args is not
None:
- context = self.get_template_context()
- start_trigger_args =
self.task.expand_start_trigger_args(context=context)
- if start_trigger_args is None:
- raise TaskDeferralError(
- "A none 'None' start_trigger_args has been change to
'None' during expandion"
- )
-
- trigger_kwargs = start_trigger_args.trigger_kwargs or {}
- next_kwargs = start_trigger_args.next_kwargs
- next_method = start_trigger_args.next_method
- timeout = start_trigger_args.timeout
- trigger_row = Trigger(
- classpath=self.task.start_trigger_args.trigger_cls,
- kwargs=trigger_kwargs,
- )
- else:
- raise TaskDeferralError("exception and ti.task.start_trigger_args
cannot both be None")
-
- # First, make the trigger entry
- session.add(trigger_row)
- session.flush()
-
- if TYPE_CHECKING:
- assert self.task
-
- # Then, update ourselves so it matches the deferral request
- # Keep an eye on the logic in
`check_and_change_state_before_execution()`
- # depending on self.next_method semantics
- self.state = TaskInstanceState.DEFERRED
- self.trigger_id = trigger_row.id
- self.next_method = next_method
- self.next_kwargs = next_kwargs or {}
-
- # Calculate timeout too if it was passed
- if timeout is not None:
- self.trigger_timeout = timezone.utcnow() + timeout
- else:
- self.trigger_timeout = None
-
- # If an execution_timeout is set, set the timeout to the minimum of
- # it and the trigger timeout
- execution_timeout = self.task.execution_timeout
- if execution_timeout:
- if TYPE_CHECKING:
- assert self.start_date
- if self.trigger_timeout:
- self.trigger_timeout = min(self.start_date +
execution_timeout, self.trigger_timeout)
- else:
- self.trigger_timeout = self.start_date + execution_timeout
- if self.test_mode:
- _add_log(event=self.state, task_instance=self, session=session)
-
- if exception is not None:
- session.merge(self)
- session.commit()
-
@provide_session
def run(
self,