[ 
https://issues.apache.org/jira/browse/AIRFLOW-4568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094275#comment-17094275
 ] 

ASF GitHub Bot commented on AIRFLOW-4568:
-----------------------------------------

turbaszek commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r416414485



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + 
len(self.allowed_states)):

Review comment:
       ```suggestion
           if set(self.failed_states).intersection(set(self.allowed_states)):
   ```
   WDYT? Personally I would cast states to set even earlier. 

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + 
len(self.allowed_states)):
+            raise AirflowException("Duplicate values provided as allowed "
+                                   "`{}` and failed states `{}`"
+                                   .format(self.allowed_states, 
self.failed_states))

Review comment:
       Let's use f-string :)

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + 
len(self.allowed_states)):
+            raise AirflowException("Duplicate values provided as allowed "
+                                   "`{}` and failed states `{}`"
+                                   .format(self.allowed_states, 
self.failed_states))
+
         if external_task_id:
-            if not set(self.allowed_states) <= set(State.task_states):
+            if not set(total_states) <= set(State.task_states):

Review comment:
       I think, we should cast `total_states` to set in constructor 

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +152,58 @@ def poke(self, context, session=None):
                 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
                     raise AirflowException('The external task'
-                                           '{} in DAG {} does not 
exist.'.format(self.external_task_id,
-                                                                               
  self.external_dag_id))
+                                           '{} in DAG {} does not exist.'
+                                           .format(self.external_task_id,
+                                                   self.external_dag_id))
             self.has_checked_existence = True
 
+        count_allowed = self.get_count(dttm_filter, session, 
self.allowed_states)
+
+        count_failed = -1
+        if len(self.failed_states) > 0:
+            count_failed = self.get_count(dttm_filter, session, 
self.failed_states)
+
+        session.commit()
+        if count_failed == len(dttm_filter):
+            if self.external_task_id:
+                raise AirflowException('The external task {} in DAG {} failed.'
+                                       .format(self.external_task_id, 
self.external_dag_id))
+            else:
+                raise AirflowException('The external DAG {} failed.'
+                                       .format(self.external_dag_id))

Review comment:
       Let's use f-string

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)

Review comment:
       ```suggestion
           total_states = self.allowed_states + self.failed_states
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> The ExternalTaskSensor should be configurable to raise an Airflow Exception 
> in case the poked external task reaches a disallowed state, such as f.i. 
> failed
> -----------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4568
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4568
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: operators
>    Affects Versions: 1.10.3
>            Reporter: ddluke
>            Priority: Minor
>
> _As an engineer, I would like to have the behavior of the ExternalTaskSensor 
> changed_
> _So that it fails in case the poked external_task_id fails_
> *Therefore*
>  * I suggest extending the behavior of the sensor to optionally also query 
> the TaskInstance for disallowed states and raise an AirflowException if 
> found. Currently, if the poked external task reaches a failed state, the 
> sensor continues to poke and does not terminate
> *Acceptance Criteria (from my pov)*
>  * The class interface for ExternalTaskSensor is extended with an additional 
> parameter, disallowed_states, which is an Optional List of 
> airflow.utils.state.State
>  * The poke method is expanded to count the number of rows from TaskInstance 
> which met the filter criteria dag_id, task_id, disallowed_states and 
> dttm_filter if disallowed_states is not None
>  * If disallowed_states is not None and the above query returns a counter > 
> 0, an Airflow Exception is thrown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to