This is an automated email from the ASF dual-hosted git repository.
sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 0213dabe Fix issues related to having catalog_name in identifier (#964)
0213dabe is described below
commit 0213dabe16338d7865e57468bab11316c266a835
Author: Honah J. <[email protected]>
AuthorDate: Fri Jul 26 11:02:15 2024 -0700
Fix issues related to having catalog_name in identifier (#964)
* first attempt
* add license
* refactor new tests
---
pyiceberg/catalog/rest.py | 18 ++++++++++++++++--
tests/catalog/test_rest.py | 24 +++++++++++++++++++++++-
tests/integration/test_writes/test_writes.py | 13 ++++++++++++-
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py
index 20d28cb2..d64051ca 100644
--- a/pyiceberg/catalog/rest.py
+++ b/pyiceberg/catalog/rest.py
@@ -354,7 +354,10 @@ class RestCatalog(Catalog):
def _split_identifier_for_path(self, identifier: Union[str, Identifier,
TableIdentifier]) -> Properties:
if isinstance(identifier, TableIdentifier):
- return {"namespace":
NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table":
identifier.name}
+ if identifier.namespace.root[0] == self.name:
+ return {"namespace":
NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table":
identifier.name}
+ else:
+ return {"namespace":
NAMESPACE_SEPARATOR.join(identifier.namespace.root), "table": identifier.name}
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]),
"table": identifier_tuple[-1]}
@@ -675,6 +678,17 @@ class RestCatalog(Catalog):
return self.load_table(to_identifier)
+ def _remove_catalog_name_from_table_request_identifier(self,
table_request: CommitTableRequest) -> CommitTableRequest:
+ if table_request.identifier.namespace.root[0] == self.name:
+ return table_request.model_copy(
+ update={
+ "identifier": TableIdentifier(
+ namespace=table_request.identifier.namespace.root[1:],
name=table_request.identifier.name
+ ).model_dump()
+ }
+ )
+ return table_request
+
@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
"""Update the table.
@@ -692,7 +706,7 @@ class RestCatalog(Catalog):
"""
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True,
**self._split_identifier_for_path(table_request.identifier)),
- data=table_request.model_dump_json().encode(UTF8),
+
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
)
try:
response.raise_for_status()
diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py
index 2ddc774b..86b6baef 100644
--- a/tests/catalog/test_rest.py
+++ b/tests/catalog/test_rest.py
@@ -37,7 +37,7 @@ from pyiceberg.exceptions import (
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, Table, TableIdentifier
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import IdentityTransform, TruncateTransform
@@ -1226,3 +1226,25 @@ def test_catalog_from_parameters_empty_env(rest_mock:
Mocker) -> None:
catalog = cast(RestCatalog, load_catalog("production",
uri="https://other-service.io/api"))
assert catalog.uri == "https://other-service.io/api"
+
+
+def test_table_identifier_in_commit_table_request(rest_mock: Mocker,
example_table_metadata_v2: Dict[str, Any]) -> None:
+ test_table_request = CommitTableRequest(
+ identifier=TableIdentifier(namespace=("catalog_name", "namespace"),
name="table_name"),
+ updates=[],
+ requirements=[],
+ )
+ rest_mock.post(
+ url=f"{TEST_URI}v1/namespaces/namespace/tables/table_name",
+ json={
+ "metadata": example_table_metadata_v2,
+ "metadata-location": "test",
+ },
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ RestCatalog("catalog_name", uri=TEST_URI,
token=TEST_TOKEN)._commit_table(test_table_request)
+ assert (
+ rest_mock.last_request.text
+ ==
"""{"identifier":{"namespace":["namespace"],"name":"table_name"},"requirements":[],"updates":[]}"""
+ )
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index 09fe654d..93999d96 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -33,7 +33,7 @@ from pydantic_core import ValidationError
from pyspark.sql import SparkSession
from pytest_mock.plugin import MockerFixture
-from pyiceberg.catalog import Catalog
+from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
@@ -1282,3 +1282,14 @@ def test_merge_manifests_file_content(session_catalog:
Catalog, arrow_table_with
(11, 3),
(12, 3),
]
+
+
[email protected]
+def test_rest_catalog_with_empty_catalog_name_append_data(session_catalog:
Catalog, arrow_table_with_null: pa.Table) -> None:
+ identifier = "default.test_rest_append"
+ test_catalog = load_catalog(
+ "", # intentionally empty
+ **session_catalog.properties,
+ )
+ tbl = _create_table(test_catalog, identifier, data=[])
+ tbl.append(arrow_table_with_null)