This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c4b3694  Fix MyPy errors in leveldb (#20222)
c4b3694 is described below

commit c4b369410155dfc461d2b95ee66cb1927f8e4230
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Dec 15 14:04:46 2021 +0100

    Fix MyPy errors in leveldb (#20222)
    
    Part of #19891
---
 airflow/providers/google/leveldb/hooks/leveldb.py  | 30 +++++++++++++++++-----
 .../providers/google/leveldb/operators/leveldb.py  | 16 ++++++------
 2 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/google/leveldb/hooks/leveldb.py 
b/airflow/providers/google/leveldb/hooks/leveldb.py
index 4994f62..fdb6b28 100644
--- a/airflow/providers/google/leveldb/hooks/leveldb.py
+++ b/airflow/providers/google/leveldb/hooks/leveldb.py
@@ -23,6 +23,8 @@ from plyvel import DB
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+DB_NOT_INITIALIZED_BEFORE = "The `get_conn` method should be called before!"
+
 
 class LevelDBHookException(AirflowException):
     """Exception specific for LevelDB"""
@@ -43,7 +45,7 @@ class LevelDBHook(BaseHook):
         super().__init__()
         self.leveldb_conn_id = leveldb_conn_id
         self.connection = self.get_connection(leveldb_conn_id)
-        self.db = None
+        self.db: Optional[plyvel.DB] = None
 
     def get_conn(self, name: str = '/tmp/testdb/', create_if_missing: bool = 
False, **kwargs) -> DB:
         """
@@ -74,9 +76,9 @@ class LevelDBHook(BaseHook):
         self,
         command: str,
         key: bytes,
-        value: bytes = None,
-        keys: List[bytes] = None,
-        values: List[bytes] = None,
+        value: Optional[bytes] = None,
+        keys: Optional[List[bytes]] = None,
+        values: Optional[List[bytes]] = None,
     ) -> Optional[bytes]:
         """
         Execute operation with leveldb
@@ -87,21 +89,27 @@ class LevelDBHook(BaseHook):
         :param key: key for command(put,get,delete) execution(, e.g. 
``b'key'``, ``b'another-key'``)
         :type key: bytes
         :param value: value for command(put) execution(bytes, e.g. 
``b'value'``, ``b'another-value'``)
-        :type value: bytes
+        :type value: Optional[bytes]
         :param keys: keys for command(write_batch) execution(List[bytes], e.g. 
``[b'key', b'another-key'])``
-        :type keys: List[bytes]
+        :type keys: Optional[List[bytes]]
         :param values: values for command(write_batch) execution e.g. 
``[b'value'``, ``b'another-value']``
-        :type values: List[bytes]
+        :type values: Optional[List[bytes]]
         :returns: value from get or None
         :rtype: Optional[bytes]
         """
         if command == 'put':
+            if not value:
+                raise Exception("Please provide `value`!")
             return self.put(key, value)
         elif command == 'get':
             return self.get(key)
         elif command == 'delete':
             return self.delete(key)
         elif command == 'write_batch':
+            if not keys:
+                raise Exception("Please provide `keys`!")
+            if not values:
+                raise Exception("Please provide `values`!")
             return self.write_batch(keys, values)
         else:
             raise LevelDBHookException("Unknown command for LevelDB hook")
@@ -115,6 +123,8 @@ class LevelDBHook(BaseHook):
         :param value: value for put execution e.g. ``b'value'``, 
``b'another-value'``
         :type value: bytes
         """
+        if not self.db:
+            raise Exception(DB_NOT_INITIALIZED_BEFORE)
         self.db.put(key, value)
 
     def get(self, key: bytes) -> bytes:
@@ -126,6 +136,8 @@ class LevelDBHook(BaseHook):
         :returns: value of key from db.get
         :rtype: bytes
         """
+        if not self.db:
+            raise Exception(DB_NOT_INITIALIZED_BEFORE)
         return self.db.get(key)
 
     def delete(self, key: bytes):
@@ -135,6 +147,8 @@ class LevelDBHook(BaseHook):
         :param key: key for delete execution, e.g. ``b'key'``, 
``b'another-key'``
         :type key: bytes
         """
+        if not self.db:
+            raise Exception(DB_NOT_INITIALIZED_BEFORE)
         self.db.delete(key)
 
     def write_batch(self, keys: List[bytes], values: List[bytes]):
@@ -146,6 +160,8 @@ class LevelDBHook(BaseHook):
         :param values: values for write_batch execution e.g. ``[b'value', 
b'another-value']``
         :type values: List[bytes]
         """
+        if not self.db:
+            raise Exception(DB_NOT_INITIALIZED_BEFORE)
         with self.db.write_batch() as batch:
             for i, key in enumerate(keys):
                 batch.put(key, values[i])
diff --git a/airflow/providers/google/leveldb/operators/leveldb.py 
b/airflow/providers/google/leveldb/operators/leveldb.py
index 6ed7660..19cddd9 100644
--- a/airflow/providers/google/leveldb/operators/leveldb.py
+++ b/airflow/providers/google/leveldb/operators/leveldb.py
@@ -34,11 +34,11 @@ class LevelDBOperator(BaseOperator):
         :param key: key for command(put,get,delete) execution(, e.g. 
``b'key'``, ``b'another-key'``)
         :type key: bytes
         :param value: value for command(put) execution(bytes, e.g. 
``b'value'``, ``b'another-value'``)
-        :type value: bytes
+        :type value: Optional[bytes]
         :param keys: keys for command(write_batch) execution(List[bytes], e.g. 
``[b'key', b'another-key'])``
-        :type keys: List[bytes]
+        :type keys: Optional[List[bytes]]
         :param values: values for command(write_batch) execution e.g. 
``[b'value'``, ``b'another-value']``
-        :type values: List[bytes]
+        :type values: Optional[List[bytes]]
         :param leveldb_conn_id:
         :type leveldb_conn_id: str
         :param create_if_missing: whether a new database should be created if 
needed
@@ -53,9 +53,9 @@ class LevelDBOperator(BaseOperator):
         *,
         command: str,
         key: bytes,
-        value: bytes = None,
-        keys: List[bytes] = None,
-        values: List[bytes] = None,
+        value: Optional[bytes] = None,
+        keys: Optional[List[bytes]] = None,
+        values: Optional[List[bytes]] = None,
         leveldb_conn_id: str = 'leveldb_default',
         name: str = '/tmp/testdb/',
         create_if_missing: bool = True,
@@ -94,5 +94,5 @@ class LevelDBOperator(BaseOperator):
         )
         self.log.info("Done. Returned value was: %s", str(value))
         leveldb_hook.close_conn()
-        value = value if value is None else value.decode()
-        return value
+        str_value = value if value is None else value.decode()
+        return str_value

Reply via email to