This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new d6f22d5a51 Flink: Add support for filter in RewriteDataFiles (#13669) d6f22d5a51 is described below commit d6f22d5a51500848bfb1b6feb515ebe3a2e8cd34 Author: GuoYu <511955...@qq.com> AuthorDate: Tue Jul 29 23:21:33 2025 +0800 Flink: Add support for filter in RewriteDataFiles (#13669) --- .../flink/maintenance/api/RewriteDataFiles.java | 18 +++++++++- .../operator/DataFileRewritePlanner.java | 9 +++-- .../maintenance/api/TestRewriteDataFiles.java | 41 ++++++++++++++++++++++ .../flink/maintenance/operator/RewriteUtil.java | 4 ++- .../operator/TestDataFileRewritePlanner.java | 7 ++-- .../operator/TestDataFileRewriteRunner.java | 4 ++- 6 files changed, 76 insertions(+), 7 deletions(-) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index 98fd0cdd40..bedf70725a 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -26,6 +26,8 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.SizeBasedFileRewritePlanner; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteCommitter; import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner; import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteRunner; @@ -56,6 +58,7 @@ public class RewriteDataFiles { org.apache.iceberg.actions.RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT; private final Map<String, String> rewriteOptions = Maps.newHashMapWithExpectedSize(6); private long maxRewriteBytes = Long.MAX_VALUE; + private Expression filter = Expressions.alwaysTrue(); @Override String maintenanceTaskName() { @@ -190,6 +193,18 @@ public class RewriteDataFiles { return this; } + /** + * A user provided filter for determining which files will be considered by the rewrite + * strategy. + * + * @param newFilter the filter expression to apply + * @return this for method chaining + */ + public Builder filter(Expression newFilter) { + this.filter = newFilter; + return this; + } + /** * Configures the properties for the rewriter. * @@ -233,7 +248,8 @@ public class RewriteDataFiles { tableLoader(), partialProgressEnabled ? partialProgressMaxCommits : 1, maxRewriteBytes, - rewriteOptions)) + rewriteOptions, + filter)) .name(operatorName(PLANNER_TASK_NAME)) .uid(PLANNER_TASK_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java index 867ad64ffd..81db62e8bf 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java @@ -35,6 +35,7 @@ import org.apache.iceberg.actions.BinPackRewriteFilePlanner; import org.apache.iceberg.actions.FileRewritePlan; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteFileGroup; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.io.CloseableIterator; @@ -62,6 +63,7 @@ public class DataFileRewritePlanner private final long maxRewriteBytes; private final Map<String, String> rewriterOptions; private transient Counter errorCounter; + private final Expression filter; public DataFileRewritePlanner( String tableName, @@ -70,7 +72,9 @@ public class DataFileRewritePlanner TableLoader tableLoader, int newPartialProgressMaxCommits, long maxRewriteBytes, - Map<String, String> rewriterOptions) { + Map<String, String> rewriterOptions, + Expression filter) { + Preconditions.checkNotNull(tableName, "Table name should no be null"); Preconditions.checkNotNull(taskName, "Task name should no be null"); Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); @@ -83,6 +87,7 @@ public class DataFileRewritePlanner this.partialProgressMaxCommits = newPartialProgressMaxCommits; this.maxRewriteBytes = maxRewriteBytes; this.rewriterOptions = rewriterOptions; + this.filter = filter; } @Override @@ -119,7 +124,7 @@ public class DataFileRewritePlanner return; } - BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table, filter); planner.init(rewriterOptions); FileRewritePlan<RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java index 135b3bd090..795057e235 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java @@ -34,6 +34,7 @@ import java.util.stream.StreamSupport; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -373,6 +374,46 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase { .build()); } + @Test + void testRewriteWithFilter() throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + insert(table, 2, "b"); + insert(table, 3, "c"); + insert(table, 4, "d"); + + assertFileNum(table, 4, 0); + + appendRewriteDataFiles( + RewriteDataFiles.builder() + .parallelism(2) + .deleteFileThreshold(10) + .targetFileSizeBytes(1_000_000L) + .maxFileGroupSizeBytes(10_000_000L) + .maxFileSizeBytes(2_000_000L) + .minFileSizeBytes(500_000L) + .minInputFiles(2) + // Only rewrite data files where id is 1 or 2 for testing rewrite + .filter(Expressions.in("id", 1, 2)) + .partialProgressEnabled(true) + .partialProgressMaxCommits(1) + .maxRewriteBytes(100_000L) + .rewriteAll(false)); + + runAndWaitForSuccess(infra.env(), infra.source(), infra.sink()); + + // There is four files, only id is 1 and 2 will be rewritten. so expect 3 files. + assertFileNum(table, 3, 0); + + SimpleDataUtil.assertTableRecords( + table, + ImmutableList.of( + createRecord(1, "a"), + createRecord(2, "b"), + createRecord(3, "c"), + createRecord(4, "d"))); + } + private void appendRewriteDataFiles() { appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true)); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java index 6e43009e08..68aaf29ac0 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses; import org.apache.iceberg.DataFile; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -47,7 +48,8 @@ class RewriteUtil { tableLoader, 11, 10_000_000L, - ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + ImmutableMap.of(MIN_INPUT_FILES, "2"), + Expressions.alwaysTrue()))) { testHarness.open(); OperatorTestBase.trigger(testHarness); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java index 8798ff23a2..2d83f553e5 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java @@ -34,6 +34,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -116,7 +117,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase { tableLoader(), 11, 1L, - ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + ImmutableMap.of(MIN_INPUT_FILES, "2"), + Expressions.alwaysTrue()))) { testHarness.open(); // Cause an exception @@ -181,7 +183,8 @@ class TestDataFileRewritePlanner extends OperatorTestBase { tableLoader(), 11, maxRewriteBytes, - ImmutableMap.of(MIN_INPUT_FILES, "2")))) { + ImmutableMap.of(MIN_INPUT_FILES, "2"), + Expressions.alwaysTrue()))) { testHarness.open(); OperatorTestBase.trigger(testHarness); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java index 6dc8fb3c02..3c5a103287 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java @@ -45,6 +45,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.parquet.Parquet; @@ -286,7 +287,8 @@ class TestDataFileRewriteRunner extends OperatorTestBase { MIN_INPUT_FILES, "2", TARGET_FILE_SIZE_BYTES, - String.valueOf(targetFileSize))))) { + String.valueOf(targetFileSize)), + Expressions.alwaysTrue()))) { testHarness.open(); OperatorTestBase.trigger(testHarness);