DinGo4DEV commented on issue #51522: URL: https://github.com/apache/airflow/issues/51522#issuecomment-2956365418
Thank you for your response. I would like to configure Kafka in the context of message queue trigger. However, it seems that there is no explicit type hint available for this unless we inspect the code for both the Kafka provider and the message queue trigger. https://github.com/jason810496/airflow/blob/9da23244ba0b3ce110033b804928e98304d505ae/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py#L79-L100 https://github.com/jason810496/airflow/blob/fa3def0010c7e0589d6d7a8391a33c49b52bc55a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py#L74-L76 I think it is better to provide a class for Kafka provider for convenience: ``` python class KafkaMessageQueueTrigger(MessageQueueTrigger): def __init__( self, topics: Sequence[str], apply_function: str, kafka_config_id: str = "kafka_default", apply_function_args: Sequence[Any] | None = None, apply_function_kwargs: dict[Any, Any] | None = None, poll_timeout: float = 1, poll_interval: float = 5, ) -> None: # Any validation queue_topics= ... kafka_boostrap= ... super().__init__( queue=f"kafka://{kafka_bootstrap}/{topics}", apply_function=apply_function, apply_function_args=apply_function_args, poll_timeout=poll_timeout, poll_interval: float = 5 ) trigger = MessageQueueTrigger( topics=["test"], kafka_config_id="kafka_airflow", apply_function="kafka_message_queue_trigger.apply_function", ) asset = Asset("kafka_queue_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)]) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
