This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 82153db46a5 Fix mypy type errors in trigger.py (#57457)
82153db46a5 is described below

commit 82153db46a5d0ddb2547cbc34c736cedec1e65e7
Author: LI,JHE-CHEN <[email protected]>
AuthorDate: Thu Oct 30 10:46:46 2025 -0400

    Fix mypy type errors in trigger.py (#57457)
---
 airflow-core/src/airflow/models/trigger.py | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/airflow-core/src/airflow/models/trigger.py 
b/airflow-core/src/airflow/models/trigger.py
index a0d93210870..cd337c69139 100644
--- a/airflow-core/src/airflow/models/trigger.py
+++ b/airflow-core/src/airflow/models/trigger.py
@@ -41,6 +41,7 @@ from airflow.utils.sqlalchemy import UtcDateTime, 
get_dialect_name, mapped_colum
 from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
+    from sqlalchemy import Row
     from sqlalchemy.sql import Select
 
     from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -233,10 +234,14 @@ class Trigger(Base):
         )
         if get_dialect_name(session) == "mysql":
             # MySQL doesn't support DELETE with JOIN, so we need to do it in 
two steps
-            ids = session.scalars(ids).all()
-        session.execute(
-            
delete(Trigger).where(Trigger.id.in_(ids)).execution_options(synchronize_session=False)
-        )
+            ids_list = list(session.scalars(ids).all())
+            session.execute(
+                
delete(Trigger).where(Trigger.id.in_(ids_list)).execution_options(synchronize_session=False)
+            )
+        else:
+            session.execute(
+                
delete(Trigger).where(Trigger.id.in_(ids)).execution_options(synchronize_session=False)
+            )
 
     @classmethod
     @provide_session
@@ -308,7 +313,7 @@ class Trigger(Base):
     @provide_session
     def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION) 
-> list[int]:
         """Retrieve a list of trigger ids."""
-        return session.scalars(select(cls.id).where(cls.triggerer_id == 
triggerer_id)).all()
+        return list(session.scalars(select(cls.id).where(cls.triggerer_id == 
triggerer_id)).all())
 
     @classmethod
     @provide_session
@@ -349,7 +354,7 @@ class Trigger(Base):
         if trigger_ids_query:
             session.execute(
                 update(cls)
-                .where(cls.id.in_([i.id for i in trigger_ids_query]))
+                .where(cls.id.in_([i[0] for i in trigger_ids_query]))
                 .values(triggerer_id=triggerer_id)
                 .execution_options(synchronize_session=False)
             )
@@ -365,7 +370,7 @@ class Trigger(Base):
         :param alive_triggerer_ids: The alive triggerer ids as a list or a 
select query.
         :param session: The database session.
         """
-        result: list[int] = []
+        result: list[Row[Any]] = []
 
         # Add triggers associated to deadlines first, then tasks, then assets
         # It prioritizes deadline triggers, then DAGs over event driven 
scheduling which is fair
@@ -447,6 +452,8 @@ def _(event: BaseTaskEndEvent, *, task_instance: 
TaskInstance, session: Session)
     def _submit_callback_if_necessary() -> None:
         """Submit a callback request if the task state is SUCCESS or FAILED."""
         if event.task_instance_state in (TaskInstanceState.SUCCESS, 
TaskInstanceState.FAILED):
+            if task_instance.dag_model.relative_fileloc is None:
+                raise RuntimeError("relative_fileloc should not be None for a 
finished task")
             request = TaskCallbackRequest(
                 filepath=task_instance.dag_model.relative_fileloc,
                 ti=task_instance,

Reply via email to