This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c53e0844d5 Fix the serialization issue of AwaitMessageTrigger with 
Redis. (#58746)
1c53e0844d5 is described below

commit 1c53e0844d515f551b9616cd2115d8b7ee658856
Author: Nick <[email protected]>
AuthorDate: Tue Dec 2 17:47:23 2025 +0800

    Fix the serialization issue of AwaitMessageTrigger with Redis. (#58746)
---
 .../redis/triggers/redis_await_message.py          |  4 ++++
 .../redis/triggers/test_redis_await_message.py     | 23 ++++++++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git 
a/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py 
b/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py
index 8292eb1b848..2bb3f6ccc44 100644
--- 
a/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py
+++ 
b/providers/redis/src/airflow/providers/redis/triggers/redis_await_message.py
@@ -67,6 +67,10 @@ class AwaitMessageTrigger(BaseTrigger):
             message = await async_get_message()
 
             if message and message["type"] == "message":
+                if "channel" in message and isinstance(message["channel"], 
bytes):
+                    message["channel"] = message["channel"].decode("utf-8")
+                if "data" in message and isinstance(message["data"], bytes):
+                    message["data"] = message["data"].decode("utf-8")
                 yield TriggerEvent(message)
                 break
             await asyncio.sleep(self.poll_interval)
diff --git 
a/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py 
b/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py
index 193fdb107fe..c4256508b32 100644
--- a/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py
+++ b/providers/redis/tests/unit/redis/triggers/test_redis_await_message.py
@@ -67,6 +67,29 @@ class TestAwaitMessageTrigger:
         assert event.payload["channel"] == "test"
         asyncio.get_event_loop().stop()
 
+    @patch("airflow.providers.redis.hooks.redis.RedisHook.get_conn")
+    @pytest.mark.asyncio
+    async def test_trigger_run_succeed_with_bytes(self, mock_redis_conn):
+        trigger = AwaitMessageTrigger(
+            channels="test",
+            redis_conn_id="redis_default",
+            poll_interval=0.0001,
+        )
+
+        mock_redis_conn().pubsub().get_message.return_value = {
+            "type": "message",
+            "channel": b"test",
+            "data": b"d1",
+        }
+
+        trigger_gen = trigger.run()
+        task = asyncio.create_task(trigger_gen.__anext__())
+        event = await task
+        assert task.done() is True
+        assert event.payload["data"] == "d1"
+        assert event.payload["channel"] == "test"
+        asyncio.get_event_loop().stop()
+
     @patch("airflow.providers.redis.hooks.redis.RedisHook.get_conn")
     @pytest.mark.asyncio
     async def test_trigger_run_fail(self, mock_redis_conn):

Reply via email to