GitHub user sona26G added a comment to the discussion: Why is my TaskInstance
Listener not Working?
I have deployed Apache airflow 2.8.3 on Kubernetes with Kubernetes executor. I
have also created a custom event listener to fetch dag_run and task_instance
events. I have uploaded the listener files in /opt/airflow/plugins in the
scheduler pod. I ran some custom dag and I am able to fetch the dag related
events but the task instance events are not coming in. Is there something that
im missing? Do I need to put the event listener in the individual pods that
come up for the dag task? I assume that scheduler should be able to fetch both
dag and task events??
attaching the event listener code below:
`from __future__ import annotations
import os
import logging
from configparser import ConfigParser
from typing import TYPE_CHECKING
import pathlib
import requests
from airflow.listeners import hookimpl
import asyncio
from event_logging_plugin.ad_nats_client import AdNatsClient
if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import TaskInstanceState
from airflow.sdk.definitions.asset import Asset, AssetAlias
from airflow.listeners.task_instance import TaskInstanceListener
logging.basicConfig(filename="airflow_event_listener.log",
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s
%(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO)
config_path = pathlib.Path(__file__).parent.absolute() / "config.ini"
parser = ConfigParser()
parser.read(config_path)
nats_client = None
def get_nats_client():
global nats_client
if nats_client is None:
raw = parser.get("pulse", "ad.events.url")
servers = [s.strip() for s in raw.split(",")]
nats_client = AdNatsClient(servers)
return nats_client
async def publish_to_nats(event_detail: dict):
try:
cluster = parser.get("pulse", "ad.cluster")
subject = f"airflow_events_{cluster}"
client = get_nats_client()
await client.publish_event(subject, event_detail)
except Exception as e:
logging.error("Failed pushing event to NATS: %s", e)
def post_event(event_detail: dict):
try:
loop = asyncio.get_event_loop()
if loop.is_running():
logging.debug("Airflow event loop already running → using
create_task()")
loop.create_task(publish_to_nats(event_detail))
else:
loop.run_until_complete(publish_to_nats(event_detail))
except RuntimeError:
logging.debug("No event loop → using asyncio.run()")
asyncio.run(publish_to_nats(event_detail))
def convert_to_millis(dt):
"""Convert a datetime object to milliseconds since the epoch."""
if dt is None:
return None
return int(dt.timestamp() * 1000)
if parser.has_option("pulse", "ad.events.url"):
cluster = parser.get("pulse", "ad.cluster")
else:
cluster = "UnknownCluster"
logging.error("Missing cluster in config.ini")
connectorContext = {
"monitor": "airflow",
"group": cluster,
"userId": "",
"uuid": ""
}
def get_connector_context():
group = connectorContext.get("group")
if not group:
logging.error("Missing group in connector context")
connectorContext["group"] = "UnknownGroup"
return connectorContext
return connectorContext
def extract_task_instance_details(event: str, previous_state: TaskInstanceState,
task_instance: TaskInstance):
eventType = "airflow_task_instance"
try:
details = {
"eventType": eventType,
"event": event,
"task_id": task_instance.task_id,
"dag_id": task_instance.dag_id,
"run_id": task_instance.run_id,
"previous_state": str(previous_state),
"map_index": task_instance.map_index,
"start_date": convert_to_millis(task_instance.start_date),
"end_date": convert_to_millis(task_instance.end_date),
"duration": task_instance.duration,
"state": task_instance.state,
"max_tries": task_instance.max_tries,
"hostname": task_instance.hostname,
"unixname": task_instance.unixname,
"job_id": task_instance.job_id,
"pool": task_instance.pool,
"pool_slots": task_instance.pool_slots,
"queue": task_instance.queue,
"priority_weight": task_instance.priority_weight,
"operator": task_instance.operator,
"custom_operator_name": task_instance.custom_operator_name,
"queued_dttm": convert_to_millis(task_instance.queued_dttm),
"queued_by_job_id": task_instance.queued_by_job_id,
"pid": task_instance.pid,
"executor_config": task_instance.executor_config,
"updated_at": convert_to_millis(task_instance.updated_at),
"external_executor_id": task_instance.external_executor_id,
"trigger_id": task_instance.trigger_id,
"trigger_timeout": str(task_instance.trigger_timeout),
"next_method": task_instance.next_method,
"next_kwargs": task_instance.next_kwargs,
}
adevent = {
"event": details,
}
return adevent
except Exception as e:
error_details = {
"eventType": eventType,
"event": event,
"message": "Exception while parsing: " + str(e),
"instance_details": str(task_instance)
}
adevent = {
"event": error_details,
}
return adevent
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance:
TaskInstance,
session: Session = None):
"""
This method is called when task state changes to RUNNING.
"""
event_detail = extract_task_instance_details("on_task_instance_running",
previous_state,
task_instance)
logging.info("Pushing to nats for task instance event")
post_event(event_detail)
@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance:
TaskInstance,
session: Session = None):
"""
This method is called when task state changes to SUCCESS.
"""
logging.info("HOOK-FIRED: on_task_instance_success")
try:
event_detail =
extract_task_instance_details("on_task_instance_success", previous_state,
task_instance)
post_event(event_detail)
except Exception:
logging.exception("Exception in on_task_instance_success")
@hookimpl
def on_task_instance_failed(previous_state: TaskInstanceState, task_instance:
TaskInstance,
session: Session = None):
"""
This method is called when task state changes to FAILED.
"""
details = extract_task_instance_details("on_task_instance_failed",
previous_state,
task_instance)
post_event(details)
def extract_dag_run_instance_details(event: str, dag_run: DagRun):
eventType = "airflow_dag_run_instance"
try:
details = {
"eventType": eventType,
"event": event,
"dag_id": dag_run.dag_id,
"queued_at": convert_to_millis(dag_run.queued_at),
"execution_date": convert_to_millis(dag_run.execution_date),
"start_date": convert_to_millis(dag_run.start_date),
"end_date": convert_to_millis(dag_run.end_date),
"state": str(dag_run.state),
"run_id": dag_run.run_id,
"creating_job_id": dag_run.creating_job_id,
"external_trigger": dag_run.external_trigger,
"run_type": dag_run.run_type,
"conf": str(dag_run.conf),
"data_interval_start":
convert_to_millis(dag_run.data_interval_start),
"data_interval_end": convert_to_millis(dag_run.data_interval_end),
"last_scheduling_decision":
convert_to_millis(dag_run.last_scheduling_decision),
"dag_hash": dag_run.dag_hash,
"log_template_id": dag_run.log_template_id,
"updated_at": convert_to_millis(dag_run.updated_at),
"clear_number": dag_run.clear_number
}
adevent = {
"event": details,
}
return adevent
except Exception as e:
error_details = {
"eventType": eventType,
"event": event,
"message": "Exception parsing: " + str(e),
"dag_run_details": str(dag_run)
}
adevent = {
"event": error_details,
}
return adevent
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to SUCCESS.
"""
details = extract_dag_run_instance_details("on_dag_run_success", dag_run)
logging.info(f"Pushing to nats for dag run event---- {str(dag_run)}-----")
post_event(details)
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
details = extract_dag_run_instance_details("on_dag_run_failed", dag_run)
post_event(details)
@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to RUNNING.
"""
details = extract_dag_run_instance_details("on_dag_run_running", dag_run)
post_event(details)
def extract_asset_details(event: str, asset: Asset):
eventType = "airflow_asset_instance"
try:
details = {
"eventType": eventType,
"event": event,
"asset_data": str(asset),
}
adevent = {
"event": details,
}
return adevent
except Exception as e:
error_details = {
"eventType": eventType,
"event": event,
"message": "Exception parsing: " + str(e),
"asset_details": str(asset)
}
adevent = {
"event": error_details,
}
return adevent
@hookimpl
def on_asset_created(asset: Asset):
"""Execute when a new asset is created."""
details = extract_asset_details("on_asset_created", asset)
logging.info("Pushing to nats for asset created event")
post_event(details)
@hookimpl
def on_asset_alias_created(asset_alias: AssetAlias):
"""Execute when a new asset alias is created."""
@hookimpl
def on_asset_changed(asset: Asset):
"""Execute when asset change is registered."""
details = extract_asset_details("on_asset_changed", asset)
logging.info("Pushing to nats for asset changed event")
post_event(details)`
GitHub link:
https://github.com/apache/airflow/discussions/33683#discussioncomment-15010973
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]