This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new c68b9b1e Support Location Providers (#1452)
c68b9b1e is described below
commit c68b9b1eb0530c5df2a8b114f6df54b63a8374d8
Author: smaheshwar-pltr <[email protected]>
AuthorDate: Fri Jan 10 22:33:48 2025 +0000
Support Location Providers (#1452)
* Skeletal implementation
* First attempt at hashing locations
* Relocate to table submodule; code and comment improvements
* Add unit tests
* Remove entropy check
* Nit: Prefer `self.table_properties`
* Remove special character testing
* Add integration tests for writes
* Move all `LocationProviders`-related code into locations.py
* Nit: tiny for loop refactor
* Fix typo
* Object storage as default location provider
* Update tests/integration/test_writes/test_partitioned_writes.py
Co-authored-by: Kevin Liu <[email protected]>
* Test entropy in test_object_storage_injects_entropy
* Refactor integration tests to use properties and omit when default once
* Use a different table property for custom location provision
* write.location-provider.py-impl -> write.py-location-provider.impl
* Make lint
* Move location provider loading into `write_file` for back-compat
* Make object storage no longer the default
* Add test case for partitioned paths disabled but with no partition
special case
* Moved constants within ObjectStoreLocationProvider
---------
Co-authored-by: Sreesh Maheshwar <[email protected]>
Co-authored-by: Kevin Liu <[email protected]>
---
pyiceberg/io/pyarrow.py | 7 +-
pyiceberg/table/__init__.py | 15 ++-
pyiceberg/table/locations.py | 145 +++++++++++++++++++++
.../test_writes/test_partitioned_writes.py | 39 ++++++
tests/integration/test_writes/test_writes.py | 27 ++++
tests/table/test_locations.py | 130 ++++++++++++++++++
6 files changed, 355 insertions(+), 8 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index ad7e4f4f..1ce08428 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -136,6 +136,7 @@ from pyiceberg.schema import (
visit,
visit_with_partner,
)
+from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.transforms import TruncateTransform
@@ -2305,6 +2306,7 @@ def write_file(io: FileIO, table_metadata: TableMetadata,
tasks: Iterator[WriteT
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
)
+ location_provider =
load_location_provider(table_location=table_metadata.location,
table_properties=table_metadata.properties)
def write_parquet(task: WriteTask) -> DataFile:
table_schema = table_metadata.schema()
@@ -2327,7 +2329,10 @@ def write_file(io: FileIO, table_metadata:
TableMetadata, tasks: Iterator[WriteT
for batch in task.record_batches
]
arrow_table = pa.Table.from_batches(batches)
- file_path =
f"{table_metadata.location}/data/{task.generate_data_file_path('parquet')}"
+ file_path = location_provider.new_data_location(
+ data_file_name=task.generate_data_file_filename("parquet"),
+ partition_key=task.partition_key,
+ )
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_table.schema,
**parquet_writer_kwargs) as writer:
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 7bc3fe83..0c8c848c 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -187,6 +187,14 @@ class TableProperties:
WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0
+ WRITE_PY_LOCATION_PROVIDER_IMPL = "write.py-location-provider.impl"
+
+ OBJECT_STORE_ENABLED = "write.object-storage.enabled"
+ OBJECT_STORE_ENABLED_DEFAULT = False
+
+ WRITE_OBJECT_STORE_PARTITIONED_PATHS =
"write.object-storage.partitioned-paths"
+ WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True
+
DELETE_MODE = "write.delete.mode"
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
DELETE_MODE_MERGE_ON_READ = "merge-on-read"
@@ -1613,13 +1621,6 @@ class WriteTask:
#
https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
- def generate_data_file_path(self, extension: str) -> str:
- if self.partition_key:
- file_path =
f"{self.partition_key.to_path()}/{self.generate_data_file_filename(extension)}"
- return file_path
- else:
- return self.generate_data_file_filename(extension)
-
@dataclass(frozen=True)
class AddFileTask:
diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py
new file mode 100644
index 00000000..046ee325
--- /dev/null
+++ b/pyiceberg/table/locations.py
@@ -0,0 +1,145 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import importlib
+import logging
+from abc import ABC, abstractmethod
+from typing import Optional
+
+import mmh3
+
+from pyiceberg.partitioning import PartitionKey
+from pyiceberg.table import TableProperties
+from pyiceberg.typedef import Properties
+from pyiceberg.utils.properties import property_as_bool
+
+logger = logging.getLogger(__name__)
+
+
+class LocationProvider(ABC):
+ """A base class for location providers, that provide data file locations
for write tasks."""
+
+ table_location: str
+ table_properties: Properties
+
+ def __init__(self, table_location: str, table_properties: Properties):
+ self.table_location = table_location
+ self.table_properties = table_properties
+
+ @abstractmethod
+ def new_data_location(self, data_file_name: str, partition_key:
Optional[PartitionKey] = None) -> str:
+ """Return a fully-qualified data file location for the given filename.
+
+ Args:
+ data_file_name (str): The name of the data file.
+ partition_key (Optional[PartitionKey]): The data file's partition
key. If None, the data is not partitioned.
+
+ Returns:
+ str: A fully-qualified location URI for the data file.
+ """
+
+
+class SimpleLocationProvider(LocationProvider):
+ def __init__(self, table_location: str, table_properties: Properties):
+ super().__init__(table_location, table_properties)
+
+ def new_data_location(self, data_file_name: str, partition_key:
Optional[PartitionKey] = None) -> str:
+ prefix = f"{self.table_location}/data"
+ return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if
partition_key else f"{prefix}/{data_file_name}"
+
+
+class ObjectStoreLocationProvider(LocationProvider):
+ HASH_BINARY_STRING_BITS = 20
+ ENTROPY_DIR_LENGTH = 4
+ ENTROPY_DIR_DEPTH = 3
+
+ _include_partition_paths: bool
+
+ def __init__(self, table_location: str, table_properties: Properties):
+ super().__init__(table_location, table_properties)
+ self._include_partition_paths = property_as_bool(
+ self.table_properties,
+ TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS,
+ TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT,
+ )
+
+ def new_data_location(self, data_file_name: str, partition_key:
Optional[PartitionKey] = None) -> str:
+ if self._include_partition_paths and partition_key:
+ return
self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")
+
+ prefix = f"{self.table_location}/data"
+ hashed_path = self._compute_hash(data_file_name)
+
+ return (
+ f"{prefix}/{hashed_path}/{data_file_name}"
+ if self._include_partition_paths
+ else f"{prefix}/{hashed_path}-{data_file_name}"
+ )
+
+ @staticmethod
+ def _compute_hash(data_file_name: str) -> str:
+ # Bitwise AND to combat sign-extension; bitwise OR to preserve leading
zeroes that `bin` would otherwise strip.
+ top_mask = 1 << ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS
+ hash_code = mmh3.hash(data_file_name) & (top_mask - 1) | top_mask
+ return
ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS
:])
+
+ @staticmethod
+ def _dirs_from_hash(file_hash: str) -> str:
+ """Divides hash into directories for optimized orphan removal
operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH."""
+ total_entropy_length = ObjectStoreLocationProvider.ENTROPY_DIR_DEPTH *
ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH
+
+ hash_with_dirs = []
+ for i in range(0, total_entropy_length,
ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH):
+ hash_with_dirs.append(file_hash[i : i +
ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH])
+
+ if len(file_hash) > total_entropy_length:
+ hash_with_dirs.append(file_hash[total_entropy_length:])
+
+ return "/".join(hash_with_dirs)
+
+
+def _import_location_provider(
+ location_provider_impl: str, table_location: str, table_properties:
Properties
+) -> Optional[LocationProvider]:
+ try:
+ path_parts = location_provider_impl.split(".")
+ if len(path_parts) < 2:
+ raise ValueError(
+ f"{TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL} should be
full path (module.CustomLocationProvider), got: {location_provider_impl}"
+ )
+ module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
+ module = importlib.import_module(module_name)
+ class_ = getattr(module, class_name)
+ return class_(table_location, table_properties)
+ except ModuleNotFoundError:
+ logger.warning("Could not initialize LocationProvider: %s",
location_provider_impl)
+ return None
+
+
+def load_location_provider(table_location: str, table_properties: Properties)
-> LocationProvider:
+ table_location = table_location.rstrip("/")
+
+ if location_provider_impl :=
table_properties.get(TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL):
+ if location_provider :=
_import_location_provider(location_provider_impl, table_location,
table_properties):
+ logger.info("Loaded LocationProvider: %s", location_provider_impl)
+ return location_provider
+ else:
+ raise ValueError(f"Could not initialize LocationProvider:
{location_provider_impl}")
+
+ if property_as_bool(table_properties,
TableProperties.OBJECT_STORE_ENABLED,
TableProperties.OBJECT_STORE_ENABLED_DEFAULT):
+ return ObjectStoreLocationProvider(table_location, table_properties)
+ else:
+ return SimpleLocationProvider(table_location, table_properties)
diff --git a/tests/integration/test_writes/test_partitioned_writes.py
b/tests/integration/test_writes/test_partitioned_writes.py
index 8a3a5c9a..50a1bc8c 100644
--- a/tests/integration/test_writes/test_partitioned_writes.py
+++ b/tests/integration/test_writes/test_partitioned_writes.py
@@ -28,6 +28,7 @@ from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
+from pyiceberg.table import TableProperties
from pyiceberg.transforms import (
BucketTransform,
DayTransform,
@@ -280,6 +281,44 @@ def test_query_filter_v1_v2_append_null(
assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows
for {col}"
[email protected]
[email protected](
+ "part_col", ["int", "bool", "string", "string_long", "long", "float",
"double", "date", "timestamp", "timestamptz", "binary"]
+)
[email protected]("format_version", [1, 2])
+def test_object_storage_location_provider_excludes_partition_path(
+ session_catalog: Catalog, spark: SparkSession, arrow_table_with_null:
pa.Table, part_col: str, format_version: int
+) -> None:
+ nested_field = TABLE_SCHEMA.find_field(part_col)
+ partition_spec = PartitionSpec(
+ PartitionField(source_id=nested_field.field_id, field_id=1001,
transform=IdentityTransform(), name=part_col)
+ )
+
+ tbl = _create_table(
+ session_catalog=session_catalog,
+
identifier=f"default.arrow_table_v{format_version}_with_null_partitioned_on_col_{part_col}",
+ # write.object-storage.partitioned-paths defaults to True
+ properties={"format-version": str(format_version),
TableProperties.OBJECT_STORE_ENABLED: True},
+ data=[arrow_table_with_null],
+ partition_spec=partition_spec,
+ )
+
+ original_paths = tbl.inspect.data_files().to_pydict()["file_path"]
+ assert len(original_paths) == 3
+
+ # Update props to exclude partitioned paths and append data
+ with tbl.transaction() as tx:
+
tx.set_properties({TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS: False})
+ tbl.append(arrow_table_with_null)
+
+ added_paths = set(tbl.inspect.data_files().to_pydict()["file_path"]) -
set(original_paths)
+ assert len(added_paths) == 3
+
+ # All paths before the props update should contain the partition, while
all paths after should not
+ assert all(f"{part_col}=" in path for path in original_paths)
+ assert all(f"{part_col}=" not in path for path in added_paths)
+
+
@pytest.mark.integration
@pytest.mark.parametrize(
"spec",
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index c23e8365..fff48b93 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -285,6 +285,33 @@ def test_data_files(spark: SparkSession, session_catalog:
Catalog, arrow_table_w
assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 0, 0]
[email protected]
[email protected]("format_version", [1, 2])
+def test_object_storage_data_files(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, format_version: int
+) -> None:
+ tbl = _create_table(
+ session_catalog=session_catalog,
+ identifier="default.object_stored",
+ properties={"format-version": format_version,
TableProperties.OBJECT_STORE_ENABLED: True},
+ data=[arrow_table_with_null],
+ )
+ tbl.append(arrow_table_with_null)
+
+ paths = tbl.inspect.data_files().to_pydict()["file_path"]
+ assert len(paths) == 2
+
+ for location in paths:
+ assert
location.startswith("s3://warehouse/default/object_stored/data/")
+ parts = location.split("/")
+ assert len(parts) == 11
+
+ # Entropy binary directories should have been injected
+ for dir_name in parts[6:10]:
+ assert dir_name
+ assert all(c in "01" for c in dir_name)
+
+
@pytest.mark.integration
def test_python_writes_with_spark_snapshot_reads(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table
diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py
new file mode 100644
index 00000000..bda2442a
--- /dev/null
+++ b/tests/table/test_locations.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+import pytest
+
+from pyiceberg.partitioning import PartitionField, PartitionFieldValue,
PartitionKey, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table.locations import LocationProvider, load_location_provider
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.typedef import EMPTY_DICT
+from pyiceberg.types import NestedField, StringType
+
+PARTITION_FIELD = PartitionField(source_id=1, field_id=1002,
transform=IdentityTransform(), name="string_field")
+PARTITION_KEY = PartitionKey(
+ raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD,
"example_string")],
+ partition_spec=PartitionSpec(PARTITION_FIELD),
+ schema=Schema(NestedField(field_id=1, name="string_field",
field_type=StringType(), required=False)),
+)
+
+
+class CustomLocationProvider(LocationProvider):
+ def new_data_location(self, data_file_name: str, partition_key:
Optional[PartitionKey] = None) -> str:
+ return f"custom_location_provider/{data_file_name}"
+
+
+def test_default_location_provider() -> None:
+ provider = load_location_provider(table_location="table_location",
table_properties=EMPTY_DICT)
+
+ assert provider.new_data_location("my_file") ==
"table_location/data/my_file"
+
+
+def test_custom_location_provider() -> None:
+ qualified_name = CustomLocationProvider.__module__ + "." +
CustomLocationProvider.__name__
+ provider = load_location_provider(
+ table_location="table_location",
table_properties={"write.py-location-provider.impl": qualified_name}
+ )
+
+ assert provider.new_data_location("my_file") ==
"custom_location_provider/my_file"
+
+
+def test_custom_location_provider_single_path() -> None:
+ with pytest.raises(ValueError, match=r"write\.py-location-provider\.impl
should be full path"):
+ load_location_provider(table_location="table_location",
table_properties={"write.py-location-provider.impl": "not_found"})
+
+
+def test_custom_location_provider_not_found() -> None:
+ with pytest.raises(ValueError, match=r"Could not initialize
LocationProvider"):
+ load_location_provider(
+ table_location="table_location",
table_properties={"write.py-location-provider.impl": "module.not_found"}
+ )
+
+
+def test_object_storage_injects_entropy() -> None:
+ provider = load_location_provider(table_location="table_location",
table_properties={"write.object-storage.enabled": "true"})
+
+ location = provider.new_data_location("test.parquet")
+ parts = location.split("/")
+
+ assert len(parts) == 7
+ assert parts[0] == "table_location"
+ assert parts[1] == "data"
+ assert parts[-1] == "test.parquet"
+
+ # Entropy directories in the middle
+ for dir_name in parts[2:-1]:
+ assert dir_name
+ assert all(c in "01" for c in dir_name)
+
+
[email protected]("object_storage", [True, False])
+def test_partition_value_in_path(object_storage: bool) -> None:
+ provider = load_location_provider(
+ table_location="table_location",
+ table_properties={
+ "write.object-storage.enabled": str(object_storage),
+ },
+ )
+
+ location = provider.new_data_location("test.parquet", PARTITION_KEY)
+ partition_segment = location.split("/")[-2]
+
+ assert partition_segment == "string_field=example_string"
+
+
+# NB: We test here with None partition key too because disabling partitioned
paths still replaces final / with - even in
+# paths of un-partitioned files. This matches the behaviour of the Java
implementation.
[email protected]("partition_key", [PARTITION_KEY, None])
+def test_object_storage_partitioned_paths_disabled(partition_key:
Optional[PartitionKey]) -> None:
+ provider = load_location_provider(
+ table_location="table_location",
+ table_properties={
+ "write.object-storage.enabled": "true",
+ "write.object-storage.partitioned-paths": "false",
+ },
+ )
+
+ location = provider.new_data_location("test.parquet", partition_key)
+
+ # No partition values included in the path and last part of entropy is
separated with "-"
+ assert location ==
"table_location/data/0110/1010/0011/11101000-test.parquet"
+
+
[email protected](
+ ["data_file_name", "expected_hash"],
+ [
+ ("a", "0101/0110/1001/10110010"),
+ ("b", "1110/0111/1110/00000011"),
+ ("c", "0010/1101/0110/01011111"),
+ ("d", "1001/0001/0100/01110011"),
+ ],
+)
+def test_hash_injection(data_file_name: str, expected_hash: str) -> None:
+ provider = load_location_provider(table_location="table_location",
table_properties={"write.object-storage.enabled": "true"})
+
+ assert provider.new_data_location(data_file_name) ==
f"table_location/data/{expected_hash}/{data_file_name}"