This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 7df41f5 Simplify if clauses in ExternalTaskSensor (#9968)
7df41f5 is described below
commit 7df41f559ff9b2bcc2bc825b9386300b95eeedf4
Author: Tomek Urbaszek <[email protected]>
AuthorDate: Tue Jul 28 10:18:32 2020 +0200
Simplify if clauses in ExternalTaskSensor (#9968)
Co-authored-by: Jarek Potiuk <[email protected]>
---
airflow/sensors/external_task_sensor.py | 50 +++++++++++++++------------------
1 file changed, 23 insertions(+), 27 deletions(-)
diff --git a/airflow/sensors/external_task_sensor.py
b/airflow/sensors/external_task_sensor.py
index cd4143b..d9b47b5 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -91,15 +91,14 @@ class ExternalTaskSensor(BaseSensorOperator):
if external_task_id:
if not total_states <= set(State.task_states):
raise ValueError(
- 'Valid values for `allowed_states` and `failed_states` '
- 'when `external_task_id` is not `None`:
{}'.format(State.task_states)
- )
- else:
- if not total_states <= set(State.dag_states):
- raise ValueError(
- 'Valid values for `allowed_states` and `failed_states` '
- 'when `external_task_id` is `None`:
{}'.format(State.dag_states)
+ f'Valid values for `allowed_states` and `failed_states` '
+ f'when `external_task_id` is not `None`:
{State.task_states}'
)
+ elif not total_states <= set(State.dag_states):
+ raise ValueError(
+ f'Valid values for `allowed_states` and `failed_states` '
+ f'when `external_task_id` is `None`: {State.dag_states}'
+ )
if execution_delta is not None and execution_date_fn is not None:
raise ValueError(
@@ -140,20 +139,17 @@ class ExternalTaskSensor(BaseSensorOperator):
).first()
if not dag_to_wait:
- raise AirflowException('The external DAG '
- '{} does not
exist.'.format(self.external_dag_id))
- else:
- if not os.path.exists(dag_to_wait.fileloc):
- raise AirflowException('The external DAG '
- '{} was
deleted.'.format(self.external_dag_id))
+ raise AirflowException(f'The external DAG
{self.external_dag_id} does not exist.')
+ elif not os.path.exists(dag_to_wait.fileloc):
+ raise AirflowException(f'The external DAG
{self.external_dag_id} was deleted.')
if self.external_task_id:
refreshed_dag_info =
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task(self.external_task_id):
- raise AirflowException('The external task'
- '{} in DAG {} does not exist.'
- .format(self.external_task_id,
- self.external_dag_id))
+ raise AirflowException(
+ f'The external task {self.external_task_id} in '
+ f'DAG {self.external_dag_id} does not exist.'
+ )
self.has_checked_existence = True
count_allowed = self.get_count(dttm_filter, session,
self.allowed_states)
@@ -165,11 +161,11 @@ class ExternalTaskSensor(BaseSensorOperator):
session.commit()
if count_failed == len(dttm_filter):
if self.external_task_id:
- raise AirflowException('The external task {} in DAG {} failed.'
- .format(self.external_task_id,
self.external_dag_id))
+ raise AirflowException(
+ f'The external task {self.external_task_id} in DAG
{self.external_dag_id} failed.'
+ )
else:
- raise AirflowException('The external DAG {} failed.'
- .format(self.external_dag_id))
+ raise AirflowException(f'The external DAG
{self.external_dag_id} failed.')
return count_allowed == len(dttm_filter)
@@ -217,12 +213,12 @@ class ExternalTaskSensor(BaseSensorOperator):
num_fxn_params = self.execution_date_fn.__code__.co_argcount
if num_fxn_params == 1:
return self.execution_date_fn(context['execution_date'])
- elif num_fxn_params == 2:
+ if num_fxn_params == 2:
return self.execution_date_fn(context['execution_date'], context)
- else:
- raise AirflowException(
- 'execution_date_fn passed {} args but only allowed up to
2'.format(num_fxn_params)
- )
+
+ raise AirflowException(
+ f'execution_date_fn passed {num_fxn_params} args but only allowed
up to 2'
+ )
class ExternalTaskMarker(DummyOperator):