This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 61cf27fdef5 [IcebergIO] Apply delete/update filter when reading
(#34607)
61cf27fdef5 is described below
commit 61cf27fdef56a8c43572e20a350aa1109d17d3aa
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Apr 11 21:16:54 2025 +0000
[IcebergIO] Apply delete/update filter when reading (#34607)
* apply delete filter
* revert cdc changes
* spotless
* trigger integration tests
---
.github/trigger_files/IO_Iceberg_Integration_Tests.json | 2 +-
.../src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java | 5 ++++-
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 34a6e02150e..37dd25bf902 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 4
+ "modification": 3
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
index 6ff0a8d1778..998405a73f5 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
@@ -175,7 +176,9 @@ class ScanTaskReader extends
BoundedSource.BoundedReader<Row> {
default:
throw new UnsupportedOperationException("Cannot read format: " +
file.format());
}
- currentIterator = iterable.iterator();
+ GenericDeleteFilter deleteFilter =
+ new GenericDeleteFilter(checkStateNotNull(io), fileTask,
fileTask.schema(), project);
+ currentIterator = deleteFilter.filter(iterable).iterator();
} while (true);