[
https://issues.apache.org/jira/browse/AIRFLOW-4568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089578#comment-17089578
]
ASF GitHub Bot commented on AIRFLOW-4568:
-----------------------------------------
lokeshlal commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r412900398
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +153,58 @@ def poke(self, context, session=None):
refreshed_dag_info =
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task(self.external_task_id):
raise AirflowException('The external task'
- '{} in DAG {} does not
exist.'.format(self.external_task_id,
-
self.external_dag_id))
+ '{} in DAG {} does not exist.'
+ .format(self.external_task_id,
+ self.external_dag_id))
self.has_checked_existence = True
+ count_allowed = self.get_count(dttm_filter, session,
self.allowed_states)
+
+ count_failed = -1
+ if len(self.failed_states) > 0:
+ count_failed = self.get_count(dttm_filter, session,
self.failed_states)
+
+ session.commit()
+ if count_failed == len(dttm_filter):
+ if self.external_task_id:
+ raise AirflowException('The external task {} in DAG {} failed.'
+ .format(self.external_task_id,
self.external_dag_id))
+ else:
+ raise AirflowException('The external DAG {} failed.'
+ .format(self.external_dag_id))
+
+ return count_allowed == len(dttm_filter)
+
+ def get_count(self, dttm_filter, session, states):
+ """
+ get the count of records against dttm filter and states
+ :param dttm_filter: date time filter for execution date
+ :type dttm_filter: list
+ :param session: airflow session object
+ :type session: SASession
+ :param states: task or dag states
+ :type states: list
+ :return: count of record against the filters
+ """
+ TI = TaskInstance
+ DR = DagRun
+
if self.external_task_id:
# .count() is inefficient
count = session.query(func.count()).filter(
TI.dag_id == self.external_dag_id,
TI.task_id == self.external_task_id,
- TI.state.in_(self.allowed_states),
+ TI.state.in_(states), # pylint: disable=no-member
Review comment:
Thanks jward-bw
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> The ExternalTaskSensor should be configurable to raise an Airflow Exception
> in case the poked external task reaches a disallowed state, such as f.i.
> failed
> -----------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-4568
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4568
> Project: Apache Airflow
> Issue Type: Improvement
> Components: operators
> Affects Versions: 1.10.3
> Reporter: ddluke
> Priority: Minor
>
> _As an engineer, I would like to have the behavior of the ExternalTaskSensor
> changed_
> _So that it fails in case the poked external_task_id fails_
> *Therefore*
> * I suggest extending the behavior of the sensor to optionally also query
> the TaskInstance for disallowed states and raise an AirflowException if
> found. Currently, if the poked external task reaches a failed state, the
> sensor continues to poke and does not terminate
> *Acceptance Criteria (from my pov)*
> * The class interface for ExternalTaskSensor is extended with an additional
> parameter, disallowed_states, which is an Optional List of
> airflow.utils.state.State
> * The poke method is expanded to count the number of rows from TaskInstance
> which met the filter criteria dag_id, task_id, disallowed_states and
> dttm_filter if disallowed_states is not None
> * If disallowed_states is not None and the above query returns a counter >
> 0, an Airflow Exception is thrown
--
This message was sent by Atlassian Jira
(v8.3.4#803005)