This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch BinPackStrategyCosmeticChanges
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 042225c38b3ec886cfd566eda241a859ce3ac588
Author: Russell_Spitzer <[email protected]>
AuthorDate: Tue Jun 29 16:26:54 2021 -0500

    Core: Refactors to BinPack and Rewrite APIs
    
    Changes to support the implementation of the Spark RewriteDatafileAciton. 
Fixes JavaDoc links,
    adds in BinPackStrategy comments to better binpack real world usecases.
---
 .../apache/iceberg/actions/RewriteDataFiles.java   | 22 +++---
 .../apache/iceberg/actions/RewriteStrategy.java    |  5 +-
 .../apache/iceberg/actions/BinPackStrategy.java    | 88 ++++++++++++++++++++--
 .../iceberg/actions/TestBinPackStrategy.java       | 22 +++++-
 4 files changed, 116 insertions(+), 21 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java 
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index e959365..f2fe66d 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -49,10 +49,10 @@ public interface RewriteDataFiles extends 
SnapshotUpdate<RewriteDataFiles, Rewri
   /**
    * The entire rewrite operation is broken down into pieces based on 
partitioning and within partitions based
    * on size into groups. These sub-units of the rewrite are referred to as 
file groups. The largest amount of data that
-   * should be compacted in a single group is controlled by 
MAX_FILE_GROUP_SIZE_BYTES. This helps with breaking down the
-   * rewriting of very large partitions which may not be rewritable otherwise 
due to the resource constraints of the
-   * cluster. For example a sort based rewrite may not scale to terabyte sized 
partitions, those partitions need to be
-   * worked on in small subsections to avoid exhaustion of resources.
+   * should be compacted in a single group is controlled by {@link 
#MAX_FILE_GROUP_SIZE_BYTES}. This helps with
+   * breaking down the rewriting of very large partitions which may not be 
rewritable otherwise due to the resource
+   * constraints of the cluster. For example a sort based rewrite may not 
scale to terabyte sized partitions, those
+   * partitions need to be worked on in small subsections to avoid exhaustion 
of resources.
    * <p>
    * When grouping files, the underlying rewrite strategy will use this value 
as to limit the files which
    * will be included in a single file group. A group will be processed by a 
single framework "action". For example,
@@ -77,12 +77,6 @@ public interface RewriteDataFiles extends 
SnapshotUpdate<RewriteDataFiles, Rewri
   String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
 
   /**
-   * The partition spec to use when writing the output data from this 
operation. By default uses the
-   * current table partition spec.
-   */
-  String OUTPUT_PARTITION_SPEC_ID = "output-partition-spec-id";
-
-  /**
    * Choose BINPACK as a strategy for this rewrite operation
    * @return this for method chaining
    */
@@ -107,6 +101,14 @@ public interface RewriteDataFiles extends 
SnapshotUpdate<RewriteDataFiles, Rewri
    */
   interface Result {
     Map<FileGroupInfo, FileGroupRewriteResult> resultMap();
+
+    default int addedDataFilesCount() {
+      return 
resultMap().values().stream().mapToInt(FileGroupRewriteResult::addedDataFilesCount).sum();
+    }
+
+    default int rewrittenDataFilesCount() {
+      return 
resultMap().values().stream().mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount).sum();
+    }
   }
 
   /**
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java 
b/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java
index 13f1386..3334066 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java
@@ -71,8 +71,9 @@ interface RewriteStrategy extends Serializable {
    * Method which will rewrite files based on this particular 
RewriteStrategy's algorithm.
    * This will most likely be Action framework specific (Spark/Presto/Flink 
....).
    *
+   * @param groupID an identifier for this set of files
    * @param filesToRewrite a group of files to be rewritten together
-   * @return a list of newly written files
+   * @return a set of newly written files
    */
-  List<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite);
+  Set<DataFile> rewriteFiles(String groupID, List<FileScanTask> 
filesToRewrite);
 }
diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java 
b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
index b42001c..bd15068 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.actions;
 
+import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,18 +29,19 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
 import org.apache.iceberg.util.BinPacking;
 import org.apache.iceberg.util.BinPacking.ListPacker;
 import org.apache.iceberg.util.PropertyUtil;
 
 /**
  * A rewrite strategy for data files which determines which files to rewrite
- * based on their size. If files are either smaller than the {@link 
MIN_FILE_SIZE_BYTES} threshold or
- * larger than the {@link MAX_FILE_SIZE_BYTES} threshold, they are considered 
targets for being rewritten.
+ * based on their size. If files are either smaller than the {@link 
#MIN_FILE_SIZE_BYTES} threshold or
+ * larger than the {@link #MAX_FILE_SIZE_BYTES} threshold, they are considered 
targets for being rewritten.
  * <p>
  * Once selected files are grouped based on a {@link BinPacking} into groups 
defined
  * by {@link RewriteDataFiles#MAX_FILE_GROUP_SIZE_BYTES}. Groups will be 
considered for rewriting if they contain
- * more files than {@link MIN_INPUT_FILES} or would produce at least one file 
of
+ * more files than {@link #MIN_INPUT_FILES} or would produce at least one file 
of
  * {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
  */
 abstract class BinPackStrategy implements RewriteStrategy {
@@ -55,8 +57,8 @@ abstract class BinPackStrategy implements RewriteStrategy {
 
   /**
    * Adjusts files which will be considered for rewriting. Files smaller than
-   * {@link MIN_FILE_SIZE_BYTES} will be considered for rewriting. This 
functions independently
-   * of {@link MAX_FILE_SIZE_BYTES}.
+   * {@link #MIN_FILE_SIZE_BYTES} will be considered for rewriting. This 
functions independently
+   * of {@link #MAX_FILE_SIZE_BYTES}.
    * <p>
    * Defaults to 75% of the target file size
    */
@@ -65,8 +67,8 @@ abstract class BinPackStrategy implements RewriteStrategy {
 
   /**
    * Adjusts files which will be considered for rewriting. Files larger than
-   * {@link MAX_FILE_SIZE_BYTES} will be considered for rewriting. This 
functions independently
-   * of {@link MIN_FILE_SIZE_BYTES}.
+   * {@link #MAX_FILE_SIZE_BYTES} will be considered for rewriting. This 
functions independently
+   * of {@link #MIN_FILE_SIZE_BYTES}.
    * <p>
    * Defaults to 180% of the target file size
    */
@@ -137,6 +139,78 @@ abstract class BinPackStrategy implements RewriteStrategy {
     ).collect(Collectors.toList());
   }
 
+  protected long targetFileSize() {
+    return this.targetFileSize;
+  }
+
+  /**
+   * Determine how many output files to create when rewriting. We use this to 
determine the split-size
+   * we want to use when actually writing files to avoid the following 
situation.
+   * <p>
+   * If we are writing 10.1 G of data with a target file size of 1G we would 
end up with
+   * 11 files, one of which would only have 0.1g. This would most likely be 
less preferable to
+   * 10 files each of which was 1.01g. So here we decide whether to round up 
or round down
+   * based on what the estimated average file size will be if we ignore the 
remainder (0.1g). If
+   * the new file size is less than 10% greater than the target file size then 
we will round down
+   * when determining the number of output files.
+   * @param totalSizeInBytes total data size for a file group
+   * @return the number of files this strategy should create
+   */
+  protected long numOutputFiles(long totalSizeInBytes) {
+    if (totalSizeInBytes < targetFileSize) {
+      return 1;
+    }
+
+    long fileCountWithRemainder = LongMath.divide(totalSizeInBytes, 
targetFileSize, RoundingMode.CEILING);
+    if (LongMath.mod(totalSizeInBytes, targetFileSize) > minFileSize) {
+      // Our Remainder file is of valid size for this compaction so keep it
+      return fileCountWithRemainder;
+    }
+
+    long fileCountWithoutRemainder = LongMath.divide(totalSizeInBytes, 
targetFileSize, RoundingMode.FLOOR);
+    long avgFileSizeWithoutRemainder = totalSizeInBytes / 
fileCountWithoutRemainder;
+    if (avgFileSizeWithoutRemainder < 1.1 * targetFileSize) {
+      // Round down and distribute remainder amongst other files
+      return fileCountWithoutRemainder;
+    } else {
+      // Keep the remainder file
+      return fileCountWithRemainder;
+    }
+  }
+
+  /**
+   * Returns the smaller of our max write file threshold, and our estimated 
split size based on
+   * the number of output files we want to generate.
+   */
+  protected long splitSize(long totalSizeInBytes) {
+    long estimatedSplitSize = totalSizeInBytes / 
numOutputFiles(totalSizeInBytes);
+    return Math.min(estimatedSplitSize, writeMaxFileSize());
+  }
+
+  protected static long inputFileSize(List<FileScanTask> fileToRewrite) {
+    return fileToRewrite.stream().mapToLong(FileScanTask::length).sum();
+  }
+
+
+  /**
+   * Estimates a larger max target file size than our target size used in task 
creation to avoid
+   * tasks which are predicted to have a certain size, but exceed that target 
size when serde is complete creating
+   * tiny remainder files.
+   * <p>
+   * While we create tasks that should all be smaller than our target size 
there is a chance that the actual
+   * data will end up being larger than our target size due to various factors 
of compression, serialization and
+   * other factors outside our control. If this occurs, instead of making a 
single file that is close in size to
+   * our target we would end up producing one file of the target size, and 
then a small extra file with the remaining
+   * data. For example, if our target is 512 MB we may generate a rewrite task 
that should be 500 MB. When we write
+   * the data we may find we actually have to write out 530 MB. If we use the 
target size while writing we would
+   * produced a 512 MB file and a 18 MB file. If instead we use a larger size 
estimated by this method,
+   * then we end up writing a single file.
+   * @return the target size plus one half of the distance between max and 
target
+   */
+  protected long writeMaxFileSize() {
+    return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5));
+  }
+
   private long sizeOfInputFiles(List<FileScanTask> group) {
     return group.stream().mapToLong(FileScanTask::length).sum();
   }
