uranusjr commented on code in PR #31153:
URL: https://github.com/apache/airflow/pull/31153#discussion_r1189073083
##########
airflow/models/skipmixin.py:
##########
@@ -60,17 +62,24 @@ class SkipMixin(LoggingMixin):
def _set_state_to_skipped(
self,
dag_run: DagRun | DagRunPydantic,
- tasks: Iterable[Operator],
+ tasks: Iterable[str] | Iterable[tuple[str, int]],
session: Session,
+ include_map_index: bool = False,
) -> None:
"""Used internally to set state of task instances to skipped from the
same dag run."""
now = timezone.utcnow()
+ TI = TaskInstance
+ query = session.query(TI).filter(
+ TI.dag_id == dag_run.dag_id,
+ TI.run_id == dag_run.run_id,
+ )
- session.query(TaskInstance).filter(
- TaskInstance.dag_id == dag_run.dag_id,
- TaskInstance.run_id == dag_run.run_id,
- TaskInstance.task_id.in_(d.task_id for d in tasks),
- ).update(
+ if include_map_index:
+ query = query.filter(tuple_(TI.task_id, TI.map_index).in_(tasks))
Review Comment:
IIRC there are issues using `tuple_` with Microsoft SQL Server. You likely
need to use the helper method `tuple_in_condition` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]