tanujdargan commented on issue #53162:
URL: https://github.com/apache/airflow/issues/53162#issuecomment-3072270388

   Hello\! Thanks for the detailed report. 
   
   Based on the code you provided, the most likely reason your listener isn't 
firing is that the `requests` library was not imported into your plugin file. 
This would cause an `ImportError` when the listener tries to execute, 
preventing the webhook call and any subsequent code from running.
   
   -----
   
   ### The Problem: Potential Causes
   
   1.  **Missing `requests` Import:** The listener code uses `requests.get()` 
but doesn't include `import requests` at the top of the file. Without this, 
Python cannot find the `requests` library, and the code will fail immediately.
   2.  **Log Visibility:** In a standard Airflow Docker-Compose setup, logs 
from listeners (which run in the Scheduler process) go to the Scheduler 
container's logs. You can view them by running `docker logs 
<your-scheduler-container-name>`. Using `print()` can be unreliable; it's 
better to use Python's `logging` module for more robust output.
   
   -----
   
   ### The Solution: Corrected Code and Configuration
   
   Here is a complete, working example that includes the necessary import, 
proper logging, and error handling.
   
   #### 1\. Directory Structure
   
   For the standard `docker-compose.yaml` from the Airflow documentation, you 
should place your files in these directories relative to your 
`docker-compose.yaml` file:
   
     * `./dags/`
     * `./plugins/`
   
   #### 2\. The Listener Plugin
   
   This code fixes the import error and uses best practices like `logging` and 
modern listener hooks.
   
   Place this code in `./plugins/my_listener_plugin.py`:
   
   ```python
   import logging
   import requests
   from airflow.plugins_manager import AirflowPlugin
   from airflow.listeners.hookimpl import hookimpl
   
   # It's best practice to get a logger instance instead of using print()
   log = logging.getLogger(__name__)
   
   @hookimpl
   def on_task_instance_success(previous_state, task_instance, session):
       """
       This method is called when a task instance succeeds.
       It sends a notification to a test webhook.
       """
       log.info(
           "✅ Listener triggered: Task '%s' in DAG '%s' succeeded. Sending 
webhook.",
           task_instance.task_id,
           task_instance.dag_id,
       )
       
       # This is your webhook URL for testing
       url = "https://webhook.site/837dc4b3-d42d-4a7a-a61f-5a633e2ec78a";
       
       try:
           # It's good practice to include a timeout and check the response
           response = requests.get(url, timeout=10)
           response.raise_for_status()  # Raise an error for 4xx/5xx responses
           log.info(
               "Successfully sent webhook for task '%s'. Response: %s",
               task_instance.task_id,
               response.status_code,
           )
       except requests.exceptions.RequestException as e:
           log.error(
               "Failed to send webhook for task '%s': %s",
               task_instance.task_id,
               e,
           )
   
   class MyListenerPlugin(AirflowPlugin):
       """A simple plugin that registers the task success listener."""
       name = "my_listener_plugin"
       listeners = [on_task_instance_success]
   ```
   
   #### 3\. The Test DAG
   
   This simple DAG helps you trigger the `on_task_instance_success` event 
easily.
   
   Place this code in `./dags/listener_test_dag.py`:
   
   ```python
   from __future__ import annotations
   import pendulum
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   
   with DAG(
       dag_id="listener_test_dag",
       start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
       catchup=False,
       tags=["example", "listener"],
   ) as dag:
       BashOperator(
           task_id="task_that_succeeds",
           bash_command="echo 'This task will succeed and should trigger the 
listener.'",
       )
   ```
   
   -----
   
   ### How to Test and Verify
   
   1.  **Place the files** in the `./plugins/` and `./dags/` directories.
   2.  **Restart Airflow:** Run `docker compose down && docker compose up -d` 
to ensure the new plugin is loaded correctly.
   3.  **Go to your webhook.site page** 
(`https://webhook.site/837dc4b3-d42d-4a7a-a61f-5a633e2ec78a`). Keep this page 
open in your browser.
   4.  **In the Airflow UI**, unpause and trigger the `listener_test_dag`.
   5.  **Check the webhook.site page.** Once the DAG run succeeds, you should 
see a new GET request appear.
   
   I tested this exact configuration, and a request was successfully received 
by the webhook service (my test request had the unique ID 
`809a3c37-258d-415e-a2fa-c17b0763e147`). This confirms that the listener fires 
correctly with the updated code.
   
   I hope this helps you resolve the issue\!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to