diff --git 
a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java 
b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
index b42496f..a84a10f 100644
--- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
+++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.actions;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -59,7 +60,7 @@ public class TestBinPackStrategy extends TableTestBase {
     }
 
     @Override
-    public List<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    public Set<DataFile> rewriteFiles(String groupId, List<FileScanTask> 
filesToRewrite) {
       throw new UnsupportedOperationException();
     }
   }
@@ -164,6 +165,24 @@ public class TestBinPackStrategy extends TableTestBase {
   }
 
   @Test
+  public void testNumOuputFiles() {
+    BinPackStrategy strategy = (BinPackStrategy) defaultBinPack();
+    long targetFileSize = strategy.targetFileSize();
+    Assert.assertEquals("Should keep remainder if the remainder is a valid 
size",
+        2, strategy.numOutputFiles(targetFileSize + 450 * MB));
+    Assert.assertEquals("Should discard remainder file if the remainder is 
very small",
+        1, strategy.numOutputFiles(targetFileSize + 40 * MB));
+    Assert.assertEquals("Should keep remainder file if it would change average 
file size greatly",
+        2, strategy.numOutputFiles((long) (targetFileSize + 0.40 * 
targetFileSize)));
+    Assert.assertEquals("Should discard remainder if file is small and 
wouldn't change average that much",
+        200, strategy.numOutputFiles(200 * targetFileSize + 13 * MB));
+    Assert.assertEquals("Should keep remainder if it's a valid size",
+        201, strategy.numOutputFiles(200 * targetFileSize + 499 * MB));
+    Assert.assertEquals("Should not return 0 even for very small files",
+        1, strategy.numOutputFiles(1));
+  }
+
+  @Test
   public void testInvalidOptions() {
     AssertHelpers.assertThrows("Should not allow max size smaller than target",
         IllegalArgumentException.class, () -> {
@@ -188,6 +207,5 @@ public class TestBinPackStrategy extends TableTestBase {
           defaultBinPack().options(ImmutableMap.of(
               RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(-5)));
         });
-
   }
 }

Reply via email to