This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new aa465436 Bug: HiveCatalog's `_commit_table` refresh and update the
metadata within transaction (#607)
aa465436 is described below
commit aa4654368e54bf84933279179519b299b5910493
Author: Honah J <[email protected]>
AuthorDate: Mon Apr 15 13:39:00 2024 -0700
Bug: HiveCatalog's `_commit_table` refresh and update the metadata within
transaction (#607)
* make refresh and update metadata in a transaction
* fix integration tests
---
pyiceberg/catalog/hive.py | 38 ++++++++++++++++++++------------------
1 file changed, 20 insertions(+), 18 deletions(-)
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index b504da9a..804b1105 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -372,22 +372,7 @@ class HiveCatalog(MetastoreCatalog):
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root +
[table_request.identifier.name])
)
- current_table = self.load_table(identifier_tuple)
database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
- base_metadata = current_table.metadata
- for requirement in table_request.requirements:
- requirement.validate(base_metadata)
-
- updated_metadata = update_table_metadata(base_metadata,
table_request.updates)
- if updated_metadata == base_metadata:
- # no changes, do nothing
- return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
-
- # write new metadata
- new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
- new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
- self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
-
# commit to hive
#
https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
@@ -397,11 +382,28 @@ class HiveCatalog(MetastoreCatalog):
if lock.state != LockState.ACQUIRED:
raise CommitFailedException(f"Failed to acquire lock for
{table_request.identifier}, state: {lock.state}")
- tbl = open_client.get_table(dbname=database_name,
tbl_name=table_name)
- tbl.parameters = _construct_parameters(
+ hive_table = open_client.get_table(dbname=database_name,
tbl_name=table_name)
+ io = load_file_io({**self.properties,
**hive_table.parameters}, hive_table.sd.location)
+ current_table = self._convert_hive_into_iceberg(hive_table, io)
+
+ base_metadata = current_table.metadata
+ for requirement in table_request.requirements:
+ requirement.validate(base_metadata)
+
+ updated_metadata = update_table_metadata(base_metadata,
table_request.updates)
+ if updated_metadata == base_metadata:
+ # no changes, do nothing
+ return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
+
+ # write new metadata
+ new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
+ new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
+ self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
+
+ hive_table.parameters = _construct_parameters(
metadata_location=new_metadata_location,
previous_metadata_location=current_table.metadata_location
)
- open_client.alter_table(dbname=database_name,
tbl_name=table_name, new_tbl=tbl)
+ open_client.alter_table(dbname=database_name,
tbl_name=table_name, new_tbl=hive_table)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exist: {table_name}")
from e
finally: