This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new b8b2f66b Use ArrowScan.to_table to replace project_table (#1180)
b8b2f66b is described below
commit b8b2f66be8c74b6567577b16a70c573fdd31ec56
Author: JeffreyChen <[email protected]>
AuthorDate: Sat Sep 21 01:36:08 2024 +0800
Use ArrowScan.to_table to replace project_table (#1180)
* Use ArrowScan.to_table to replace project_table
* Use ArrowScan.to_table to replace project_table on these file:
** pyiceberg\table\__init__.py
** pyiceberg\io\pyarrow.py
** pyiceberg\test_pyarrow.py
* Replace all remaining of project_table using ArrowScan.to_table
Replace all remaining of project_table using ArrowScan.to_table
* Fix format
Fix format
* Modify by ruff
Modify by ruff
---
pyiceberg/table/__init__.py | 9 ++++----
tests/io/test_pyarrow.py | 50 ++++++++++++++++++++++-----------------------
2 files changed, 28 insertions(+), 31 deletions(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 3eedff45..de621ead 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -523,9 +523,9 @@ class Transaction:
snapshot_properties: Custom properties to be added to the snapshot
summary
"""
from pyiceberg.io.pyarrow import (
+ ArrowScan,
_dataframe_to_data_files,
_expression_to_complementary_pyarrow,
- project_table,
)
if (
@@ -559,13 +559,12 @@ class Transaction:
# - Apply the latest partition-spec
# - And sort order when added
for original_file in files:
- df = project_table(
- tasks=[original_file],
+ df = ArrowScan(
table_metadata=self.table_metadata,
io=self._table.io,
- row_filter=AlwaysTrue(),
projected_schema=self.table_metadata.schema(),
- )
+ row_filter=AlwaysTrue(),
+ ).to_table(tasks=[original_file])
filtered_df = df.filter(preserve_row_filter)
# Only rewrite if there are records being deleted
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 82b35341..e4017e1d 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -58,6 +58,7 @@ from pyiceberg.expressions.literals import literal
from pyiceberg.io import InputStream, OutputStream, load_file_io
from pyiceberg.io.pyarrow import (
ICEBERG_SCHEMA,
+ ArrowScan,
PyArrowFile,
PyArrowFileIO,
StatsAggregator,
@@ -69,7 +70,6 @@ from pyiceberg.io.pyarrow import (
_to_requested_schema,
bin_pack_arrow_table,
expression_to_pyarrow,
- project_table,
schema_to_pyarrow,
)
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
@@ -952,7 +952,19 @@ def file_map(schema_map: Schema, tmpdir: str) -> str:
def project(
schema: Schema, files: List[str], expr: Optional[BooleanExpression] =
None, table_schema: Optional[Schema] = None
) -> pa.Table:
- return project_table(
+ return ArrowScan(
+ table_metadata=TableMetadataV2(
+ location="file://a/b/",
+ last_column_id=1,
+ format_version=2,
+ schemas=[table_schema or schema],
+ partition_specs=[PartitionSpec()],
+ ),
+ io=PyArrowFileIO(),
+ projected_schema=schema,
+ row_filter=expr or AlwaysTrue(),
+ case_sensitive=True,
+ ).to_table(
tasks=[
FileScanTask(
DataFile(
@@ -965,18 +977,7 @@ def project(
)
)
for file in files
- ],
- table_metadata=TableMetadataV2(
- location="file://a/b/",
- last_column_id=1,
- format_version=2,
- schemas=[table_schema or schema],
- partition_specs=[PartitionSpec()],
- ),
- io=PyArrowFileIO(),
- row_filter=expr or AlwaysTrue(),
- projected_schema=schema,
- case_sensitive=True,
+ ]
)
@@ -1411,9 +1412,7 @@ def test_delete(deletes_file: str, example_task:
FileScanTask, table_schema_simp
data_file=example_task.file,
delete_files={DataFile(content=DataFileContent.POSITION_DELETES,
file_path=deletes_file, file_format=FileFormat.PARQUET)},
)
-
- with_deletes = project_table(
- tasks=[example_task_with_delete],
+ with_deletes = ArrowScan(
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
@@ -1423,9 +1422,9 @@ def test_delete(deletes_file: str, example_task:
FileScanTask, table_schema_simp
partition_specs=[PartitionSpec()],
),
io=load_file_io(),
- row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
- )
+ row_filter=AlwaysTrue(),
+ ).to_table(tasks=[example_task_with_delete])
assert (
str(with_deletes)
@@ -1450,8 +1449,7 @@ def test_delete_duplicates(deletes_file: str,
example_task: FileScanTask, table_
},
)
- with_deletes = project_table(
- tasks=[example_task_with_delete],
+ with_deletes = ArrowScan(
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
@@ -1461,9 +1459,9 @@ def test_delete_duplicates(deletes_file: str,
example_task: FileScanTask, table_
partition_specs=[PartitionSpec()],
),
io=load_file_io(),
- row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
- )
+ row_filter=AlwaysTrue(),
+ ).to_table(tasks=[example_task_with_delete])
assert (
str(with_deletes)
@@ -1480,8 +1478,8 @@ baz: [[true,null]]"""
def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple:
Schema) -> None:
metadata_location = "file://a/b/c.json"
- projection = project_table(
- tasks=[example_task],
+
+ projection = ArrowScan(
table_metadata=TableMetadataV2(
location=metadata_location,
last_column_id=1,
@@ -1494,7 +1492,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask,
table_schema_simple: Sc
case_sensitive=True,
projected_schema=table_schema_simple,
row_filter=AlwaysTrue(),
- )
+ ).to_table(tasks=[example_task])
assert (
str(projection)