This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 11376a7159f AIP-82: Add KafkaMessageQueueProvider (#49938)
11376a7159f is described below

commit 11376a7159fb420b3c50fce7693bae34f1c79e60
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Mon May 12 20:22:29 2025 +0800

    AIP-82: Add KafkaMessageQueueProvider (#49938)
    
    * AIP-82: Add KafkaMessageQueueProvider
    
    - Add Apache kafka to index.rst
    
    - Fix mypy static check
    
    * Add TestKafkaMessageQueueProvider unittest
    
    * Fix kafka_message_queue_trigger system test
    
    * Refine docstring of KafkaMessageQueueProvider
    
    * Fix common-messaging docs
    
    * Move Kafka message queue to Apache Kafka provider
    
    * Fix common messaging dev dependency-group
    
    * Fix kafka queue test
    
    * Fix compact test
    
    * Refine doc for KafkaMessageQueueProvider
    
    * Move system test to provider-specific directory
---
 providers/apache/kafka/docs/index.rst              |   1 +
 .../apache/kafka/docs/message-queues/index.rst     |  70 ++++++++++++++
 providers/apache/kafka/provider.yaml               |   4 +
 providers/apache/kafka/pyproject.toml              |  10 +-
 .../src/airflow/providers/apache/kafka/__init__.py |   2 +-
 .../providers/apache/kafka/get_provider_info.py    |   1 +
 .../apache/kafka/{ => queues}/__init__.py          |  23 -----
 .../airflow/providers/apache/kafka/queues/kafka.py | 100 +++++++++++++++++++
 .../apache/kafka/kafka_message_queue_trigger.py    |  50 ++++++++++
 .../unit/apache/kafka/queues}/__init__.py          |  23 -----
 .../tests/unit/apache/kafka/queues/test_kafka.py   | 106 +++++++++++++++++++++
 .../apache/kafka/triggers/test_await_message.py    |  15 +++
 providers/common/messaging/pyproject.toml          |   4 +
 13 files changed, 359 insertions(+), 50 deletions(-)

diff --git a/providers/apache/kafka/docs/index.rst 
b/providers/apache/kafka/docs/index.rst
index a3f89db5cf1..5ac4bc45403 100644
--- a/providers/apache/kafka/docs/index.rst
+++ b/providers/apache/kafka/docs/index.rst
@@ -37,6 +37,7 @@
     Connection <connections/kafka>
     Hooks <hooks>
     Operators <operators/index>
+    Message Queues <message-queues/index>
     Sensors <sensors>
     Triggers <triggers>
 
diff --git a/providers/apache/kafka/docs/message-queues/index.rst 
b/providers/apache/kafka/docs/message-queues/index.rst
new file mode 100644
index 00000000000..5078a82127d
--- /dev/null
+++ b/providers/apache/kafka/docs/message-queues/index.rst
@@ -0,0 +1,70 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. NOTE TO CONTRIBUTORS:
+   Please, only add notes to the Changelog just below the "Changelog" header 
when there are some breaking changes
+   and you want to add an explanation to the users on how they are supposed to 
deal with them.
+   The changelog is updated and maintained semi-automatically by release 
manager.
+
+
+Apache Kafka Message Queue
+==========================
+
+
+Apache Kafka Queue Provider
+---------------------------
+
+Implemented by 
:class:`~airflow.providers.apache.kafka.queues.kafka.KafkaMessageQueueProvider`
+
+
+The Apache Kafka Queue Provider is a message queue provider that uses
+Apache Kafka as the underlying message queue system.
+It allows you to send and receive messages using Kafka topics in your Airflow 
workflows.
+The provider supports Kafka topics and provides features for consuming and 
processing
+messages from Kafka brokers.
+
+The queue must be matching this regex:
+
+.. exampleinclude:: /../src/airflow/providers/apache/kafka/queues/kafka.py
+    :language: python
+    :dedent: 0
+    :start-after: [START queue_regexp]
+    :end-before: [END queue_regexp]
+
+Queue URI Format:
+
+.. code-block:: text
+
+    kafka://<broker>/<topic_list>
+
+Where:
+
+- ``broker``: Kafka brokers (hostname:port)
+- ``topic_list``: Comma-separated list of Kafka topics to consume messages from
+
+The ``queue`` parameter is used to configure the underlying
+:class:`~airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger`
 class and
+passes all kwargs directly to the trigger constructor, if provided.
+The ``apply_function`` kwarg is **required**.
+
+Topics can also be specified via the Queue URI instead of the ``topics`` 
kwarg. The provider will extract topics from the URI as follows:
+
+.. exampleinclude:: /../src/airflow/providers/apache/kafka/queues/kafka.py
+    :language: python
+    :dedent: 0
+    :start-after: [START extract_topics]
+    :end-before: [END extract_topics]
diff --git a/providers/apache/kafka/provider.yaml 
b/providers/apache/kafka/provider.yaml
index 3006661b927..b1afcca73da 100644
--- a/providers/apache/kafka/provider.yaml
+++ b/providers/apache/kafka/provider.yaml
@@ -28,6 +28,7 @@ description: |
 # In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider 
have
 # to be done in the same PR
 versions:
+  - 1.9.0
   - 1.8.1
   - 1.8.0
   - 1.7.0
@@ -77,3 +78,6 @@ triggers:
 connection-types:
   - hook-class-name: airflow.providers.apache.kafka.hooks.base.KafkaBaseHook
     connection-type: kafka
+
+queues:
+  - airflow.providers.apache.kafka.queues.kafka.KafkaMessageQueueProvider
diff --git a/providers/apache/kafka/pyproject.toml 
b/providers/apache/kafka/pyproject.toml
index bb5ad25859b..cb3cbbbb485 100644
--- a/providers/apache/kafka/pyproject.toml
+++ b/providers/apache/kafka/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
 
 [project]
 name = "apache-airflow-providers-apache-kafka"
-version = "1.8.1"
+version = "1.9.0"
 description = "Provider package apache-airflow-providers-apache-kafka for 
Apache Airflow"
 readme = "README.rst"
 authors = [
@@ -68,12 +68,16 @@ dependencies = [
 "google" = [
     "apache-airflow-providers-google"
 ]
+"common.messaging" = [
+    "apache-airflow-providers-common-messaging>=1.0.1"
+]
 
 [dependency-groups]
 dev = [
     "apache-airflow",
     "apache-airflow-task-sdk",
     "apache-airflow-devel-common",
+    "apache-airflow-providers-common-messaging",
     "apache-airflow-providers-google",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
 ]
@@ -104,8 +108,8 @@ apache-airflow-providers-common-sql = {workspace = true}
 apache-airflow-providers-standard = {workspace = true}
 
 [project.urls]
-"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.8.1";
-"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.8.1/changelog.html";
+"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.9.0";
+"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.9.0/changelog.html";
 "Bug Tracker" = "https://github.com/apache/airflow/issues";
 "Source Code" = "https://github.com/apache/airflow";
 "Slack Chat" = "https://s.apache.org/airflow-slack";
diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
index c2ea7906c79..8d24f62f26e 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
 
 __all__ = ["__version__"]
 
-__version__ = "1.8.1"
+__version__ = "1.9.0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
     "2.10.0"
diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
index d06f0a9fb4d..b26dfb1ab38 100644
--- 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
+++ 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
@@ -72,4 +72,5 @@ def get_provider_info():
                 "connection-type": "kafka",
             }
         ],
+        "queues": 
["airflow.providers.apache.kafka.queues.kafka.KafkaMessageQueueProvider"],
     }
diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/__init__.py
similarity index 52%
copy from providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
copy to 
providers/apache/kafka/src/airflow/providers/apache/kafka/queues/__init__.py
index c2ea7906c79..13a83393a91 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
+++ 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/__init__.py
@@ -14,26 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the 
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.1"
-
-if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.10.0"
-):
-    raise RuntimeError(
-        f"The package `apache-airflow-providers-apache-kafka:{__version__}` 
needs Apache Airflow 2.10.0+"
-    )
diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py 
b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py
new file mode 100644
index 00000000000..45617544d69
--- /dev/null
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING
+from urllib.parse import urlparse
+
+from airflow.providers.apache.kafka.triggers.await_message import 
AwaitMessageTrigger
+from airflow.providers.common.messaging.providers.base_provider import 
BaseMessageQueueProvider
+
+if TYPE_CHECKING:
+    from airflow.triggers.base import BaseEventTrigger
+
+# [START queue_regexp]
+QUEUE_REGEXP = r"^kafka://"
+# [END queue_regexp]
+
+
+class KafkaMessageQueueProvider(BaseMessageQueueProvider):
+    """
+    Configuration for Apache Kafka integration with common-messaging.
+
+    It uses the ``kafka://`` URI scheme for identifying Kafka queues.
+
+    **URI Format**:
+
+    .. code-block:: text
+
+        kafka://<broker>/<topic_list>
+
+    Where:
+
+    * ``broker``: Kafka brokers (hostname:port)
+    * ``topic_list``: Comma-separated list of Kafka topics to consume messages 
from
+
+    **Examples**:
+
+    .. code-block:: text
+
+        kafka://localhost:9092/my_topic
+
+    **Required kwargs**:
+
+    * ``apply_function``: Function to process each Kafka message
+
+    You can also provide ``topics`` directly in kwargs instead of in the URI.
+
+    .. code-block:: python
+
+        from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
+
+        trigger = MessageQueueTrigger(
+            queue="kafka://localhost:9092/test",
+            apply_function="module.apply_function",
+        )
+
+    For a complete example, see:
+    :mod:`tests.system.common.messaging.kafka_message_queue_trigger`
+    """
+
+    def queue_matches(self, queue: str) -> bool:
+        return bool(re.match(QUEUE_REGEXP, queue))
+
+    def trigger_class(self) -> type[BaseEventTrigger]:
+        return AwaitMessageTrigger  # type: ignore[return-value]
+
+    def trigger_kwargs(self, queue: str, **kwargs) -> dict:
+        if "apply_function" not in kwargs:
+            raise ValueError("apply_function is required in 
KafkaMessageQueueProvider kwargs")
+
+        # [START extract_topics]
+        # Parse the queue URI
+        parsed = urlparse(queue)
+        # Extract topics (after host list)
+        # parsed.path starts with a '/', so strip it
+        raw_topics = parsed.path.lstrip("/")
+        topics = raw_topics.split(",") if raw_topics else []
+        # [END extract_topics]
+
+        if not topics and "topics" not in kwargs:
+            raise ValueError(
+                "topics is required in KafkaMessageQueueProvider kwargs or 
provide them in the queue URI"
+            )
+
+        return {} if "topics" in kwargs else {"topics": topics}
diff --git 
a/providers/apache/kafka/tests/system/apache/kafka/kafka_message_queue_trigger.py
 
b/providers/apache/kafka/tests/system/apache/kafka/kafka_message_queue_trigger.py
new file mode 100644
index 00000000000..b5b9d365b09
--- /dev/null
+++ 
b/providers/apache/kafka/tests/system/apache/kafka/kafka_message_queue_trigger.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+
+# [START howto_trigger_message_queue]
+from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG, Asset, AssetWatcher
+
+
+def apply_function(message):
+    val = json.loads(message.value())
+    print(f"Value in message is {val}")
+    return True
+
+
+# Define a trigger that listens to an external message queue (Apache Kafka in 
this case)
+trigger = MessageQueueTrigger(
+    queue="kafka://localhost:9092/test",
+    apply_function="kafka_message_queue_trigger.apply_function",
+)
+
+# Define an asset that watches for messages on the queue
+asset = Asset("kafka_queue_asset", 
watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)])
+
+with DAG(dag_id="example_kafka_watcher", schedule=[asset]) as dag:
+    EmptyOperator(task_id="task")
+# [END howto_trigger_message_queue]
+
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py 
b/providers/apache/kafka/tests/unit/apache/kafka/queues/__init__.py
similarity index 52%
copy from providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
copy to providers/apache/kafka/tests/unit/apache/kafka/queues/__init__.py
index c2ea7906c79..13a83393a91 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
+++ b/providers/apache/kafka/tests/unit/apache/kafka/queues/__init__.py
@@ -14,26 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the 
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.1"
-
-if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.10.0"
-):
-    raise RuntimeError(
-        f"The package `apache-airflow-providers-apache-kafka:{__version__}` 
needs Apache Airflow 2.10.0+"
-    )
diff --git 
a/providers/apache/kafka/tests/unit/apache/kafka/queues/test_kafka.py 
b/providers/apache/kafka/tests/unit/apache/kafka/queues/test_kafka.py
new file mode 100644
index 00000000000..567c2f17102
--- /dev/null
+++ b/providers/apache/kafka/tests/unit/apache/kafka/queues/test_kafka.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.apache.kafka.triggers.await_message import 
AwaitMessageTrigger
+
+pytest.importorskip("airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider")
+
+MOCK_KAFKA_TRIGGER_APPLY_FUNCTION = "mock_kafka_trigger_apply_function"
+
+
+class TestKafkaMessageQueueProvider:
+    """Tests for KafkaMessageQueueProvider."""
+
+    def setup_method(self):
+        """Set up the test environment."""
+        from airflow.providers.apache.kafka.queues.kafka import 
KafkaMessageQueueProvider
+
+        self.provider = KafkaMessageQueueProvider()
+
+    def test_queue_create(self):
+        """Test the creation of the KafkaMessageQueueProvider."""
+        from airflow.providers.common.messaging.providers.base_provider import 
BaseMessageQueueProvider
+
+        assert isinstance(self.provider, BaseMessageQueueProvider)
+
+    @pytest.mark.parametrize(
+        "queue_uri, expected_result",
+        [
+            pytest.param("kafka://localhost:9092/topic1", True, 
id="single_broker_single_topic"),
+            pytest.param(
+                "kafka://broker1:9092,broker2:9092/topic1,topic2", True, 
id="multiple_brokers_multiple_topics"
+            ),
+            pytest.param("http://example.com";, False, id="http_url"),
+            pytest.param("not-a-url", False, id="invalid_url"),
+        ],
+    )
+    def test_queue_matches(self, queue_uri, expected_result):
+        """Test the queue_matches method with various URLs."""
+        assert self.provider.queue_matches(queue_uri) == expected_result
+
+    def test_trigger_class(self):
+        """Test the trigger_class method."""
+        assert self.provider.trigger_class() == AwaitMessageTrigger
+
+    @pytest.mark.parametrize(
+        "queue_uri, extra_kwargs, expected_result",
+        [
+            pytest.param(
+                "kafka://broker:9092/topic1,topic2",
+                {"apply_function": MOCK_KAFKA_TRIGGER_APPLY_FUNCTION},
+                {"topics": ["topic1", "topic2"]},
+                id="topics_from_uri",
+            ),
+            pytest.param(
+                "kafka://broker:9092/",
+                {"topics": ["topic1", "topic2"], "apply_function": 
MOCK_KAFKA_TRIGGER_APPLY_FUNCTION},
+                {},
+                id="topics_from_kwargs",
+            ),
+        ],
+    )
+    def test_trigger_kwargs_valid_cases(self, queue_uri, extra_kwargs, 
expected_result):
+        """Test the trigger_kwargs method with valid parameters."""
+        kwargs = self.provider.trigger_kwargs(queue_uri, **extra_kwargs)
+        assert kwargs == expected_result
+
+    @pytest.mark.parametrize(
+        "queue_uri, extra_kwargs, expected_error, error_match",
+        [
+            pytest.param(
+                "kafka://broker:9092/topic1",
+                {},
+                ValueError,
+                "apply_function is required in KafkaMessageQueueProvider 
kwargs",
+                id="missing_apply_function",
+            ),
+            pytest.param(
+                "kafka://broker:9092/",
+                {"apply_function": MOCK_KAFKA_TRIGGER_APPLY_FUNCTION},
+                ValueError,
+                "topics is required in KafkaMessageQueueProvider kwargs or 
provide them in the queue URI",
+                id="missing_topics",
+            ),
+        ],
+    )
+    def test_trigger_kwargs_error_cases(self, queue_uri, extra_kwargs, 
expected_error, error_match):
+        """Test that trigger_kwargs raises appropriate errors with invalid 
parameters."""
+        with pytest.raises(expected_error, match=error_match):
+            self.provider.trigger_kwargs(queue_uri, **extra_kwargs)
diff --git 
a/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py 
b/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py
index 6f3632b2c6d..bdd8ea38ded 100644
--- 
a/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py
+++ 
b/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py
@@ -26,6 +26,8 @@ from airflow.providers.apache.kafka.hooks.consume import 
KafkaConsumerHook
 from airflow.providers.apache.kafka.triggers.await_message import 
