This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 11376a7159f AIP-82: Add KafkaMessageQueueProvider (#49938)
11376a7159f is described below
commit 11376a7159fb420b3c50fce7693bae34f1c79e60
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Mon May 12 20:22:29 2025 +0800
AIP-82: Add KafkaMessageQueueProvider (#49938)
* AIP-82: Add KafkaMessageQueueProvider
- Add Apache kafka to index.rst
- Fix mypy static check
* Add TestKafkaMessageQueueProvider unittest
* Fix kafka_message_queue_trigger system test
* Refine docstring of KafkaMessageQueueProvider
* Fix common-messaging docs
* Move Kafka message queue to Apache Kafka provider
* Fix common messaging dev dependency-group
* Fix kafka queue test
* Fix compact test
* Refine doc for KafkaMessageQueueProvider
* Move system test to provider-specific directory
---
providers/apache/kafka/docs/index.rst | 1 +
.../apache/kafka/docs/message-queues/index.rst | 70 ++++++++++++++
providers/apache/kafka/provider.yaml | 4 +
providers/apache/kafka/pyproject.toml | 10 +-
.../src/airflow/providers/apache/kafka/__init__.py | 2 +-
.../providers/apache/kafka/get_provider_info.py | 1 +
.../apache/kafka/{ => queues}/__init__.py | 23 -----
.../airflow/providers/apache/kafka/queues/kafka.py | 100 +++++++++++++++++++
.../apache/kafka/kafka_message_queue_trigger.py | 50 ++++++++++
.../unit/apache/kafka/queues}/__init__.py | 23 -----
.../tests/unit/apache/kafka/queues/test_kafka.py | 106 +++++++++++++++++++++
.../apache/kafka/triggers/test_await_message.py | 15 +++
providers/common/messaging/pyproject.toml | 4 +
13 files changed, 359 insertions(+), 50 deletions(-)
diff --git a/providers/apache/kafka/docs/index.rst
b/providers/apache/kafka/docs/index.rst
index a3f89db5cf1..5ac4bc45403 100644
--- a/providers/apache/kafka/docs/index.rst
+++ b/providers/apache/kafka/docs/index.rst
@@ -37,6 +37,7 @@
Connection <connections/kafka>
Hooks <hooks>
Operators <operators/index>
+ Message Queues <message-queues/index>
Sensors <sensors>
Triggers <triggers>
diff --git a/providers/apache/kafka/docs/message-queues/index.rst
b/providers/apache/kafka/docs/message-queues/index.rst
new file mode 100644
index 00000000000..5078a82127d
--- /dev/null
+++ b/providers/apache/kafka/docs/message-queues/index.rst
@@ -0,0 +1,70 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. NOTE TO CONTRIBUTORS:
+ Please, only add notes to the Changelog just below the "Changelog" header
when there are some breaking changes
+ and you want to add an explanation to the users on how they are supposed to
deal with them.
+ The changelog is updated and maintained semi-automatically by release
manager.
+
+
+Apache Kafka Message Queue
+==========================
+
+
+Apache Kafka Queue Provider
+---------------------------
+
+Implemented by
:class:`~airflow.providers.apache.kafka.queues.kafka.KafkaMessageQueueProvider`
+
+
+The Apache Kafka Queue Provider is a message queue provider that uses
+Apache Kafka as the underlying message queue system.
+It allows you to send and receive messages using Kafka topics in your Airflow
workflows.
+The provider supports Kafka topics and provides features for consuming and
processing
+messages from Kafka brokers.
+
+The queue must be matching this regex:
+
+.. exampleinclude:: /../src/airflow/providers/apache/kafka/queues/kafka.py
+ :language: python
+ :dedent: 0
+ :start-after: [START queue_regexp]
+ :end-before: [END queue_regexp]
+
+Queue URI Format:
+
+.. code-block:: text
+
+ kafka://<broker>/<topic_list>
+
+Where:
+
+- ``broker``: Kafka brokers (hostname:port)
+- ``topic_list``: Comma-separated list of Kafka topics to consume messages from
+
+The ``queue`` parameter is used to configure the underlying
+:class:`~airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger`
class and
+passes all kwargs directly to the trigger constructor, if provided.
+The ``apply_function`` kwarg is **required**.
+
+Topics can also be specified via the Queue URI instead of the ``topics``
kwarg. The provider will extract topics from the URI as follows:
+
+.. exampleinclude:: /../src/airflow/providers/apache/kafka/queues/kafka.py
+ :language: python
+ :dedent: 0
+ :start-after: [START extract_topics]
+ :end-before: [END extract_topics]
diff --git a/providers/apache/kafka/provider.yaml
b/providers/apache/kafka/provider.yaml
index 3006661b927..b1afcca73da 100644
--- a/providers/apache/kafka/provider.yaml
+++ b/providers/apache/kafka/provider.yaml
@@ -28,6 +28,7 @@ description: |
# In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider
have
# to be done in the same PR
versions:
+ - 1.9.0
- 1.8.1
- 1.8.0
- 1.7.0
@@ -77,3 +78,6 @@ triggers:
connection-types:
- hook-class-name: airflow.providers.apache.kafka.hooks.base.KafkaBaseHook
connection-type: kafka
+
+queues:
+ - airflow.providers.apache.kafka.queues.kafka.KafkaMessageQueueProvider
diff --git a/providers/apache/kafka/pyproject.toml
b/providers/apache/kafka/pyproject.toml
index bb5ad25859b..cb3cbbbb485 100644
--- a/providers/apache/kafka/pyproject.toml
+++ b/providers/apache/kafka/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "apache-airflow-providers-apache-kafka"
-version = "1.8.1"
+version = "1.9.0"
description = "Provider package apache-airflow-providers-apache-kafka for
Apache Airflow"
readme = "README.rst"
authors = [
@@ -68,12 +68,16 @@ dependencies = [
"google" = [
"apache-airflow-providers-google"
]
+"common.messaging" = [
+ "apache-airflow-providers-common-messaging>=1.0.1"
+]
[dependency-groups]
dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
+ "apache-airflow-providers-common-messaging",
"apache-airflow-providers-google",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
]
@@ -104,8 +108,8 @@ apache-airflow-providers-common-sql = {workspace = true}
apache-airflow-providers-standard = {workspace = true}
[project.urls]
-"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.8.1"
-"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.8.1/changelog.html"
+"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.9.0"
+"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/1.9.0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
b/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
index c2ea7906c79..8d24f62f26e 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "1.8.1"
+__version__ = "1.9.0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.10.0"
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
b/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
index d06f0a9fb4d..b26dfb1ab38 100644
---
a/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
+++
b/providers/apache/kafka/src/airflow/providers/apache/kafka/get_provider_info.py
@@ -72,4 +72,5 @@ def get_provider_info():
"connection-type": "kafka",
}
],
+ "queues":
["airflow.providers.apache.kafka.queues.kafka.KafkaMessageQueueProvider"],
}
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/__init__.py
similarity index 52%
copy from providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
copy to
providers/apache/kafka/src/airflow/providers/apache/kafka/queues/__init__.py
index c2ea7906c79..13a83393a91 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
+++
b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/__init__.py
@@ -14,26 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.1"
-
-if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
- "2.10.0"
-):
- raise RuntimeError(
- f"The package `apache-airflow-providers-apache-kafka:{__version__}`
needs Apache Airflow 2.10.0+"
- )
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py
b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py
new file mode 100644
index 00000000000..45617544d69
--- /dev/null
+++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/queues/kafka.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING
+from urllib.parse import urlparse
+
+from airflow.providers.apache.kafka.triggers.await_message import
AwaitMessageTrigger
+from airflow.providers.common.messaging.providers.base_provider import
BaseMessageQueueProvider
+
+if TYPE_CHECKING:
+ from airflow.triggers.base import BaseEventTrigger
+
+# [START queue_regexp]
+QUEUE_REGEXP = r"^kafka://"
+# [END queue_regexp]
+
+
+class KafkaMessageQueueProvider(BaseMessageQueueProvider):
+ """
+ Configuration for Apache Kafka integration with common-messaging.
+
+ It uses the ``kafka://`` URI scheme for identifying Kafka queues.
+
+ **URI Format**:
+
+ .. code-block:: text
+
+ kafka://<broker>/<topic_list>
+
+ Where:
+
+ * ``broker``: Kafka brokers (hostname:port)
+ * ``topic_list``: Comma-separated list of Kafka topics to consume messages
from
+
+ **Examples**:
+
+ .. code-block:: text
+
+ kafka://localhost:9092/my_topic
+
+ **Required kwargs**:
+
+ * ``apply_function``: Function to process each Kafka message
+
+ You can also provide ``topics`` directly in kwargs instead of in the URI.
+
+ .. code-block:: python
+
+ from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+
+ trigger = MessageQueueTrigger(
+ queue="kafka://localhost:9092/test",
+ apply_function="module.apply_function",
+ )
+
+ For a complete example, see:
+ :mod:`tests.system.common.messaging.kafka_message_queue_trigger`
+ """
+
+ def queue_matches(self, queue: str) -> bool:
+ return bool(re.match(QUEUE_REGEXP, queue))
+
+ def trigger_class(self) -> type[BaseEventTrigger]:
+ return AwaitMessageTrigger # type: ignore[return-value]
+
+ def trigger_kwargs(self, queue: str, **kwargs) -> dict:
+ if "apply_function" not in kwargs:
+ raise ValueError("apply_function is required in
KafkaMessageQueueProvider kwargs")
+
+ # [START extract_topics]
+ # Parse the queue URI
+ parsed = urlparse(queue)
+ # Extract topics (after host list)
+ # parsed.path starts with a '/', so strip it
+ raw_topics = parsed.path.lstrip("/")
+ topics = raw_topics.split(",") if raw_topics else []
+ # [END extract_topics]
+
+ if not topics and "topics" not in kwargs:
+ raise ValueError(
+ "topics is required in KafkaMessageQueueProvider kwargs or
provide them in the queue URI"
+ )
+
+ return {} if "topics" in kwargs else {"topics": topics}
diff --git
a/providers/apache/kafka/tests/system/apache/kafka/kafka_message_queue_trigger.py
b/providers/apache/kafka/tests/system/apache/kafka/kafka_message_queue_trigger.py
new file mode 100644
index 00000000000..b5b9d365b09
--- /dev/null
+++
b/providers/apache/kafka/tests/system/apache/kafka/kafka_message_queue_trigger.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+
+# [START howto_trigger_message_queue]
+from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG, Asset, AssetWatcher
+
+
+def apply_function(message):
+ val = json.loads(message.value())
+ print(f"Value in message is {val}")
+ return True
+
+
+# Define a trigger that listens to an external message queue (Apache Kafka in
this case)
+trigger = MessageQueueTrigger(
+ queue="kafka://localhost:9092/test",
+ apply_function="kafka_message_queue_trigger.apply_function",
+)
+
+# Define an asset that watches for messages on the queue
+asset = Asset("kafka_queue_asset",
watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)])
+
+with DAG(dag_id="example_kafka_watcher", schedule=[asset]) as dag:
+ EmptyOperator(task_id="task")
+# [END howto_trigger_message_queue]
+
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git
a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
b/providers/apache/kafka/tests/unit/apache/kafka/queues/__init__.py
similarity index 52%
copy from providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
copy to providers/apache/kafka/tests/unit/apache/kafka/queues/__init__.py
index c2ea7906c79..13a83393a91 100644
--- a/providers/apache/kafka/src/airflow/providers/apache/kafka/__init__.py
+++ b/providers/apache/kafka/tests/unit/apache/kafka/queues/__init__.py
@@ -14,26 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.1"
-
-if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
- "2.10.0"
-):
- raise RuntimeError(
- f"The package `apache-airflow-providers-apache-kafka:{__version__}`
needs Apache Airflow 2.10.0+"
- )
diff --git
a/providers/apache/kafka/tests/unit/apache/kafka/queues/test_kafka.py
b/providers/apache/kafka/tests/unit/apache/kafka/queues/test_kafka.py
new file mode 100644
index 00000000000..567c2f17102
--- /dev/null
+++ b/providers/apache/kafka/tests/unit/apache/kafka/queues/test_kafka.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.apache.kafka.triggers.await_message import
AwaitMessageTrigger
+
+pytest.importorskip("airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider")
+
+MOCK_KAFKA_TRIGGER_APPLY_FUNCTION = "mock_kafka_trigger_apply_function"
+
+
+class TestKafkaMessageQueueProvider:
+ """Tests for KafkaMessageQueueProvider."""
+
+ def setup_method(self):
+ """Set up the test environment."""
+ from airflow.providers.apache.kafka.queues.kafka import
KafkaMessageQueueProvider
+
+ self.provider = KafkaMessageQueueProvider()
+
+ def test_queue_create(self):
+ """Test the creation of the KafkaMessageQueueProvider."""
+ from airflow.providers.common.messaging.providers.base_provider import
BaseMessageQueueProvider
+
+ assert isinstance(self.provider, BaseMessageQueueProvider)
+
+ @pytest.mark.parametrize(
+ "queue_uri, expected_result",
+ [
+ pytest.param("kafka://localhost:9092/topic1", True,
id="single_broker_single_topic"),
+ pytest.param(
+ "kafka://broker1:9092,broker2:9092/topic1,topic2", True,
id="multiple_brokers_multiple_topics"
+ ),
+ pytest.param("http://example.com", False, id="http_url"),
+ pytest.param("not-a-url", False, id="invalid_url"),
+ ],
+ )
+ def test_queue_matches(self, queue_uri, expected_result):
+ """Test the queue_matches method with various URLs."""
+ assert self.provider.queue_matches(queue_uri) == expected_result
+
+ def test_trigger_class(self):
+ """Test the trigger_class method."""
+ assert self.provider.trigger_class() == AwaitMessageTrigger
+
+ @pytest.mark.parametrize(
+ "queue_uri, extra_kwargs, expected_result",
+ [
+ pytest.param(
+ "kafka://broker:9092/topic1,topic2",
+ {"apply_function": MOCK_KAFKA_TRIGGER_APPLY_FUNCTION},
+ {"topics": ["topic1", "topic2"]},
+ id="topics_from_uri",
+ ),
+ pytest.param(
+ "kafka://broker:9092/",
+ {"topics": ["topic1", "topic2"], "apply_function":
MOCK_KAFKA_TRIGGER_APPLY_FUNCTION},
+ {},
+ id="topics_from_kwargs",
+ ),
+ ],
+ )
+ def test_trigger_kwargs_valid_cases(self, queue_uri, extra_kwargs,
expected_result):
+ """Test the trigger_kwargs method with valid parameters."""
+ kwargs = self.provider.trigger_kwargs(queue_uri, **extra_kwargs)
+ assert kwargs == expected_result
+
+ @pytest.mark.parametrize(
+ "queue_uri, extra_kwargs, expected_error, error_match",
+ [
+ pytest.param(
+ "kafka://broker:9092/topic1",
+ {},
+ ValueError,
+ "apply_function is required in KafkaMessageQueueProvider
kwargs",
+ id="missing_apply_function",
+ ),
+ pytest.param(
+ "kafka://broker:9092/",
+ {"apply_function": MOCK_KAFKA_TRIGGER_APPLY_FUNCTION},
+ ValueError,
+ "topics is required in KafkaMessageQueueProvider kwargs or
provide them in the queue URI",
+ id="missing_topics",
+ ),
+ ],
+ )
+ def test_trigger_kwargs_error_cases(self, queue_uri, extra_kwargs,
expected_error, error_match):
+ """Test that trigger_kwargs raises appropriate errors with invalid
parameters."""
+ with pytest.raises(expected_error, match=error_match):
+ self.provider.trigger_kwargs(queue_uri, **extra_kwargs)
diff --git
a/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py
b/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py
index 6f3632b2c6d..bdd8ea38ded 100644
---
a/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py
+++
b/providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py
@@ -26,6 +26,8 @@ from airflow.providers.apache.kafka.hooks.consume import
KafkaConsumerHook
from airflow.providers.apache.kafka.triggers.await_message import
AwaitMessageTrigger
from airflow.utils import db
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
get_base_airflow_version_tuple
+
pytestmark = pytest.mark.db_test
@@ -127,3 +129,16 @@ class TestTrigger:
await asyncio.sleep(1.0)
assert task.done() is False
asyncio.get_event_loop().stop()
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Requires Airflow 3.0.+")
+class TestMessageQueueTrigger:
+ def test_provider_integrations(self, cleanup_providers_manager):
+ if get_base_airflow_version_tuple() < (3, 0, 1):
+ pytest.skip("This test is only for Airflow 3.0.1+")
+
+ queue = "kafka://localhost:9092/topic1"
+ from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+
+ trigger = MessageQueueTrigger(queue=queue,
apply_function="mock_kafka_trigger_apply_function")
+ assert isinstance(trigger.trigger, AwaitMessageTrigger)
diff --git a/providers/common/messaging/pyproject.toml
b/providers/common/messaging/pyproject.toml
index cac7e940db5..1af5e503a7a 100644
--- a/providers/common/messaging/pyproject.toml
+++ b/providers/common/messaging/pyproject.toml
@@ -69,6 +69,9 @@ dependencies = [
"amazon" = [
"apache-airflow-providers-amazon>=9.7.0"
]
+"apache.kafka" = [
+ "apache-airflow-providers-apache-kafka>=1.9.0"
+]
[dependency-groups]
dev = [
@@ -77,6 +80,7 @@ dev = [
"apache-airflow-devel-common",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
"apache-airflow-providers-amazon",
+ "apache-airflow-providers-apache-kafka",
]
# To build docs: