This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a1b5bdb25a respect soft_fail argument when ExternalTaskSensor runs in 
deferrable mode (#33196)
a1b5bdb25a is described below

commit a1b5bdb25a6f9565ac5934a9a458e9b079ccf3ae
Author: Wei Lee <[email protected]>
AuthorDate: Mon Aug 14 18:05:28 2023 +0800

    respect soft_fail argument when ExternalTaskSensor runs in deferrable mode 
(#33196)
---
 airflow/sensors/base.py          |  6 ++++++
 airflow/sensors/external_task.py | 46 +++++++++++++++++++++++-----------------
 2 files changed, 32 insertions(+), 20 deletions(-)

diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 53d9b661ed..792d907d1f 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -330,6 +330,12 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
     def get_serialized_fields(cls):
         return super().get_serialized_fields() | {"reschedule"}
 
+    def raise_failed_or_skiping_exception(self, *, failed_message: str, 
skipping_message: str = "") -> None:
+        """Raise AirflowSkipException if self.soft_fail is set to True. 
Otherwise raise AirflowException."""
+        if self.soft_fail:
+            raise AirflowSkipException(skipping_message or failed_message)
+        raise AirflowException(failed_message)
+
 
 def poke_mode_only(cls):
     """
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 2406f274aa..53270ac411 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -222,6 +222,8 @@ class ExternalTaskSensor(BaseSensorOperator):
         self.deferrable = deferrable
         self.poll_interval = poll_interval
 
+        self._skipping_message_postfix = " Skipping due to soft_fail."
+
     def _get_dttm_filter(self, context):
         if self.execution_delta:
             dttm = context["logical_date"] - self.execution_delta
@@ -274,32 +276,28 @@ class ExternalTaskSensor(BaseSensorOperator):
         # Fail if anything in the list has failed.
         if count_failed > 0:
             if self.external_task_ids:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"Some of the external tasks {self.external_task_ids} "
-                        f"in DAG {self.external_dag_id} failed. Skipping due 
to soft_fail."
-                    )
-                raise AirflowException(
+                failed_message = (
                     f"Some of the external tasks {self.external_task_ids} "
                     f"in DAG {self.external_dag_id} failed."
                 )
+
+                self.raise_failed_or_skiping_exception(
+                    failed_message=failed_message,
+                    
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+                )
             elif self.external_task_group_id:
-                if self.soft_fail:
-                    raise AirflowSkipException(
+                self.raise_failed_or_skiping_exception(
+                    failed_message=(
                         f"The external task_group 
'{self.external_task_group_id}' "
-                        f"in DAG '{self.external_dag_id}' failed. Skipping due 
to soft_fail."
+                        f"in DAG '{self.external_dag_id}' failed."
                     )
-                raise AirflowException(
-                    f"The external task_group '{self.external_task_group_id}' "
-                    f"in DAG '{self.external_dag_id}' failed."
                 )
-
             else:
-                if self.soft_fail:
-                    raise AirflowSkipException(
-                        f"The external DAG {self.external_dag_id} failed. 
Skipping due to soft_fail."
-                    )
-                raise AirflowException(f"The external DAG 
{self.external_dag_id} failed.")
+                failed_message = f"The external DAG {self.external_dag_id} 
failed."
+                self.raise_failed_or_skiping_exception(
+                    failed_message=failed_message,
+                    
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+                )
 
         count_skipped = -1
         if self.skipped_states:
@@ -354,12 +352,20 @@ class ExternalTaskSensor(BaseSensorOperator):
             self.log.info("External task %s has executed successfully.", 
self.external_task_id)
             return None
         elif event["status"] == "timeout":
-            raise AirflowException("Dag was not started within 1 minute, 
assuming fail.")
+            failed_message = "Dag was not started within 1 minute, assuming 
fail."
+            self.raise_failed_or_skiping_exception(
+                failed_message=failed_message,
+                
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+            )
         else:
-            raise AirflowException(
+            failed_message = (
                 "Error occurred while trying to retrieve task status. Please, 
check the "
                 "name of executed task and Dag."
             )
+            self.raise_failed_or_skiping_exception(
+                failed_message=failed_message,
+                
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+            )
 
     def _check_for_existence(self, session) -> None:
         dag_to_wait = DagModel.get_current(self.external_dag_id, session)

Reply via email to