jward-bw commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r412825231



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +153,58 @@ def poke(self, context, session=None):
                 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
                     raise AirflowException('The external task'
-                                           '{} in DAG {} does not 
exist.'.format(self.external_task_id,
-                                                                               
  self.external_dag_id))
+                                           '{} in DAG {} does not exist.'
+                                           .format(self.external_task_id,
+                                                   self.external_dag_id))
             self.has_checked_existence = True
 
+        count_allowed = self.get_count(dttm_filter, session, 
self.allowed_states)
+
+        count_failed = -1
+        if len(self.failed_states) > 0:
+            count_failed = self.get_count(dttm_filter, session, 
self.failed_states)
+
+        session.commit()
+        if count_failed == len(dttm_filter):
+            if self.external_task_id:
+                raise AirflowException('The external task {} in DAG {} failed.'
+                                       .format(self.external_task_id, 
self.external_dag_id))
+            else:
+                raise AirflowException('The external DAG {} failed.'
+                                       .format(self.external_dag_id))
+
+        return count_allowed == len(dttm_filter)
+
+    def get_count(self, dttm_filter, session, states):
+        """
+        get the count of records against dttm filter and states
+        :param dttm_filter: date time filter for execution date
+        :type dttm_filter: list
+        :param session: airflow session object
+        :type session: SASession
+        :param states: task or dag states
+        :type states: list
+        :return: count of record against the filters
+        """
+        TI = TaskInstance
+        DR = DagRun
+
         if self.external_task_id:
             # .count() is inefficient
             count = session.query(func.count()).filter(
                 TI.dag_id == self.external_dag_id,
                 TI.task_id == self.external_task_id,
-                TI.state.in_(self.allowed_states),
+                TI.state.in_(states),  # pylint: disable=no-member

Review comment:
       Since the comment wasn't here before though, and it passed the tests I 
would guess you don't need it. Only on line 204.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to