This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 61cf27fdef5 [IcebergIO] Apply delete/update filter when reading  
(#34607)
61cf27fdef5 is described below

commit 61cf27fdef56a8c43572e20a350aa1109d17d3aa
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Apr 11 21:16:54 2025 +0000

    [IcebergIO] Apply delete/update filter when reading  (#34607)
    
    * apply delete filter
    
    * revert cdc changes
    
    * spotless
    
    * trigger integration tests
---
 .github/trigger_files/IO_Iceberg_Integration_Tests.json              | 2 +-
 .../src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java | 5 ++++-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 34a6e02150e..37dd25bf902 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 4
+    "modification": 3
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
index 6ff0a8d1778..998405a73f5 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericDeleteFilter;
 import org.apache.iceberg.data.IdentityPartitionConverters;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataReader;
@@ -175,7 +176,9 @@ class ScanTaskReader extends 
BoundedSource.BoundedReader<Row> {
         default:
           throw new UnsupportedOperationException("Cannot read format: " + 
file.format());
       }
-      currentIterator = iterable.iterator();
+      GenericDeleteFilter deleteFilter =
+          new GenericDeleteFilter(checkStateNotNull(io), fileTask, 
fileTask.schema(), project);
+      currentIterator = deleteFilter.filter(iterable).iterator();
 
     } while (true);
 

Reply via email to