GitHub user nabil0024 edited a discussion: Build Custom Asset/ trigger

Hi guys,  i am trying to build a Custom trigger to track message in a queue 
RabbitMQ as the one already exist AWS SQS
my code

`

 from typing import Any, AsyncIterator
from airflow.sdk import DAG, Asset, AssetWatcher, dag, task, chain
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
from kombu import Connection
from config import settings


class RabbitMQTrigger(BaseEventTrigger):
    def __init__(self, url, queue: str, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self.queue = queue
        self.url = url

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            "main.RabbitMQTrigger",# current file name is main
            {
                "queue": self.queue,
                "url": self.url,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        with Connection(self.url) as connection:
            with connection.SimpleQueue(self.queue) as queue:
                while True:
                    message = queue.get()

                    message.ack()

                    yield TriggerEvent({"status": "success", 
"message_batch":message.payload})


trigger = RabbitMQTrigger(
    url="amqp://{}:{}@{}:{}/".format(
            settings.RABBITMQ_USER,
            settings.RABBITMQ_PASSWORD,
            settings.RABBITMQ_HOST,
            settings.RABBITMQ_PORT
        ),
    queue='consumer_queue2',
)
asset = Asset("ex", watchers=[AssetWatcher(name="test_asset_watcher", 
trigger=trigger)])

with DAG(
    dag_id="example_asset_with_watchers",
    schedule=[asset],
    catchup=False,
):
    @task
    def test_task():
        print("Hello world")
    chain(test_task())
`

The problem is that it is not triggered, and I don't know exactly where the 
problem is. any idea?

GitHub link: https://github.com/apache/airflow/discussions/51882

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to