ramdharam opened a new issue #9542:
URL: https://github.com/apache/airflow/issues/9542
<!--
Welcome to Apache Airflow! For a smooth issue process, try to answer the
following questions.
Don't worry if they're not all applicable; just try to include what you can
:-)
If you need to include code snippets or logs, please put them in fenced code
blocks. If they're super-long, please use the details tag like
<details><summary>super-long log</summary> lots of stuff </details>
Please delete these comment blocks before submitting the issue.
-->
**Description**
Adding an on_failure_callback feature for external task sensors. The current
out of the box implementation does not provide an ability to perform certain
operations on sensor timeouts/errors. This limits the usage of the sensor in
its current/existing implementation as the purpose of the sensor is not
completely fulfilled.
**Use case / motivation**
The primary purpose of an external task sensor is to look up for an upstream
dependency/ task and if the upstream task is successful then perform other
operations in the current DAG. The sensor does its part very well if
upstream(external task) succeeds. If in case the upstream fails, then the
sensor fails silently, downstream tasks will never run and DAG will be marked
as failed. If we have no custom implementation of sensor we cannot call any
python callables that are available for an operator.
DAGs are primarily composed of Operators and Sensors. Operators have the
ability to call a custom task with `on_failure_callback`, and so do the DAGs.
This means you can write a generic DAG level implementation of
`on_failure_callback` python block to send emails/slack alerts to incase of
task failures. But this python block will not work on the sensor because there
is no ability to call a callable. When an external task sensor in a DAG fails.
The DAG silently dies (or) retries and may eventually fail silently without
sending an error/alert like it does for the `on_failure_callback`
implementation which is already available at the DAG level.
I would like to suggest we provide a parameter `on_failure_callback` for
external task sensor as we do for an operator. I know this can be a simple
change that can be done by the user, but I still think that we provide an
ability for a callable function that will help reduce a lot of little/simple
customization on their respective implementations/installations of airflow.
The reason I suggest only providing it for the `external task sensor` for
now is the fact that `external task sensor` is the most generic and most widely
used sensor.
On a personal level, we have a ton of DAG that use external task sensors.
The DAGs will send a slack alert by `on_failure_callback` implementation if any
task fails in the DAG. But if the failures happen on a sensor the alert is not
sent. The DAG just fails silently. Having an ability to call a callable on
failure will help with code reuse. As sensors are inheriting some properties
of operators the changes required are minimal.
```
class ExternalTaskSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
external_dag_id,
external_task_id=None,
allowed_states=None,
failed_states=None,
execution_delta=None,
execution_date_fn=None,
check_existence=False,
on_failure_callback=None, # add this parameter for callable
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.on_failure_callback = on_failure_callback # add this
parameter for callable
self.allowed_states = allowed_states or [State.SUCCESS]
self.failed_states = failed_states or []
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]