tanujdargan commented on issue #53162:
URL: https://github.com/apache/airflow/issues/53162#issuecomment-3072270388
Hello\! Thanks for the detailed report.
Based on the code you provided, the most likely reason your listener isn't
firing is that the `requests` library was not imported into your plugin file.
This would cause an `ImportError` when the listener tries to execute,
preventing the webhook call and any subsequent code from running.
-----
### The Problem: Potential Causes
1. **Missing `requests` Import:** The listener code uses `requests.get()`
but doesn't include `import requests` at the top of the file. Without this,
Python cannot find the `requests` library, and the code will fail immediately.
2. **Log Visibility:** In a standard Airflow Docker-Compose setup, logs
from listeners (which run in the Scheduler process) go to the Scheduler
container's logs. You can view them by running `docker logs
<your-scheduler-container-name>`. Using `print()` can be unreliable; it's
better to use Python's `logging` module for more robust output.
-----
### The Solution: Corrected Code and Configuration
Here is a complete, working example that includes the necessary import,
proper logging, and error handling.
#### 1\. Directory Structure
For the standard `docker-compose.yaml` from the Airflow documentation, you
should place your files in these directories relative to your
`docker-compose.yaml` file:
* `./dags/`
* `./plugins/`
#### 2\. The Listener Plugin
This code fixes the import error and uses best practices like `logging` and
modern listener hooks.
Place this code in `./plugins/my_listener_plugin.py`:
```python
import logging
import requests
from airflow.plugins_manager import AirflowPlugin
from airflow.listeners.hookimpl import hookimpl
# It's best practice to get a logger instance instead of using print()
log = logging.getLogger(__name__)
@hookimpl
def on_task_instance_success(previous_state, task_instance, session):
"""
This method is called when a task instance succeeds.
It sends a notification to a test webhook.
"""
log.info(
"✅ Listener triggered: Task '%s' in DAG '%s' succeeded. Sending
webhook.",
task_instance.task_id,
task_instance.dag_id,
)
# This is your webhook URL for testing
url = "https://webhook.site/837dc4b3-d42d-4a7a-a61f-5a633e2ec78a"
try:
# It's good practice to include a timeout and check the response
response = requests.get(url, timeout=10)
response.raise_for_status() # Raise an error for 4xx/5xx responses
log.info(
"Successfully sent webhook for task '%s'. Response: %s",
task_instance.task_id,
response.status_code,
)
except requests.exceptions.RequestException as e:
log.error(
"Failed to send webhook for task '%s': %s",
task_instance.task_id,
e,
)
class MyListenerPlugin(AirflowPlugin):
"""A simple plugin that registers the task success listener."""
name = "my_listener_plugin"
listeners = [on_task_instance_success]
```
#### 3\. The Test DAG
This simple DAG helps you trigger the `on_task_instance_success` event
easily.
Place this code in `./dags/listener_test_dag.py`:
```python
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id="listener_test_dag",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "listener"],
) as dag:
BashOperator(
task_id="task_that_succeeds",
bash_command="echo 'This task will succeed and should trigger the
listener.'",
)
```
-----
### How to Test and Verify
1. **Place the files** in the `./plugins/` and `./dags/` directories.
2. **Restart Airflow:** Run `docker compose down && docker compose up -d`
to ensure the new plugin is loaded correctly.
3. **Go to your webhook.site page**
(`https://webhook.site/837dc4b3-d42d-4a7a-a61f-5a633e2ec78a`). Keep this page
open in your browser.
4. **In the Airflow UI**, unpause and trigger the `listener_test_dag`.
5. **Check the webhook.site page.** Once the DAG run succeeds, you should
see a new GET request appear.
I tested this exact configuration, and a request was successfully received
by the webhook service (my test request had the unique ID
`809a3c37-258d-415e-a2fa-c17b0763e147`). This confirms that the listener fires
correctly with the updated code.
I hope this helps you resolve the issue\!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]