This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 09a531740c Spark, Flink: Backport add max files rewrite option for 
RewriteAction (#13082)
09a531740c is described below

commit 09a531740ccec41d69c166ec484be6da16a09947
Author: B Vadlamani <bhargava...@gmail.com>
AuthorDate: Tue May 20 05:38:17 2025 -0700

    Spark, Flink: Backport add max files rewrite option for RewriteAction 
(#13082)
    
    backports #12824
---
 docs/docs/spark-procedures.md                                |  3 ++-
 .../iceberg/flink/maintenance/api/RewriteDataFiles.java      | 12 ++++++++++++
 .../iceberg/flink/maintenance/api/RewriteDataFiles.java      |  6 ++++++
 .../iceberg/flink/maintenance/api/RewriteDataFiles.java      | 12 ++++++++++++
 .../iceberg/spark/actions/RewriteDataFilesSparkAction.java   |  3 ++-
 .../iceberg/spark/actions/RewriteDataFilesSparkAction.java   |  3 ++-
 6 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md
index 026d256199..c1205d513a 100644
--- a/docs/docs/spark-procedures.md
+++ b/docs/docs/spark-procedures.md
@@ -533,6 +533,7 @@ Dangling deletes are always filtered out during rewriting.
 | `min-input-files` | 5 | Any file group exceeding this number of files will 
be rewritten regardless of other criteria |
 | `rewrite-all` | false | Force rewriting of all provided files overriding 
other options |
 | `max-file-group-size-bytes` | 107374182400 (100GB) | Largest amount of data 
that should be rewritten in a single file group. The entire rewrite operation 
is broken down into pieces based on partitioning and within partitions based on 
size into file-groups.  This helps with breaking down the rewriting of very 
large partitions which may not be rewritable otherwise due to the resource 
constraints of the cluster. |
+| `max-files-to-rewrite` | null | This option sets an upper limit on the 
number of eligible files that will be rewritten. If this option is not 
specified, all eligible files will be rewritten. |
 
 #### Output
 
@@ -1055,4 +1056,4 @@ metadata files and data files to the target location.
 Lastly, the [register_table](#register_table) procedure can be used to 
register the copied table in the target location with a catalog.
 
 !!! warning
-    Iceberg tables with partition statistics files are not currently supported 
for path rewrite.
\ No newline at end of file
+    Iceberg tables with partition statistics files are not currently supported 
for path rewrite.
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index b96635e6ab..3668e8a6c6 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -170,6 +170,18 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Configures max files to rewrite. See {@link 
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
+     * for more details.
+     *
+     * @param maxFilesToRewrite maximum files to rewrite
+     */
+    public Builder maxFilesToRewrite(int maxFilesToRewrite) {
+      this.rewriteOptions.put(
+          BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, 
String.valueOf(maxFilesToRewrite));
+      return this;
+    }
+
     /**
      * The input is a {@link DataStream} with {@link Trigger} events and every 
event should be
      * immediately followed by a {@link Watermark} with the same timestamp as 
the event.
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index 5b1c2c8eee..3668e8a6c6 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -170,6 +170,12 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Configures max files to rewrite. See {@link 
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
+     * for more details.
+     *
+     * @param maxFilesToRewrite maximum files to rewrite
+     */
     public Builder maxFilesToRewrite(int maxFilesToRewrite) {
       this.rewriteOptions.put(
           BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, 
String.valueOf(maxFilesToRewrite));
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index b96635e6ab..3668e8a6c6 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -170,6 +170,18 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Configures max files to rewrite. See {@link 
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
+     * for more details.
+     *
+     * @param maxFilesToRewrite maximum files to rewrite
+     */
+    public Builder maxFilesToRewrite(int maxFilesToRewrite) {
+      this.rewriteOptions.put(
+          BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, 
String.valueOf(maxFilesToRewrite));
+      return this;
+    }
+
     /**
      * The input is a {@link DataStream} with {@link Trigger} events and every 
event should be
      * immediately followed by a {@link Watermark} with the same timestamp as 
the event.
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index df53c1ff1d..48a213f878 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction
           USE_STARTING_SEQUENCE_NUMBER,
           REWRITE_JOB_ORDER,
           OUTPUT_SPEC_ID,
-          REMOVE_DANGLING_DELETES);
+          REMOVE_DANGLING_DELETES,
+          BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE);
 
   private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
       
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 775d260ece..ffee1fec13 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction
           USE_STARTING_SEQUENCE_NUMBER,
           REWRITE_JOB_ORDER,
           OUTPUT_SPEC_ID,
-          REMOVE_DANGLING_DELETES);
+          REMOVE_DANGLING_DELETES,
+          BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE);
 
   private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
       
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();

Reply via email to