This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 7f712fda Add Data Files from Parquet Files to UnPartitioned Table
(#506)
7f712fda is described below
commit 7f712fdad025a2110816ec217616de54631f1e3e
Author: Sung Yun <[email protected]>
AuthorDate: Fri Mar 15 22:19:27 2024 -0600
Add Data Files from Parquet Files to UnPartitioned Table (#506)
---
mkdocs/docs/api.md | 33 +++++
pyiceberg/io/pyarrow.py | 35 +++++-
pyiceberg/table/__init__.py | 52 ++++++++
tests/integration/test_add_files.py | 240 ++++++++++++++++++++++++++++++++++++
4 files changed, 359 insertions(+), 1 deletion(-)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 35b271ae..5897881f 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -302,6 +302,39 @@ The nested lists indicate the different Arrow buffers,
where the first write res
<!-- prettier-ignore-end -->
+### Add Files
+
+Expert Iceberg users may choose to commit existing parquet files to the
Iceberg table as data files, without rewriting them.
+
+```
+# Given that these parquet files have schema consistent with the Iceberg table
+
+file_paths = [
+ "s3a://warehouse/default/existing-1.parquet",
+ "s3a://warehouse/default/existing-2.parquet",
+]
+
+# They can be added to the table without rewriting them
+
+tbl.add_files(file_paths=file_paths)
+
+# A new snapshot is committed to the table with manifests pointing to the
existing parquet files
+```
+
+<!-- prettier-ignore-start -->
+
+!!! note "Name Mapping"
+ Because `add_files` uses existing files without writing new parquet files
that are aware of the Iceberg's schema, it requires the Iceberg's table to have
a [Name
Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization)
(The Name mapping maps the field names within the parquet files to the Iceberg
field IDs). Hence, `add_files` requires that there are no field IDs in the
parquet file's metadata, and creates a new Name Mapping based on the table's
current sc [...]
+
+<!-- prettier-ignore-end -->
+
+<!-- prettier-ignore-start -->
+
+!!! warning "Maintenance Operations"
+ Because `add_files` commits the existing parquet files to the Iceberg
Table as any other data file, destructive maintenance operations like expiring
snapshots will remove them.
+
+<!-- prettier-ignore-end -->
+
## Schema evolution
PyIceberg supports full schema evolution through the Python API. It takes care
of setting the field-IDs and makes sure that only non-breaking changes are done
(can be overriden).
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index f1e4d302..31d846f6 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -124,7 +124,7 @@ from pyiceberg.schema import (
visit,
visit_with_partner,
)
-from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
+from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties,
WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
@@ -1772,6 +1772,39 @@ def write_file(io: FileIO, table_metadata:
TableMetadata, tasks: Iterator[WriteT
return iter([data_file])
+def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata,
tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
+ for task in tasks:
+ input_file = io.new_input(task.file_path)
+ with input_file.open() as input_stream:
+ parquet_metadata = pq.read_metadata(input_stream)
+
+ if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
+ raise NotImplementedError(
+ f"Cannot add file {task.file_path} because it has field IDs.
`add_files` only supports addition of files without field_ids"
+ )
+
+ schema = table_metadata.schema()
+ data_file = DataFile(
+ content=DataFileContent.DATA,
+ file_path=task.file_path,
+ file_format=FileFormat.PARQUET,
+ partition=task.partition_field_value,
+ record_count=parquet_metadata.num_rows,
+ file_size_in_bytes=len(input_file),
+ sort_order_id=None,
+ spec_id=table_metadata.default_spec_id,
+ equality_ids=None,
+ key_metadata=None,
+ )
+ fill_parquet_file_metadata(
+ data_file=data_file,
+ parquet_metadata=parquet_metadata,
+ stats_columns=compute_statistics_plan(schema,
table_metadata.properties),
+ parquet_column_mapping=parquet_path_to_id_mapping(schema),
+ )
+ yield data_file
+
+
ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
PYARROW_UNCOMPRESSED_CODEC = "none"
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c5dd2e8e..4fb14e7d 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -33,6 +33,7 @@ from typing import (
Dict,
Generic,
Iterable,
+ Iterator,
List,
Literal,
Optional,
@@ -115,6 +116,7 @@ from pyiceberg.typedef import (
Identifier,
KeyDefaultDict,
Properties,
+ Record,
)
from pyiceberg.types import (
IcebergType,
@@ -1147,6 +1149,27 @@ class Table:
for data_file in data_files:
update_snapshot.append_data_file(data_file)
+ def add_files(self, file_paths: List[str]) -> None:
+ """
+ Shorthand API for adding files as data files to the table.
+
+ Args:
+ file_paths: The list of full file paths to be added as data files
to the table
+
+ Raises:
+ FileNotFoundError: If the file does not exist.
+ """
+ if len(self.spec().fields) > 0:
+ raise ValueError("Cannot add files to partitioned tables")
+
+ with self.transaction() as tx:
+ if self.name_mapping() is None:
+ tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING:
self.schema().name_mapping.model_dump_json()})
+ with tx.update_snapshot().fast_append() as update_snapshot:
+ data_files =
_parquet_files_to_data_files(table_metadata=self.metadata,
file_paths=file_paths, io=self.io)
+ for data_file in data_files:
+ update_snapshot.append_data_file(data_file)
+
def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True),
case_sensitive=case_sensitive)
@@ -2444,6 +2467,12 @@ class WriteTask:
return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
+@dataclass(frozen=True)
+class AddFileTask:
+ file_path: str
+ partition_field_value: Record
+
+
def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
return f'{location}/metadata/{commit_uuid}-m{num}.avro'
@@ -2475,6 +2504,29 @@ def _dataframe_to_data_files(
yield from write_file(io=io, table_metadata=table_metadata,
tasks=iter([WriteTask(write_uuid, next(counter), df)]))
+def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata:
TableMetadata) -> Iterator[AddFileTask]:
+ if len([spec for spec in table_metadata.partition_specs if spec.spec_id !=
0]) > 0:
+ raise ValueError("Cannot add files to partitioned tables")
+
+ for file_path in file_paths:
+ yield AddFileTask(
+ file_path=file_path,
+ partition_field_value=Record(),
+ )
+
+
+def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths:
List[str], io: FileIO) -> Iterable[DataFile]:
+ """Convert a list files into DataFiles.
+
+ Returns:
+ An iterable that supplies DataFiles that describe the parquet files.
+ """
+ from pyiceberg.io.pyarrow import parquet_files_to_data_files
+
+ tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
+ yield from parquet_files_to_data_files(io=io,
table_metadata=table_metadata, tasks=tasks)
+
+
class
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
commit_uuid: uuid.UUID
_operation: Operation
diff --git a/tests/integration/test_add_files.py
b/tests/integration/test_add_files.py
new file mode 100644
index 00000000..2066e178
--- /dev/null
+++ b/tests/integration/test_add_files.py
@@ -0,0 +1,240 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+from datetime import date
+from typing import Optional
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pytest
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.partitioning import PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.types import (
+ BooleanType,
+ DateType,
+ IntegerType,
+ NestedField,
+ StringType,
+)
+
+TABLE_SCHEMA = Schema(
+ NestedField(field_id=1, name="foo", field_type=BooleanType(),
required=False),
+ NestedField(field_id=2, name="bar", field_type=StringType(),
required=False),
+ NestedField(field_id=4, name="baz", field_type=IntegerType(),
required=False),
+ NestedField(field_id=10, name="qux", field_type=DateType(),
required=False),
+)
+
+ARROW_SCHEMA = pa.schema([
+ ("foo", pa.bool_()),
+ ("bar", pa.string()),
+ ("baz", pa.int32()),
+ ("qux", pa.date32()),
+])
+
+ARROW_TABLE = pa.Table.from_pylist(
+ [
+ {
+ "foo": True,
+ "bar": "bar_string",
+ "baz": 123,
+ "qux": date(2024, 3, 7),
+ }
+ ],
+ schema=ARROW_SCHEMA,
+)
+
+ARROW_SCHEMA_WITH_IDS = pa.schema([
+ pa.field('foo', pa.bool_(), nullable=False, metadata={"PARQUET:field_id":
"1"}),
+ pa.field('bar', pa.string(), nullable=False, metadata={"PARQUET:field_id":
"2"}),
+ pa.field('baz', pa.int32(), nullable=False, metadata={"PARQUET:field_id":
"3"}),
+ pa.field('qux', pa.date32(), nullable=False, metadata={"PARQUET:field_id":
"4"}),
+])
+
+
+ARROW_TABLE_WITH_IDS = pa.Table.from_pylist(
+ [
+ {
+ "foo": True,
+ "bar": "bar_string",
+ "baz": 123,
+ "qux": date(2024, 3, 7),
+ }
+ ],
+ schema=ARROW_SCHEMA_WITH_IDS,
+)
+
+ARROW_SCHEMA_UPDATED = pa.schema([
+ ("foo", pa.bool_()),
+ ("baz", pa.int32()),
+ ("qux", pa.date32()),
+ ("quux", pa.int32()),
+])
+
+ARROW_TABLE_UPDATED = pa.Table.from_pylist(
+ [
+ {
+ "foo": True,
+ "baz": 123,
+ "qux": date(2024, 3, 7),
+ "quux": 234,
+ }
+ ],
+ schema=ARROW_SCHEMA_UPDATED,
+)
+
+
+def _create_table(session_catalog: Catalog, identifier: str, partition_spec:
Optional[PartitionSpec] = None) -> Table:
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(
+ identifier=identifier, schema=TABLE_SCHEMA,
partition_spec=partition_spec if partition_spec else PartitionSpec()
+ )
+
+ return tbl
+
+
[email protected]
+def test_add_files_to_unpartitioned_table(spark: SparkSession,
session_catalog: Catalog) -> None:
+ identifier = "default.unpartitioned_table"
+ tbl = _create_table(session_catalog, identifier)
+
+ file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for
i in range(5)]
+ # write parquet files
+ for file_path in file_paths:
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+ writer.write_table(ARROW_TABLE)
+
+ # add the parquet files as data files
+ tbl.add_files(file_paths=file_paths)
+
+ # NameMapping must have been set to enable reads
+ assert tbl.name_mapping() is not None
+
+ rows = spark.sql(
+ f"""
+ SELECT added_data_files_count, existing_data_files_count,
deleted_data_files_count
+ FROM {identifier}.all_manifests
+ """
+ ).collect()
+
+ assert [row.added_data_files_count for row in rows] == [5]
+ assert [row.existing_data_files_count for row in rows] == [0]
+ assert [row.deleted_data_files_count for row in rows] == [0]
+
+ df = spark.table(identifier)
+ assert df.count() == 5, "Expected 5 rows"
+ for col in df.columns:
+ assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5
rows to be non-null"
+
+
[email protected]
+def test_add_files_to_unpartitioned_table_raises_file_not_found(spark:
SparkSession, session_catalog: Catalog) -> None:
+ identifier = "default.unpartitioned_raises_not_found"
+ tbl = _create_table(session_catalog, identifier)
+
+ file_paths =
[f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for
i in range(5)]
+ # write parquet files
+ for file_path in file_paths:
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+ writer.write_table(ARROW_TABLE)
+
+ # add the parquet files as data files
+ with pytest.raises(FileNotFoundError):
+ tbl.add_files(file_paths=file_paths +
["s3://warehouse/default/unpartitioned_raises_not_found/unknown.parquet"])
+
+
[email protected]
+def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark:
SparkSession, session_catalog: Catalog) -> None:
+ identifier = "default.unpartitioned_raises_field_ids"
+ tbl = _create_table(session_catalog, identifier)
+
+ file_paths =
[f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for
i in range(5)]
+ # write parquet files
+ for file_path in file_paths:
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer:
+ writer.write_table(ARROW_TABLE_WITH_IDS)
+
+ # add the parquet files as data files
+ with pytest.raises(NotImplementedError):
+ tbl.add_files(file_paths=file_paths)
+
+
[email protected]
+def test_add_files_to_unpartitioned_table_with_schema_updates(spark:
SparkSession, session_catalog: Catalog) -> None:
+ identifier = "default.unpartitioned_table_2"
+ tbl = _create_table(session_catalog, identifier)
+
+ file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet"
for i in range(5)]
+ # write parquet files
+ for file_path in file_paths:
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+ writer.write_table(ARROW_TABLE)
+
+ # add the parquet files as data files
+ tbl.add_files(file_paths=file_paths)
+
+ # NameMapping must have been set to enable reads
+ assert tbl.name_mapping() is not None
+
+ with tbl.update_schema() as update:
+ update.add_column("quux", IntegerType())
+ update.delete_column("bar")
+
+ file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet"
+ # write parquet files
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_UPDATED) as writer:
+ writer.write_table(ARROW_TABLE_UPDATED)
+
+ # add the parquet files as data files
+ tbl.add_files(file_paths=[file_path])
+ rows = spark.sql(
+ f"""
+ SELECT added_data_files_count, existing_data_files_count,
deleted_data_files_count
+ FROM {identifier}.all_manifests
+ """
+ ).collect()
+
+ assert [row.added_data_files_count for row in rows] == [5, 1, 5]
+ assert [row.existing_data_files_count for row in rows] == [0, 0, 0]
+ assert [row.deleted_data_files_count for row in rows] == [0, 0, 0]
+
+ df = spark.table(identifier)
+ assert df.count() == 6, "Expected 6 rows"
+ assert len(df.columns) == 4, "Expected 4 columns"
+
+ for col in df.columns:
+ value_count = 1 if col == "quux" else 6
+ assert df.filter(df[col].isNotNull()).count() == value_count,
f"Expected {value_count} rows to be non-null"