This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a1b5bdb25a respect soft_fail argument when ExternalTaskSensor runs in
deferrable mode (#33196)
a1b5bdb25a is described below
commit a1b5bdb25a6f9565ac5934a9a458e9b079ccf3ae
Author: Wei Lee <[email protected]>
AuthorDate: Mon Aug 14 18:05:28 2023 +0800
respect soft_fail argument when ExternalTaskSensor runs in deferrable mode
(#33196)
---
airflow/sensors/base.py | 6 ++++++
airflow/sensors/external_task.py | 46 +++++++++++++++++++++++-----------------
2 files changed, 32 insertions(+), 20 deletions(-)
diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 53d9b661ed..792d907d1f 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -330,6 +330,12 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
def get_serialized_fields(cls):
return super().get_serialized_fields() | {"reschedule"}
+ def raise_failed_or_skiping_exception(self, *, failed_message: str,
skipping_message: str = "") -> None:
+ """Raise AirflowSkipException if self.soft_fail is set to True.
Otherwise raise AirflowException."""
+ if self.soft_fail:
+ raise AirflowSkipException(skipping_message or failed_message)
+ raise AirflowException(failed_message)
+
def poke_mode_only(cls):
"""
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 2406f274aa..53270ac411 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -222,6 +222,8 @@ class ExternalTaskSensor(BaseSensorOperator):
self.deferrable = deferrable
self.poll_interval = poll_interval
+ self._skipping_message_postfix = " Skipping due to soft_fail."
+
def _get_dttm_filter(self, context):
if self.execution_delta:
dttm = context["logical_date"] - self.execution_delta
@@ -274,32 +276,28 @@ class ExternalTaskSensor(BaseSensorOperator):
# Fail if anything in the list has failed.
if count_failed > 0:
if self.external_task_ids:
- if self.soft_fail:
- raise AirflowSkipException(
- f"Some of the external tasks {self.external_task_ids} "
- f"in DAG {self.external_dag_id} failed. Skipping due
to soft_fail."
- )
- raise AirflowException(
+ failed_message = (
f"Some of the external tasks {self.external_task_ids} "
f"in DAG {self.external_dag_id} failed."
)
+
+ self.raise_failed_or_skiping_exception(
+ failed_message=failed_message,
+
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+ )
elif self.external_task_group_id:
- if self.soft_fail:
- raise AirflowSkipException(
+ self.raise_failed_or_skiping_exception(
+ failed_message=(
f"The external task_group
'{self.external_task_group_id}' "
- f"in DAG '{self.external_dag_id}' failed. Skipping due
to soft_fail."
+ f"in DAG '{self.external_dag_id}' failed."
)
- raise AirflowException(
- f"The external task_group '{self.external_task_group_id}' "
- f"in DAG '{self.external_dag_id}' failed."
)
-
else:
- if self.soft_fail:
- raise AirflowSkipException(
- f"The external DAG {self.external_dag_id} failed.
Skipping due to soft_fail."
- )
- raise AirflowException(f"The external DAG
{self.external_dag_id} failed.")
+ failed_message = f"The external DAG {self.external_dag_id}
failed."
+ self.raise_failed_or_skiping_exception(
+ failed_message=failed_message,
+
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+ )
count_skipped = -1
if self.skipped_states:
@@ -354,12 +352,20 @@ class ExternalTaskSensor(BaseSensorOperator):
self.log.info("External task %s has executed successfully.",
self.external_task_id)
return None
elif event["status"] == "timeout":
- raise AirflowException("Dag was not started within 1 minute,
assuming fail.")
+ failed_message = "Dag was not started within 1 minute, assuming
fail."
+ self.raise_failed_or_skiping_exception(
+ failed_message=failed_message,
+
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+ )
else:
- raise AirflowException(
+ failed_message = (
"Error occurred while trying to retrieve task status. Please,
check the "
"name of executed task and Dag."
)
+ self.raise_failed_or_skiping_exception(
+ failed_message=failed_message,
+
skipping_message=f"{failed_message}{self._skipping_message_postfix}",
+ )
def _check_for_existence(self, session) -> None:
dag_to_wait = DagModel.get_current(self.external_dag_id, session)