This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e2ac8dfd2 Core: Fix split size calculations in file rewriters (#9069)
2e2ac8dfd2 is described below

commit 2e2ac8dfd2ab2c214444cccb0e6024e1f80b0502
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Nov 16 09:31:17 2023 -0800

    Core: Fix split size calculations in file rewriters (#9069)
---
 .../iceberg/actions/SizeBasedFileRewriter.java     |  16 +++-
 .../iceberg/actions/TestSizeBasedRewriter.java     | 100 +++++++++++++++++++++
 2 files changed, 112 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java 
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
index 505fbaeda8..cf98c5266a 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
@@ -181,13 +181,21 @@ public abstract class SizeBasedFileRewriter<T extends 
ContentScanTask<F>, F exte
   }
 
   /**
-   * Returns the smallest of our max write file threshold and our estimated 
split size based on the
-   * number of output files we want to generate. Add an overhead onto the 
estimated split size to
-   * try to avoid small errors in size creating brand-new files.
+   * Calculates the split size to use in bin-packing rewrites.
+   *
+   * <p>This method determines the target split size as the input size divided 
by the desired number
+   * of output files. The final split size is adjusted to be at least as big 
as the target file size
+   * but less than the max write file size.
    */
   protected long splitSize(long inputSize) {
     long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) + 
SPLIT_OVERHEAD;
-    return Math.min(estimatedSplitSize, writeMaxFileSize());
+    if (estimatedSplitSize < targetFileSize) {
+      return targetFileSize;
+    } else if (estimatedSplitSize > writeMaxFileSize()) {
+      return writeMaxFileSize();
+    } else {
+      return estimatedSplitSize;
+    }
   }
 
   /**
diff --git 
a/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java 
b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java
new file mode 100644
index 0000000000..c33bbc6f6d
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSizeBasedRewriter extends TableTestBase {
+
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {1, 2};
+  }
+
+  public TestSizeBasedRewriter(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Test
+  public void testSplitSizeLowerBound() {
+    SizeBasedDataFileRewriterImpl rewriter = new 
SizeBasedDataFileRewriterImpl(table);
+
+    FileScanTask task1 = new MockFileScanTask(145L * 1024 * 1024);
+    FileScanTask task2 = new MockFileScanTask(145L * 1024 * 1024);
+    FileScanTask task3 = new MockFileScanTask(145L * 1024 * 1024);
+    FileScanTask task4 = new MockFileScanTask(145L * 1024 * 1024);
+    List<FileScanTask> tasks = ImmutableList.of(task1, task2, task3, task4);
+
+    long minFileSize = 256L * 1024 * 1024;
+    long targetFileSize = 512L * 1024 * 1024;
+    long maxFileSize = 768L * 1024 * 1024;
+
+    Map<String, String> options =
+        ImmutableMap.of(
+            SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, 
String.valueOf(minFileSize),
+            SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, 
String.valueOf(targetFileSize),
+            SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, 
String.valueOf(maxFileSize));
+    rewriter.init(options);
+
+    // the total task size is 580 MB and the target file size is 512 MB
+    // the remainder must be written into a separate file as it exceeds 10%
+    long numOutputFiles = rewriter.computeNumOutputFiles(tasks);
+    assertThat(numOutputFiles).isEqualTo(2);
+
+    // the split size must be >= targetFileSize and < maxFileSize
+    long splitSize = rewriter.computeSplitSize(tasks);
+    assertThat(splitSize).isGreaterThanOrEqualTo(targetFileSize);
+    assertThat(splitSize).isLessThan(maxFileSize);
+  }
+
+  private static class SizeBasedDataFileRewriterImpl extends 
SizeBasedDataRewriter {
+
+    SizeBasedDataFileRewriterImpl(Table table) {
+      super(table);
+    }
+
+    @Override
+    public Set<DataFile> rewrite(List<FileScanTask> group) {
+      throw new UnsupportedOperationException("Not implemented");
+    }
+
+    public long computeSplitSize(List<FileScanTask> group) {
+      return splitSize(inputSize(group));
+    }
+
+    public long computeNumOutputFiles(List<FileScanTask> group) {
+      return numOutputFiles(inputSize(group));
+    }
+  }
+}

Reply via email to