This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 17a3dd4135 Move redis subscribe to poke() method in Redis Sensor
(#32984)
17a3dd4135 is described below
commit 17a3dd41355c29ed55fcf449b3b7002d4085e9bb
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Aug 1 02:06:03 2023 +0200
Move redis subscribe to poke() method in Redis Sensor (#32984)
In RedisPubSubSensor subscription has been done in constructor,
which was pretty wrong - for example it means that when scheduler
parses the sensor, it involves subscribing to the messages and
commmunication with redis DB.
This PR moves subscription to "poke()" method, which is executed
on worker instead.
---
airflow/providers/redis/sensors/redis_pub_sub.py | 9 ++++++---
tests/integration/providers/redis/sensors/test_redis_pub_sub.py | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/redis/sensors/redis_pub_sub.py
b/airflow/providers/redis/sensors/redis_pub_sub.py
index 4fd6c826ca..4501758a61 100644
--- a/airflow/providers/redis/sensors/redis_pub_sub.py
+++ b/airflow/providers/redis/sensors/redis_pub_sub.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from functools import cached_property
from typing import TYPE_CHECKING, Sequence
from airflow.providers.redis.hooks.redis import RedisHook
@@ -41,8 +42,10 @@ class RedisPubSubSensor(BaseSensorOperator):
super().__init__(**kwargs)
self.channels = channels
self.redis_conn_id = redis_conn_id
- self.pubsub =
RedisHook(redis_conn_id=self.redis_conn_id).get_conn().pubsub()
- self.pubsub.subscribe(self.channels)
+
+ @cached_property
+ def pubsub(self):
+ return RedisHook(redis_conn_id=self.redis_conn_id).get_conn().pubsub()
def poke(self, context: Context) -> bool:
"""
@@ -54,7 +57,7 @@ class RedisPubSubSensor(BaseSensorOperator):
:return: ``True`` if message (with type 'message') is available or
``False`` if not
"""
self.log.info("RedisPubSubSensor checking for message on channels:
%s", self.channels)
-
+ self.pubsub.subscribe(self.channels)
message = self.pubsub.get_message()
self.log.info("Message %s from channel %s", message, self.channels)
diff --git a/tests/integration/providers/redis/sensors/test_redis_pub_sub.py
b/tests/integration/providers/redis/sensors/test_redis_pub_sub.py
index e50967328f..f452871bdb 100644
--- a/tests/integration/providers/redis/sensors/test_redis_pub_sub.py
+++ b/tests/integration/providers/redis/sensors/test_redis_pub_sub.py
@@ -46,10 +46,10 @@ class TestRedisPubSubSensor:
hook = RedisHook(redis_conn_id="redis_default")
redis = hook.get_conn()
- redis.publish("test", "message")
result = sensor.poke(self.mock_context)
assert not result
+ redis.publish("test", "message")
for _ in range(1, 10):
result = sensor.poke(self.mock_context)