This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ae65ac  API: Refactor RewriteDataFiles and BinPackStrategy (#2600)
2ae65ac is described below

commit 2ae65ac305631cc5b0543207baa9f8b5af804cd2
Author: Russell Spitzer <[email protected]>
AuthorDate: Mon May 17 18:35:20 2021 -0500

    API: Refactor RewriteDataFiles and BinPackStrategy (#2600)
---
 .../apache/iceberg/actions/RewriteDataFiles.java   | 13 +++----
 .../apache/iceberg/actions/BinPackStrategy.java    | 40 ++++++----------------
 .../iceberg/actions/TestBinPackStrategy.java       | 35 +++++--------------
 3 files changed, 24 insertions(+), 64 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java 
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index ffe4ca7..e959365 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -67,7 +67,7 @@ public interface RewriteDataFiles extends 
SnapshotUpdate<RewriteDataFiles, Rewri
    * contents of the group is determined by the rewrite strategy. Each file 
group will be rewritten
    * independently and asynchronously.
    **/
-  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = 
"max-concurrent-file-group-rewrites";
+  String MAX_CONCURRENT_FILE_GROUP_REWRITES = 
"max-concurrent-file-group-rewrites";
   int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
 
   /**
@@ -82,16 +82,13 @@ public interface RewriteDataFiles extends 
SnapshotUpdate<RewriteDataFiles, Rewri
    */
   String OUTPUT_PARTITION_SPEC_ID = "output-partition-spec-id";
 
-  enum Strategy {
-    BINPACK
-  }
-
   /**
-   * Sets the strategy to be used by this rewrite action
-   * @param strategy the strategy to use
+   * Choose BINPACK as a strategy for this rewrite operation
    * @return this for method chaining
    */
-  RewriteDataFiles strategy(Strategy strategy);
+  default RewriteDataFiles binPack() {
+    return this;
+  }
 
   /**
    * A user provided filter for determining which files will be considered by 
the rewrite strategy. This will be used
diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java 
b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
index 965d433..b42001c 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
@@ -25,7 +25,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.actions.RewriteDataFiles.Strategy;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -40,25 +39,19 @@ import org.apache.iceberg.util.PropertyUtil;
  * <p>
  * Once selected files are grouped based on a {@link BinPacking} into groups 
defined
  * by {@link RewriteDataFiles#MAX_FILE_GROUP_SIZE_BYTES}. Groups will be 
considered for rewriting if they contain
- * more files than {@link MIN_INPUT_FILES} and would produce more files than 
{@link MIN_OUTPUT_FILES}.
+ * more files than {@link MIN_INPUT_FILES} or would produce at least one file 
of
+ * {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
  */
 abstract class BinPackStrategy implements RewriteStrategy {
 
   /**
-   * Minimum number of files that need to be in a file group to be considered
-   * for rewriting. This is considered in conjunction with {@link 
MIN_OUTPUT_FILES}, both
-   * conditions must pass to consider a group of files to be rewritten.
+   * The minimum number of files that need to be in a file group for it to be 
considered for
+   * compaction if the total size of that group is less than the {@link 
RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
+   * This can also be thought of as the maximum number of non-target-size 
files that could remain in a file
+   * group (partition) after rewriting.
    */
   public static final String MIN_INPUT_FILES = "min-input-files";
-  public static final int MIN_INPUT_FILES_DEFAULT = 1;
-
-  /**
-   * Minimum number of files we want to be created by file group when being
-   * rewritten. This is considered in conjunction with {@link 
MIN_INPUT_FILES}, both
-   * conditions must pass to consider a group of files to be rewritten.
-   */
-  public static final String MIN_OUTPUT_FILES = "min-output-files";
-  public static final int MIN_OUTPUT_FILES_DEFAULT = 1;
+  public static final int MIN_INPUT_FILES_DEFAULT = 5;
 
   /**
    * Adjusts files which will be considered for rewriting. Files smaller than
@@ -81,7 +74,6 @@ abstract class BinPackStrategy implements RewriteStrategy {
   public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80d;
 
   private int minInputFiles;
-  private int minOutputFiles;
   private long minFileSize;
   private long maxFileSize;
   private long targetFileSize;
@@ -89,14 +81,13 @@ abstract class BinPackStrategy implements RewriteStrategy {
 
   @Override
   public String name() {
-    return Strategy.BINPACK.name();
+    return "BINPACK";
   }
 
   @Override
   public Set<String> validOptions() {
     return ImmutableSet.of(
         MIN_INPUT_FILES,
-        MIN_OUTPUT_FILES,
         MIN_FILE_SIZE_BYTES,
         MAX_FILE_SIZE_BYTES
     );
@@ -123,10 +114,6 @@ abstract class BinPackStrategy implements RewriteStrategy {
         RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES,
         RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES_DEFAULT);
 
-    minOutputFiles = PropertyUtil.propertyAsInt(options,
-        MIN_OUTPUT_FILES,
-        MIN_OUTPUT_FILES_DEFAULT);
-
     minInputFiles = PropertyUtil.propertyAsInt(options,
         MIN_INPUT_FILES,
         MIN_INPUT_FILES_DEFAULT);
@@ -146,13 +133,12 @@ abstract class BinPackStrategy implements RewriteStrategy 
{
     ListPacker<FileScanTask> packer = new 
BinPacking.ListPacker<>(maxGroupSize, 1, false);
     List<List<FileScanTask>> potentialGroups = packer.pack(dataFiles, 
FileScanTask::length);
     return potentialGroups.stream().filter(group ->
-        numOutputFiles(group) >= minOutputFiles && group.size() >= 
minInputFiles
+      group.size() >= minInputFiles || sizeOfInputFiles(group) > targetFileSize
     ).collect(Collectors.toList());
   }
 
-  private long numOutputFiles(List<FileScanTask> group) {
-    long groupSize = group.stream().mapToLong(FileScanTask::length).sum();
-    return (long) Math.ceil((double) groupSize / targetFileSize);
+  private long sizeOfInputFiles(List<FileScanTask> group) {
+    return group.stream().mapToLong(FileScanTask::length).sum();
   }
 
   private void validateOptions() {
@@ -175,9 +161,5 @@ abstract class BinPackStrategy implements RewriteStrategy {
     Preconditions.checkArgument(minInputFiles > 0,
         "Cannot set %s is less than 1. All values less than 1 have the same 
effect as 1. %d < 1",
         MIN_INPUT_FILES, minInputFiles);
-
-    Preconditions.checkArgument(minOutputFiles > 0,
-        "Cannot set %s to less than 1. All values less than 1 have the same 
effect as 1. %d < 1",
-        MIN_OUTPUT_FILES, minOutputFiles);
   }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java 
b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
index 10002a3..b42496f 100644
--- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
+++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
@@ -121,44 +121,31 @@ public class TestBinPackStrategy extends TableTestBase {
   }
 
   @Test
-  public void testGroupingMinInputFilesValid() {
+  public void testGroupWithLargeFileMinInputFiles() {
     RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
         BinPackStrategy.MIN_INPUT_FILES, Integer.toString(5)
     ));
 
-    Iterable<FileScanTask> testFiles = filesOfSize(1, 1, 1, 1, 1);
+    Iterable<FileScanTask> testFiles = filesOfSize(2000);
 
     Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);
 
-    Assert.assertEquals("Should plan 1 groups since there are enough input 
files",
+    Assert.assertEquals("Should plan 1 groups, not enough input files but the 
input file exceeds our max" +
+            "and can be written into at least one new target-file-size files",
         ImmutableList.of(testFiles), grouped);
   }
 
   @Test
-  public void testGroupingMinOutputFilesInvalid() {
-    RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
-        BinPackStrategy.MIN_OUTPUT_FILES, Integer.toString(3)
-    ));
-
-    Iterable<FileScanTask> testFiles = filesOfSize(200, 200, 200, 200, 200);
-
-    Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);
-
-    Assert.assertEquals("Shouldn't plan any groups because we only would 
produce 2 files and require 3",
-        Collections.emptyList(), grouped);
-  }
-
-  @Test
-  public void testGroupingMinOutputFilesValid() {
+  public void testGroupingMinInputFilesValid() {
     RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
-        BinPackStrategy.MIN_OUTPUT_FILES, Integer.toString(2)
+        BinPackStrategy.MIN_INPUT_FILES, Integer.toString(5)
     ));
 
-    Iterable<FileScanTask> testFiles = filesOfSize(200, 200, 200, 200, 200);
+    Iterable<FileScanTask> testFiles = filesOfSize(1, 1, 1, 1, 1);
 
     Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);
 
-    Assert.assertEquals("Should plan 1 groups since there would be 2 output 
files",
+    Assert.assertEquals("Should plan 1 groups since there are enough input 
files",
         ImmutableList.of(testFiles), grouped);
   }
 
@@ -196,12 +183,6 @@ public class TestBinPackStrategy extends TableTestBase {
               BinPackStrategy.MIN_INPUT_FILES, Long.toString(-5)));
         });
 
-    AssertHelpers.assertThrows("Should not allow min output size smaller than 
target",
-        IllegalArgumentException.class, () -> {
-          defaultBinPack().options(ImmutableMap.of(
-              BinPackStrategy.MIN_OUTPUT_FILES, Long.toString(-5)));
-        });
-
     AssertHelpers.assertThrows("Should not allow negative target size",
         IllegalArgumentException.class, () -> {
           defaultBinPack().options(ImmutableMap.of(

Reply via email to