This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d6f22d5a51 Flink: Add support for filter in RewriteDataFiles (#13669)
d6f22d5a51 is described below

commit d6f22d5a51500848bfb1b6feb515ebe3a2e8cd34
Author: GuoYu <511955...@qq.com>
AuthorDate: Tue Jul 29 23:21:33 2025 +0800

    Flink: Add support for filter in RewriteDataFiles (#13669)
---
 .../flink/maintenance/api/RewriteDataFiles.java    | 18 +++++++++-
 .../operator/DataFileRewritePlanner.java           |  9 +++--
 .../maintenance/api/TestRewriteDataFiles.java      | 41 ++++++++++++++++++++++
 .../flink/maintenance/operator/RewriteUtil.java    |  4 ++-
 .../operator/TestDataFileRewritePlanner.java       |  7 ++--
 .../operator/TestDataFileRewriteRunner.java        |  4 ++-
 6 files changed, 76 insertions(+), 7 deletions(-)

diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index 98fd0cdd40..bedf70725a 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteCommitter;
 import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner;
 import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteRunner;
@@ -56,6 +58,7 @@ public class RewriteDataFiles {
         
org.apache.iceberg.actions.RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT;
     private final Map<String, String> rewriteOptions = 
Maps.newHashMapWithExpectedSize(6);
     private long maxRewriteBytes = Long.MAX_VALUE;
+    private Expression filter = Expressions.alwaysTrue();
 
     @Override
     String maintenanceTaskName() {
@@ -190,6 +193,18 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * A user provided filter for determining which files will be considered 
by the rewrite
+     * strategy.
+     *
+     * @param newFilter the filter expression to apply
+     * @return this for method chaining
+     */
+    public Builder filter(Expression newFilter) {
+      this.filter = newFilter;
+      return this;
+    }
+
     /**
      * Configures the properties for the rewriter.
      *
@@ -233,7 +248,8 @@ public class RewriteDataFiles {
                       tableLoader(),
                       partialProgressEnabled ? partialProgressMaxCommits : 1,
                       maxRewriteBytes,
-                      rewriteOptions))
+                      rewriteOptions,
+                      filter))
               .name(operatorName(PLANNER_TASK_NAME))
               .uid(PLANNER_TASK_NAME + uidSuffix())
               .slotSharingGroup(slotSharingGroup())
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index 867ad64ffd..81db62e8bf 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.FileRewritePlan;
 import org.apache.iceberg.actions.RewriteDataFiles;
 import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.io.CloseableIterator;
@@ -62,6 +63,7 @@ public class DataFileRewritePlanner
   private final long maxRewriteBytes;
   private final Map<String, String> rewriterOptions;
   private transient Counter errorCounter;
+  private final Expression filter;
 
   public DataFileRewritePlanner(
       String tableName,
@@ -70,7 +72,9 @@ public class DataFileRewritePlanner
       TableLoader tableLoader,
       int newPartialProgressMaxCommits,
       long maxRewriteBytes,
-      Map<String, String> rewriterOptions) {
+      Map<String, String> rewriterOptions,
+      Expression filter) {
+
     Preconditions.checkNotNull(tableName, "Table name should no be null");
     Preconditions.checkNotNull(taskName, "Task name should no be null");
     Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
@@ -83,6 +87,7 @@ public class DataFileRewritePlanner
     this.partialProgressMaxCommits = newPartialProgressMaxCommits;
     this.maxRewriteBytes = maxRewriteBytes;
     this.rewriterOptions = rewriterOptions;
+    this.filter = filter;
   }
 
   @Override
@@ -119,7 +124,7 @@ public class DataFileRewritePlanner
         return;
       }
 
-      BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
+      BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table, 
filter);
       planner.init(rewriterOptions);
 
       FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, 
RewriteFileGroup>
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 135b3bd090..795057e235 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -34,6 +34,7 @@ import java.util.stream.StreamSupport;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import 
org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -373,6 +374,46 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase 
{
             .build());
   }
 
+  @Test
+  void testRewriteWithFilter() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+    insert(table, 3, "c");
+    insert(table, 4, "d");
+
+    assertFileNum(table, 4, 0);
+
+    appendRewriteDataFiles(
+        RewriteDataFiles.builder()
+            .parallelism(2)
+            .deleteFileThreshold(10)
+            .targetFileSizeBytes(1_000_000L)
+            .maxFileGroupSizeBytes(10_000_000L)
+            .maxFileSizeBytes(2_000_000L)
+            .minFileSizeBytes(500_000L)
+            .minInputFiles(2)
+            // Only rewrite data files where id is 1 or 2 for testing rewrite
+            .filter(Expressions.in("id", 1, 2))
+            .partialProgressEnabled(true)
+            .partialProgressMaxCommits(1)
+            .maxRewriteBytes(100_000L)
+            .rewriteAll(false));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    // There is four files, only id is 1 and 2 will be rewritten. so expect 3 
files.
+    assertFileNum(table, 3, 0);
+
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            createRecord(1, "a"),
+            createRecord(2, "b"),
+            createRecord(3, "c"),
+            createRecord(4, "d")));
+  }
+
   private void appendRewriteDataFiles() {
     appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
   }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 6e43009e08..68aaf29ac0 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -47,7 +48,8 @@ class RewriteUtil {
                     tableLoader,
                     11,
                     10_000_000L,
-                    ImmutableMap.of(MIN_INPUT_FILES, "2")))) {
+                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    Expressions.alwaysTrue()))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 8798ff23a2..2d83f553e5 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -116,7 +117,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
                     tableLoader(),
                     11,
                     1L,
-                    ImmutableMap.of(MIN_INPUT_FILES, "2")))) {
+                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    Expressions.alwaysTrue()))) {
       testHarness.open();
 
       // Cause an exception
@@ -181,7 +183,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
                     tableLoader(),
                     11,
                     maxRewriteBytes,
-                    ImmutableMap.of(MIN_INPUT_FILES, "2")))) {
+                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    Expressions.alwaysTrue()))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 6dc8fb3c02..3c5a103287 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.parquet.Parquet;
@@ -286,7 +287,8 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
                         MIN_INPUT_FILES,
                         "2",
                         TARGET_FILE_SIZE_BYTES,
-                        String.valueOf(targetFileSize))))) {
+                        String.valueOf(targetFileSize)),
+                    Expressions.alwaysTrue()))) {
       testHarness.open();
 
       OperatorTestBase.trigger(testHarness);

Reply via email to