This is an automated email from the ASF dual-hosted git repository.

sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new c30e43ad Bug Fix: Position Deletes + row_filter yields less data when 
the DataFile is large (#1141)
c30e43ad is described below

commit c30e43adf94a82ec1a225d3a1bf69fface592cfd
Author: Sung Yun <[email protected]>
AuthorDate: Thu Sep 26 22:05:25 2024 -0400

    Bug Fix: Position Deletes + row_filter yields less data when the DataFile 
is large (#1141)
    
    * add more tests for position deletes
    
    * multiple append and deletes
    
    * test
    
    * fix
    
    * adopt nit
---
 pyiceberg/io/pyarrow.py           | 16 +++++++++---
 tests/integration/test_deletes.py | 53 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 65 insertions(+), 4 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 999813d0..aa277960 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1238,10 +1238,13 @@ def _task_to_record_batches(
         for batch in batches:
             next_index = next_index + len(batch)
             current_index = next_index - len(batch)
+            output_batches = iter([batch])
             if positional_deletes:
                 # Create the mask of indices that we're interested in
                 indices = _combine_positional_deletes(positional_deletes, 
current_index, current_index + len(batch))
                 batch = batch.take(indices)
+                output_batches = iter([batch])
+
                 # Apply the user filter
                 if pyarrow_filter is not None:
                     # we need to switch back and forth between RecordBatch and 
Table
@@ -1251,10 +1254,15 @@ def _task_to_record_batches(
                     arrow_table = arrow_table.filter(pyarrow_filter)
                     if len(arrow_table) == 0:
                         continue
-                    batch = arrow_table.to_batches()[0]
-            yield _to_requested_schema(
-                projected_schema, file_project_schema, batch, 
downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
-            )
+                    output_batches = arrow_table.to_batches()
+            for output_batch in output_batches:
+                yield _to_requested_schema(
+                    projected_schema,
+                    file_project_schema,
+                    output_batch,
+                    downcast_ns_timestamp_to_us=True,
+                    use_large_types=use_large_types,
+                )
 
 
 def _task_to_table(
diff --git a/tests/integration/test_deletes.py 
b/tests/integration/test_deletes.py
index d95b299a..2cdf9916 100644
--- a/tests/integration/test_deletes.py
+++ b/tests/integration/test_deletes.py
@@ -292,6 +292,59 @@ def 
test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes
     assert len(reader.read_all()) == 0
 
 
[email protected]
[email protected]("ignore:Merge on read is not yet supported, 
falling back to copy-on-write")
+def test_read_multiple_batches_in_task_with_position_deletes(spark: 
SparkSession, session_catalog: RestCatalog) -> None:
+    identifier = 
"default.test_read_multiple_batches_in_task_with_position_deletes"
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number              int
+            )
+            USING iceberg
+            TBLPROPERTIES(
+                'format-version' = 2,
+                'write.delete.mode'='merge-on-read',
+                'write.update.mode'='merge-on-read',
+                'write.merge.mode'='merge-on-read'
+            )
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+
+    arrow_table = pa.Table.from_arrays(
+        [
+            pa.array(list(range(1, 1001)) * 100),
+        ],
+        schema=pa.schema([pa.field("number", pa.int32())]),
+    )
+
+    tbl.append(arrow_table)
+
+    run_spark_commands(
+        spark,
+        [
+            f"""
+            DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
+        """,
+        ],
+    )
+
+    tbl.refresh()
+
+    reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
+    assert isinstance(reader, pa.RecordBatchReader)
+    pyiceberg_count = len(reader.read_all())
+    expected_count = 46 * 100
+    assert pyiceberg_count == expected_count, f"Failing check. 
{pyiceberg_count} != {expected_count}"
+
+
 @pytest.mark.integration
 @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, 
falling back to copy-on-write")
 def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: 
RestCatalog) -> None:

Reply via email to