GitHub user nabil0024 edited a discussion: Build Custom Asset/ trigger
Hi guys, i am trying to build a Custom trigger to track message in a queue
RabbitMQ as the one already exist AWS SQS
my code
`
from typing import Any, AsyncIterator
from airflow.sdk import DAG, Asset, AssetWatcher, dag, task, chain
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
from kombu import Connection
from config import settings
class RabbitMQTrigger(BaseEventTrigger):
def __init__(self, url, queue: str, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.queue = queue
self.url = url
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"main.RabbitMQTrigger",# current file name is main
{
"queue": self.queue,
"url": self.url,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
with Connection(self.url) as connection:
with connection.SimpleQueue(self.queue) as queue:
while True:
message = queue.get()
message.ack()
yield TriggerEvent({"status": "success",
"message_batch":message.payload})
trigger = RabbitMQTrigger(
url="amqp://{}:{}@{}:{}/".format(
settings.RABBITMQ_USER,
settings.RABBITMQ_PASSWORD,
settings.RABBITMQ_HOST,
settings.RABBITMQ_PORT
),
queue='consumer_queue2',
)
asset = Asset("ex", watchers=[AssetWatcher(name="test_asset_watcher",
trigger=trigger)])
with DAG(
dag_id="example_asset_with_watchers",
schedule=[asset],
catchup=False,
):
@task
def test_task():
print("Hello world")
chain(test_task())
`
The problem is that it is not triggered, and I don't know exactly where the
problem is. any idea?
GitHub link: https://github.com/apache/airflow/discussions/51882
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]