This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2e2ac8dfd2 Core: Fix split size calculations in file rewriters (#9069)
2e2ac8dfd2 is described below
commit 2e2ac8dfd2ab2c214444cccb0e6024e1f80b0502
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Nov 16 09:31:17 2023 -0800
Core: Fix split size calculations in file rewriters (#9069)
---
.../iceberg/actions/SizeBasedFileRewriter.java | 16 +++-
.../iceberg/actions/TestSizeBasedRewriter.java | 100 +++++++++++++++++++++
2 files changed, 112 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
index 505fbaeda8..cf98c5266a 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
@@ -181,13 +181,21 @@ public abstract class SizeBasedFileRewriter<T extends
ContentScanTask<F>, F exte
}
/**
- * Returns the smallest of our max write file threshold and our estimated
split size based on the
- * number of output files we want to generate. Add an overhead onto the
estimated split size to
- * try to avoid small errors in size creating brand-new files.
+ * Calculates the split size to use in bin-packing rewrites.
+ *
+ * <p>This method determines the target split size as the input size divided
by the desired number
+ * of output files. The final split size is adjusted to be at least as big
as the target file size
+ * but less than the max write file size.
*/
protected long splitSize(long inputSize) {
long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) +
SPLIT_OVERHEAD;
- return Math.min(estimatedSplitSize, writeMaxFileSize());
+ if (estimatedSplitSize < targetFileSize) {
+ return targetFileSize;
+ } else if (estimatedSplitSize > writeMaxFileSize()) {
+ return writeMaxFileSize();
+ } else {
+ return estimatedSplitSize;
+ }
}
/**
diff --git
a/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java
b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java
new file mode 100644
index 0000000000..c33bbc6f6d
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSizeBasedRewriter extends TableTestBase {
+
+ @Parameterized.Parameters(name = "formatVersion = {0}")
+ public static Object[] parameters() {
+ return new Object[] {1, 2};
+ }
+
+ public TestSizeBasedRewriter(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Test
+ public void testSplitSizeLowerBound() {
+ SizeBasedDataFileRewriterImpl rewriter = new
SizeBasedDataFileRewriterImpl(table);
+
+ FileScanTask task1 = new MockFileScanTask(145L * 1024 * 1024);
+ FileScanTask task2 = new MockFileScanTask(145L * 1024 * 1024);
+ FileScanTask task3 = new MockFileScanTask(145L * 1024 * 1024);
+ FileScanTask task4 = new MockFileScanTask(145L * 1024 * 1024);
+ List<FileScanTask> tasks = ImmutableList.of(task1, task2, task3, task4);
+
+ long minFileSize = 256L * 1024 * 1024;
+ long targetFileSize = 512L * 1024 * 1024;
+ long maxFileSize = 768L * 1024 * 1024;
+
+ Map<String, String> options =
+ ImmutableMap.of(
+ SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES,
String.valueOf(minFileSize),
+ SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES,
String.valueOf(targetFileSize),
+ SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES,
String.valueOf(maxFileSize));
+ rewriter.init(options);
+
+ // the total task size is 580 MB and the target file size is 512 MB
+ // the remainder must be written into a separate file as it exceeds 10%
+ long numOutputFiles = rewriter.computeNumOutputFiles(tasks);
+ assertThat(numOutputFiles).isEqualTo(2);
+
+ // the split size must be >= targetFileSize and < maxFileSize
+ long splitSize = rewriter.computeSplitSize(tasks);
+ assertThat(splitSize).isGreaterThanOrEqualTo(targetFileSize);
+ assertThat(splitSize).isLessThan(maxFileSize);
+ }
+
+ private static class SizeBasedDataFileRewriterImpl extends
SizeBasedDataRewriter {
+
+ SizeBasedDataFileRewriterImpl(Table table) {
+ super(table);
+ }
+
+ @Override
+ public Set<DataFile> rewrite(List<FileScanTask> group) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public long computeSplitSize(List<FileScanTask> group) {
+ return splitSize(inputSize(group));
+ }
+
+ public long computeNumOutputFiles(List<FileScanTask> group) {
+ return numOutputFiles(inputSize(group));
+ }
+ }
+}