lokeshlal commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r412900398
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +153,58 @@ def poke(self, context, session=None):
refreshed_dag_info =
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task(self.external_task_id):
raise AirflowException('The external task'
- '{} in DAG {} does not
exist.'.format(self.external_task_id,
-
self.external_dag_id))
+ '{} in DAG {} does not exist.'
+ .format(self.external_task_id,
+ self.external_dag_id))
self.has_checked_existence = True
+ count_allowed = self.get_count(dttm_filter, session,
self.allowed_states)
+
+ count_failed = -1
+ if len(self.failed_states) > 0:
+ count_failed = self.get_count(dttm_filter, session,
self.failed_states)
+
+ session.commit()
+ if count_failed == len(dttm_filter):
+ if self.external_task_id:
+ raise AirflowException('The external task {} in DAG {} failed.'
+ .format(self.external_task_id,
self.external_dag_id))
+ else:
+ raise AirflowException('The external DAG {} failed.'
+ .format(self.external_dag_id))
+
+ return count_allowed == len(dttm_filter)
+
+ def get_count(self, dttm_filter, session, states):
+ """
+ get the count of records against dttm filter and states
+ :param dttm_filter: date time filter for execution date
+ :type dttm_filter: list
+ :param session: airflow session object
+ :type session: SASession
+ :param states: task or dag states
+ :type states: list
+ :return: count of record against the filters
+ """
+ TI = TaskInstance
+ DR = DagRun
+
if self.external_task_id:
# .count() is inefficient
count = session.query(func.count()).filter(
TI.dag_id == self.external_dag_id,
TI.task_id == self.external_task_id,
- TI.state.in_(self.allowed_states),
+ TI.state.in_(states), # pylint: disable=no-member
Review comment:
Thanks jward-bw
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]