This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 8aeab495 Pyarrow IO property for configuring large v small types on 
read (#986)
8aeab495 is described below

commit 8aeab4951080fa196c0d29c72cba1cbba824ffc4
Author: Sung Yun <[email protected]>
AuthorDate: Wed Aug 7 05:10:01 2024 -0400

    Pyarrow IO property for configuring large v small types on read (#986)
    
    * upyarrow IO property for configuring large v small types on read
    
    * tests
    
    * adopt feedback
    
    * use property_as_bool
    
    * fix
    
    * docs
    
    * nits
    
    * respect flag on promotion
    
    * lint
    
    ---------
    
    Co-authored-by: Sung Yun <[email protected]>
---
 mkdocs/docs/configuration.md     | 10 +++++
 pyiceberg/io/__init__.py         |  1 +
 pyiceberg/io/pyarrow.py          | 81 ++++++++++++++++++++++++++++++++-----
 tests/integration/test_reads.py  | 87 +++++++++++++++++++++++++++++++++++++++-
 tests/io/test_pyarrow_visitor.py |  6 +++
 5 files changed, 174 insertions(+), 11 deletions(-)

diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md
index ff374165..d4a8de36 100644
--- a/mkdocs/docs/configuration.md
+++ b/mkdocs/docs/configuration.md
@@ -137,6 +137,16 @@ For the FileIO there are several configuration options 
available:
 
 <!-- markdown-link-check-enable-->
 
+### PyArrow
+
+<!-- markdown-link-check-disable -->
+
+| Key                             | Example | Description                      
                                                                                
                                                                                
                                                                                
                                                                               |
+| ------------------------------- | ------- | 
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
+| pyarrow.use-large-types-on-read | True    | Use large PyArrow types i.e. 
[large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html),
 
[large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html)
 and 
[large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html)
 field types on table scans. The default value is True. |
+
+<!-- markdown-link-check-enable-->
+
 ## Catalogs
 
 PyIceberg currently has native catalog type support for REST, SQL, Hive, Glue 
and DynamoDB.
diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py
index d2008747..0567af2d 100644
--- a/pyiceberg/io/__init__.py
+++ b/pyiceberg/io/__init__.py
@@ -80,6 +80,7 @@ GCS_SESSION_KWARGS = "gcs.session-kwargs"
 GCS_ENDPOINT = "gcs.endpoint"
 GCS_DEFAULT_LOCATION = "gcs.default-bucket-location"
 GCS_VERSION_AWARE = "gcs.version-aware"
+PYARROW_USE_LARGE_TYPES_ON_READ = "pyarrow.use-large-types-on-read"
 
 
 @runtime_checkable
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index aefe86ac..5bbf6575 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -95,6 +95,7 @@ from pyiceberg.io import (
     HDFS_KERB_TICKET,
     HDFS_PORT,
     HDFS_USER,
+    PYARROW_USE_LARGE_TYPES_ON_READ,
     S3_ACCESS_KEY_ID,
     S3_CONNECT_TIMEOUT,
     S3_ENDPOINT,
@@ -158,7 +159,7 @@ from pyiceberg.utils.concurrent import ExecutorFactory
 from pyiceberg.utils.config import Config
 from pyiceberg.utils.datetime import millis_to_datetime
 from pyiceberg.utils.deprecated import deprecated
-from pyiceberg.utils.properties import get_first_property_value, 
property_as_int
+from pyiceberg.utils.properties import get_first_property_value, 
property_as_bool, property_as_int
 from pyiceberg.utils.singleton import Singleton
 from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, 
truncate_upper_bound_text_string
 
@@ -835,6 +836,10 @@ def _pyarrow_schema_ensure_large_types(schema: pa.Schema) 
-> pa.Schema:
     return visit_pyarrow(schema, _ConvertToLargeTypes())
 
 
+def _pyarrow_schema_ensure_small_types(schema: pa.Schema) -> pa.Schema:
+    return visit_pyarrow(schema, _ConvertToSmallTypes())
+
+
 @singledispatch
 def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: 
PyArrowSchemaVisitor[T]) -> T:
     """Apply a pyarrow schema visitor to any point within a schema.
@@ -876,7 +881,6 @@ def _(obj: Union[pa.ListType, pa.LargeListType, 
pa.FixedSizeListType], visitor:
     visitor.before_list_element(obj.value_field)
     result = visit_pyarrow(obj.value_type, visitor)
     visitor.after_list_element(obj.value_field)
-
     return visitor.list(obj, result)
 
 
@@ -1145,6 +1149,30 @@ class 
_ConvertToLargeTypes(PyArrowSchemaVisitor[Union[pa.DataType, pa.Schema]]):
         return primitive
 
 
+class _ConvertToSmallTypes(PyArrowSchemaVisitor[Union[pa.DataType, 
pa.Schema]]):
+    def schema(self, schema: pa.Schema, struct_result: pa.StructType) -> 
pa.Schema:
+        return pa.schema(struct_result)
+
+    def struct(self, struct: pa.StructType, field_results: List[pa.Field]) -> 
pa.StructType:
+        return pa.struct(field_results)
+
+    def field(self, field: pa.Field, field_result: pa.DataType) -> pa.Field:
+        return field.with_type(field_result)
+
+    def list(self, list_type: pa.ListType, element_result: pa.DataType) -> 
pa.DataType:
+        return pa.list_(element_result)
+
+    def map(self, map_type: pa.MapType, key_result: pa.DataType, value_result: 
pa.DataType) -> pa.DataType:
+        return pa.map_(key_result, value_result)
+
+    def primitive(self, primitive: pa.DataType) -> pa.DataType:
+        if primitive == pa.large_string():
+            return pa.string()
+        elif primitive == pa.large_binary():
+            return pa.binary()
+        return primitive
+
+
 class _ConvertToIcebergWithoutIDs(_ConvertToIceberg):
     """
     Converts PyArrowSchema to Iceberg Schema with all -1 ids.
@@ -1169,6 +1197,7 @@ def _task_to_record_batches(
     positional_deletes: Optional[List[ChunkedArray]],
     case_sensitive: bool,
     name_mapping: Optional[NameMapping] = None,
+    use_large_types: bool = True,
 ) -> Iterator[pa.RecordBatch]:
     _, _, path = PyArrowFileIO.parse_location(task.file.file_path)
     arrow_format = ds.ParquetFileFormat(pre_buffer=True, 
buffer_size=(ONE_MEGABYTE * 8))
@@ -1197,7 +1226,9 @@ def _task_to_record_batches(
             # https://github.com/apache/arrow/issues/41884
             # https://github.com/apache/arrow/issues/43183
             # Would be good to remove this later on
-            schema=_pyarrow_schema_ensure_large_types(physical_schema),
+            schema=_pyarrow_schema_ensure_large_types(physical_schema)
+            if use_large_types
+            else (_pyarrow_schema_ensure_small_types(physical_schema)),
             # This will push down the query to Arrow.
             # But in case there are positional deletes, we have to apply them 
first
             filter=pyarrow_filter if not positional_deletes else None,
@@ -1219,7 +1250,9 @@ def _task_to_record_batches(
                     arrow_table = pa.Table.from_batches([batch])
                     arrow_table = arrow_table.filter(pyarrow_filter)
                     batch = arrow_table.to_batches()[0]
-            yield _to_requested_schema(projected_schema, file_project_schema, 
batch, downcast_ns_timestamp_to_us=True)
+            yield _to_requested_schema(
+                projected_schema, file_project_schema, batch, 
downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
+            )
             current_index += len(batch)
 
 
@@ -1232,10 +1265,19 @@ def _task_to_table(
     positional_deletes: Optional[List[ChunkedArray]],
     case_sensitive: bool,
     name_mapping: Optional[NameMapping] = None,
+    use_large_types: bool = True,
 ) -> Optional[pa.Table]:
     batches = list(
         _task_to_record_batches(
-            fs, task, bound_row_filter, projected_schema, projected_field_ids, 
positional_deletes, case_sensitive, name_mapping
+            fs,
+            task,
+            bound_row_filter,
+            projected_schema,
+            projected_field_ids,
+            positional_deletes,
+            case_sensitive,
+            name_mapping,
+            use_large_types,
         )
     )
 
@@ -1303,6 +1345,8 @@ def project_table(
             # When FsSpec is not installed
             raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
 
+    use_large_types = property_as_bool(io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
     bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
 
     projected_field_ids = {
@@ -1322,6 +1366,7 @@ def project_table(
             deletes_per_file.get(task.file.file_path),
             case_sensitive,
             table_metadata.name_mapping(),
+            use_large_types,
         )
         for task in tasks
     ]
@@ -1394,6 +1439,8 @@ def project_batches(
             # When FsSpec is not installed
             raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
 
+    use_large_types = property_as_bool(io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
     bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
 
     projected_field_ids = {
@@ -1414,6 +1461,7 @@ def project_batches(
             deletes_per_file.get(task.file.file_path),
             case_sensitive,
             table_metadata.name_mapping(),
+            use_large_types,
         )
         for batch in batches:
             if limit is not None:
@@ -1447,12 +1495,13 @@ def _to_requested_schema(
     batch: pa.RecordBatch,
     downcast_ns_timestamp_to_us: bool = False,
     include_field_ids: bool = False,
+    use_large_types: bool = True,
 ) -> pa.RecordBatch:
     # We could re-use some of these visitors
     struct_array = visit_with_partner(
         requested_schema,
         batch,
-        ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, 
include_field_ids),
+        ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, 
include_field_ids, use_large_types),
         ArrowAccessor(file_schema),
     )
     return pa.RecordBatch.from_struct_array(struct_array)
@@ -1462,20 +1511,31 @@ class 
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
     _file_schema: Schema
     _include_field_ids: bool
     _downcast_ns_timestamp_to_us: bool
+    _use_large_types: bool
 
-    def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool 
= False, include_field_ids: bool = False) -> None:
+    def __init__(
+        self,
+        file_schema: Schema,
+        downcast_ns_timestamp_to_us: bool = False,
+        include_field_ids: bool = False,
+        use_large_types: bool = True,
+    ) -> None:
         self._file_schema = file_schema
         self._include_field_ids = include_field_ids
         self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
+        self._use_large_types = use_large_types
 
     def _cast_if_needed(self, field: NestedField, values: pa.Array) -> 
pa.Array:
         file_field = self._file_schema.find_field(field.field_id)
 
         if field.field_type.is_primitive:
             if field.field_type != file_field.field_type:
-                return values.cast(
-                    schema_to_pyarrow(promote(file_field.field_type, 
field.field_type), include_field_ids=self._include_field_ids)
+                target_schema = schema_to_pyarrow(
+                    promote(file_field.field_type, field.field_type), 
include_field_ids=self._include_field_ids
                 )
+                if not self._use_large_types:
+                    target_schema = 
_pyarrow_schema_ensure_small_types(target_schema)
+                return values.cast(target_schema)
             elif (target_type := schema_to_pyarrow(field.field_type, 
include_field_ids=self._include_field_ids)) != values.type:
                 if field.field_type == TimestampType():
                     # Downcasting of nanoseconds to microseconds
@@ -1547,12 +1607,13 @@ class 
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
 
     def list(self, list_type: ListType, list_array: Optional[pa.Array], 
value_array: Optional[pa.Array]) -> Optional[pa.Array]:
         if isinstance(list_array, (pa.ListArray, pa.LargeListArray, 
pa.FixedSizeListArray)) and value_array is not None:
+            list_initializer = pa.large_list if isinstance(list_array, 
pa.LargeListArray) else pa.list_
             if isinstance(value_array, pa.StructArray):
                 # This can be removed once this has been fixed:
                 # https://github.com/apache/arrow/issues/38809
                 list_array = pa.LargeListArray.from_arrays(list_array.offsets, 
value_array)
             value_array = self._cast_if_needed(list_type.element_field, 
value_array)
-            arrow_field = 
pa.large_list(self._construct_field(list_type.element_field, value_array.type))
+            arrow_field = 
list_initializer(self._construct_field(list_type.element_field, 
value_array.type))
             return list_array.cast(arrow_field)
         else:
             return None
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index 078ec163..a2d34661 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -40,10 +40,14 @@ from pyiceberg.expressions import (
     NotEqualTo,
     NotNaN,
 )
-from pyiceberg.io.pyarrow import pyarrow_to_schema
+from pyiceberg.io import PYARROW_USE_LARGE_TYPES_ON_READ
+from pyiceberg.io.pyarrow import (
+    pyarrow_to_schema,
+)
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.types import (
+    BinaryType,
     BooleanType,
     IntegerType,
     NestedField,
@@ -665,6 +669,87 @@ def test_hive_locking_with_retry(session_catalog_hive: 
HiveCatalog) -> None:
         assert table.properties.get("lock") == "xxx"
 
 
[email protected]
[email protected]("catalog", 
[pytest.lazy_fixture("session_catalog_hive"), 
pytest.lazy_fixture("session_catalog")])
+def test_table_scan_default_to_large_types(catalog: Catalog) -> None:
+    identifier = "default.test_table_scan_default_to_large_types"
+    arrow_table = pa.Table.from_arrays(
+        [
+            pa.array(["a", "b", "c"]),
+            pa.array(["a", "b", "c"]),
+            pa.array([b"a", b"b", b"c"]),
+            pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
+        ],
+        names=["string", "string-to-binary", "binary", "list"],
+    )
+
+    try:
+        catalog.drop_table(identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = catalog.create_table(
+        identifier,
+        schema=arrow_table.schema,
+    )
+
+    tbl.append(arrow_table)
+
+    with tbl.update_schema() as update_schema:
+        update_schema.update_column("string-to-binary", BinaryType())
+
+    result_table = tbl.scan().to_arrow()
+
+    expected_schema = pa.schema([
+        pa.field("string", pa.large_string()),
+        pa.field("string-to-binary", pa.large_binary()),
+        pa.field("binary", pa.large_binary()),
+        pa.field("list", pa.large_list(pa.large_string())),
+    ])
+    assert result_table.schema.equals(expected_schema)
+
+
[email protected]
[email protected]("catalog", 
[pytest.lazy_fixture("session_catalog_hive"), 
pytest.lazy_fixture("session_catalog")])
+def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
+    identifier = "default.test_table_scan_override_with_small_types"
+    arrow_table = pa.Table.from_arrays(
+        [
+            pa.array(["a", "b", "c"]),
+            pa.array(["a", "b", "c"]),
+            pa.array([b"a", b"b", b"c"]),
+            pa.array([["a", "b"], ["c", "d"], ["e", "f"]]),
+        ],
+        names=["string", "string-to-binary", "binary", "list"],
+    )
+
+    try:
+        catalog.drop_table(identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = catalog.create_table(
+        identifier,
+        schema=arrow_table.schema,
+    )
+
+    tbl.append(arrow_table)
+
+    with tbl.update_schema() as update_schema:
+        update_schema.update_column("string-to-binary", BinaryType())
+
+    tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
+    result_table = tbl.scan().to_arrow()
+
+    expected_schema = pa.schema([
+        pa.field("string", pa.string()),
+        pa.field("string-to-binary", pa.binary()),
+        pa.field("binary", pa.binary()),
+        pa.field("list", pa.list_(pa.string())),
+    ])
+    assert result_table.schema.equals(expected_schema)
+
+
 @pytest.mark.integration
 @pytest.mark.parametrize("catalog", 
[pytest.lazy_fixture("session_catalog_hive"), 
pytest.lazy_fixture("session_catalog")])
 def test_empty_scan_ordered_str(catalog: Catalog) -> None:
diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py
index f0a2a458..9e6df720 100644
--- a/tests/io/test_pyarrow_visitor.py
+++ b/tests/io/test_pyarrow_visitor.py
@@ -40,6 +40,7 @@ from pyiceberg.io.pyarrow import (
     _HasIds,
     _NullNaNUnmentionedTermsCollector,
     _pyarrow_schema_ensure_large_types,
+    _pyarrow_schema_ensure_small_types,
     pyarrow_to_schema,
     schema_to_pyarrow,
     visit_pyarrow,
@@ -596,6 +597,11 @@ def 
test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa
     assert 
_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == 
expected_schema
 
 
+def 
test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids:
 pa.Schema) -> None:
+    schema_with_large_types = 
_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids)
+    assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == 
pyarrow_schema_nested_without_ids
+
+
 @pytest.fixture
 def bound_reference_str() -> BoundReference[Any]:
     return BoundReference(

Reply via email to