This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 339daf6bc07 fix(messaging): improve MessageQueueTrigger logging and
add comprehensive tests (#54492)
339daf6bc07 is described below
commit 339daf6bc070976e9bc5a844b4ef1b1e30b1357f
Author: Deji Ibrahim <[email protected]>
AuthorDate: Fri Aug 15 20:17:13 2025 +0100
fix(messaging): improve MessageQueueTrigger logging and add comprehensive
tests (#54492)
---
.../common/messaging/triggers/msg_queue.py | 4 +-
.../common/messaging/triggers/test_msg_queue.py | 170 +++++++++++++++++++++
2 files changed, 172 insertions(+), 2 deletions(-)
diff --git
a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
index 7c5ff6b75eb..7dcfcc19bcd 100644
---
a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
+++
b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py
@@ -72,7 +72,7 @@ class MessageQueueTrigger(BaseEventTrigger):
"The queue '%s' is not recognized by any of the registered
providers. "
"The available providers are: '%s'.",
self.queue,
- ", ".join([provider for provider in MESSAGE_QUEUE_PROVIDERS]),
+ ", ".join([type(provider).__name__ for provider in
MESSAGE_QUEUE_PROVIDERS]),
)
raise ValueError("The queue is not recognized by any of the
registered providers.")
if len(providers) > 1:
@@ -81,7 +81,7 @@ class MessageQueueTrigger(BaseEventTrigger):
"At least two providers in ``MESSAGE_QUEUE_PROVIDERS`` are
colliding with each "
"other: '%s'",
self.queue,
- ", ".join([provider for provider in providers]),
+ ", ".join([type(provider).__name__ for provider in providers]),
)
raise ValueError(f"The queue '{self.queue}' is recognized by more
than one provider.")
return providers[0].trigger_class()(
diff --git
a/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
b/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
index b2b02c01063..7fd4394c942 100644
---
a/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
+++
b/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py
@@ -17,8 +17,178 @@
from __future__ import annotations
from unittest import mock
+from unittest.mock import MagicMock
+
+import pytest
from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
+from airflow.triggers.base import BaseEventTrigger
+
+
+class MockProvider:
+ """Mock provider for testing."""
+
+ def __init__(self, name: str, pattern: str):
+ self.name = name
+ self.pattern = pattern
+ self.mock_trigger = MagicMock(spec=BaseEventTrigger)
+
+ def queue_matches(self, queue: str) -> bool:
+ return queue.startswith(self.pattern)
+
+ def trigger_class(self):
+ return type(self.mock_trigger)
+
+ def trigger_kwargs(self, queue: str, **kwargs):
+ return {"queue": queue}
+
+
+PROVIDER_1_NAME = "SQSMessageQueueProvider"
+PROVIDER_1_PATTERN = "sqs://"
+PROVIDER_1_QUEUE = "sqs://test-queue"
+PROVIDER_2_NAME = "KafkaMessageQueueProvider"
+PROVIDER_2_PATTERN = "kafka://"
+PROVIDER_2_QUEUE = "kafka://test-queue"
+
+UNKNOWN_QUEUE = "unknown://queue"
+UNSUPPORTED_QUEUE = "unsupported://queue"
+TEST_QUEUE = "test://queue"
+
+NO_PROVIDERS_ERROR = "No message queue providers are available"
+INSTALL_PROVIDERS_MESSAGE = "Please ensure that you have the necessary
providers installed"
+NOT_RECOGNIZED_ERROR = "The queue is not recognized by any of the registered
providers"
+MULTIPLE_PROVIDERS_ERROR = "The queue '{queue}' is recognized by more than one
provider"
+
+
+MESSAGE_QUEUE_PROVIDERS_PATH =
"airflow.providers.common.messaging.triggers.msg_queue.MESSAGE_QUEUE_PROVIDERS"
+
+
+class TestMessageQueueTrigger:
+ """Test cases for MessageQueueTrigger error handling and provider
matching."""
+
+ def test_no_providers_available(self):
+ """Test error when no message queue providers are available."""
+ trigger = MessageQueueTrigger(queue=TEST_QUEUE)
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, []):
+ with pytest.raises(ValueError, match=NO_PROVIDERS_ERROR):
+ _ = trigger.trigger
+
+ def test_queue_not_recognized_by_any_provider(self):
+ """Test error when queue is not recognized by any provider."""
+ # Create mock providers that don't match the queue
+ provider1 = MockProvider(PROVIDER_1_NAME, PROVIDER_1_PATTERN)
+ provider2 = MockProvider(PROVIDER_2_NAME, PROVIDER_2_PATTERN)
+
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, [provider1, provider2]):
+ trigger = MessageQueueTrigger(queue=UNKNOWN_QUEUE)
+
+ with pytest.raises(ValueError, match=NOT_RECOGNIZED_ERROR):
+ _ = trigger.trigger
+
+ def test_queue_recognized_by_multiple_providers(self):
+ """Test error when queue is recognized by multiple providers
(collision)."""
+ # Create mock providers that both match the same queue pattern
+ provider1 = MockProvider(PROVIDER_1_NAME, PROVIDER_1_PATTERN)
+ provider2 = MockProvider(PROVIDER_2_NAME, PROVIDER_1_PATTERN)
+
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, [provider1, provider2]):
+ trigger = MessageQueueTrigger(queue=PROVIDER_1_QUEUE)
+
+ with pytest.raises(ValueError,
match=MULTIPLE_PROVIDERS_ERROR.format(queue=PROVIDER_1_QUEUE)):
+ _ = trigger.trigger
+
+ def test_successful_provider_matching(self):
+ """Test successful provider matching and trigger creation."""
+ provider1 = MockProvider(PROVIDER_1_NAME, PROVIDER_1_PATTERN)
+ provider2 = MockProvider(PROVIDER_2_NAME, PROVIDER_2_PATTERN)
+
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, [provider1, provider2]):
+ trigger = MessageQueueTrigger(queue=PROVIDER_1_QUEUE,
extra_param="value")
+
+ result_trigger = trigger.trigger
+
+ assert result_trigger is not None
+
+ def test_provider_class_names_in_logging(self):
+ """Test that provider class names (not objects) are logged in error
messages."""
+ provider1 = MockProvider(PROVIDER_1_NAME, PROVIDER_1_PATTERN)
+ provider2 = MockProvider(PROVIDER_2_NAME, PROVIDER_2_PATTERN)
+
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, [provider1, provider2]):
+ trigger = MessageQueueTrigger(queue=UNSUPPORTED_QUEUE)
+
+ with pytest.raises(ValueError):
+ _ = trigger.trigger
+
+ def test_trigger_kwargs_passed_correctly(self):
+ """Test that kwargs are passed correctly to the selected provider."""
+ provider = MockProvider(PROVIDER_1_NAME, PROVIDER_1_PATTERN)
+
+ mock_trigger_class = MagicMock()
+ mock_trigger_instance = MagicMock(spec=BaseEventTrigger)
+ mock_trigger_class.return_value = mock_trigger_instance
+
+ provider.trigger_class = MagicMock(return_value=mock_trigger_class)
+ provider.trigger_kwargs = MagicMock(return_value={"processed_queue":
"test://processed"})
+
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, [provider]):
+ trigger = MessageQueueTrigger(queue=PROVIDER_1_QUEUE,
param1="value1", param2="value2")
+
+ result = trigger.trigger
+
+ provider.trigger_kwargs.assert_called_once_with(
+ PROVIDER_1_QUEUE, param1="value1", param2="value2"
+ )
+
+ # Verify trigger class was instantiated with combined kwargs
+ mock_trigger_class.assert_called_once_with(
+ processed_queue="test://processed", param1="value1",
param2="value2"
+ )
+
+ assert result == mock_trigger_instance
+
+ def test_serialize_delegates_to_underlying_trigger(self):
+ """Test that serialize method delegates to the underlying trigger."""
+ provider = MockProvider(PROVIDER_1_NAME, PROVIDER_1_PATTERN)
+
+ mock_trigger_instance = MagicMock(spec=BaseEventTrigger)
+ mock_trigger_instance.serialize.return_value =
("test.module.TestTrigger", {"param": "value"})
+
+ mock_trigger_class = MagicMock(return_value=mock_trigger_instance)
+ provider.trigger_class = MagicMock(return_value=mock_trigger_class)
+ provider.trigger_kwargs = MagicMock(return_value={})
+
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, [provider]):
+ trigger = MessageQueueTrigger(queue=PROVIDER_1_QUEUE)
+
+ result = trigger.serialize()
+
+ mock_trigger_instance.serialize.assert_called_once()
+ assert result == ("test.module.TestTrigger", {"param": "value"})
+
+ @pytest.mark.asyncio
+ async def test_run_delegates_to_underlying_trigger(self):
+ """Test that run method delegates to the underlying trigger."""
+ provider = MockProvider(PROVIDER_1_NAME, PROVIDER_1_PATTERN)
+
+ mock_trigger_instance = MagicMock(spec=BaseEventTrigger)
+
+ async def mock_run():
+ yield MagicMock()
+
+ mock_trigger_instance.run.return_value = mock_run()
+
+ mock_trigger_class = MagicMock(return_value=mock_trigger_instance)
+ provider.trigger_class = MagicMock(return_value=mock_trigger_class)
+ provider.trigger_kwargs = MagicMock(return_value={})
+
+ with mock.patch(MESSAGE_QUEUE_PROVIDERS_PATH, [provider]):
+ trigger = MessageQueueTrigger(queue=PROVIDER_1_QUEUE)
+
+ async_gen = trigger.run()
+ event = await async_gen.__anext__()
+ mock_trigger_instance.run.assert_called_once()
+ assert event is not None
@mock.patch(