This is an automated email from the ASF dual-hosted git repository. russellspitzer pushed a commit to branch BinPackStrategyCosmeticChanges in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 042225c38b3ec886cfd566eda241a859ce3ac588 Author: Russell_Spitzer <[email protected]> AuthorDate: Tue Jun 29 16:26:54 2021 -0500 Core: Refactors to BinPack and Rewrite APIs Changes to support the implementation of the Spark RewriteDatafileAciton. Fixes JavaDoc links, adds in BinPackStrategy comments to better binpack real world usecases. --- .../apache/iceberg/actions/RewriteDataFiles.java | 22 +++--- .../apache/iceberg/actions/RewriteStrategy.java | 5 +- .../apache/iceberg/actions/BinPackStrategy.java | 88 ++++++++++++++++++++-- .../iceberg/actions/TestBinPackStrategy.java | 22 +++++- 4 files changed, 116 insertions(+), 21 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index e959365..f2fe66d 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -49,10 +49,10 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri /** * The entire rewrite operation is broken down into pieces based on partitioning and within partitions based * on size into groups. These sub-units of the rewrite are referred to as file groups. The largest amount of data that - * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. This helps with breaking down the - * rewriting of very large partitions which may not be rewritable otherwise due to the resource constraints of the - * cluster. For example a sort based rewrite may not scale to terabyte sized partitions, those partitions need to be - * worked on in small subsections to avoid exhaustion of resources. + * should be compacted in a single group is controlled by {@link #MAX_FILE_GROUP_SIZE_BYTES}. This helps with + * breaking down the rewriting of very large partitions which may not be rewritable otherwise due to the resource + * constraints of the cluster. For example a sort based rewrite may not scale to terabyte sized partitions, those + * partitions need to be worked on in small subsections to avoid exhaustion of resources. * <p> * When grouping files, the underlying rewrite strategy will use this value as to limit the files which * will be included in a single file group. A group will be processed by a single framework "action". For example, @@ -77,12 +77,6 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes"; /** - * The partition spec to use when writing the output data from this operation. By default uses the - * current table partition spec. - */ - String OUTPUT_PARTITION_SPEC_ID = "output-partition-spec-id"; - - /** * Choose BINPACK as a strategy for this rewrite operation * @return this for method chaining */ @@ -107,6 +101,14 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri */ interface Result { Map<FileGroupInfo, FileGroupRewriteResult> resultMap(); + + default int addedDataFilesCount() { + return resultMap().values().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum(); + } + + default int rewrittenDataFilesCount() { + return resultMap().values().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum(); + } } /** diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java b/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java index 13f1386..3334066 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java @@ -71,8 +71,9 @@ interface RewriteStrategy extends Serializable { * Method which will rewrite files based on this particular RewriteStrategy's algorithm. * This will most likely be Action framework specific (Spark/Presto/Flink ....). * + * @param groupID an identifier for this set of files * @param filesToRewrite a group of files to be rewritten together - * @return a list of newly written files + * @return a set of newly written files */ - List<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite); + Set<DataFile> rewriteFiles(String groupID, List<FileScanTask> filesToRewrite); } diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java index b42001c..bd15068 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java @@ -19,6 +19,7 @@ package org.apache.iceberg.actions; +import java.math.RoundingMode; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,18 +29,19 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.util.BinPacking; import org.apache.iceberg.util.BinPacking.ListPacker; import org.apache.iceberg.util.PropertyUtil; /** * A rewrite strategy for data files which determines which files to rewrite - * based on their size. If files are either smaller than the {@link MIN_FILE_SIZE_BYTES} threshold or - * larger than the {@link MAX_FILE_SIZE_BYTES} threshold, they are considered targets for being rewritten. + * based on their size. If files are either smaller than the {@link #MIN_FILE_SIZE_BYTES} threshold or + * larger than the {@link #MAX_FILE_SIZE_BYTES} threshold, they are considered targets for being rewritten. * <p> * Once selected files are grouped based on a {@link BinPacking} into groups defined * by {@link RewriteDataFiles#MAX_FILE_GROUP_SIZE_BYTES}. Groups will be considered for rewriting if they contain - * more files than {@link MIN_INPUT_FILES} or would produce at least one file of + * more files than {@link #MIN_INPUT_FILES} or would produce at least one file of * {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}. */ abstract class BinPackStrategy implements RewriteStrategy { @@ -55,8 +57,8 @@ abstract class BinPackStrategy implements RewriteStrategy { /** * Adjusts files which will be considered for rewriting. Files smaller than - * {@link MIN_FILE_SIZE_BYTES} will be considered for rewriting. This functions independently - * of {@link MAX_FILE_SIZE_BYTES}. + * {@link #MIN_FILE_SIZE_BYTES} will be considered for rewriting. This functions independently + * of {@link #MAX_FILE_SIZE_BYTES}. * <p> * Defaults to 75% of the target file size */ @@ -65,8 +67,8 @@ abstract class BinPackStrategy implements RewriteStrategy { /** * Adjusts files which will be considered for rewriting. Files larger than - * {@link MAX_FILE_SIZE_BYTES} will be considered for rewriting. This functions independently - * of {@link MIN_FILE_SIZE_BYTES}. + * {@link #MAX_FILE_SIZE_BYTES} will be considered for rewriting. This functions independently + * of {@link #MIN_FILE_SIZE_BYTES}. * <p> * Defaults to 180% of the target file size */ @@ -137,6 +139,78 @@ abstract class BinPackStrategy implements RewriteStrategy { ).collect(Collectors.toList()); } + protected long targetFileSize() { + return this.targetFileSize; + } + + /** + * Determine how many output files to create when rewriting. We use this to determine the split-size + * we want to use when actually writing files to avoid the following situation. + * <p> + * If we are writing 10.1 G of data with a target file size of 1G we would end up with + * 11 files, one of which would only have 0.1g. This would most likely be less preferable to + * 10 files each of which was 1.01g. So here we decide whether to round up or round down + * based on what the estimated average file size will be if we ignore the remainder (0.1g). If + * the new file size is less than 10% greater than the target file size then we will round down + * when determining the number of output files. + * @param totalSizeInBytes total data size for a file group + * @return the number of files this strategy should create + */ + protected long numOutputFiles(long totalSizeInBytes) { + if (totalSizeInBytes < targetFileSize) { + return 1; + } + + long fileCountWithRemainder = LongMath.divide(totalSizeInBytes, targetFileSize, RoundingMode.CEILING); + if (LongMath.mod(totalSizeInBytes, targetFileSize) > minFileSize) { + // Our Remainder file is of valid size for this compaction so keep it + return fileCountWithRemainder; + } + + long fileCountWithoutRemainder = LongMath.divide(totalSizeInBytes, targetFileSize, RoundingMode.FLOOR); + long avgFileSizeWithoutRemainder = totalSizeInBytes / fileCountWithoutRemainder; + if (avgFileSizeWithoutRemainder < 1.1 * targetFileSize) { + // Round down and distribute remainder amongst other files + return fileCountWithoutRemainder; + } else { + // Keep the remainder file + return fileCountWithRemainder; + } + } + + /** + * Returns the smaller of our max write file threshold, and our estimated split size based on + * the number of output files we want to generate. + */ + protected long splitSize(long totalSizeInBytes) { + long estimatedSplitSize = totalSizeInBytes / numOutputFiles(totalSizeInBytes); + return Math.min(estimatedSplitSize, writeMaxFileSize()); + } + + protected static long inputFileSize(List<FileScanTask> fileToRewrite) { + return fileToRewrite.stream().mapToLong(FileScanTask::length).sum(); + } + + + /** + * Estimates a larger max target file size than our target size used in task creation to avoid + * tasks which are predicted to have a certain size, but exceed that target size when serde is complete creating + * tiny remainder files. + * <p> + * While we create tasks that should all be smaller than our target size there is a chance that the actual + * data will end up being larger than our target size due to various factors of compression, serialization and + * other factors outside our control. If this occurs, instead of making a single file that is close in size to + * our target we would end up producing one file of the target size, and then a small extra file with the remaining + * data. For example, if our target is 512 MB we may generate a rewrite task that should be 500 MB. When we write + * the data we may find we actually have to write out 530 MB. If we use the target size while writing we would + * produced a 512 MB file and a 18 MB file. If instead we use a larger size estimated by this method, + * then we end up writing a single file. + * @return the target size plus one half of the distance between max and target + */ + protected long writeMaxFileSize() { + return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5)); + } + private long sizeOfInputFiles(List<FileScanTask> group) { return group.stream().mapToLong(FileScanTask::length).sum(); } diff --git a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java index b42496f..a84a10f 100644 --- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java +++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java @@ -22,6 +22,7 @@ package org.apache.iceberg.actions; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -59,7 +60,7 @@ public class TestBinPackStrategy extends TableTestBase { } @Override - public List<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) { + public Set<DataFile> rewriteFiles(String groupId, List<FileScanTask> filesToRewrite) { throw new UnsupportedOperationException(); } } @@ -164,6 +165,24 @@ public class TestBinPackStrategy extends TableTestBase { } @Test + public void testNumOuputFiles() { + BinPackStrategy strategy = (BinPackStrategy) defaultBinPack(); + long targetFileSize = strategy.targetFileSize(); + Assert.assertEquals("Should keep remainder if the remainder is a valid size", + 2, strategy.numOutputFiles(targetFileSize + 450 * MB)); + Assert.assertEquals("Should discard remainder file if the remainder is very small", + 1, strategy.numOutputFiles(targetFileSize + 40 * MB)); + Assert.assertEquals("Should keep remainder file if it would change average file size greatly", + 2, strategy.numOutputFiles((long) (targetFileSize + 0.40 * targetFileSize))); + Assert.assertEquals("Should discard remainder if file is small and wouldn't change average that much", + 200, strategy.numOutputFiles(200 * targetFileSize + 13 * MB)); + Assert.assertEquals("Should keep remainder if it's a valid size", + 201, strategy.numOutputFiles(200 * targetFileSize + 499 * MB)); + Assert.assertEquals("Should not return 0 even for very small files", + 1, strategy.numOutputFiles(1)); + } + + @Test public void testInvalidOptions() { AssertHelpers.assertThrows("Should not allow max size smaller than target", IllegalArgumentException.class, () -> { @@ -188,6 +207,5 @@ public class TestBinPackStrategy extends TableTestBase { defaultBinPack().options(ImmutableMap.of( RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(-5))); }); - } }
