This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 6d52325b Hive catalog: Add retry logic for hive locking (#701)
6d52325b is described below
commit 6d52325b45ed5026c9aaad7556705a608891d8c0
Author: frankliee <[email protected]>
AuthorDate: Wed May 15 15:13:32 2024 +0800
Hive catalog: Add retry logic for hive locking (#701)
---
pyiceberg/catalog/hive.py | 61 +++++++++++++++++++++++++++++++++++++++--
pyiceberg/exceptions.py | 4 +++
pyiceberg/table/__init__.py | 10 +++++++
tests/catalog/test_hive.py | 39 +++++++++++++++++++++++++-
tests/integration/test_reads.py | 31 +++++++++++++++++++++
5 files changed, 142 insertions(+), 3 deletions(-)
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 804b1105..708ae8c9 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import getpass
+import logging
import socket
import time
from types import TracebackType
@@ -33,6 +34,7 @@ from urllib.parse import urlparse
from hive_metastore.ThriftHiveMetastore import Client
from hive_metastore.ttypes import (
AlreadyExistsException,
+ CheckLockRequest,
FieldSchema,
InvalidOperationException,
LockComponent,
@@ -49,6 +51,7 @@ from hive_metastore.ttypes import (
)
from hive_metastore.ttypes import Database as HiveDatabase
from hive_metastore.ttypes import Table as HiveTable
+from tenacity import retry, retry_if_exception_type, stop_after_attempt,
wait_exponential
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
@@ -69,12 +72,20 @@ from pyiceberg.exceptions import (
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
+ WaitingForLockException,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableRequest, CommitTableResponse,
PropertyUtil, Table, TableProperties, update_table_metadata
+from pyiceberg.table import (
+ CommitTableRequest,
+ CommitTableResponse,
+ PropertyUtil,
+ Table,
+ TableProperties,
+ update_table_metadata,
+)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -111,6 +122,15 @@ OWNER = "owner"
HIVE2_COMPATIBLE = "hive.hive2-compatible"
HIVE2_COMPATIBLE_DEFAULT = False
+LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
+LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
+LOCK_CHECK_RETRIES = "lock-check-retries"
+DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds
+DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
+DEFAULT_LOCK_CHECK_RETRIES = 4
+
+logger = logging.getLogger(__name__)
+
class _HiveClient:
"""Helper class to nicely open and close the transport."""
@@ -240,6 +260,18 @@ class HiveCatalog(MetastoreCatalog):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"], properties.get("ugi"))
+ self._lock_check_min_wait_time = PropertyUtil.property_as_float(
+ properties, LOCK_CHECK_MIN_WAIT_TIME,
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
+ )
+ self._lock_check_max_wait_time = PropertyUtil.property_as_float(
+ properties, LOCK_CHECK_MAX_WAIT_TIME,
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
+ )
+ self._lock_check_retries = PropertyUtil.property_as_float(
+ properties,
+ LOCK_CHECK_RETRIES,
+ DEFAULT_LOCK_CHECK_RETRIES,
+ )
+
def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) ->
Table:
properties: Dict[str, str] = table.parameters
if TABLE_TYPE not in properties:
@@ -356,6 +388,26 @@ class HiveCatalog(MetastoreCatalog):
return lock_request
+ def _wait_for_lock(self, database_name: str, table_name: str, lockid: int,
open_client: Client) -> LockResponse:
+ @retry(
+ retry=retry_if_exception_type(WaitingForLockException),
+ wait=wait_exponential(multiplier=2,
min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
+ stop=stop_after_attempt(self._lock_check_retries),
+ reraise=True,
+ )
+ def _do_wait_for_lock() -> LockResponse:
+ response: LockResponse =
open_client.check_lock(CheckLockRequest(lockid=lockid))
+ if response.state == LockState.ACQUIRED:
+ return response
+ elif response.state == LockState.WAITING:
+ msg = f"Wait on lock for {database_name}.{table_name}"
+ logger.warning(msg)
+ raise WaitingForLockException(msg)
+ else:
+ raise CommitFailedException(f"Failed to check lock for
{database_name}.{table_name}, state: {response.state}")
+
+ return _do_wait_for_lock()
+
def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
"""Update the table.
@@ -380,7 +432,10 @@ class HiveCatalog(MetastoreCatalog):
try:
if lock.state != LockState.ACQUIRED:
- raise CommitFailedException(f"Failed to acquire lock for
{table_request.identifier}, state: {lock.state}")
+ if lock.state == LockState.WAITING:
+ self._wait_for_lock(database_name, table_name,
lock.lockid, open_client)
+ else:
+ raise CommitFailedException(f"Failed to acquire lock
for {table_request.identifier}, state: {lock.state}")
hive_table = open_client.get_table(dbname=database_name,
tbl_name=table_name)
io = load_file_io({**self.properties,
**hive_table.parameters}, hive_table.sd.location)
@@ -406,6 +461,8 @@ class HiveCatalog(MetastoreCatalog):
open_client.alter_table(dbname=database_name,
tbl_name=table_name, new_tbl=hive_table)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exist: {table_name}")
from e
+ except WaitingForLockException as e:
+ raise CommitFailedException(f"Failed to acquire lock for
{table_request.identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))
diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py
index 64356b11..c7e37ba7 100644
--- a/pyiceberg/exceptions.py
+++ b/pyiceberg/exceptions.py
@@ -110,3 +110,7 @@ class CommitFailedException(Exception):
class CommitStateUnknownException(RESTError):
"""Commit failed due to unknown reason."""
+
+
+class WaitingForLockException(Exception):
+ """Need to wait for a lock, try again."""
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 5b7d04b5..c57f0d12 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -251,6 +251,16 @@ class PropertyUtil:
else:
return default
+ @staticmethod
+ def property_as_float(properties: Dict[str, str], property_name: str,
default: Optional[float] = None) -> Optional[float]:
+ if value := properties.get(property_name):
+ try:
+ return float(value)
+ except ValueError as e:
+ raise ValueError(f"Could not parse table property
{property_name} to a float: {value}") from e
+ else:
+ return default
+
@staticmethod
def property_as_bool(properties: Dict[str, str], property_name: str,
default: bool) -> bool:
if value := properties.get(property_name):
diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py
index af3a3801..ef662b3a 100644
--- a/tests/catalog/test_hive.py
+++ b/tests/catalog/test_hive.py
@@ -24,6 +24,8 @@ from hive_metastore.ttypes import (
AlreadyExistsException,
FieldSchema,
InvalidOperationException,
+ LockResponse,
+ LockState,
MetaException,
NoSuchObjectException,
SerDeInfo,
@@ -34,12 +36,19 @@ from hive_metastore.ttypes import Database as HiveDatabase
from hive_metastore.ttypes import Table as HiveTable
from pyiceberg.catalog import PropertiesUpdateSummary
-from pyiceberg.catalog.hive import HiveCatalog,
_construct_hive_storage_descriptor
+from pyiceberg.catalog.hive import (
+ LOCK_CHECK_MAX_WAIT_TIME,
+ LOCK_CHECK_MIN_WAIT_TIME,
+ LOCK_CHECK_RETRIES,
+ HiveCatalog,
+ _construct_hive_storage_descriptor,
+)
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
+ WaitingForLockException,
)
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
@@ -1158,3 +1167,31 @@ def test_resolve_table_location_warehouse(hive_database:
HiveDatabase) -> None:
location = catalog._resolve_table_location(None, "database", "table")
assert location == "/tmp/warehouse/database.db/table"
+
+
+def test_hive_wait_for_lock() -> None:
+ lockid = 12345
+ acquired = LockResponse(lockid=lockid, state=LockState.ACQUIRED)
+ waiting = LockResponse(lockid=lockid, state=LockState.WAITING)
+ prop = {
+ "uri": HIVE_METASTORE_FAKE_URL,
+ LOCK_CHECK_MIN_WAIT_TIME: 0.1,
+ LOCK_CHECK_MAX_WAIT_TIME: 0.5,
+ LOCK_CHECK_RETRIES: 5,
+ }
+ catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) # type: ignore
+ catalog._client = MagicMock()
+ catalog._client.lock.return_value = LockResponse(lockid=lockid,
state=LockState.WAITING)
+
+ # lock will be acquired after 3 retries
+ catalog._client.check_lock.side_effect = [waiting if i < 2 else acquired
for i in range(10)]
+ response: LockResponse = catalog._wait_for_lock("db", "tbl", lockid,
catalog._client)
+ assert response.state == LockState.ACQUIRED
+ assert catalog._client.check_lock.call_count == 3
+
+ # lock wait should exit with WaitingForLockException finally after enough
retries
+ catalog._client.check_lock.side_effect = [waiting for _ in range(10)]
+ catalog._client.check_lock.call_count = 0
+ with pytest.raises(WaitingForLockException):
+ catalog._wait_for_lock("db", "tbl", lockid, catalog._client)
+ assert catalog._client.check_lock.call_count == 5
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index ee9b17e4..2a10e37b 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -17,6 +17,7 @@
# pylint:disable=redefined-outer-name
import math
+import time
import uuid
from urllib.parse import urlparse
@@ -48,6 +49,7 @@ from pyiceberg.types import (
StringType,
TimestampType,
)
+from pyiceberg.utils.concurrent import ExecutorFactory
DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
@@ -506,3 +508,32 @@ def test_hive_locking(session_catalog_hive: HiveCatalog)
-> None:
table.transaction().set_properties(lock="fail").commit_transaction()
finally:
open_client.unlock(UnlockRequest(lock.lockid))
+
+
[email protected]
+def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
+ table = create_table(session_catalog_hive)
+ database_name: str
+ table_name: str
+ _, database_name, table_name = table.identifier
+ session_catalog_hive._lock_check_min_wait_time = 0.1
+ session_catalog_hive._lock_check_max_wait_time = 0.5
+ session_catalog_hive._lock_check_retries = 5
+
+ hive_client: _HiveClient =
_HiveClient(session_catalog_hive.properties["uri"])
+
+ executor = ExecutorFactory.get_or_create()
+
+ with hive_client as open_client:
+
+ def another_task() -> None:
+ lock: LockResponse =
open_client.lock(session_catalog_hive._create_lock_request(database_name,
table_name))
+ time.sleep(1)
+ open_client.unlock(UnlockRequest(lock.lockid))
+
+ # test transaction commit with concurrent locking
+ executor.submit(another_task)
+ time.sleep(0.5)
+
+ table.transaction().set_properties(lock="xxx").commit_transaction()
+ assert table.properties.get("lock") == "xxx"