This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1c53e0844d5 Fix the serialization issue of AwaitMessageTrigger with
Redis. (#58746)
1c53e0844d5 is described below
commit 1c53e0844d515f551b9616cd2115d8b7ee658856
Author: Nick <[email protected]>
AuthorDate: Tue Dec 2 17:47:23 2025 +0800
Fix the serialization issue of AwaitMessageTrigger with Redis. (#58746)
---
.../redis/triggers/redis_await_message.py | 4 ++++
.../redis/triggers/test_redis_await_message.py | 23 ++++++++++++++++++++++
2 files changed, 27 insertions(+)
diff --git
a/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py
b/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py
index 8292eb1b848..2bb3f6ccc44 100644
---
a/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py
+++
b/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py
@@ -67,6 +67,10 @@ class AwaitMessageTrigger(BaseTrigger):
message = await async_get_message()
if message and message["type"] == "message":
+ if "channel" in message and isinstance(message["channel"],
bytes):
+ message["channel"] = message["channel"].decode("utf-8")
+ if "data" in message and isinstance(message["data"], bytes):
+ message["data"] = message["data"].decode("utf-8")
yield TriggerEvent(message)
break
await asyncio.sleep(self.poll_interval)
diff --git
a/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py
b/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py
index 193fdb107fe..c4256508b32 100644
--- a/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py
+++ b/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py
@@ -67,6 +67,29 @@ class TestAwaitMessageTrigger:
assert event.payload["channel"] == "test"
asyncio.get_event_loop().stop()
+ @patch("airflow.providers.redis.hooks.redis.RedisHook.get_conn")
+ @pytest.mark.asyncio
+ async def test_trigger_run_succeed_with_bytes(self, mock_redis_conn):
+ trigger = AwaitMessageTrigger(
+ channels="test",
+ redis_conn_id="redis_default",
+ poll_interval=0.0001,
+ )
+
+ mock_redis_conn().pubsub().get_message.return_value = {
+ "type": "message",
+ "channel": b"test",
+ "data": b"d1",
+ }
+
+ trigger_gen = trigger.run()
+ task = asyncio.create_task(trigger_gen.__anext__())
+ event = await task
+ assert task.done() is True
+ assert event.payload["data"] == "d1"
+ assert event.payload["channel"] == "test"
+ asyncio.get_event_loop().stop()
+
@patch("airflow.providers.redis.hooks.redis.RedisHook.get_conn")
@pytest.mark.asyncio
async def test_trigger_run_fail(self, mock_redis_conn):