jx2lee commented on code in PR #3099:
URL: https://github.com/apache/iceberg-python/pull/3099#discussion_r2878423685
##########
pyiceberg/catalog/bigquery_metastore.py:
##########
@@ -229,7 +236,88 @@ def drop_table(self, identifier: str | Identifier) -> None:
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
- raise NotImplementedError
+ table_identifier = table.name()
+ dataset_name, table_name =
self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
+ table_ref = TableReference(
+ dataset_ref=DatasetReference(project=self.project_id,
dataset_id=dataset_name),
+ table_id=table_name,
+ )
+
+ current_bq_table: BQTable | None
+ current_table: Table | None
+ try:
+ current_bq_table = self.client.get_table(table_ref)
+ except NotFound:
+ current_bq_table = None
+ current_table = None
+ else:
+ current_table =
self._convert_bigquery_table_to_iceberg_table(table_identifier,
current_bq_table)
+
+ updated_staged_table = self._update_and_stage_table(current_table,
table_identifier, requirements, updates)
+ if current_table and updated_staged_table.metadata ==
current_table.metadata:
+ return CommitTableResponse(metadata=current_table.metadata,
metadata_location=current_table.metadata_location)
+
+ self._write_metadata(
+ metadata=updated_staged_table.metadata,
+ io=updated_staged_table.io,
+ metadata_path=updated_staged_table.metadata_location,
+ )
+
+ commit_error: Exception | None = None
+ try:
+ if current_bq_table and current_table:
+ current_bq_table.external_catalog_table_options =
self._create_external_catalog_table_options(
+ updated_staged_table.metadata.location,
+ self._create_table_parameters(
+
metadata_file_location=updated_staged_table.metadata_location,
+ table_metadata=updated_staged_table.metadata,
+
previous_metadata_location=current_table.metadata_location,
+ ),
+ )
+ self.client.update_table(current_bq_table,
["external_catalog_table_options"])
+ else:
+ self.client.create_table(
+ self._make_new_table(
+ updated_staged_table.metadata,
+ updated_staged_table.metadata_location,
+ table_ref,
+ )
+ )
+ except NotFound as e:
+ commit_error = (
+ CommitFailedException(f"Table does not exist:
{dataset_name}.{table_name}")
+ if current_table
+ else NoSuchNamespaceError(f"Namespace does not exist:
{dataset_name}")
+ )
+ commit_error.__cause__ = e
+ except Conflict as e:
+ commit_error = (
+ CommitFailedException(f"Table has been updated by another
process: {dataset_name}.{table_name}")
+ if current_table
+ else TableAlreadyExistsError(f"Table {table_name} already
exists")
+ )
+ commit_error.__cause__ = e
+ except Exception as e:
+ commit_error = e
+ finally:
+ if commit_error:
+ commit_status = self._check_bigquery_commit_status(table_ref,
updated_staged_table.metadata_location)
+ if commit_status == "SUCCESS":
+ commit_error = None
+ elif commit_status == "UNKNOWN":
+ raise CommitStateUnknownException(
+ f"Commit state unknown for table
{dataset_name}.{table_name}"
+ ) from commit_error
+
+ if commit_error:
+ raise commit_error
+
+ if current_table:
+ self._delete_old_metadata(updated_staged_table.io,
current_table.metadata, updated_staged_table.metadata)
Review Comment:
Removed check current_table. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]