tanujdargan commented on issue #53162:
URL: https://github.com/apache/airflow/issues/53162#issuecomment-3096005490

   Hey @plovegro I was able to fix this issue. The code for the dag remains the 
same however the code for the plugin and airflow's config file need to be 
changed a little.
   
   I was able to get the webhook to trigger successfully by changing the plugin 
code to the following:
   
   ```python
   import logging
   import requests
   from airflow.plugins_manager import AirflowPlugin
   from airflow.listeners import hookimpl
   
   # It's best practice to get a logger instance instead of using print()
   log = logging.getLogger(__name__)
   
   class TaskSuccessListener:
       """A listener class to handle task success events."""
       
       @hookimpl
       def on_task_instance_success(self, previous_state, task_instance):
           """
           This method is called when a task instance succeeds.
           It sends a notification to a test webhook.
           """
           log.info(
               "✅ Listener triggered: Task '%s' in DAG '%s' succeeded. Sending 
webhook.",
               task_instance.task_id,
               task_instance.dag_id,
           )
           
           # This is your webhook URL for testing
           url = "https://webhook.site/b931861d-ff05-4ced-a150-d5aba9e3c975";
           
           try:
               # It's good practice to include a timeout and check the response
               response = requests.get(url, timeout=10)
               response.raise_for_status()  # Raise an error for 4xx/5xx 
responses
               log.info(
                   "Successfully sent webhook for task '%s'. Response: %s",
                   task_instance.task_id,
                   response.status_code,
               )
           except requests.exceptions.RequestException as e:
               log.error(
                   "Failed to send webhook for task '%s': %s",
                   task_instance.task_id,
                   e,
               )
   
   # Create an instance of the listener
   task_success_listener = TaskSuccessListener()
   
   class MyListenerPlugin(AirflowPlugin):
       """A simple plugin that registers the task success listener."""
       name = "my_listener_plugin"
       listeners = [task_success_listener]
   ```
   
   and adding the following line to my airflow.cfg file: 
`enable_task_context_logger = True`
   
   The issues that were fixed:
   
   1. **Incorrect import path** - [from airflow.listeners.hookimpl import 
hookimpl] was wrong
   2. **Invalid parameter** - The `session` parameter doesn't exist in the 
hookspec
   3. **Wrong architecture** - Function-based listeners don't work properly; 
Airflow expects class instances
   4. Enabling Task Logger in Airflow Config.
   
   I tested on Windows and Ubuntu 24.04 using docker and it seems to be working 
on Airflow 3.0.2.
   
   There are 3 test runs I did and those are visible here: [Webhook 
Site](https://webhook.site/#!/view/b931861d-ff05-4ced-a150-d5aba9e3c975/45bef14d-487f-4ca7-b0a1-d8ce6de8228f/1)
   
   Let me know if this helps!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to