AwaitMessageTrigger
 from airflow.utils import db
 
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
get_base_airflow_version_tuple
+
 pytestmark = pytest.mark.db_test
 
 
@@ -127,3 +129,16 @@ class TestTrigger:
         await asyncio.sleep(1.0)
         assert task.done() is False
         asyncio.get_event_loop().stop()
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Requires Airflow 3.0.+")
+class TestMessageQueueTrigger:
+    def test_provider_integrations(self, cleanup_providers_manager):
+        if get_base_airflow_version_tuple() < (3, 0, 1):
+            pytest.skip("This test is only for Airflow 3.0.1+")
+
+        queue = "kafka://localhost:9092/topic1"
+        from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
+
+        trigger = MessageQueueTrigger(queue=queue, 
apply_function="mock_kafka_trigger_apply_function")
+        assert isinstance(trigger.trigger, AwaitMessageTrigger)
diff --git a/providers/common/messaging/pyproject.toml 
b/providers/common/messaging/pyproject.toml
index cac7e940db5..1af5e503a7a 100644
--- a/providers/common/messaging/pyproject.toml
+++ b/providers/common/messaging/pyproject.toml
@@ -69,6 +69,9 @@ dependencies = [
 "amazon" = [
     "apache-airflow-providers-amazon>=9.7.0"
 ]
+"apache.kafka" = [
+    "apache-airflow-providers-apache-kafka>=1.9.0"
+]
 
 [dependency-groups]
 dev = [
@@ -77,6 +80,7 @@ dev = [
     "apache-airflow-devel-common",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
     "apache-airflow-providers-amazon",
+    "apache-airflow-providers-apache-kafka",
 ]
 
 # To build docs:

Reply via email to