This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f712fda Add Data Files from Parquet Files to UnPartitioned Table 
(#506)
7f712fda is described below

commit 7f712fdad025a2110816ec217616de54631f1e3e
Author: Sung Yun <[email protected]>
AuthorDate: Fri Mar 15 22:19:27 2024 -0600

    Add Data Files from Parquet Files to UnPartitioned Table (#506)
---
 mkdocs/docs/api.md                  |  33 +++++
 pyiceberg/io/pyarrow.py             |  35 +++++-
 pyiceberg/table/__init__.py         |  52 ++++++++
 tests/integration/test_add_files.py | 240 ++++++++++++++++++++++++++++++++++++
 4 files changed, 359 insertions(+), 1 deletion(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 35b271ae..5897881f 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -302,6 +302,39 @@ The nested lists indicate the different Arrow buffers, 
where the first write res
 
 <!-- prettier-ignore-end -->
 
+### Add Files
+
+Expert Iceberg users may choose to commit existing parquet files to the 
Iceberg table as data files, without rewriting them.
+
+```
+# Given that these parquet files have schema consistent with the Iceberg table
+
+file_paths = [
+    "s3a://warehouse/default/existing-1.parquet",
+    "s3a://warehouse/default/existing-2.parquet",
+]
+
+# They can be added to the table without rewriting them
+
+tbl.add_files(file_paths=file_paths)
+
+# A new snapshot is committed to the table with manifests pointing to the 
existing parquet files
+```
+
+<!-- prettier-ignore-start -->
+
+!!! note "Name Mapping"
+    Because `add_files` uses existing files without writing new parquet files 
that are aware of the Iceberg's schema, it requires the Iceberg's table to have 
a [Name 
Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization)
 (The Name mapping maps the field names within the parquet files to the Iceberg 
field IDs). Hence, `add_files` requires that there are no field IDs in the 
parquet file's metadata, and creates a new Name Mapping based on the table's 
current sc [...]
+
+<!-- prettier-ignore-end -->
+
+<!-- prettier-ignore-start -->
+
+!!! warning "Maintenance Operations"
+    Because `add_files` commits the existing parquet files to the Iceberg 
Table as any other data file, destructive maintenance operations like expiring 
snapshots will remove them.
+
+<!-- prettier-ignore-end -->
+
 ## Schema evolution
 
 PyIceberg supports full schema evolution through the Python API. It takes care 
of setting the field-IDs and makes sure that only non-breaking changes are done 
(can be overriden).
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index f1e4d302..31d846f6 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -124,7 +124,7 @@ from pyiceberg.schema import (
     visit,
     visit_with_partner,
 )
-from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
+from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, 
WriteTask
 from pyiceberg.table.metadata import TableMetadata
 from pyiceberg.table.name_mapping import NameMapping
 from pyiceberg.transforms import TruncateTransform
@@ -1772,6 +1772,39 @@ def write_file(io: FileIO, table_metadata: 
TableMetadata, tasks: Iterator[WriteT
     return iter([data_file])
 
 
+def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, 
tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
+    for task in tasks:
+        input_file = io.new_input(task.file_path)
+        with input_file.open() as input_stream:
+            parquet_metadata = pq.read_metadata(input_stream)
+
+        if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
+            raise NotImplementedError(
+                f"Cannot add file {task.file_path} because it has field IDs. 
`add_files` only supports addition of files without field_ids"
+            )
+
+        schema = table_metadata.schema()
+        data_file = DataFile(
+            content=DataFileContent.DATA,
+            file_path=task.file_path,
+            file_format=FileFormat.PARQUET,
+            partition=task.partition_field_value,
+            record_count=parquet_metadata.num_rows,
+            file_size_in_bytes=len(input_file),
+            sort_order_id=None,
+            spec_id=table_metadata.default_spec_id,
+            equality_ids=None,
+            key_metadata=None,
+        )
+        fill_parquet_file_metadata(
+            data_file=data_file,
+            parquet_metadata=parquet_metadata,
+            stats_columns=compute_statistics_plan(schema, 
table_metadata.properties),
+            parquet_column_mapping=parquet_path_to_id_mapping(schema),
+        )
+        yield data_file
+
+
 ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
 PYARROW_UNCOMPRESSED_CODEC = "none"
 
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c5dd2e8e..4fb14e7d 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -33,6 +33,7 @@ from typing import (
     Dict,
     Generic,
     Iterable,
+    Iterator,
     List,
     Literal,
     Optional,
@@ -115,6 +116,7 @@ from pyiceberg.typedef import (
     Identifier,
     KeyDefaultDict,
     Properties,
+    Record,
 )
 from pyiceberg.types import (
     IcebergType,
@@ -1147,6 +1149,27 @@ class Table:
                     for data_file in data_files:
                         update_snapshot.append_data_file(data_file)
 
+    def add_files(self, file_paths: List[str]) -> None:
+        """
+        Shorthand API for adding files as data files to the table.
+
+        Args:
+            file_paths: The list of full file paths to be added as data files 
to the table
+
+        Raises:
+            FileNotFoundError: If the file does not exist.
+        """
+        if len(self.spec().fields) > 0:
+            raise ValueError("Cannot add files to partitioned tables")
+
+        with self.transaction() as tx:
+            if self.name_mapping() is None:
+                tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: 
self.schema().name_mapping.model_dump_json()})
+            with tx.update_snapshot().fast_append() as update_snapshot:
+                data_files = 
_parquet_files_to_data_files(table_metadata=self.metadata, 
file_paths=file_paths, io=self.io)
+                for data_file in data_files:
+                    update_snapshot.append_data_file(data_file)
+
     def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
         return UpdateSpec(Transaction(self, autocommit=True), 
case_sensitive=case_sensitive)
 
@@ -2444,6 +2467,12 @@ class WriteTask:
         return f"00000-{self.task_id}-{self.write_uuid}.{extension}"
 
 
+@dataclass(frozen=True)
+class AddFileTask:
+    file_path: str
+    partition_field_value: Record
+
+
 def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
     return f'{location}/metadata/{commit_uuid}-m{num}.avro'
 
@@ -2475,6 +2504,29 @@ def _dataframe_to_data_files(
     yield from write_file(io=io, table_metadata=table_metadata, 
tasks=iter([WriteTask(write_uuid, next(counter), df)]))
 
 
+def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: 
TableMetadata) -> Iterator[AddFileTask]:
+    if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 
0]) > 0:
+        raise ValueError("Cannot add files to partitioned tables")
+
+    for file_path in file_paths:
+        yield AddFileTask(
+            file_path=file_path,
+            partition_field_value=Record(),
+        )
+
+
+def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: 
List[str], io: FileIO) -> Iterable[DataFile]:
+    """Convert a list files into DataFiles.
+
+    Returns:
+        An iterable that supplies DataFiles that describe the parquet files.
+    """
+    from pyiceberg.io.pyarrow import parquet_files_to_data_files
+
+    tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
+    yield from parquet_files_to_data_files(io=io, 
table_metadata=table_metadata, tasks=tasks)
+
+
 class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
     commit_uuid: uuid.UUID
     _operation: Operation
diff --git a/tests/integration/test_add_files.py 
b/tests/integration/test_add_files.py
new file mode 100644
index 00000000..2066e178
--- /dev/null
+++ b/tests/integration/test_add_files.py
@@ -0,0 +1,240 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+from datetime import date
+from typing import Optional
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pytest
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.partitioning import PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.types import (
+    BooleanType,
+    DateType,
+    IntegerType,
+    NestedField,
+    StringType,
+)
+
+TABLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="foo", field_type=BooleanType(), 
required=False),
+    NestedField(field_id=2, name="bar", field_type=StringType(), 
required=False),
+    NestedField(field_id=4, name="baz", field_type=IntegerType(), 
required=False),
+    NestedField(field_id=10, name="qux", field_type=DateType(), 
required=False),
+)
+
+ARROW_SCHEMA = pa.schema([
+    ("foo", pa.bool_()),
+    ("bar", pa.string()),
+    ("baz", pa.int32()),
+    ("qux", pa.date32()),
+])
+
+ARROW_TABLE = pa.Table.from_pylist(
+    [
+        {
+            "foo": True,
+            "bar": "bar_string",
+            "baz": 123,
+            "qux": date(2024, 3, 7),
+        }
+    ],
+    schema=ARROW_SCHEMA,
+)
+
+ARROW_SCHEMA_WITH_IDS = pa.schema([
+    pa.field('foo', pa.bool_(), nullable=False, metadata={"PARQUET:field_id": 
"1"}),
+    pa.field('bar', pa.string(), nullable=False, metadata={"PARQUET:field_id": 
"2"}),
+    pa.field('baz', pa.int32(), nullable=False, metadata={"PARQUET:field_id": 
"3"}),
+    pa.field('qux', pa.date32(), nullable=False, metadata={"PARQUET:field_id": 
"4"}),
+])
+
+
+ARROW_TABLE_WITH_IDS = pa.Table.from_pylist(
+    [
+        {
+            "foo": True,
+            "bar": "bar_string",
+            "baz": 123,
+            "qux": date(2024, 3, 7),
+        }
+    ],
+    schema=ARROW_SCHEMA_WITH_IDS,
+)
+
+ARROW_SCHEMA_UPDATED = pa.schema([
+    ("foo", pa.bool_()),
+    ("baz", pa.int32()),
+    ("qux", pa.date32()),
+    ("quux", pa.int32()),
+])
+
+ARROW_TABLE_UPDATED = pa.Table.from_pylist(
+    [
+        {
+            "foo": True,
+            "baz": 123,
+            "qux": date(2024, 3, 7),
+            "quux": 234,
+        }
+    ],
+    schema=ARROW_SCHEMA_UPDATED,
+)
+
+
+def _create_table(session_catalog: Catalog, identifier: str, partition_spec: 
Optional[PartitionSpec] = None) -> Table:
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(
+        identifier=identifier, schema=TABLE_SCHEMA, 
partition_spec=partition_spec if partition_spec else PartitionSpec()
+    )
+
+    return tbl
+
+
[email protected]
+def test_add_files_to_unpartitioned_table(spark: SparkSession, 
session_catalog: Catalog) -> None:
+    identifier = "default.unpartitioned_table"
+    tbl = _create_table(session_catalog, identifier)
+
+    file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for 
i in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(ARROW_TABLE)
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+
+    # NameMapping must have been set to enable reads
+    assert tbl.name_mapping() is not None
+
+    rows = spark.sql(
+        f"""
+        SELECT added_data_files_count, existing_data_files_count, 
deleted_data_files_count
+        FROM {identifier}.all_manifests
+    """
+    ).collect()
+
+    assert [row.added_data_files_count for row in rows] == [5]
+    assert [row.existing_data_files_count for row in rows] == [0]
+    assert [row.deleted_data_files_count for row in rows] == [0]
+
+    df = spark.table(identifier)
+    assert df.count() == 5, "Expected 5 rows"
+    for col in df.columns:
+        assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 
rows to be non-null"
+
+
[email protected]
+def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: 
SparkSession, session_catalog: Catalog) -> None:
+    identifier = "default.unpartitioned_raises_not_found"
+    tbl = _create_table(session_catalog, identifier)
+
+    file_paths = 
[f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for 
i in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(ARROW_TABLE)
+
+    # add the parquet files as data files
+    with pytest.raises(FileNotFoundError):
+        tbl.add_files(file_paths=file_paths + 
["s3://warehouse/default/unpartitioned_raises_not_found/unknown.parquet"])
+
+
[email protected]
+def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: 
SparkSession, session_catalog: Catalog) -> None:
+    identifier = "default.unpartitioned_raises_field_ids"
+    tbl = _create_table(session_catalog, identifier)
+
+    file_paths = 
[f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for 
i in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer:
+                writer.write_table(ARROW_TABLE_WITH_IDS)
+
+    # add the parquet files as data files
+    with pytest.raises(NotImplementedError):
+        tbl.add_files(file_paths=file_paths)
+
+
[email protected]
+def test_add_files_to_unpartitioned_table_with_schema_updates(spark: 
SparkSession, session_catalog: Catalog) -> None:
+    identifier = "default.unpartitioned_table_2"
+    tbl = _create_table(session_catalog, identifier)
+
+    file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet" 
for i in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(ARROW_TABLE)
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+
+    # NameMapping must have been set to enable reads
+    assert tbl.name_mapping() is not None
+
+    with tbl.update_schema() as update:
+        update.add_column("quux", IntegerType())
+        update.delete_column("bar")
+
+    file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet"
+    # write parquet files
+    fo = tbl.io.new_output(file_path)
+    with fo.create(overwrite=True) as fos:
+        with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_UPDATED) as writer:
+            writer.write_table(ARROW_TABLE_UPDATED)
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=[file_path])
+    rows = spark.sql(
+        f"""
+        SELECT added_data_files_count, existing_data_files_count, 
deleted_data_files_count
+        FROM {identifier}.all_manifests
+    """
+    ).collect()
+
+    assert [row.added_data_files_count for row in rows] == [5, 1, 5]
+    assert [row.existing_data_files_count for row in rows] == [0, 0, 0]
+    assert [row.deleted_data_files_count for row in rows] == [0, 0, 0]
+
+    df = spark.table(identifier)
+    assert df.count() == 6, "Expected 6 rows"
+    assert len(df.columns) == 4, "Expected 4 columns"
+
+    for col in df.columns:
+        value_count = 1 if col == "quux" else 6
+        assert df.filter(df[col].isNotNull()).count() == value_count, 
f"Expected {value_count} rows to be non-null"

Reply via email to