This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 84a2c043 Support CreateTableTransaction for HiveCatalog (#683)
84a2c043 is described below

commit 84a2c043870111937e2802132486d8eb5979570e
Author: Honah J <[email protected]>
AuthorDate: Fri May 31 00:33:03 2024 -0700

    Support CreateTableTransaction for HiveCatalog (#683)
---
 pyiceberg/catalog/__init__.py                |   2 +-
 pyiceberg/catalog/hive.py                    | 161 ++++++++++++++++-----------
 tests/integration/test_writes/test_writes.py |  16 +--
 3 files changed, 104 insertions(+), 75 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index ea2bc657..9a951b5c 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -761,7 +761,7 @@ class MetastoreCatalog(Catalog, ABC):
         metadata = new_table_metadata(
             location=location, schema=schema, partition_spec=partition_spec, 
sort_order=sort_order, properties=properties
         )
-        io = load_file_io(properties=self.properties, 
location=metadata_location)
+        io = self._load_file_io(properties=properties, 
location=metadata_location)
         return StagedTable(
             identifier=(self.name, database_name, table_name),
             metadata=metadata,
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 13b57b6e..83bbd507 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -70,11 +70,11 @@ from pyiceberg.exceptions import (
     NamespaceNotEmptyError,
     NoSuchIcebergTableError,
     NoSuchNamespaceError,
+    NoSuchPropertyException,
     NoSuchTableError,
     TableAlreadyExistsError,
     WaitingForLockException,
 )
-from pyiceberg.io import FileIO, load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.serializers import FromInputFile
@@ -82,11 +82,10 @@ from pyiceberg.table import (
     CommitTableRequest,
     CommitTableResponse,
     PropertyUtil,
+    StagedTable,
     Table,
     TableProperties,
-    update_table_metadata,
 )
-from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
 from pyiceberg.types import (
@@ -272,10 +271,12 @@ class HiveCatalog(MetastoreCatalog):
             DEFAULT_LOCK_CHECK_RETRIES,
         )
 
-    def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> 
Table:
+    def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
         properties: Dict[str, str] = table.parameters
         if TABLE_TYPE not in properties:
-            raise NoSuchTableError(f"Property table_type missing, could not 
determine type: {table.dbName}.{table.tableName}")
+            raise NoSuchPropertyException(
+                f"Property table_type missing, could not determine type: 
{table.dbName}.{table.tableName}"
+            )
 
         table_type = properties[TABLE_TYPE]
         if table_type.lower() != ICEBERG:
@@ -286,8 +287,9 @@ class HiveCatalog(MetastoreCatalog):
         if prop_metadata_location := properties.get(METADATA_LOCATION):
             metadata_location = prop_metadata_location
         else:
-            raise NoSuchTableError(f"Table property {METADATA_LOCATION} is 
missing")
+            raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} 
is missing")
 
+        io = self._load_file_io(location=metadata_location)
         file = io.new_input(metadata_location)
         metadata = FromInputFile.table_metadata(file)
         return Table(
@@ -298,6 +300,38 @@ class HiveCatalog(MetastoreCatalog):
             catalog=self,
         )
 
+    def _convert_iceberg_into_hive(self, table: Table) -> HiveTable:
+        identifier_tuple = 
self.identifier_to_tuple_without_catalog(table.identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+        current_time_millis = int(time.time() * 1000)
+
+        return HiveTable(
+            dbName=database_name,
+            tableName=table_name,
+            owner=table.properties[OWNER] if table.properties and OWNER in 
table.properties else getpass.getuser(),
+            createTime=current_time_millis // 1000,
+            lastAccessTime=current_time_millis // 1000,
+            sd=_construct_hive_storage_descriptor(
+                table.schema(),
+                table.location(),
+                PropertyUtil.property_as_bool(self.properties, 
HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
+            ),
+            tableType=EXTERNAL_TABLE,
+            parameters=_construct_parameters(table.metadata_location),
+        )
+
+    def _create_hive_table(self, open_client: Client, hive_table: HiveTable) 
-> None:
+        try:
+            open_client.create_table(hive_table)
+        except AlreadyExistsException as e:
+            raise TableAlreadyExistsError(f"Table 
{hive_table.dbName}.{hive_table.tableName} already exists") from e
+
+    def _get_hive_table(self, open_client: Client, database_name: str, 
table_name: str) -> HiveTable:
+        try:
+            return open_client.get_table(dbname=database_name, 
tbl_name=table_name)
+        except NoSuchObjectException as e:
+            raise NoSuchTableError(f"Table does not exists: {table_name}") 
from e
+
     def create_table(
         self,
         identifier: Union[str, Identifier],
@@ -324,45 +358,25 @@ class HiveCatalog(MetastoreCatalog):
             AlreadyExistsError: If a table with the name already exists.
             ValueError: If the identifier is invalid.
         """
-        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
-
         properties = {**DEFAULT_PROPERTIES, **properties}
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
-        current_time_millis = int(time.time() * 1000)
-
-        location = self._resolve_table_location(location, database_name, 
table_name)
-
-        metadata_location = self._get_metadata_location(location=location)
-        metadata = new_table_metadata(
-            location=location,
+        staged_table = self._create_staged_table(
+            identifier=identifier,
             schema=schema,
+            location=location,
             partition_spec=partition_spec,
             sort_order=sort_order,
             properties=properties,
         )
-        io = load_file_io({**self.properties, **properties}, location=location)
-        self._write_metadata(metadata, io, metadata_location)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
 
-        tbl = HiveTable(
-            dbName=database_name,
-            tableName=table_name,
-            owner=properties[OWNER] if properties and OWNER in properties else 
getpass.getuser(),
-            createTime=current_time_millis // 1000,
-            lastAccessTime=current_time_millis // 1000,
-            sd=_construct_hive_storage_descriptor(
-                schema, location, 
PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, 
HIVE2_COMPATIBLE_DEFAULT)
-            ),
-            tableType=EXTERNAL_TABLE,
-            parameters=_construct_parameters(metadata_location),
-        )
-        try:
-            with self._client as open_client:
-                open_client.create_table(tbl)
-                hive_table = open_client.get_table(dbname=database_name, 
tbl_name=table_name)
-        except AlreadyExistsException as e:
-            raise TableAlreadyExistsError(f"Table {database_name}.{table_name} 
already exists") from e
+        self._write_metadata(staged_table.metadata, staged_table.io, 
staged_table.metadata_location)
+        tbl = self._convert_iceberg_into_hive(staged_table)
+
+        with self._client as open_client:
+            self._create_hive_table(open_client, tbl)
+            hive_table = open_client.get_table(dbname=database_name, 
tbl_name=table_name)
 
-        return self._convert_hive_into_iceberg(hive_table, io)
+        return self._convert_hive_into_iceberg(hive_table)
 
     def register_table(self, identifier: Union[str, Identifier], 
metadata_location: str) -> Table:
         """Register a new table using existing metadata.
@@ -437,36 +451,52 @@ class HiveCatalog(MetastoreCatalog):
                     else:
                         raise CommitFailedException(f"Failed to acquire lock 
for {table_request.identifier}, state: {lock.state}")
 
-                hive_table = open_client.get_table(dbname=database_name, 
tbl_name=table_name)
-                io = load_file_io({**self.properties, 
**hive_table.parameters}, hive_table.sd.location)
-                current_table = self._convert_hive_into_iceberg(hive_table, io)
-
-                base_metadata = current_table.metadata
-                for requirement in table_request.requirements:
-                    requirement.validate(base_metadata)
-
-                updated_metadata = update_table_metadata(base_metadata, 
table_request.updates)
-                if updated_metadata == base_metadata:
+                hive_table: Optional[HiveTable]
+                current_table: Optional[Table]
+                try:
+                    hive_table = self._get_hive_table(open_client, 
database_name, table_name)
+                    current_table = self._convert_hive_into_iceberg(hive_table)
+                except NoSuchTableError:
+                    hive_table = None
+                    current_table = None
+
+                updated_staged_table = 
self._update_and_stage_table(current_table, table_request)
+                if current_table and updated_staged_table.metadata == 
current_table.metadata:
                     # no changes, do nothing
-                    return CommitTableResponse(metadata=base_metadata, 
metadata_location=current_table.metadata_location)
-
-                # write new metadata
-                new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1
-                new_metadata_location = 
self._get_metadata_location(current_table.metadata.location, 
new_metadata_version)
-                self._write_metadata(updated_metadata, current_table.io, 
new_metadata_location)
-
-                hive_table.parameters = _construct_parameters(
-                    metadata_location=new_metadata_location, 
previous_metadata_location=current_table.metadata_location
+                    return 
CommitTableResponse(metadata=current_table.metadata, 
metadata_location=current_table.metadata_location)
+                self._write_metadata(
+                    metadata=updated_staged_table.metadata,
+                    io=updated_staged_table.io,
+                    metadata_path=updated_staged_table.metadata_location,
                 )
-                open_client.alter_table(dbname=database_name, 
tbl_name=table_name, new_tbl=hive_table)
-            except NoSuchObjectException as e:
-                raise NoSuchTableError(f"Table does not exist: {table_name}") 
from e
+
+                if hive_table and current_table:
+                    # Table exists, update it.
+                    hive_table.parameters = _construct_parameters(
+                        
metadata_location=updated_staged_table.metadata_location,
+                        
previous_metadata_location=current_table.metadata_location,
+                    )
+                    open_client.alter_table(dbname=database_name, 
tbl_name=table_name, new_tbl=hive_table)
+                else:
+                    # Table does not exist, create it.
+                    hive_table = self._convert_iceberg_into_hive(
+                        StagedTable(
+                            identifier=(self.name, database_name, table_name),
+                            metadata=updated_staged_table.metadata,
+                            
metadata_location=updated_staged_table.metadata_location,
+                            io=updated_staged_table.io,
+                            catalog=self,
+                        )
+                    )
+                    self._create_hive_table(open_client, hive_table)
             except WaitingForLockException as e:
                 raise CommitFailedException(f"Failed to acquire lock for 
{table_request.identifier}, state: {lock.state}") from e
             finally:
                 open_client.unlock(UnlockRequest(lockid=lock.lockid))
 
-        return CommitTableResponse(metadata=updated_metadata, 
metadata_location=new_metadata_location)
+        return CommitTableResponse(
+            metadata=updated_staged_table.metadata, 
metadata_location=updated_staged_table.metadata_location
+        )
 
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         """Load the table's metadata and return the table instance.
@@ -485,14 +515,11 @@ class HiveCatalog(MetastoreCatalog):
         """
         identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
         database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
-        try:
-            with self._client as open_client:
-                hive_table = open_client.get_table(dbname=database_name, 
tbl_name=table_name)
-        except NoSuchObjectException as e:
-            raise NoSuchTableError(f"Table does not exists: {table_name}") 
from e
 
-        io = load_file_io({**self.properties, **hive_table.parameters}, 
hive_table.sd.location)
-        return self._convert_hive_into_iceberg(hive_table, io)
+        with self._client as open_client:
+            hive_table = self._get_hive_table(open_client, database_name, 
table_name)
+
+        return self._convert_hive_into_iceberg(hive_table)
 
     def drop_table(self, identifier: Union[str, Identifier]) -> None:
         """Drop a table.
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index 0941b358..e329adcd 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -34,6 +34,7 @@ from pytest_mock.plugin import MockerFixture
 
 from pyiceberg.catalog import Catalog
 from pyiceberg.catalog.hive import HiveCatalog
+from pyiceberg.catalog.rest import RestCatalog
 from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.exceptions import NoSuchTableError
 from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -637,17 +638,18 @@ def test_write_and_evolve(session_catalog: Catalog, 
format_version: int) -> None
 
 
 @pytest.mark.integration
[email protected]("format_version", [2])
-def test_create_table_transaction(session_catalog: Catalog, format_version: 
int) -> None:
-    if format_version == 1:
[email protected]("format_version", [1, 2])
[email protected]("catalog", 
[pytest.lazy_fixture("session_catalog_hive"), 
pytest.lazy_fixture("session_catalog")])
+def test_create_table_transaction(catalog: Catalog, format_version: int) -> 
None:
+    if format_version == 1 and isinstance(catalog, RestCatalog):
         pytest.skip(
             "There is a bug in the REST catalog (maybe server side) that 
prevents create and commit a staged version 1 table"
         )
 
-    identifier = f"default.arrow_create_table_transaction{format_version}"
+    identifier = 
f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
 
     try:
-        session_catalog.drop_table(identifier=identifier)
+        catalog.drop_table(identifier=identifier)
     except NoSuchTableError:
         pass
 
@@ -669,7 +671,7 @@ def test_create_table_transaction(session_catalog: Catalog, 
format_version: int)
         ]),
     )
 
-    with session_catalog.create_table_transaction(
+    with catalog.create_table_transaction(
         identifier=identifier, schema=pa_table.schema, 
properties={"format-version": str(format_version)}
     ) as txn:
         with txn.update_snapshot().fast_append() as snapshot_update:
@@ -685,7 +687,7 @@ def test_create_table_transaction(session_catalog: Catalog, 
format_version: int)
             ):
                 snapshot_update.append_data_file(data_file)
 
-    tbl = session_catalog.load_table(identifier=identifier)
+    tbl = catalog.load_table(identifier=identifier)
     assert tbl.format_version == format_version
     assert len(tbl.scan().to_arrow()) == 6
 

Reply via email to