This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d52325b Hive catalog: Add retry logic for hive locking (#701)
6d52325b is described below

commit 6d52325b45ed5026c9aaad7556705a608891d8c0
Author: frankliee <[email protected]>
AuthorDate: Wed May 15 15:13:32 2024 +0800

    Hive catalog: Add retry logic for hive locking (#701)
---
 pyiceberg/catalog/hive.py       | 61 +++++++++++++++++++++++++++++++++++++++--
 pyiceberg/exceptions.py         |  4 +++
 pyiceberg/table/__init__.py     | 10 +++++++
 tests/catalog/test_hive.py      | 39 +++++++++++++++++++++++++-
 tests/integration/test_reads.py | 31 +++++++++++++++++++++
 5 files changed, 142 insertions(+), 3 deletions(-)

diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 804b1105..708ae8c9 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -15,6 +15,7 @@
 #  specific language governing permissions and limitations
 #  under the License.
 import getpass
+import logging
 import socket
 import time
 from types import TracebackType
@@ -33,6 +34,7 @@ from urllib.parse import urlparse
 from hive_metastore.ThriftHiveMetastore import Client
 from hive_metastore.ttypes import (
     AlreadyExistsException,
+    CheckLockRequest,
     FieldSchema,
     InvalidOperationException,
     LockComponent,
@@ -49,6 +51,7 @@ from hive_metastore.ttypes import (
 )
 from hive_metastore.ttypes import Database as HiveDatabase
 from hive_metastore.ttypes import Table as HiveTable
+from tenacity import retry, retry_if_exception_type, stop_after_attempt, 
wait_exponential
 from thrift.protocol import TBinaryProtocol
 from thrift.transport import TSocket, TTransport
 
@@ -69,12 +72,20 @@ from pyiceberg.exceptions import (
     NoSuchNamespaceError,
     NoSuchTableError,
     TableAlreadyExistsError,
+    WaitingForLockException,
 )
 from pyiceberg.io import FileIO, load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableRequest, CommitTableResponse, 
PropertyUtil, Table, TableProperties, update_table_metadata
+from pyiceberg.table import (
+    CommitTableRequest,
+    CommitTableResponse,
+    PropertyUtil,
+    Table,
+    TableProperties,
+    update_table_metadata,
+)
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -111,6 +122,15 @@ OWNER = "owner"
 HIVE2_COMPATIBLE = "hive.hive2-compatible"
 HIVE2_COMPATIBLE_DEFAULT = False
 
+LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
+LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
+LOCK_CHECK_RETRIES = "lock-check-retries"
+DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1  # 100 milliseconds
+DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60  # 1 min
+DEFAULT_LOCK_CHECK_RETRIES = 4
+
+logger = logging.getLogger(__name__)
+
 
 class _HiveClient:
     """Helper class to nicely open and close the transport."""
@@ -240,6 +260,18 @@ class HiveCatalog(MetastoreCatalog):
         super().__init__(name, **properties)
         self._client = _HiveClient(properties["uri"], properties.get("ugi"))
 
+        self._lock_check_min_wait_time = PropertyUtil.property_as_float(
+            properties, LOCK_CHECK_MIN_WAIT_TIME, 
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
+        )
+        self._lock_check_max_wait_time = PropertyUtil.property_as_float(
+            properties, LOCK_CHECK_MAX_WAIT_TIME, 
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
+        )
+        self._lock_check_retries = PropertyUtil.property_as_float(
+            properties,
+            LOCK_CHECK_RETRIES,
+            DEFAULT_LOCK_CHECK_RETRIES,
+        )
+
     def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> 
Table:
         properties: Dict[str, str] = table.parameters
         if TABLE_TYPE not in properties:
@@ -356,6 +388,26 @@ class HiveCatalog(MetastoreCatalog):
 
         return lock_request
 
+    def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, 
open_client: Client) -> LockResponse:
+        @retry(
+            retry=retry_if_exception_type(WaitingForLockException),
+            wait=wait_exponential(multiplier=2, 
min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
+            stop=stop_after_attempt(self._lock_check_retries),
+            reraise=True,
+        )
+        def _do_wait_for_lock() -> LockResponse:
+            response: LockResponse = 
open_client.check_lock(CheckLockRequest(lockid=lockid))
+            if response.state == LockState.ACQUIRED:
+                return response
+            elif response.state == LockState.WAITING:
+                msg = f"Wait on lock for {database_name}.{table_name}"
+                logger.warning(msg)
+                raise WaitingForLockException(msg)
+            else:
+                raise CommitFailedException(f"Failed to check lock for 
{database_name}.{table_name}, state: {response.state}")
+
+        return _do_wait_for_lock()
+
     def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
         """Update the table.
 
@@ -380,7 +432,10 @@ class HiveCatalog(MetastoreCatalog):
 
             try:
                 if lock.state != LockState.ACQUIRED:
-                    raise CommitFailedException(f"Failed to acquire lock for 
{table_request.identifier}, state: {lock.state}")
+                    if lock.state == LockState.WAITING:
+                        self._wait_for_lock(database_name, table_name, 
lock.lockid, open_client)
+                    else:
+                        raise CommitFailedException(f"Failed to acquire lock 
for {table_request.identifier}, state: {lock.state}")
 
                 hive_table = open_client.get_table(dbname=database_name, 
tbl_name=table_name)
                 io = load_file_io({**self.properties, 
**hive_table.parameters}, hive_table.sd.location)
@@ -406,6 +461,8 @@ class HiveCatalog(MetastoreCatalog):
                 open_client.alter_table(dbname=database_name, 
tbl_name=table_name, new_tbl=hive_table)
             except NoSuchObjectException as e:
                 raise NoSuchTableError(f"Table does not exist: {table_name}") 
from e
+            except WaitingForLockException as e:
+                raise CommitFailedException(f"Failed to acquire lock for 
{table_request.identifier}, state: {lock.state}") from e
             finally:
                 open_client.unlock(UnlockRequest(lockid=lock.lockid))
 
diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py
index 64356b11..c7e37ba7 100644
--- a/pyiceberg/exceptions.py
+++ b/pyiceberg/exceptions.py
@@ -110,3 +110,7 @@ class CommitFailedException(Exception):
 
 class CommitStateUnknownException(RESTError):
     """Commit failed due to unknown reason."""
+
+
+class WaitingForLockException(Exception):
+    """Need to wait for a lock, try again."""
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 5b7d04b5..c57f0d12 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -251,6 +251,16 @@ class PropertyUtil:
         else:
             return default
 
+    @staticmethod
+    def property_as_float(properties: Dict[str, str], property_name: str, 
default: Optional[float] = None) -> Optional[float]:
+        if value := properties.get(property_name):
+            try:
+                return float(value)
+            except ValueError as e:
+                raise ValueError(f"Could not parse table property 
{property_name} to a float: {value}") from e
+        else:
+            return default
+
     @staticmethod
     def property_as_bool(properties: Dict[str, str], property_name: str, 
default: bool) -> bool:
         if value := properties.get(property_name):
diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py
index af3a3801..ef662b3a 100644
--- a/tests/catalog/test_hive.py
+++ b/tests/catalog/test_hive.py
@@ -24,6 +24,8 @@ from hive_metastore.ttypes import (
     AlreadyExistsException,
     FieldSchema,
     InvalidOperationException,
+    LockResponse,
+    LockState,
     MetaException,
     NoSuchObjectException,
     SerDeInfo,
@@ -34,12 +36,19 @@ from hive_metastore.ttypes import Database as HiveDatabase
 from hive_metastore.ttypes import Table as HiveTable
 
 from pyiceberg.catalog import PropertiesUpdateSummary
-from pyiceberg.catalog.hive import HiveCatalog, 
_construct_hive_storage_descriptor
+from pyiceberg.catalog.hive import (
+    LOCK_CHECK_MAX_WAIT_TIME,
+    LOCK_CHECK_MIN_WAIT_TIME,
+    LOCK_CHECK_RETRIES,
+    HiveCatalog,
+    _construct_hive_storage_descriptor,
+)
 from pyiceberg.exceptions import (
     NamespaceAlreadyExistsError,
     NamespaceNotEmptyError,
     NoSuchNamespaceError,
     NoSuchTableError,
+    WaitingForLockException,
 )
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
@@ -1158,3 +1167,31 @@ def test_resolve_table_location_warehouse(hive_database: 
HiveDatabase) -> None:
 
     location = catalog._resolve_table_location(None, "database", "table")
     assert location == "/tmp/warehouse/database.db/table"
+
+
+def test_hive_wait_for_lock() -> None:
+    lockid = 12345
+    acquired = LockResponse(lockid=lockid, state=LockState.ACQUIRED)
+    waiting = LockResponse(lockid=lockid, state=LockState.WAITING)
+    prop = {
+        "uri": HIVE_METASTORE_FAKE_URL,
+        LOCK_CHECK_MIN_WAIT_TIME: 0.1,
+        LOCK_CHECK_MAX_WAIT_TIME: 0.5,
+        LOCK_CHECK_RETRIES: 5,
+    }
+    catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)  # type: ignore
+    catalog._client = MagicMock()
+    catalog._client.lock.return_value = LockResponse(lockid=lockid, 
state=LockState.WAITING)
+
+    # lock will be acquired after 3 retries
+    catalog._client.check_lock.side_effect = [waiting if i < 2 else acquired 
for i in range(10)]
+    response: LockResponse = catalog._wait_for_lock("db", "tbl", lockid, 
catalog._client)
+    assert response.state == LockState.ACQUIRED
+    assert catalog._client.check_lock.call_count == 3
+
+    # lock wait should exit with WaitingForLockException finally after enough 
retries
+    catalog._client.check_lock.side_effect = [waiting for _ in range(10)]
+    catalog._client.check_lock.call_count = 0
+    with pytest.raises(WaitingForLockException):
+        catalog._wait_for_lock("db", "tbl", lockid, catalog._client)
+    assert catalog._client.check_lock.call_count == 5
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index ee9b17e4..2a10e37b 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -17,6 +17,7 @@
 # pylint:disable=redefined-outer-name
 
 import math
+import time
 import uuid
 from urllib.parse import urlparse
 
@@ -48,6 +49,7 @@ from pyiceberg.types import (
     StringType,
     TimestampType,
 )
+from pyiceberg.utils.concurrent import ExecutorFactory
 
 DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
 
@@ -506,3 +508,32 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) 
-> None:
                 
table.transaction().set_properties(lock="fail").commit_transaction()
         finally:
             open_client.unlock(UnlockRequest(lock.lockid))
+
+
[email protected]
+def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
+    table = create_table(session_catalog_hive)
+    database_name: str
+    table_name: str
+    _, database_name, table_name = table.identifier
+    session_catalog_hive._lock_check_min_wait_time = 0.1
+    session_catalog_hive._lock_check_max_wait_time = 0.5
+    session_catalog_hive._lock_check_retries = 5
+
+    hive_client: _HiveClient = 
_HiveClient(session_catalog_hive.properties["uri"])
+
+    executor = ExecutorFactory.get_or_create()
+
+    with hive_client as open_client:
+
+        def another_task() -> None:
+            lock: LockResponse = 
open_client.lock(session_catalog_hive._create_lock_request(database_name, 
table_name))
+            time.sleep(1)
+            open_client.unlock(UnlockRequest(lock.lockid))
+
+        # test transaction commit with concurrent locking
+        executor.submit(another_task)
+        time.sleep(0.5)
+
+        table.transaction().set_properties(lock="xxx").commit_transaction()
+        assert table.properties.get("lock") == "xxx"

Reply via email to