xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588859705
##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +179,23 @@ def poke(self, context, session=None):
if self.failed_states:
count_failed = self.get_count(dttm_filter, session,
self.failed_states)
- if count_failed == len(dttm_filter):
+ if 0 < count_failed <= len(dttm_filter):
Review comment:
Here I am making the assumption that as long as there is at least one
external task failure, then we will want to fail the sensor. Though this
changes the original behavior, I wonder in what situation users will allow
partial failure?
##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +228,48 @@ def get_count(self, dttm_filter, session, states) -> int:
"""
TI = TaskInstance
DR = DagRun
+
if self.external_task_id:
count = (
- session.query(func.count()) # .count() is inefficient
- .filter(
- TI.dag_id == self.external_dag_id,
- TI.task_id == self.external_task_id,
- TI.state.in_(states), # pylint: disable=no-member
- TI.execution_date.in_(dttm_filter),
- )
+ self._count_query(TI, session, states, dttm_filter)
+ .filter(TI.task_id == self.external_task_id)
.scalar()
)
- else:
+ elif self.external_task_group_id:
+ external_task_group_task_ids =
self.get_external_task_group_task_ids(session)
count = (
- session.query(func.count())
- .filter(
- DR.dag_id == self.external_dag_id,
- DR.state.in_(states), # pylint: disable=no-member
- DR.execution_date.in_(dttm_filter),
- )
+ self._count_query(TI, session, states, dttm_filter)
+ .filter(TI.task_id.in_(external_task_group_task_ids))
.scalar()
- )
+ ) / len(external_task_group_task_ids)
Review comment:
This is trying to converge back to the original behavior, and we don't
need this if we fail it whenever one of the task fails.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]