This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c4b3694 Fix MyPy errors in leveldb (#20222)
c4b3694 is described below
commit c4b369410155dfc461d2b95ee66cb1927f8e4230
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Dec 15 14:04:46 2021 +0100
Fix MyPy errors in leveldb (#20222)
Part of #19891
---
airflow/providers/google/leveldb/hooks/leveldb.py | 30 +++++++++++++++++-----
.../providers/google/leveldb/operators/leveldb.py | 16 ++++++------
2 files changed, 31 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/google/leveldb/hooks/leveldb.py
b/airflow/providers/google/leveldb/hooks/leveldb.py
index 4994f62..fdb6b28 100644
--- a/airflow/providers/google/leveldb/hooks/leveldb.py
+++ b/airflow/providers/google/leveldb/hooks/leveldb.py
@@ -23,6 +23,8 @@ from plyvel import DB
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
+DB_NOT_INITIALIZED_BEFORE = "The `get_conn` method should be called before!"
+
class LevelDBHookException(AirflowException):
"""Exception specific for LevelDB"""
@@ -43,7 +45,7 @@ class LevelDBHook(BaseHook):
super().__init__()
self.leveldb_conn_id = leveldb_conn_id
self.connection = self.get_connection(leveldb_conn_id)
- self.db = None
+ self.db: Optional[plyvel.DB] = None
def get_conn(self, name: str = '/tmp/testdb/', create_if_missing: bool =
False, **kwargs) -> DB:
"""
@@ -74,9 +76,9 @@ class LevelDBHook(BaseHook):
self,
command: str,
key: bytes,
- value: bytes = None,
- keys: List[bytes] = None,
- values: List[bytes] = None,
+ value: Optional[bytes] = None,
+ keys: Optional[List[bytes]] = None,
+ values: Optional[List[bytes]] = None,
) -> Optional[bytes]:
"""
Execute operation with leveldb
@@ -87,21 +89,27 @@ class LevelDBHook(BaseHook):
:param key: key for command(put,get,delete) execution(, e.g.
``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g.
``b'value'``, ``b'another-value'``)
- :type value: bytes
+ :type value: Optional[bytes]
:param keys: keys for command(write_batch) execution(List[bytes], e.g.
``[b'key', b'another-key'])``
- :type keys: List[bytes]
+ :type keys: Optional[List[bytes]]
:param values: values for command(write_batch) execution e.g.
``[b'value'``, ``b'another-value']``
- :type values: List[bytes]
+ :type values: Optional[List[bytes]]
:returns: value from get or None
:rtype: Optional[bytes]
"""
if command == 'put':
+ if not value:
+ raise Exception("Please provide `value`!")
return self.put(key, value)
elif command == 'get':
return self.get(key)
elif command == 'delete':
return self.delete(key)
elif command == 'write_batch':
+ if not keys:
+ raise Exception("Please provide `keys`!")
+ if not values:
+ raise Exception("Please provide `values`!")
return self.write_batch(keys, values)
else:
raise LevelDBHookException("Unknown command for LevelDB hook")
@@ -115,6 +123,8 @@ class LevelDBHook(BaseHook):
:param value: value for put execution e.g. ``b'value'``,
``b'another-value'``
:type value: bytes
"""
+ if not self.db:
+ raise Exception(DB_NOT_INITIALIZED_BEFORE)
self.db.put(key, value)
def get(self, key: bytes) -> bytes:
@@ -126,6 +136,8 @@ class LevelDBHook(BaseHook):
:returns: value of key from db.get
:rtype: bytes
"""
+ if not self.db:
+ raise Exception(DB_NOT_INITIALIZED_BEFORE)
return self.db.get(key)
def delete(self, key: bytes):
@@ -135,6 +147,8 @@ class LevelDBHook(BaseHook):
:param key: key for delete execution, e.g. ``b'key'``,
``b'another-key'``
:type key: bytes
"""
+ if not self.db:
+ raise Exception(DB_NOT_INITIALIZED_BEFORE)
self.db.delete(key)
def write_batch(self, keys: List[bytes], values: List[bytes]):
@@ -146,6 +160,8 @@ class LevelDBHook(BaseHook):
:param values: values for write_batch execution e.g. ``[b'value',
b'another-value']``
:type values: List[bytes]
"""
+ if not self.db:
+ raise Exception(DB_NOT_INITIALIZED_BEFORE)
with self.db.write_batch() as batch:
for i, key in enumerate(keys):
batch.put(key, values[i])
diff --git a/airflow/providers/google/leveldb/operators/leveldb.py
b/airflow/providers/google/leveldb/operators/leveldb.py
index 6ed7660..19cddd9 100644
--- a/airflow/providers/google/leveldb/operators/leveldb.py
+++ b/airflow/providers/google/leveldb/operators/leveldb.py
@@ -34,11 +34,11 @@ class LevelDBOperator(BaseOperator):
:param key: key for command(put,get,delete) execution(, e.g.
``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g.
``b'value'``, ``b'another-value'``)
- :type value: bytes
+ :type value: Optional[bytes]
:param keys: keys for command(write_batch) execution(List[bytes], e.g.
``[b'key', b'another-key'])``
- :type keys: List[bytes]
+ :type keys: Optional[List[bytes]]
:param values: values for command(write_batch) execution e.g.
``[b'value'``, ``b'another-value']``
- :type values: List[bytes]
+ :type values: Optional[List[bytes]]
:param leveldb_conn_id:
:type leveldb_conn_id: str
:param create_if_missing: whether a new database should be created if
needed
@@ -53,9 +53,9 @@ class LevelDBOperator(BaseOperator):
*,
command: str,
key: bytes,
- value: bytes = None,
- keys: List[bytes] = None,
- values: List[bytes] = None,
+ value: Optional[bytes] = None,
+ keys: Optional[List[bytes]] = None,
+ values: Optional[List[bytes]] = None,
leveldb_conn_id: str = 'leveldb_default',
name: str = '/tmp/testdb/',
create_if_missing: bool = True,
@@ -94,5 +94,5 @@ class LevelDBOperator(BaseOperator):
)
self.log.info("Done. Returned value was: %s", str(value))
leveldb_hook.close_conn()
- value = value if value is None else value.decode()
- return value
+ str_value = value if value is None else value.decode()
+ return str_value