This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f2449550c7 Flink, Core: RewriteDataFiles add max file group count 
(#14837)
f2449550c7 is described below

commit f2449550c72e8d744b43438ef332716121af2c6b
Author: GuoYu <[email protected]>
AuthorDate: Tue Dec 16 17:01:26 2025 +0800

    Flink, Core: RewriteDataFiles add max file group count (#14837)
---
 .../actions/SizeBasedFileRewritePlanner.java       | 23 +++++++++-
 .../java/org/apache/iceberg/util/BinPacking.java   | 46 +++++++++++++++++---
 .../org/apache/iceberg/util/TestBinPacking.java    | 50 +++++++++++++++++++++-
 docs/docs/flink-maintenance.md                     |  1 +
 .../flink/maintenance/api/RewriteDataFiles.java    | 13 ++++++
 .../flink/maintenance/operator/RewriteUtil.java    |  8 +++-
 .../operator/TestDataFileRewritePlanner.java       | 20 +++++++++
 7 files changed, 150 insertions(+), 11 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
 
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
index fad139078b..bcd0054130 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
@@ -107,6 +107,15 @@ public abstract class SizeBasedFileRewritePlanner<
 
   public static final long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 100L * 1024 * 
1024 * 1024; // 100 GB
 
+  /**
+   * This option controls the largest count of data that should be rewritten 
in a single file group.
+   * It helps with breaking down the rewriting of very large partitions which 
may not be rewritable
+   * otherwise due to the resource constraints of the cluster.
+   */
+  public static final String MAX_FILE_GROUP_INPUT_FILES = 
"max-file-group-input-files";
+
+  public static final long MAX_FILE_GROUP_INPUT_FILES_DEFAULT = Long.MAX_VALUE;
+
   private static final long SPLIT_OVERHEAD = 5L * 1024;
 
   private final Table table;
@@ -116,6 +125,7 @@ public abstract class SizeBasedFileRewritePlanner<
   private int minInputFiles;
   private boolean rewriteAll;
   private long maxGroupSize;
+  private long maxGroupCount;
   private int outputSpecId;
 
   protected SizeBasedFileRewritePlanner(Table table) {
@@ -151,6 +161,7 @@ public abstract class SizeBasedFileRewritePlanner<
     this.minInputFiles = minInputFiles(options);
     this.rewriteAll = rewriteAll(options);
     this.maxGroupSize = maxGroupSize(options);
+    this.maxGroupCount = maxGroupCount(options);
     this.outputSpecId = outputSpecId(options);
 
     if (rewriteAll) {
@@ -168,7 +179,8 @@ public abstract class SizeBasedFileRewritePlanner<
 
   protected Iterable<List<T>> planFileGroups(Iterable<T> tasks) {
     Iterable<T> filteredTasks = rewriteAll ? tasks : filterFiles(tasks);
-    BinPacking.ListPacker<T> packer = new 
BinPacking.ListPacker<>(maxGroupSize, 1, false);
+    BinPacking.ListPacker<T> packer =
+        new BinPacking.ListPacker<>(maxGroupSize, 1, false, maxGroupCount);
     List<List<T>> groups = packer.pack(filteredTasks, ContentScanTask::length);
     return rewriteAll ? groups : filterFileGroups(groups);
   }
@@ -337,6 +349,15 @@ public abstract class SizeBasedFileRewritePlanner<
     return value;
   }
 
+  private long maxGroupCount(Map<String, String> options) {
+    long value =
+        PropertyUtil.propertyAsLong(
+            options, MAX_FILE_GROUP_INPUT_FILES, 
MAX_FILE_GROUP_INPUT_FILES_DEFAULT);
+    Preconditions.checkArgument(
+        value > 0, "'%s' is set to %s but must be > 0", 
MAX_FILE_GROUP_INPUT_FILES, value);
+    return value;
+  }
+
   private boolean rewriteAll(Map<String, String> options) {
     return PropertyUtil.propertyAsBoolean(options, REWRITE_ALL, 
REWRITE_ALL_DEFAULT);
   }
diff --git a/core/src/main/java/org/apache/iceberg/util/BinPacking.java 
b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
index f3160389ca..db31b63974 100644
--- a/core/src/main/java/org/apache/iceberg/util/BinPacking.java
+++ b/core/src/main/java/org/apache/iceberg/util/BinPacking.java
@@ -36,11 +36,18 @@ public class BinPacking {
     private final long targetWeight;
     private final int lookback;
     private final boolean largestBinFirst;
+    private final long maxItemsPerBin;
 
     public ListPacker(long targetWeight, int lookback, boolean 
largestBinFirst) {
+      this(targetWeight, lookback, largestBinFirst, Long.MAX_VALUE);
+    }
+
+    public ListPacker(
+        long targetWeight, int lookback, boolean largestBinFirst, long 
maxItemsPerBin) {
       this.targetWeight = targetWeight;
       this.lookback = lookback;
       this.largestBinFirst = largestBinFirst;
+      this.maxItemsPerBin = maxItemsPerBin;
     }
 
     public List<List<T>> packEnd(List<T> items, Function<T, Long> weightFunc) {
@@ -48,13 +55,19 @@ public class BinPacking {
           ImmutableList.copyOf(
               Iterables.transform(
                   new PackingIterable<>(
-                      Lists.reverse(items), targetWeight, lookback, 
weightFunc, largestBinFirst),
+                      Lists.reverse(items),
+                      targetWeight,
+                      lookback,
+                      weightFunc,
+                      largestBinFirst,
+                      maxItemsPerBin),
                   Lists::reverse)));
     }
 
     public List<List<T>> pack(Iterable<T> items, Function<T, Long> weightFunc) 
{
       return ImmutableList.copyOf(
-          new PackingIterable<>(items, targetWeight, lookback, weightFunc, 
largestBinFirst));
+          new PackingIterable<>(
+              items, targetWeight, lookback, weightFunc, largestBinFirst, 
maxItemsPerBin));
     }
   }
 
@@ -62,12 +75,13 @@ public class BinPacking {
     private final Iterable<T> iterable;
     private final long targetWeight;
     private final int lookback;
+    private final long maxSize;
     private final Function<T, Long> weightFunc;
     private final boolean largestBinFirst;
 
     public PackingIterable(
         Iterable<T> iterable, long targetWeight, int lookback, Function<T, 
Long> weightFunc) {
-      this(iterable, targetWeight, lookback, weightFunc, false);
+      this(iterable, targetWeight, lookback, weightFunc, false, 
Long.MAX_VALUE);
     }
 
     public PackingIterable(
@@ -76,11 +90,22 @@ public class BinPacking {
         int lookback,
         Function<T, Long> weightFunc,
         boolean largestBinFirst) {
+      this(iterable, targetWeight, lookback, weightFunc, largestBinFirst, 
Long.MAX_VALUE);
+    }
+
+    public PackingIterable(
+        Iterable<T> iterable,
+        long targetWeight,
+        int lookback,
+        Function<T, Long> weightFunc,
+        boolean largestBinFirst,
+        long maxSize) {
       Preconditions.checkArgument(
           lookback > 0, "Bin look-back size must be greater than 0: %s", 
lookback);
       this.iterable = iterable;
       this.targetWeight = targetWeight;
       this.lookback = lookback;
+      this.maxSize = maxSize;
       this.weightFunc = weightFunc;
       this.largestBinFirst = largestBinFirst;
     }
@@ -88,7 +113,7 @@ public class BinPacking {
     @Override
     public Iterator<List<T>> iterator() {
       return new PackingIterator<>(
-          iterable.iterator(), targetWeight, lookback, weightFunc, 
largestBinFirst);
+          iterable.iterator(), targetWeight, lookback, maxSize, weightFunc, 
largestBinFirst);
     }
   }
 
@@ -97,6 +122,7 @@ public class BinPacking {
     private final Iterator<T> items;
     private final long targetWeight;
     private final int lookback;
+    private final long maxSize;
     private final Function<T, Long> weightFunc;
     private final boolean largestBinFirst;
 
@@ -104,11 +130,13 @@ public class BinPacking {
         Iterator<T> items,
         long targetWeight,
         int lookback,
+        long maxSize,
         Function<T, Long> weightFunc,
         boolean largestBinFirst) {
       this.items = items;
       this.targetWeight = targetWeight;
       this.lookback = lookback;
+      this.maxSize = maxSize;
       this.weightFunc = weightFunc;
       this.largestBinFirst = largestBinFirst;
     }
@@ -163,7 +191,7 @@ public class BinPacking {
     }
 
     private Bin<T> newBin() {
-      return new Bin<>(targetWeight);
+      return new Bin<>(targetWeight, maxSize);
     }
 
     private static <T> Bin<T> removeLargestBin(Collection<Bin<T>> bins) {
@@ -181,11 +209,14 @@ public class BinPacking {
 
   private static class Bin<T> {
     private final long targetWeight;
+    private final long maxSize;
     private final List<T> items = Lists.newArrayList();
     private long binWeight = 0L;
+    private int binSize = 0;
 
-    Bin(long targetWeight) {
+    Bin(long targetWeight, long maxSize) {
       this.targetWeight = targetWeight;
+      this.maxSize = maxSize;
     }
 
     List<T> items() {
@@ -193,11 +224,12 @@ public class BinPacking {
     }
 
     boolean canAdd(long weight) {
-      return binWeight + weight <= targetWeight;
+      return binWeight + weight <= targetWeight && binSize < maxSize;
     }
 
     void add(T item, long weight) {
       this.binWeight += weight;
+      this.binSize++;
       items.add(item);
     }
 
diff --git a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java 
b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
index f0cc6db1ad..213f099c8a 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestBinPacking.java
@@ -61,6 +61,37 @@ public class TestBinPacking {
         .isEqualTo(list(list(1, 2, 3, 4, 5)));
   }
 
+  @Test
+  public void testBasicBinPackingTargetSize() {
+    assertThat(pack(list(1, 2, 3, 4, 5), 3, Integer.MAX_VALUE, 2))
+        .as("Should pack the first 2 values")
+        .isEqualTo(list(list(1, 2), list(3), list(4), list(5)));
+
+    assertThat(pack(list(1, 2, 3, 4, 5), 5, Integer.MAX_VALUE, 2))
+        .as("Should pack the first 2 values")
+        .isEqualTo(list(list(1, 2), list(3), list(4), list(5)));
+
+    assertThat(pack(list(1, 2, 3, 4, 5), 6, Integer.MAX_VALUE, 2))
+        .as("Should pack the first 3 values")
+        .isEqualTo(list(list(1, 2), list(3), list(4), list(5)));
+
+    assertThat(pack(list(1, 2, 3, 4, 5), 8, Integer.MAX_VALUE, 3))
+        .as("Should pack the first 3 values")
+        .isEqualTo(list(list(1, 2, 3), list(4), list(5)));
+
+    assertThat(pack(list(1, 2, 3, 4, 5), 9, Integer.MAX_VALUE, 3))
+        .as("Should pack the first 3 values, last 2 values")
+        .isEqualTo(list(list(1, 2, 3), list(4, 5)));
+
+    assertThat(pack(list(1, 2, 3, 4, 5), 10, Integer.MAX_VALUE, 3))
+        .as("Should pack the first 3 values, last 2 values")
+        .isEqualTo(list(list(1, 2, 3), list(4, 5)));
+
+    assertThat(pack(list(1, 2, 3, 4, 5), 14, Integer.MAX_VALUE, 3))
+        .as("Should pack the first 3 values, last 2 values")
+        .isEqualTo(list(list(1, 2, 3), list(4, 5)));
+  }
+
   @Test
   public void testReverseBinPackingSingleLookback() {
     assertThat(packEnd(list(1, 2, 3, 4, 5), 3, 1))
@@ -212,12 +243,27 @@ public class TestBinPacking {
   }
 
   private List<List<Integer>> pack(List<Integer> items, long targetWeight, int 
lookback) {
-    return pack(items, targetWeight, lookback, false);
+    return pack(items, targetWeight, lookback, Long.MAX_VALUE);
   }
 
   private List<List<Integer>> pack(
       List<Integer> items, long targetWeight, int lookback, boolean 
largestBinFirst) {
-    ListPacker<Integer> packer = new ListPacker<>(targetWeight, lookback, 
largestBinFirst);
+    return pack(items, targetWeight, lookback, largestBinFirst, 
Long.MAX_VALUE);
+  }
+
+  private List<List<Integer>> pack(
+      List<Integer> items, long targetWeight, int lookback, long targetSize) {
+    return pack(items, targetWeight, lookback, false, targetSize);
+  }
+
+  private List<List<Integer>> pack(
+      List<Integer> items,
+      long targetWeight,
+      int lookback,
+      boolean largestBinFirst,
+      long targetSize) {
+    ListPacker<Integer> packer =
+        new ListPacker<>(targetWeight, lookback, largestBinFirst, targetSize);
     return packer.pack(items, Integer::longValue);
   }
 
diff --git a/docs/docs/flink-maintenance.md b/docs/docs/flink-maintenance.md
index a3d3ff1e4b..37fb9a7a84 100644
--- a/docs/docs/flink-maintenance.md
+++ b/docs/docs/flink-maintenance.md
@@ -218,6 +218,7 @@ env.execute("Table Maintenance Job");
 | `partialProgressMaxCommits(int)` | Maximum commits allowed for partial 
progress when partialProgressEnabled is true | 10 | int |
 | `maxRewriteBytes(long)` | Maximum bytes to rewrite per execution | 
Long.MAX_VALUE | long |
 | `filter(Expression)` | Filter expression for selecting files to rewrite | 
Expressions.alwaysTrue() | Expression |
+| `maxFileGroupInputFiles(long)`         | Maximum allowed number of input 
files within a file group                                              | 
Long.MAX_VALUE | long |
 
 #### DeleteOrphanFiles Configuration
 
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index bedf70725a..3b64a79eee 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -181,6 +181,19 @@ public class RewriteDataFiles {
       return this;
     }
 
+    /**
+     * Configures the max file count for rewriting. See {@link
+     * SizeBasedFileRewritePlanner#MAX_FILE_GROUP_INPUT_FILES} for more 
details.
+     *
+     * @param maxFileGroupInputFiles file count for rewrite
+     */
+    public Builder maxFileGroupInputFiles(long maxFileGroupInputFiles) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES,
+          String.valueOf(maxFileGroupInputFiles));
+      return this;
+    }
+
     /**
      * Configures max files to rewrite. See {@link 
BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
      * for more details.
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
index 68aaf29ac0..95992ccd97 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/RewriteUtil.java
@@ -22,6 +22,7 @@ import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
@@ -38,6 +39,11 @@ class RewriteUtil {
 
   static List<DataFileRewritePlanner.PlannedGroup> 
planDataFileRewrite(TableLoader tableLoader)
       throws Exception {
+    return planDataFileRewrite(tableLoader, ImmutableMap.of(MIN_INPUT_FILES, 
"2"));
+  }
+
+  static List<DataFileRewritePlanner.PlannedGroup> planDataFileRewrite(
+      TableLoader tableLoader, Map<String, String> rewriterOptions) throws 
Exception {
     try (OneInputStreamOperatorTestHarness<Trigger, 
DataFileRewritePlanner.PlannedGroup>
         testHarness =
             ProcessFunctionTestHarnesses.forProcessFunction(
@@ -48,7 +54,7 @@ class RewriteUtil {
                     tableLoader,
                     11,
                     10_000_000L,
-                    ImmutableMap.of(MIN_INPUT_FILES, "2"),
+                    rewriterOptions,
                     Expressions.alwaysTrue()))) {
       testHarness.open();
 
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 9f4f96e106..cb1a41bb43 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.maintenance.operator;
 
+import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MAX_FILE_GROUP_INPUT_FILES;
 import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_FILES;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
@@ -182,6 +183,25 @@ class TestDataFileRewritePlanner extends OperatorTestBase {
     }
   }
 
+  @Test
+  void testMaxFileGroupCount() throws Exception {
+    Table table = createPartitionedTable();
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+    insertPartitioned(table, 5, "p2");
+    insertPartitioned(table, 6, "p2");
+
+    List<DataFileRewritePlanner.PlannedGroup> planWithNoLimit = 
planDataFileRewrite(tableLoader());
+    assertThat(planWithNoLimit).hasSize(2);
+
+    List<DataFileRewritePlanner.PlannedGroup> planWithMaxFileGroupCount =
+        planDataFileRewrite(
+            tableLoader(), ImmutableMap.of(MIN_INPUT_FILES, "2", 
MAX_FILE_GROUP_INPUT_FILES, "2"));
+    assertThat(planWithMaxFileGroupCount).hasSize(3);
+  }
+
   void assertRewriteFileGroup(
       DataFileRewritePlanner.PlannedGroup plannedGroup, Table table, 
Set<DataFile> files) {
     assertThat(plannedGroup.table().currentSnapshot().snapshotId())

Reply via email to