This is an automated email from the ASF dual-hosted git repository.

sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 53a0b739 prevent adding duplicate files  (#1036)
53a0b739 is described below

commit 53a0b7399c1490d61a542c1ffb99d73c2edff4ab
Author: amit <[email protected]>
AuthorDate: Mon Aug 26 17:39:47 2024 +0300

    prevent adding duplicate files  (#1036)
    
    * prevent add_files from adding a file that's already referenced by the 
iceberg table
    
    * fix method that searches files that are already referenced + docs
    
    * move function to locate duplicate files into add_files
    
    * add check_duplicate_files flag to add_files api to make the behaviour 
according to java api
    
    * add check_duplicate_files flag to table level api and add tests
    
    * add check_duplicate_files flag to table level api and add tests
    
    * fix tests to check new new added flag check_duplicate_files and fix checks
    
    * fix linting
---
 pyiceberg/table/__init__.py         | 29 +++++++++--
 tests/integration/test_add_files.py | 95 +++++++++++++++++++++++++++++++++++++
 2 files changed, 120 insertions(+), 4 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 8ec44c6b..0316b404 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -621,7 +621,9 @@ class Transaction:
         if not delete_snapshot.files_affected and not 
delete_snapshot.rewrites_needed:
             warnings.warn("Delete operation did not match any records")
 
-    def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, 
str] = EMPTY_DICT) -> None:
+    def add_files(
+        self, file_paths: List[str], snapshot_properties: Dict[str, str] = 
EMPTY_DICT, check_duplicate_files: bool = True
+    ) -> None:
         """
         Shorthand API for adding files as data files to the table transaction.
 
@@ -630,7 +632,21 @@ class Transaction:
 
         Raises:
             FileNotFoundError: If the file does not exist.
+            ValueError: Raises a ValueError given file_paths contains 
duplicate files
+            ValueError: Raises a ValueError given file_paths already 
referenced by table
         """
+        if len(file_paths) != len(set(file_paths)):
+            raise ValueError("File paths must be unique")
+
+        if check_duplicate_files:
+            import pyarrow.compute as pc
+
+            expr = pc.field("file_path").isin(file_paths)
+            referenced_files = [file["file_path"] for file in 
self._table.inspect.files().filter(expr).to_pylist()]
+
+            if referenced_files:
+                raise ValueError(f"Cannot add files that are already 
referenced by table, files: {', '.join(referenced_files)}")
+
         if self.table_metadata.name_mapping() is None:
             self.set_properties(**{
                 TableProperties.DEFAULT_NAME_MAPPING: 
self.table_metadata.schema().name_mapping.model_dump_json()
@@ -1632,7 +1648,9 @@ class Table:
         with self.transaction() as tx:
             tx.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties)
 
-    def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, 
str] = EMPTY_DICT) -> None:
+    def add_files(
+        self, file_paths: List[str], snapshot_properties: Dict[str, str] = 
EMPTY_DICT, check_duplicate_files: bool = True
+    ) -> None:
         """
         Shorthand API for adding files as data files to the table.
 
@@ -1643,7 +1661,9 @@ class Table:
             FileNotFoundError: If the file does not exist.
         """
         with self.transaction() as tx:
-            tx.add_files(file_paths=file_paths, 
snapshot_properties=snapshot_properties)
+            tx.add_files(
+                file_paths=file_paths, 
snapshot_properties=snapshot_properties, 
check_duplicate_files=check_duplicate_files
+            )
 
     def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
         return UpdateSpec(Transaction(self, autocommit=True), 
case_sensitive=case_sensitive)
@@ -2260,7 +2280,8 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
         visit_with_partner(
             Catalog._convert_schema_if_needed(new_schema),
             -1,
-            UnionByNameVisitor(update_schema=self, 
existing_schema=self._schema, case_sensitive=self._case_sensitive),  # type: 
ignore
+            UnionByNameVisitor(update_schema=self, 
existing_schema=self._schema, case_sensitive=self._case_sensitive),
+            # type: ignore
             PartnerIdByNameAccessor(partner_schema=self._schema, 
case_sensitive=self._case_sensitive),
         )
         return self
diff --git a/tests/integration/test_add_files.py 
b/tests/integration/test_add_files.py
index 3703a9e0..85e626ed 100644
--- a/tests/integration/test_add_files.py
+++ b/tests/integration/test_add_files.py
@@ -732,3 +732,98 @@ def test_add_files_subset_of_schema(spark: SparkSession, 
session_catalog: Catalo
     for column in written_arrow_table.column_names:
         for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
             assert left == right
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_add_files_with_duplicate_files_in_file_paths(spark: SparkSession, 
session_catalog: Catalog, format_version: int) -> None:
+    identifier = f"default.test_table_duplicate_add_files_v{format_version}"
+    tbl = _create_table(session_catalog, identifier, format_version)
+    file_path = 
"s3://warehouse/default/unpartitioned/v{format_version}/test-1.parquet"
+    file_paths = [file_path, file_path]
+
+    # add the parquet files as data files
+    with pytest.raises(ValueError) as exc_info:
+        tbl.add_files(file_paths=file_paths)
+    assert "File paths must be unique" in str(exc_info.value)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_add_files_that_referenced_by_current_snapshot(
+    spark: SparkSession, session_catalog: Catalog, format_version: int
+) -> None:
+    identifier = f"default.test_table_add_referenced_file_v{format_version}"
+    tbl = _create_table(session_catalog, identifier, format_version)
+
+    file_paths = 
[f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for 
i in range(5)]
+
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(ARROW_TABLE)
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+    existing_files_in_table = 
tbl.inspect.files().to_pylist().pop()["file_path"]
+
+    with pytest.raises(ValueError) as exc_info:
+        tbl.add_files(file_paths=[existing_files_in_table])
+    assert f"Cannot add files that are already referenced by table, files: 
{existing_files_in_table}" in str(exc_info.value)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def 
test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_files_false(
+    spark: SparkSession, session_catalog: Catalog, format_version: int
+) -> None:
+    identifier = f"default.test_table_add_referenced_file_v{format_version}"
+    tbl = _create_table(session_catalog, identifier, format_version)
+
+    file_paths = 
[f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for 
i in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(ARROW_TABLE)
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+    existing_files_in_table = 
tbl.inspect.files().to_pylist().pop()["file_path"]
+    tbl.add_files(file_paths=[existing_files_in_table], 
check_duplicate_files=False)
+    rows = spark.sql(
+        f"""
+        SELECT added_data_files_count, existing_data_files_count, 
deleted_data_files_count
+        FROM {identifier}.all_manifests
+    """
+    ).collect()
+    assert [row.added_data_files_count for row in rows] == [5, 1, 5]
+    assert [row.existing_data_files_count for row in rows] == [0, 0, 0]
+    assert [row.deleted_data_files_count for row in rows] == [0, 0, 0]
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def 
test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_files_true(
+    spark: SparkSession, session_catalog: Catalog, format_version: int
+) -> None:
+    identifier = f"default.test_table_add_referenced_file_v{format_version}"
+    tbl = _create_table(session_catalog, identifier, format_version)
+
+    file_paths = 
[f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for 
i in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(ARROW_TABLE)
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+    existing_files_in_table = 
tbl.inspect.files().to_pylist().pop()["file_path"]
+    with pytest.raises(ValueError) as exc_info:
+        tbl.add_files(file_paths=[existing_files_in_table], 
check_duplicate_files=True)
+    assert f"Cannot add files that are already referenced by table, files: 
{existing_files_in_table}" in str(exc_info.value)

Reply via email to