xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588859705



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +179,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, 
self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if 0 < count_failed <= len(dttm_filter):

Review comment:
       Here I am making the assumption that as long as there is at least one 
external task failure, then we will want to fail the sensor. Though this 
changes the original behavior, I wonder in what situation users will allow 
partial failure?

##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +228,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = 
self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)

Review comment:
       This is trying to converge back to the original behavior, and we don't 
need this if we fail it whenever one of the task fails.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to