RosterIn commented on a change in pull request #14105:
URL: https://github.com/apache/airflow/pull/14105#discussion_r574572499
##########
File path: INSTALL
##########
@@ -97,8 +97,8 @@ apache.webhdfs, async, atlas, aws, azure, cassandra, celery,
cgroups, cloudant,
crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop,
dingding, discord, doc,
docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira,
kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql,
neo4j, odbc, openfaas,
-opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres,
presto, qds, qubole,
+ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo,
mssql, mysql, neo4j, odbc,
+openfaas,opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
postgres, presto, qds, qubole,
Review comment:
```suggestion
openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
postgres, presto, qds, qubole,
```
##########
File path: airflow/providers/google/leveldb/hooks/leveldb.py
##########
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hook for Level DB"""
+import plyvel
+from plyvel import DB
+from typing import Callable, List, Optional
Review comment:
```suggestion
from typing import Callable, List, Optional
import plyvel
from plyvel import DB
```
##########
File path: airflow/providers/google/leveldb/hooks/leveldb.py
##########
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hook for Level DB"""
+import plyvel
+from plyvel import DB
+from typing import Callable, List, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class LevelDBHookException(AirflowException):
+ """Exception specific for LevelDB"""
+
+
+class LevelDBHook(BaseHook):
+ """
+ Plyvel Wrapper to Interact With LevelDB Database
+ LevelDB Connection Documentation https://plyvel.readthedocs.io/en/latest/
+ """
+
+ conn_name_attr = 'leveldb_conn_id'
+ default_conn_name = 'leveldb_default'
+ conn_type = 'leveldb'
+ hook_name = 'LevelDB'
+
+ def __init__(self, leveldb_conn_id: str = default_conn_name):
+ super().__init__()
+ self.leveldb_conn_id = leveldb_conn_id
+ self.connection = self.get_connection(leveldb_conn_id)
+ self.db = None
+
+ def get_conn(
+ self,
+ name: str = '/tmp/testdb/',
+ create_if_missing: bool = False,
+ error_if_exists: bool = False,
+ paranoid_checks: bool = None,
+ write_buffer_size: bool = None,
+ max_open_files: int = None,
+ lru_cache_size: int = None,
+ block_size: int = None,
+ block_restart_interval: int = None,
+ max_file_size: bool = None,
+ compression: str = 'snappy',
+ bloom_filter_bits: int = 0,
+ comparator: Callable = None,
+ comparator_name: bytes = None,
+ ) -> DB:
+ """Creates Plyvel DB
https://plyvel.readthedocs.io/en/latest/api.html#DB
+ :param name(str - path to create database(str, e.g. '/tmp/testdb/')
+ :param create_if_missing (bool) – whether a new database should be
created if needed
+ :param error_if_exists (bool) – whether to raise an exception if the
database already exists
+ :param paranoid_checks (bool) – whether to enable paranoid checks
+ :param write_buffer_size (int) – size of the write buffer (in bytes)
+ :param max_open_files (int) – maximum number of files to keep open
+ :param lru_cache_size (int) – size of the LRU cache (in bytes)
+ :param block_size (int) – block size (in bytes)
+ :param block_restart_interval (int) – block restart interval for delta
encoding of keys
+ :param max_file_size (bool) – maximum file size (in bytes)
+ :param compression (bool) – whether to use Snappy compression (enabled
by default))
+ :param bloom_filter_bits (int) – the number of bits to use for a bloom
filter; the default of 0 means that no
+ bloom filter will be used
+ :param comparator (callable) – a custom comparator callable that takes
two byte strings and returns an integer
+ :param comparator_name (bytes) – name for the custom comparator
+ """
+ if self.db is not None:
+ return self.db
+ self.db = plyvel.DB(
+ name=name,
+ create_if_missing=create_if_missing,
+ error_if_exists=error_if_exists,
+ paranoid_checks=paranoid_checks,
+ write_buffer_size=write_buffer_size,
+ max_open_files=max_open_files,
+ lru_cache_size=lru_cache_size,
+ block_size=block_size,
+ block_restart_interval=block_restart_interval,
+ max_file_size=max_file_size,
+ compression=compression,
+ bloom_filter_bits=bloom_filter_bits,
+ comparator=comparator,
+ comparator_name=comparator_name
Review comment:
```suggestion
comparator_name=comparator_name,
```
##########
File path: airflow/providers/google/leveldb/hooks/leveldb.py
##########
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Hook for Level DB"""
+import plyvel
+from plyvel import DB
+from typing import Callable, List, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+
+
+class LevelDBHookException(AirflowException):
+ """Exception specific for LevelDB"""
+
+
+class LevelDBHook(BaseHook):
+ """
+ Plyvel Wrapper to Interact With LevelDB Database
+ LevelDB Connection Documentation https://plyvel.readthedocs.io/en/latest/
+ """
+
+ conn_name_attr = 'leveldb_conn_id'
+ default_conn_name = 'leveldb_default'
+ conn_type = 'leveldb'
+ hook_name = 'LevelDB'
+
+ def __init__(self, leveldb_conn_id: str = default_conn_name):
+ super().__init__()
+ self.leveldb_conn_id = leveldb_conn_id
+ self.connection = self.get_connection(leveldb_conn_id)
+ self.db = None
+
+ def get_conn(
+ self,
+ name: str = '/tmp/testdb/',
+ create_if_missing: bool = False,
+ error_if_exists: bool = False,
+ paranoid_checks: bool = None,
+ write_buffer_size: bool = None,
+ max_open_files: int = None,
+ lru_cache_size: int = None,
+ block_size: int = None,
+ block_restart_interval: int = None,
+ max_file_size: bool = None,
+ compression: str = 'snappy',
+ bloom_filter_bits: int = 0,
+ comparator: Callable = None,
+ comparator_name: bytes = None,
+ ) -> DB:
+ """Creates Plyvel DB
https://plyvel.readthedocs.io/en/latest/api.html#DB
+ :param name(str - path to create database(str, e.g. '/tmp/testdb/')
+ :param create_if_missing (bool) – whether a new database should be
created if needed
+ :param error_if_exists (bool) – whether to raise an exception if the
database already exists
+ :param paranoid_checks (bool) – whether to enable paranoid checks
+ :param write_buffer_size (int) – size of the write buffer (in bytes)
+ :param max_open_files (int) – maximum number of files to keep open
+ :param lru_cache_size (int) – size of the LRU cache (in bytes)
+ :param block_size (int) – block size (in bytes)
+ :param block_restart_interval (int) – block restart interval for delta
encoding of keys
+ :param max_file_size (bool) – maximum file size (in bytes)
+ :param compression (bool) – whether to use Snappy compression (enabled
by default))
+ :param bloom_filter_bits (int) – the number of bits to use for a bloom
filter; the default of 0 means that no
+ bloom filter will be used
+ :param comparator (callable) – a custom comparator callable that takes
two byte strings and returns an integer
+ :param comparator_name (bytes) – name for the custom comparator
+ """
+ if self.db is not None:
+ return self.db
+ self.db = plyvel.DB(
+ name=name,
+ create_if_missing=create_if_missing,
+ error_if_exists=error_if_exists,
+ paranoid_checks=paranoid_checks,
+ write_buffer_size=write_buffer_size,
+ max_open_files=max_open_files,
+ lru_cache_size=lru_cache_size,
+ block_size=block_size,
+ block_restart_interval=block_restart_interval,
+ max_file_size=max_file_size,
+ compression=compression,
+ bloom_filter_bits=bloom_filter_bits,
+ comparator=comparator,
+ comparator_name=comparator_name
+ )
+ return self.db
+
+ def close_conn(self) -> None:
+ """Closes connection"""
+ db = self.db
+ if db is not None:
+ db.close()
+ self.db = None
+
+ def run(
+ self,
+ command: str,
+ key: bytes,
+ value: bytes = None,
+ keys: List[bytes] = None,
+ values: List[bytes] = None
Review comment:
```suggestion
values: List[bytes] = None,
```
##########
File path: airflow/providers/google/leveldb/operators/leveldb.py
##########
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import List, Callable, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.leveldb.hooks.leveldb import LevelDBHook
+from airflow.utils.decorators import apply_defaults
+
+
+class LevelDBOperator(BaseOperator):
+ """
+ Execute command in LevelDB
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:LevelDBOperator`
+ """
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ command: str,
+ key: bytes,
+ value: bytes = None,
+ keys: List[bytes] = None,
+ values: List[bytes] = None,
+ leveldb_conn_id: str = 'leveldb_default',
+ name: str = '/tmp/testdb/',
+ create_if_missing: bool = False,
+ error_if_exists: bool = False,
+ paranoid_checks: bool = None,
+ write_buffer_size: bool = None,
+ max_open_files: int = None,
+ lru_cache_size: int = None,
+ block_size: int = None,
+ block_restart_interval: int = None,
+ max_file_size: bool = None,
+ compression: str = 'snappy',
+ bloom_filter_bits: int = 0,
+ comparator: Callable = None,
+ comparator_name: bytes = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.command = command
+ self.key = key
+ self.value = value
+ self.keys = keys
+ self.values = values
+ self.leveldb_conn_id = leveldb_conn_id
+ # below params for pylevel DB
+ self.name = name
+ self.create_if_missing = create_if_missing
+ self.error_if_exists = error_if_exists
+ self.paranoid_checks = paranoid_checks
+ self.write_buffer_size = write_buffer_size
+ self.max_open_files = max_open_files
+ self.lru_cache_size = lru_cache_size
+ self.block_size = block_size
+ self.block_restart_interval = block_restart_interval
+ self.max_file_size = max_file_size
+ self.compression = compression
+ self.bloom_filter_bits = bloom_filter_bits
+ self.comparator = comparator
+ self.comparator_name = comparator_name
+
+ def execute(self, context) -> Optional[bytes]:
+ """
+ Execute command in LevelDB
+ :returns value from get or None(Optional[bytes])
+ """
+ leveldb_hook = LevelDBHook(leveldb_conn_id=self.leveldb_conn_id)
+ leveldb_hook.get_conn(name=self.name,
create_if_missing=self.create_if_missing,
+ error_if_exists=self.error_if_exists,
paranoid_checks=self.paranoid_checks,
+ write_buffer_size=self.write_buffer_size,
max_open_files=self.max_open_files,
+ lru_cache_size=self.lru_cache_size,
block_size=self.block_size,
+
block_restart_interval=self.block_restart_interval,
max_file_size=self.max_file_size,
+ compression=self.compression,
bloom_filter_bits=self.bloom_filter_bits,
+ comparator=self.comparator,
comparator_name=self.comparator_name)
+ value = leveldb_hook.run(command=self.command, key=self.key,
value=self.value, keys=self.keys,
+ values=self.values)
Review comment:
```suggestion
leveldb_hook.get_conn(
name=self.name,
create_if_missing=self.create_if_missing,
error_if_exists=self.error_if_exists,
paranoid_checks=self.paranoid_checks,
write_buffer_size=self.write_buffer_size,
max_open_files=self.max_open_files,
lru_cache_size=self.lru_cache_size,
block_size=self.block_size,
block_restart_interval=self.block_restart_interval,
max_file_size=self.max_file_size,
compression=self.compression,
bloom_filter_bits=self.bloom_filter_bits,
comparator=self.comparator,
comparator_name=self.comparator_name
)
value = leveldb_hook.run(
command=self.command,
key=self.key,
value=self.value,
keys=self.keys,
values=self.values
)
```
##########
File path: airflow/providers/google/leveldb/operators/leveldb.py
##########
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import List, Callable, Optional
Review comment:
```suggestion
from typing import Callable, List, Optional
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]