josh-fell commented on code in PR #23647:
URL: https://github.com/apache/airflow/pull/23647#discussion_r893444927


##########
newsfragments/23647.bugfix.rst:
##########
@@ -0,0 +1 @@
+``ExternalTaskSensor`` now supports the ``soft_fail`` flag to skip if external 
task or dag enters a failed state

Review Comment:
   ```suggestion
   ``ExternalTaskSensor`` now supports the ``soft_fail`` flag to skip if 
external task or DAG enters a failed state.
   ```
   Trivial nit.



##########
airflow/sensors/external_task.py:
##########
@@ -180,11 +198,20 @@ def poke(self, context, session=None):
 
         if count_failed == len(dttm_filter):
             if self.external_task_ids:
+                if self.soft_fail:
+                    raise AirflowSkipException(
+                        f'Some of the external tasks {self.external_task_ids} '
+                        f'in DAG {self.external_dag_id} failed. Skipping due 
to soft_fail.'
+                    )
                 raise AirflowException(
                     f'Some of the external tasks {self.external_task_ids} '
                     f'in DAG {self.external_dag_id} failed.'
                 )
             else:
+                if self.soft_fail:
+                    raise AirflowSkipException(
+                        f'The external DAG {self.external_dag_id} failed. ' 
'Skipping due to soft_fail'

Review Comment:
   ```suggestion
                           f'The external DAG {self.external_dag_id} failed. 
Skipping due to soft_fail.'
   ```
   Cleaning up a little implicit string concatenation.



##########
tests/sensors/test_external_task_sensor.py:
##########
@@ -214,6 +230,31 @@ def test_external_dag_sensor(self):
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
+    def test_external_dag_sensor_soft_fail_as_skipped(self):
+        other_dag = DAG('other_dag', default_args=self.args, 
end_date=DEFAULT_DATE, schedule_interval='@once')
+        other_dag.create_dagrun(
+            run_id='test', start_date=DEFAULT_DATE, 
execution_date=DEFAULT_DATE, state=State.SUCCESS
+        )
+        op = ExternalTaskSensor(
+            task_id='test_external_dag_sensor_check',
+            external_dag_id='other_dag',
+            external_task_id=None,
+            allowed_states=["failed"],
+            failed_states=["success"],

Review Comment:
   ```suggestion
               allowed_states=[State.Failed],
               failed_states=[State.Success],
   ```
   WDYT about using the class attr rather than the harcoded value?



##########
tests/sensors/test_external_task_sensor.py:
##########
@@ -134,17 +134,36 @@ def 
test_external_task_sensor_failed_states_as_success(self):
                 "unit_test_dag failed."
             )
 
+    def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, 
session=None):
+        self.test_time_sensor()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id=TEST_TASK_ID,
+            allowed_states=[State.FAILED],
+            failed_states=[State.SUCCESS],
+            soft_fail=True,
+            dag=self.dag,
+        )
+
+        # when
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+        # then
+        session = settings.Session()
+        TI = TaskInstance
+        task_instances: list[TI] = session.query(TI).filter(TI.task_id == 
op.task_id).all()
+        assert len(task_instances) == 1, "Unexpected number of task instances"
+        assert task_instances[0].state == State.SKIPPED, "Unexpected external 
task state"
+
     def test_external_task_sensor_external_task_id_param(self):
         """Test external_task_ids is set properly when external_task_id is 
passed as a template"""
         self.test_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id='{{ params.dag_id }}',
             external_task_id='{{ params.task_id }}',
-            params={
-                'dag_id': TEST_DAG_ID,
-                'task_id': TEST_TASK_ID,
-            },
+            params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID,},

Review Comment:
   ```suggestion
               params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID},
   ```



##########
tests/sensors/test_external_task_sensor.py:
##########
@@ -162,10 +181,7 @@ def 
test_external_task_sensor_external_task_ids_param(self):
             task_id='test_external_task_sensor_check',
             external_dag_id='{{ params.dag_id }}',
             external_task_ids=['{{ params.task_id }}'],
-            params={
-                'dag_id': TEST_DAG_ID,
-                'task_id': TEST_TASK_ID,
-            },
+            params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID,},

Review Comment:
   ```suggestion
               params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID},
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to