This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 6aeb1262 Fan out writing to multiple Parquet files (#444)
6aeb1262 is described below
commit 6aeb12620eaf1caaf9ed5287d1118227bb419886
Author: Kevin Liu <[email protected]>
AuthorDate: Thu Mar 28 12:39:30 2024 -0700
Fan out writing to multiple Parquet files (#444)
* bin pack write
* add write target file size config
* test
* add test for multiple data files
* parquet writer write once
* parallelize write tasks
* refactor
* chunk correctly using to_batches
* change variable names
* get rid of assert
* configure PackingIterator
* add more tests
* rewrite set_properties
* set int property
---
pyiceberg/io/pyarrow.py | 91 +++++++++++++++++++++++-----------------
pyiceberg/table/__init__.py | 19 +++++++--
tests/conftest.py | 59 +++++++++++++++++++++++++-
tests/integration/test_writes.py | 43 ++++++++++++++++++-
tests/io/test_pyarrow.py | 25 ++++++++++-
5 files changed, 192 insertions(+), 45 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index baefb356..738cd77b 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1761,54 +1761,67 @@ def data_file_statistics_from_parquet_metadata(
def write_file(io: FileIO, table_metadata: TableMetadata, tasks:
Iterator[WriteTask]) -> Iterator[DataFile]:
- task = next(tasks)
-
- try:
- _ = next(tasks)
- # If there are more tasks, raise an exception
- raise NotImplementedError("Only unpartitioned writes are supported:
https://github.com/apache/iceberg-python/issues/208")
- except StopIteration:
- pass
-
- parquet_writer_kwargs =
_get_parquet_writer_kwargs(table_metadata.properties)
-
- file_path =
f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
schema = table_metadata.schema()
arrow_file_schema = schema.as_arrow()
+ parquet_writer_kwargs =
_get_parquet_writer_kwargs(table_metadata.properties)
- fo = io.new_output(file_path)
row_group_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
)
- with fo.create(overwrite=True) as fos:
- with pq.ParquetWriter(fos, schema=arrow_file_schema,
**parquet_writer_kwargs) as writer:
- writer.write_table(task.df, row_group_size=row_group_size)
-
- statistics = data_file_statistics_from_parquet_metadata(
- parquet_metadata=writer.writer.metadata,
- stats_columns=compute_statistics_plan(schema,
table_metadata.properties),
- parquet_column_mapping=parquet_path_to_id_mapping(schema),
- )
- data_file = DataFile(
- content=DataFileContent.DATA,
- file_path=file_path,
- file_format=FileFormat.PARQUET,
- partition=Record(),
- file_size_in_bytes=len(fo),
- # After this has been fixed:
- # https://github.com/apache/iceberg-python/issues/271
- # sort_order_id=task.sort_order_id,
- sort_order_id=None,
- # Just copy these from the table for now
- spec_id=table_metadata.default_spec_id,
- equality_ids=None,
- key_metadata=None,
- **statistics.to_serialized_dict(),
- )
- return iter([data_file])
+ def write_parquet(task: WriteTask) -> DataFile:
+ file_path =
f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
+ fo = io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=arrow_file_schema,
**parquet_writer_kwargs) as writer:
+ writer.write(pa.Table.from_batches(task.record_batches),
row_group_size=row_group_size)
+
+ statistics = data_file_statistics_from_parquet_metadata(
+ parquet_metadata=writer.writer.metadata,
+ stats_columns=compute_statistics_plan(schema,
table_metadata.properties),
+ parquet_column_mapping=parquet_path_to_id_mapping(schema),
+ )
+ data_file = DataFile(
+ content=DataFileContent.DATA,
+ file_path=file_path,
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ file_size_in_bytes=len(fo),
+ # After this has been fixed:
+ # https://github.com/apache/iceberg-python/issues/271
+ # sort_order_id=task.sort_order_id,
+ sort_order_id=None,
+ # Just copy these from the table for now
+ spec_id=table_metadata.default_spec_id,
+ equality_ids=None,
+ key_metadata=None,
+ **statistics.to_serialized_dict(),
+ )
+
+ return data_file
+
+ executor = ExecutorFactory.get_or_create()
+ data_files = executor.map(write_parquet, tasks)
+
+ return iter(data_files)
+
+
+def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) ->
Iterator[List[pa.RecordBatch]]:
+ from pyiceberg.utils.bin_packing import PackingIterator
+
+ avg_row_size_bytes = tbl.nbytes / tbl.num_rows
+ target_rows_per_file = target_file_size // avg_row_size_bytes
+ batches = tbl.to_batches(max_chunksize=target_rows_per_file)
+ bin_packed_record_batches = PackingIterator(
+ items=batches,
+ target_weight=target_file_size,
+ lookback=len(batches), # ignore lookback
+ weight_func=lambda x: x.nbytes,
+ largest_bin_first=False,
+ )
+ return bin_packed_record_batches
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata,
file_paths: Iterator[str]) -> Iterator[DataFile]:
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 2ad1f7fe..18fac993 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -215,6 +215,9 @@ class TableProperties:
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX =
"write.parquet.bloom-filter-enabled.column"
+ WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
+ WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB
+
DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
@@ -2486,7 +2489,7 @@ def _add_and_move_fields(
class WriteTask:
write_uuid: uuid.UUID
task_id: int
- df: pa.Table
+ record_batches: List[pa.RecordBatch]
sort_order_id: Optional[int] = None
# Later to be extended with partition information
@@ -2521,7 +2524,7 @@ def _dataframe_to_data_files(
Returns:
An iterable that supplies datafiles that represent the table.
"""
- from pyiceberg.io.pyarrow import write_file
+ from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
if len([spec for spec in table_metadata.partition_specs if spec.spec_id !=
0]) > 0:
raise ValueError("Cannot write to partitioned tables")
@@ -2529,9 +2532,19 @@ def _dataframe_to_data_files(
counter = itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()
+ target_file_size = PropertyUtil.property_as_int(
+ properties=table_metadata.properties,
+ property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+ )
+
# This is an iter, so we don't have to materialize everything every time
# This will be more relevant when we start doing partitioned writes
- yield from write_file(io=io, table_metadata=table_metadata,
tasks=iter([WriteTask(write_uuid, next(counter), df)]))
+ yield from write_file(
+ io=io,
+ table_metadata=table_metadata,
+ tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches
in bin_pack_arrow_table(df, target_file_size)]), # type: ignore
+ )
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths:
List[str], io: FileIO) -> Iterable[DataFile]:
diff --git a/tests/conftest.py b/tests/conftest.py
index 48187ee6..d0f0d592 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -30,7 +30,7 @@ import re
import socket
import string
import uuid
-from datetime import datetime
+from datetime import date, datetime
from pathlib import Path
from random import choice
from tempfile import TemporaryDirectory
@@ -1987,3 +1987,60 @@ def spark() -> SparkSession:
)
return spark
+
+
+TEST_DATA_WITH_NULL = {
+ 'bool': [False, None, True],
+ 'string': ['a', None, 'z'],
+ # Go over the 16 bytes to kick in truncation
+ 'string_long': ['a' * 22, None, 'z' * 22],
+ 'int': [1, None, 9],
+ 'long': [1, None, 9],
+ 'float': [0.0, None, 0.9],
+ 'double': [0.0, None, 0.9],
+ 'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1,
19, 25, 00)],
+ 'timestamptz': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3,
1, 19, 25, 00)],
+ 'date': [date(2023, 1, 1), None, date(2023, 3, 1)],
+ # Not supported by Spark
+ # 'time': [time(1, 22, 0), None, time(19, 25, 0)],
+ # Not natively supported by Arrow
+ # 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None,
uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
+ 'binary': [b'\01', None, b'\22'],
+ 'fixed': [
+ uuid.UUID('00000000-0000-0000-0000-000000000000').bytes,
+ None,
+ uuid.UUID('11111111-1111-1111-1111-111111111111').bytes,
+ ],
+}
+
+
[email protected](scope="session")
+def pa_schema() -> "pa.Schema":
+ import pyarrow as pa
+
+ return pa.schema([
+ ("bool", pa.bool_()),
+ ("string", pa.string()),
+ ("string_long", pa.string()),
+ ("int", pa.int32()),
+ ("long", pa.int64()),
+ ("float", pa.float32()),
+ ("double", pa.float64()),
+ ("timestamp", pa.timestamp(unit="us")),
+ ("timestamptz", pa.timestamp(unit="us", tz="UTC")),
+ ("date", pa.date32()),
+ # Not supported by Spark
+ # ("time", pa.time64("us")),
+ # Not natively supported by Arrow
+ # ("uuid", pa.fixed(16)),
+ ("binary", pa.large_binary()),
+ ("fixed", pa.binary(16)),
+ ])
+
+
[email protected](scope="session")
+def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
+ import pyarrow as pa
+
+ """PyArrow table with all kinds of columns"""
+ return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index 87c33d65..5d6be0a7 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -37,7 +37,7 @@ from pyiceberg.catalog import Catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
-from pyiceberg.table import Table, _dataframe_to_data_files
+from pyiceberg.table import Table, TableProperties, _dataframe_to_data_files
from pyiceberg.typedef import Properties
from pyiceberg.types import (
BinaryType,
@@ -383,6 +383,47 @@ def test_python_writes_with_spark_snapshot_reads(
assert tbl.current_snapshot().snapshot_id ==
get_current_snapshot_id(identifier) # type: ignore
[email protected]
+def test_write_bin_pack_data_files(spark: SparkSession, session_catalog:
Catalog, arrow_table_with_null: pa.Table) -> None:
+ identifier = "default.write_bin_pack_data_files"
+ tbl = _create_table(session_catalog, identifier, {"format-version": "1"},
[])
+
+ def get_data_files_count(identifier: str) -> int:
+ return spark.sql(
+ f"""
+ SELECT *
+ FROM {identifier}.files
+ """
+ ).count()
+
+ # writes 1 data file since the table is smaller than default target file
size
+ assert arrow_table_with_null.nbytes <
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
+ tbl.overwrite(arrow_table_with_null)
+ assert get_data_files_count(identifier) == 1
+
+ # writes 1 data file as long as table is smaller than default target file
size
+ bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10)
+ assert bigger_arrow_tbl.nbytes <
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
+ tbl.overwrite(bigger_arrow_tbl)
+ assert get_data_files_count(identifier) == 1
+
+ # writes multiple data files once target file size is overridden
+ target_file_size = arrow_table_with_null.nbytes
+ tbl =
tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES:
target_file_size}).commit_transaction()
+ assert str(target_file_size) ==
tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES)
+ assert target_file_size < bigger_arrow_tbl.nbytes
+ tbl.overwrite(bigger_arrow_tbl)
+ assert get_data_files_count(identifier) == 10
+
+ # writes half the number of data files when target file size doubles
+ target_file_size = arrow_table_with_null.nbytes * 2
+ tbl =
tbl.transaction().set_properties({TableProperties.WRITE_TARGET_FILE_SIZE_BYTES:
target_file_size}).commit_transaction()
+ assert str(target_file_size) ==
tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES)
+ assert target_file_size < bigger_arrow_tbl.nbytes
+ tbl.overwrite(bigger_arrow_tbl)
+ assert get_data_files_count(identifier) == 5
+
+
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize(
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index c2d8f7bd..b99febd6 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -64,6 +64,7 @@ from pyiceberg.io.pyarrow import (
_ConvertToArrowSchema,
_primitive_to_physical,
_read_deletes,
+ bin_pack_arrow_table,
expression_to_pyarrow,
project_table,
schema_to_pyarrow,
@@ -71,7 +72,7 @@ from pyiceberg.io.pyarrow import (
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema, make_compatible_name, visit
-from pyiceberg.table import FileScanTask, Table
+from pyiceberg.table import FileScanTask, Table, TableProperties
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.typedef import UTF8
from pyiceberg.types import (
@@ -1710,3 +1711,25 @@ def test_stats_aggregator_update_max(vals: List[Any],
primitive_type: PrimitiveT
stats.update_max(val)
assert stats.current_max == expected_result
+
+
+def test_bin_pack_arrow_table(arrow_table_with_null: pa.Table) -> None:
+ # default packs to 1 bin since the table is small
+ bin_packed = bin_pack_arrow_table(
+ arrow_table_with_null,
target_file_size=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
+ )
+ assert len(list(bin_packed)) == 1
+
+ # as long as table is smaller than default target size, it should pack to
1 bin
+ bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10)
+ assert bigger_arrow_tbl.nbytes <
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
+ bin_packed = bin_pack_arrow_table(bigger_arrow_tbl,
target_file_size=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT)
+ assert len(list(bin_packed)) == 1
+
+ # unless we override the target size to be smaller
+ bin_packed = bin_pack_arrow_table(bigger_arrow_tbl,
target_file_size=arrow_table_with_null.nbytes)
+ assert len(list(bin_packed)) == 10
+
+ # and will produce half the number of files if we double the target size
+ bin_packed = bin_pack_arrow_table(bigger_arrow_tbl,
target_file_size=arrow_table_with_null.nbytes * 2)
+ assert len(list(bin_packed)) == 5