turbaszek commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r416414485
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
external_dag_id,
external_task_id=None,
allowed_states=None,
+ failed_states=None,
execution_delta=None,
execution_date_fn=None,
check_existence=False,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.allowed_states = allowed_states or [State.SUCCESS]
+ self.failed_states = failed_states or []
+
+ total_states = []
+ total_states.extend(self.allowed_states)
+ total_states.extend(self.failed_states)
+
+ if len(list(set(total_states))) < (len(self.failed_states) +
len(self.allowed_states)):
Review comment:
```suggestion
if set(self.failed_states).intersection(set(self.allowed_states)):
```
WDYT? Personally I would cast states to set even earlier.
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
external_dag_id,
external_task_id=None,
allowed_states=None,
+ failed_states=None,
execution_delta=None,
execution_date_fn=None,
check_existence=False,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.allowed_states = allowed_states or [State.SUCCESS]
+ self.failed_states = failed_states or []
+
+ total_states = []
+ total_states.extend(self.allowed_states)
+ total_states.extend(self.failed_states)
+
+ if len(list(set(total_states))) < (len(self.failed_states) +
len(self.allowed_states)):
+ raise AirflowException("Duplicate values provided as allowed "
+ "`{}` and failed states `{}`"
+ .format(self.allowed_states,
self.failed_states))
Review comment:
Let's use f-string :)
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
external_dag_id,
external_task_id=None,
allowed_states=None,
+ failed_states=None,
execution_delta=None,
execution_date_fn=None,
check_existence=False,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.allowed_states = allowed_states or [State.SUCCESS]
+ self.failed_states = failed_states or []
+
+ total_states = []
+ total_states.extend(self.allowed_states)
+ total_states.extend(self.failed_states)
+
+ if len(list(set(total_states))) < (len(self.failed_states) +
len(self.allowed_states)):
+ raise AirflowException("Duplicate values provided as allowed "
+ "`{}` and failed states `{}`"
+ .format(self.allowed_states,
self.failed_states))
+
if external_task_id:
- if not set(self.allowed_states) <= set(State.task_states):
+ if not set(total_states) <= set(State.task_states):
Review comment:
I think, we should cast `total_states` to set in constructor
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +152,58 @@ def poke(self, context, session=None):
refreshed_dag_info =
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task(self.external_task_id):
raise AirflowException('The external task'
- '{} in DAG {} does not
exist.'.format(self.external_task_id,
-
self.external_dag_id))
+ '{} in DAG {} does not exist.'
+ .format(self.external_task_id,
+ self.external_dag_id))
self.has_checked_existence = True
+ count_allowed = self.get_count(dttm_filter, session,
self.allowed_states)
+
+ count_failed = -1
+ if len(self.failed_states) > 0:
+ count_failed = self.get_count(dttm_filter, session,
self.failed_states)
+
+ session.commit()
+ if count_failed == len(dttm_filter):
+ if self.external_task_id:
+ raise AirflowException('The external task {} in DAG {} failed.'
+ .format(self.external_task_id,
self.external_dag_id))
+ else:
+ raise AirflowException('The external DAG {} failed.'
+ .format(self.external_dag_id))
Review comment:
Let's use f-string
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
external_dag_id,
external_task_id=None,
allowed_states=None,
+ failed_states=None,
execution_delta=None,
execution_date_fn=None,
check_existence=False,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.allowed_states = allowed_states or [State.SUCCESS]
+ self.failed_states = failed_states or []
+
+ total_states = []
+ total_states.extend(self.allowed_states)
+ total_states.extend(self.failed_states)
Review comment:
```suggestion
total_states = self.allowed_states + self.failed_states
```
WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]