This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7df41f5  Simplify if clauses in ExternalTaskSensor (#9968)
7df41f5 is described below

commit 7df41f559ff9b2bcc2bc825b9386300b95eeedf4
Author: Tomek Urbaszek <turbas...@gmail.com>
AuthorDate: Tue Jul 28 10:18:32 2020 +0200

    Simplify if clauses in ExternalTaskSensor (#9968)
    
    
    Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
---
 airflow/sensors/external_task_sensor.py | 50 +++++++++++++++------------------
 1 file changed, 23 insertions(+), 27 deletions(-)

diff --git a/airflow/sensors/external_task_sensor.py 
b/airflow/sensors/external_task_sensor.py
index cd4143b..d9b47b5 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -91,15 +91,14 @@ class ExternalTaskSensor(BaseSensorOperator):
         if external_task_id:
             if not total_states <= set(State.task_states):
                 raise ValueError(
-                    'Valid values for `allowed_states` and `failed_states` '
-                    'when `external_task_id` is not `None`: 
{}'.format(State.task_states)
-                )
-        else:
-            if not total_states <= set(State.dag_states):
-                raise ValueError(
-                    'Valid values for `allowed_states` and `failed_states` '
-                    'when `external_task_id` is `None`: 
{}'.format(State.dag_states)
+                    f'Valid values for `allowed_states` and `failed_states` '
+                    f'when `external_task_id` is not `None`: 
{State.task_states}'
                 )
+        elif not total_states <= set(State.dag_states):
+            raise ValueError(
+                f'Valid values for `allowed_states` and `failed_states` '
+                f'when `external_task_id` is `None`: {State.dag_states}'
+            )
 
         if execution_delta is not None and execution_date_fn is not None:
             raise ValueError(
@@ -140,20 +139,17 @@ class ExternalTaskSensor(BaseSensorOperator):
             ).first()
 
             if not dag_to_wait:
-                raise AirflowException('The external DAG '
-                                       '{} does not 
exist.'.format(self.external_dag_id))
-            else:
-                if not os.path.exists(dag_to_wait.fileloc):
-                    raise AirflowException('The external DAG '
-                                           '{} was 
deleted.'.format(self.external_dag_id))
+                raise AirflowException(f'The external DAG 
{self.external_dag_id} does not exist.')
+            elif not os.path.exists(dag_to_wait.fileloc):
+                raise AirflowException(f'The external DAG 
{self.external_dag_id} was deleted.')
 
             if self.external_task_id:
                 refreshed_dag_info = 
DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
-                    raise AirflowException('The external task'
-                                           '{} in DAG {} does not exist.'
-                                           .format(self.external_task_id,
-                                                   self.external_dag_id))
+                    raise AirflowException(
+                        f'The external task {self.external_task_id} in '
+                        f'DAG {self.external_dag_id} does not exist.'
+                    )
             self.has_checked_existence = True
 
         count_allowed = self.get_count(dttm_filter, session, 
self.allowed_states)
@@ -165,11 +161,11 @@ class ExternalTaskSensor(BaseSensorOperator):
         session.commit()
         if count_failed == len(dttm_filter):
             if self.external_task_id:
-                raise AirflowException('The external task {} in DAG {} failed.'
-                                       .format(self.external_task_id, 
self.external_dag_id))
+                raise AirflowException(
+                    f'The external task {self.external_task_id} in DAG 
{self.external_dag_id} failed.'
+                )
             else:
-                raise AirflowException('The external DAG {} failed.'
-                                       .format(self.external_dag_id))
+                raise AirflowException(f'The external DAG 
{self.external_dag_id} failed.')
 
         return count_allowed == len(dttm_filter)
 
@@ -217,12 +213,12 @@ class ExternalTaskSensor(BaseSensorOperator):
         num_fxn_params = self.execution_date_fn.__code__.co_argcount
         if num_fxn_params == 1:
             return self.execution_date_fn(context['execution_date'])
-        elif num_fxn_params == 2:
+        if num_fxn_params == 2:
             return self.execution_date_fn(context['execution_date'], context)
-        else:
-            raise AirflowException(
-                'execution_date_fn passed {} args but only allowed up to 
2'.format(num_fxn_params)
-            )
+
+        raise AirflowException(
+            f'execution_date_fn passed {num_fxn_params} args but only allowed 
up to 2'
+        )
 
 
 class ExternalTaskMarker(DummyOperator):

Reply via email to