This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new b8b2f66b Use ArrowScan.to_table to replace project_table (#1180)
b8b2f66b is described below

commit b8b2f66be8c74b6567577b16a70c573fdd31ec56
Author: JeffreyChen <[email protected]>
AuthorDate: Sat Sep 21 01:36:08 2024 +0800

    Use ArrowScan.to_table to replace project_table (#1180)
    
    * Use ArrowScan.to_table to replace project_table
    
    * Use ArrowScan.to_table to replace project_table on these file:
    ** pyiceberg\table\__init__.py
    ** pyiceberg\io\pyarrow.py
    ** pyiceberg\test_pyarrow.py
    
    * Replace all remaining of project_table using ArrowScan.to_table
    
    Replace all remaining of project_table using ArrowScan.to_table
    
    * Fix format
    
    Fix format
    
    * Modify by ruff
    
    Modify by ruff
---
 pyiceberg/table/__init__.py |  9 ++++----
 tests/io/test_pyarrow.py    | 50 ++++++++++++++++++++++-----------------------
 2 files changed, 28 insertions(+), 31 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 3eedff45..de621ead 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -523,9 +523,9 @@ class Transaction:
             snapshot_properties: Custom properties to be added to the snapshot 
summary
         """
         from pyiceberg.io.pyarrow import (
+            ArrowScan,
             _dataframe_to_data_files,
             _expression_to_complementary_pyarrow,
-            project_table,
         )
 
         if (
@@ -559,13 +559,12 @@ class Transaction:
             #   - Apply the latest partition-spec
             #   - And sort order when added
             for original_file in files:
-                df = project_table(
-                    tasks=[original_file],
+                df = ArrowScan(
                     table_metadata=self.table_metadata,
                     io=self._table.io,
-                    row_filter=AlwaysTrue(),
                     projected_schema=self.table_metadata.schema(),
-                )
+                    row_filter=AlwaysTrue(),
+                ).to_table(tasks=[original_file])
                 filtered_df = df.filter(preserve_row_filter)
 
                 # Only rewrite if there are records being deleted
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 82b35341..e4017e1d 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -58,6 +58,7 @@ from pyiceberg.expressions.literals import literal
 from pyiceberg.io import InputStream, OutputStream, load_file_io
 from pyiceberg.io.pyarrow import (
     ICEBERG_SCHEMA,
+    ArrowScan,
     PyArrowFile,
     PyArrowFileIO,
     StatsAggregator,
@@ -69,7 +70,6 @@ from pyiceberg.io.pyarrow import (
     _to_requested_schema,
     bin_pack_arrow_table,
     expression_to_pyarrow,
-    project_table,
     schema_to_pyarrow,
 )
 from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
@@ -952,7 +952,19 @@ def file_map(schema_map: Schema, tmpdir: str) -> str:
 def project(
     schema: Schema, files: List[str], expr: Optional[BooleanExpression] = 
None, table_schema: Optional[Schema] = None
 ) -> pa.Table:
-    return project_table(
+    return ArrowScan(
+        table_metadata=TableMetadataV2(
+            location="file://a/b/",
+            last_column_id=1,
+            format_version=2,
+            schemas=[table_schema or schema],
+            partition_specs=[PartitionSpec()],
+        ),
+        io=PyArrowFileIO(),
+        projected_schema=schema,
+        row_filter=expr or AlwaysTrue(),
+        case_sensitive=True,
+    ).to_table(
         tasks=[
             FileScanTask(
                 DataFile(
@@ -965,18 +977,7 @@ def project(
                 )
             )
             for file in files
-        ],
-        table_metadata=TableMetadataV2(
-            location="file://a/b/",
-            last_column_id=1,
-            format_version=2,
-            schemas=[table_schema or schema],
-            partition_specs=[PartitionSpec()],
-        ),
-        io=PyArrowFileIO(),
-        row_filter=expr or AlwaysTrue(),
-        projected_schema=schema,
-        case_sensitive=True,
+        ]
     )
 
 
@@ -1411,9 +1412,7 @@ def test_delete(deletes_file: str, example_task: 
FileScanTask, table_schema_simp
         data_file=example_task.file,
         delete_files={DataFile(content=DataFileContent.POSITION_DELETES, 
file_path=deletes_file, file_format=FileFormat.PARQUET)},
     )
-
-    with_deletes = project_table(
-        tasks=[example_task_with_delete],
+    with_deletes = ArrowScan(
         table_metadata=TableMetadataV2(
             location=metadata_location,
             last_column_id=1,
@@ -1423,9 +1422,9 @@ def test_delete(deletes_file: str, example_task: 
FileScanTask, table_schema_simp
             partition_specs=[PartitionSpec()],
         ),
         io=load_file_io(),
-        row_filter=AlwaysTrue(),
         projected_schema=table_schema_simple,
-    )
+        row_filter=AlwaysTrue(),
+    ).to_table(tasks=[example_task_with_delete])
 
     assert (
         str(with_deletes)
@@ -1450,8 +1449,7 @@ def test_delete_duplicates(deletes_file: str, 
example_task: FileScanTask, table_
         },
     )
 
-    with_deletes = project_table(
-        tasks=[example_task_with_delete],
+    with_deletes = ArrowScan(
         table_metadata=TableMetadataV2(
             location=metadata_location,
             last_column_id=1,
@@ -1461,9 +1459,9 @@ def test_delete_duplicates(deletes_file: str, 
example_task: FileScanTask, table_
             partition_specs=[PartitionSpec()],
         ),
         io=load_file_io(),
-        row_filter=AlwaysTrue(),
         projected_schema=table_schema_simple,
-    )
+        row_filter=AlwaysTrue(),
+    ).to_table(tasks=[example_task_with_delete])
 
     assert (
         str(with_deletes)
@@ -1480,8 +1478,8 @@ baz: [[true,null]]"""
 
 def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: 
Schema) -> None:
     metadata_location = "file://a/b/c.json"
-    projection = project_table(
-        tasks=[example_task],
+
+    projection = ArrowScan(
         table_metadata=TableMetadataV2(
             location=metadata_location,
             last_column_id=1,
@@ -1494,7 +1492,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, 
table_schema_simple: Sc
         case_sensitive=True,
         projected_schema=table_schema_simple,
         row_filter=AlwaysTrue(),
-    )
+    ).to_table(tasks=[example_task])
 
     assert (
         str(projection)

Reply via email to