ramdharam opened a new issue #9542:
URL: https://github.com/apache/airflow/issues/9542


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the 
following questions.
   Don't worry if they're not all applicable; just try to include what you can 
:-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   **Description**
   Adding an on_failure_callback feature for external task sensors. The current 
out of the box implementation does not provide an ability to perform certain 
operations on sensor timeouts/errors. This limits the usage of the sensor in 
its current/existing implementation as the purpose of the sensor is not 
completely fulfilled.
   
   **Use case / motivation**
   The primary purpose of an external task sensor is to look up for an upstream 
dependency/ task and if the upstream task is successful then perform other 
operations in the current DAG. The sensor does its part very well if 
upstream(external task) succeeds. If in case the upstream fails, then the 
sensor fails silently, downstream tasks will never run and DAG will be marked 
as failed. If we have no custom implementation of sensor we cannot call any 
python callables that are available for an operator.
   
   DAGs are primarily composed of Operators and Sensors. Operators have the 
ability to call a custom task with `on_failure_callback`,  and so do the DAGs. 
This means you can write a generic DAG level implementation of 
`on_failure_callback` python block to send emails/slack alerts to incase of 
task failures. But this python block will not work on the sensor because there 
is no ability to call a callable. When an external task sensor in a DAG fails. 
The DAG silently dies (or) retries and may eventually fail silently without 
sending an error/alert like it does for the `on_failure_callback` 
implementation which is already available at the DAG level. 
   
   I would like to suggest we provide a parameter `on_failure_callback` for 
external task sensor as we do for an operator. I know this can be a simple 
change that can be done by the user, but I still think that we provide an 
ability for a callable function that will help reduce a lot of little/simple 
customization on their respective implementations/installations of airflow.
   The reason I suggest only providing it for the `external task sensor` for 
now is the fact that `external task sensor` is the most generic and most widely 
used sensor.  
   
   On a personal level, we have a ton of DAG that use external task sensors. 
The DAGs will send a slack alert by `on_failure_callback` implementation if any 
task fails in the DAG. But if the failures happen on a sensor the alert is not 
sent. The DAG just fails silently. Having an ability to call a callable on 
failure will help with code reuse.  As sensors are inheriting some properties 
of operators the changes required are minimal.
   
   ```
   class ExternalTaskSensor(BaseSensorOperator):
       @apply_defaults
       def __init__(self,
                    external_dag_id,
                    external_task_id=None,
                    allowed_states=None,
                    failed_states=None,
                    execution_delta=None,
                    execution_date_fn=None,
                    check_existence=False,
                    on_failure_callback=None,  # add this parameter for callable
                    *args,
                    **kwargs):
           super().__init__(*args, **kwargs)
           self.on_failure_callback = on_failure_callback   # add this 
parameter for callable
           self.allowed_states = allowed_states or [State.SUCCESS]
           self.failed_states = failed_states or []
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to