This is an automated email from the ASF dual-hosted git repository.
sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new c30e43ad Bug Fix: Position Deletes + row_filter yields less data when
the DataFile is large (#1141)
c30e43ad is described below
commit c30e43adf94a82ec1a225d3a1bf69fface592cfd
Author: Sung Yun <[email protected]>
AuthorDate: Thu Sep 26 22:05:25 2024 -0400
Bug Fix: Position Deletes + row_filter yields less data when the DataFile
is large (#1141)
* add more tests for position deletes
* multiple append and deletes
* test
* fix
* adopt nit
---
pyiceberg/io/pyarrow.py | 16 +++++++++---
tests/integration/test_deletes.py | 53 +++++++++++++++++++++++++++++++++++++++
2 files changed, 65 insertions(+), 4 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 999813d0..aa277960 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1238,10 +1238,13 @@ def _task_to_record_batches(
for batch in batches:
next_index = next_index + len(batch)
current_index = next_index - len(batch)
+ output_batches = iter([batch])
if positional_deletes:
# Create the mask of indices that we're interested in
indices = _combine_positional_deletes(positional_deletes,
current_index, current_index + len(batch))
batch = batch.take(indices)
+ output_batches = iter([batch])
+
# Apply the user filter
if pyarrow_filter is not None:
# we need to switch back and forth between RecordBatch and
Table
@@ -1251,10 +1254,15 @@ def _task_to_record_batches(
arrow_table = arrow_table.filter(pyarrow_filter)
if len(arrow_table) == 0:
continue
- batch = arrow_table.to_batches()[0]
- yield _to_requested_schema(
- projected_schema, file_project_schema, batch,
downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
- )
+ output_batches = arrow_table.to_batches()
+ for output_batch in output_batches:
+ yield _to_requested_schema(
+ projected_schema,
+ file_project_schema,
+ output_batch,
+ downcast_ns_timestamp_to_us=True,
+ use_large_types=use_large_types,
+ )
def _task_to_table(
diff --git a/tests/integration/test_deletes.py
b/tests/integration/test_deletes.py
index d95b299a..2cdf9916 100644
--- a/tests/integration/test_deletes.py
+++ b/tests/integration/test_deletes.py
@@ -292,6 +292,59 @@ def
test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes
assert len(reader.read_all()) == 0
[email protected]
[email protected]("ignore:Merge on read is not yet supported,
falling back to copy-on-write")
+def test_read_multiple_batches_in_task_with_position_deletes(spark:
SparkSession, session_catalog: RestCatalog) -> None:
+ identifier =
"default.test_read_multiple_batches_in_task_with_position_deletes"
+
+ run_spark_commands(
+ spark,
+ [
+ f"DROP TABLE IF EXISTS {identifier}",
+ f"""
+ CREATE TABLE {identifier} (
+ number int
+ )
+ USING iceberg
+ TBLPROPERTIES(
+ 'format-version' = 2,
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read'
+ )
+ """,
+ ],
+ )
+
+ tbl = session_catalog.load_table(identifier)
+
+ arrow_table = pa.Table.from_arrays(
+ [
+ pa.array(list(range(1, 1001)) * 100),
+ ],
+ schema=pa.schema([pa.field("number", pa.int32())]),
+ )
+
+ tbl.append(arrow_table)
+
+ run_spark_commands(
+ spark,
+ [
+ f"""
+ DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
+ """,
+ ],
+ )
+
+ tbl.refresh()
+
+ reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
+ assert isinstance(reader, pa.RecordBatchReader)
+ pyiceberg_count = len(reader.read_all())
+ expected_count = 46 * 100
+ assert pyiceberg_count == expected_count, f"Failing check.
{pyiceberg_count} != {expected_count}"
+
+
@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported,
falling back to copy-on-write")
def test_overwrite_partitioned_table(spark: SparkSession, session_catalog:
RestCatalog) -> None: