This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 82153db46a5 Fix mypy type errors in trigger.py (#57457)
82153db46a5 is described below
commit 82153db46a5d0ddb2547cbc34c736cedec1e65e7
Author: LI,JHE-CHEN <[email protected]>
AuthorDate: Thu Oct 30 10:46:46 2025 -0400
Fix mypy type errors in trigger.py (#57457)
---
airflow-core/src/airflow/models/trigger.py | 21 ++++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/models/trigger.py
b/airflow-core/src/airflow/models/trigger.py
index a0d93210870..cd337c69139 100644
--- a/airflow-core/src/airflow/models/trigger.py
+++ b/airflow-core/src/airflow/models/trigger.py
@@ -41,6 +41,7 @@ from airflow.utils.sqlalchemy import UtcDateTime,
get_dialect_name, mapped_colum
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
+ from sqlalchemy import Row
from sqlalchemy.sql import Select
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -233,10 +234,14 @@ class Trigger(Base):
)
if get_dialect_name(session) == "mysql":
# MySQL doesn't support DELETE with JOIN, so we need to do it in
two steps
- ids = session.scalars(ids).all()
- session.execute(
-
delete(Trigger).where(Trigger.id.in_(ids)).execution_options(synchronize_session=False)
- )
+ ids_list = list(session.scalars(ids).all())
+ session.execute(
+
delete(Trigger).where(Trigger.id.in_(ids_list)).execution_options(synchronize_session=False)
+ )
+ else:
+ session.execute(
+
delete(Trigger).where(Trigger.id.in_(ids)).execution_options(synchronize_session=False)
+ )
@classmethod
@provide_session
@@ -308,7 +313,7 @@ class Trigger(Base):
@provide_session
def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION)
-> list[int]:
"""Retrieve a list of trigger ids."""
- return session.scalars(select(cls.id).where(cls.triggerer_id ==
triggerer_id)).all()
+ return list(session.scalars(select(cls.id).where(cls.triggerer_id ==
triggerer_id)).all())
@classmethod
@provide_session
@@ -349,7 +354,7 @@ class Trigger(Base):
if trigger_ids_query:
session.execute(
update(cls)
- .where(cls.id.in_([i.id for i in trigger_ids_query]))
+ .where(cls.id.in_([i[0] for i in trigger_ids_query]))
.values(triggerer_id=triggerer_id)
.execution_options(synchronize_session=False)
)
@@ -365,7 +370,7 @@ class Trigger(Base):
:param alive_triggerer_ids: The alive triggerer ids as a list or a
select query.
:param session: The database session.
"""
- result: list[int] = []
+ result: list[Row[Any]] = []
# Add triggers associated to deadlines first, then tasks, then assets
# It prioritizes deadline triggers, then DAGs over event driven
scheduling which is fair
@@ -447,6 +452,8 @@ def _(event: BaseTaskEndEvent, *, task_instance:
TaskInstance, session: Session)
def _submit_callback_if_necessary() -> None:
"""Submit a callback request if the task state is SUCCESS or FAILED."""
if event.task_instance_state in (TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED):
+ if task_instance.dag_model.relative_fileloc is None:
+ raise RuntimeError("relative_fileloc should not be None for a
finished task")
request = TaskCallbackRequest(
filepath=task_instance.dag_model.relative_fileloc,
ti=task_instance,