This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 09a531740c Spark, Flink: Backport add max files rewrite option for RewriteAction (#13082) 09a531740c is described below commit 09a531740ccec41d69c166ec484be6da16a09947 Author: B Vadlamani <bhargava...@gmail.com> AuthorDate: Tue May 20 05:38:17 2025 -0700 Spark, Flink: Backport add max files rewrite option for RewriteAction (#13082) backports #12824 --- docs/docs/spark-procedures.md | 3 ++- .../iceberg/flink/maintenance/api/RewriteDataFiles.java | 12 ++++++++++++ .../iceberg/flink/maintenance/api/RewriteDataFiles.java | 6 ++++++ .../iceberg/flink/maintenance/api/RewriteDataFiles.java | 12 ++++++++++++ .../iceberg/spark/actions/RewriteDataFilesSparkAction.java | 3 ++- .../iceberg/spark/actions/RewriteDataFilesSparkAction.java | 3 ++- 6 files changed, 36 insertions(+), 3 deletions(-) diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 026d256199..c1205d513a 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -533,6 +533,7 @@ Dangling deletes are always filtered out during rewriting. | `min-input-files` | 5 | Any file group exceeding this number of files will be rewritten regardless of other criteria | | `rewrite-all` | false | Force rewriting of all provided files overriding other options | | `max-file-group-size-bytes` | 107374182400 (100GB) | Largest amount of data that should be rewritten in a single file group. The entire rewrite operation is broken down into pieces based on partitioning and within partitions based on size into file-groups. This helps with breaking down the rewriting of very large partitions which may not be rewritable otherwise due to the resource constraints of the cluster. | +| `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. | #### Output @@ -1055,4 +1056,4 @@ metadata files and data files to the target location. Lastly, the [register_table](#register_table) procedure can be used to register the copied table in the target location with a catalog. !!! warning - Iceberg tables with partition statistics files are not currently supported for path rewrite. \ No newline at end of file + Iceberg tables with partition statistics files are not currently supported for path rewrite. diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index b96635e6ab..3668e8a6c6 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -170,6 +170,18 @@ public class RewriteDataFiles { return this; } + /** + * Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE} + * for more details. + * + * @param maxFilesToRewrite maximum files to rewrite + */ + public Builder maxFilesToRewrite(int maxFilesToRewrite) { + this.rewriteOptions.put( + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite)); + return this; + } + /** * The input is a {@link DataStream} with {@link Trigger} events and every event should be * immediately followed by a {@link Watermark} with the same timestamp as the event. diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index 5b1c2c8eee..3668e8a6c6 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -170,6 +170,12 @@ public class RewriteDataFiles { return this; } + /** + * Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE} + * for more details. + * + * @param maxFilesToRewrite maximum files to rewrite + */ public Builder maxFilesToRewrite(int maxFilesToRewrite) { this.rewriteOptions.put( BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite)); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index b96635e6ab..3668e8a6c6 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -170,6 +170,18 @@ public class RewriteDataFiles { return this; } + /** + * Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE} + * for more details. + * + * @param maxFilesToRewrite maximum files to rewrite + */ + public Builder maxFilesToRewrite(int maxFilesToRewrite) { + this.rewriteOptions.put( + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite)); + return this; + } + /** * The input is a {@link DataStream} with {@link Trigger} events and every event should be * immediately followed by a {@link Watermark} with the same timestamp as the event. diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index df53c1ff1d..48a213f878 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER, OUTPUT_SPEC_ID, - REMOVE_DANGLING_DELETES); + REMOVE_DANGLING_DELETES, + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE); private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 775d260ece..ffee1fec13 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER, OUTPUT_SPEC_ID, - REMOVE_DANGLING_DELETES); + REMOVE_DANGLING_DELETES, + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE); private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();