This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 0a2acf0 Add type annotations for redis provider (#9815)
0a2acf0 is described below
commit 0a2acf0b6542b717f87dee6bbff43397bbb0e83b
Author: Alexander Sutcliffe <[email protected]>
AuthorDate: Tue Jul 14 15:52:38 2020 +0200
Add type annotations for redis provider (#9815)
---
airflow/providers/redis/hooks/redis.py | 2 +-
airflow/providers/redis/operators/redis_publish.py | 12 +++++++-----
airflow/providers/redis/sensors/redis_key.py | 6 ++++--
airflow/providers/redis/sensors/redis_pub_sub.py | 6 ++++--
4 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/redis/hooks/redis.py
b/airflow/providers/redis/hooks/redis.py
index b83959a..52431f5 100644
--- a/airflow/providers/redis/hooks/redis.py
+++ b/airflow/providers/redis/hooks/redis.py
@@ -33,7 +33,7 @@ class RedisHook(BaseHook):
Also you can set ssl parameters as:
``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file":
"/path/to/cert.pem", etc}``.
"""
- def __init__(self, redis_conn_id='redis_default'):
+ def __init__(self, redis_conn_id: str = 'redis_default') -> None:
"""
Prepares hook to connect to a Redis database.
diff --git a/airflow/providers/redis/operators/redis_publish.py
b/airflow/providers/redis/operators/redis_publish.py
index 6734b42..8c357af 100644
--- a/airflow/providers/redis/operators/redis_publish.py
+++ b/airflow/providers/redis/operators/redis_publish.py
@@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
+from typing import Dict
+
from airflow.models import BaseOperator
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.decorators import apply_defaults
@@ -38,17 +40,17 @@ class RedisPublishOperator(BaseOperator):
@apply_defaults
def __init__(
self,
- channel,
- message,
- redis_conn_id='redis_default',
- *args, **kwargs):
+ channel: str,
+ message: str,
+ redis_conn_id: str = 'redis_default',
+ *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.redis_conn_id = redis_conn_id
self.channel = channel
self.message = message
- def execute(self, context):
+ def execute(self, context: Dict) -> None:
"""
Publish the message to Redis channel
diff --git a/airflow/providers/redis/sensors/redis_key.py
b/airflow/providers/redis/sensors/redis_key.py
index e53be94..59167e9 100644
--- a/airflow/providers/redis/sensors/redis_key.py
+++ b/airflow/providers/redis/sensors/redis_key.py
@@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import Dict
+
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -28,11 +30,11 @@ class RedisKeySensor(BaseSensorOperator):
ui_color = '#f0eee4'
@apply_defaults
- def __init__(self, key, redis_conn_id, *args, **kwargs):
+ def __init__(self, key: str, redis_conn_id: str, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.redis_conn_id = redis_conn_id
self.key = key
- def poke(self, context):
+ def poke(self, context: Dict) -> bool:
self.log.info('Sensor checks for existence of key: %s', self.key)
return RedisHook(self.redis_conn_id).get_conn().exists(self.key)
diff --git a/airflow/providers/redis/sensors/redis_pub_sub.py
b/airflow/providers/redis/sensors/redis_pub_sub.py
index 1e3fecb..9be2b56 100644
--- a/airflow/providers/redis/sensors/redis_pub_sub.py
+++ b/airflow/providers/redis/sensors/redis_pub_sub.py
@@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.
+from typing import Dict, List, Union
+
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
@@ -34,14 +36,14 @@ class RedisPubSubSensor(BaseSensorOperator):
ui_color = '#f0eee4'
@apply_defaults
- def __init__(self, channels, redis_conn_id, *args, **kwargs):
+ def __init__(self, channels: Union[List[str], str], redis_conn_id: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.channels = channels
self.redis_conn_id = redis_conn_id
self.pubsub =
RedisHook(redis_conn_id=self.redis_conn_id).get_conn().pubsub()
self.pubsub.subscribe(self.channels)
- def poke(self, context):
+ def poke(self, context: Dict) -> bool:
"""
Check for message on subscribed channels and write to xcom the message
with key ``message``