This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2ae65ac API: Refactor RewriteDataFiles and BinPackStrategy (#2600)
2ae65ac is described below
commit 2ae65ac305631cc5b0543207baa9f8b5af804cd2
Author: Russell Spitzer <[email protected]>
AuthorDate: Mon May 17 18:35:20 2021 -0500
API: Refactor RewriteDataFiles and BinPackStrategy (#2600)
---
.../apache/iceberg/actions/RewriteDataFiles.java | 13 +++----
.../apache/iceberg/actions/BinPackStrategy.java | 40 ++++++----------------
.../iceberg/actions/TestBinPackStrategy.java | 35 +++++--------------
3 files changed, 24 insertions(+), 64 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index ffe4ca7..e959365 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -67,7 +67,7 @@ public interface RewriteDataFiles extends
SnapshotUpdate<RewriteDataFiles, Rewri
* contents of the group is determined by the rewrite strategy. Each file
group will be rewritten
* independently and asynchronously.
**/
- String MAX_CONCURRENT_FILE_GROUP_ACTIONS =
"max-concurrent-file-group-rewrites";
+ String MAX_CONCURRENT_FILE_GROUP_REWRITES =
"max-concurrent-file-group-rewrites";
int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
/**
@@ -82,16 +82,13 @@ public interface RewriteDataFiles extends
SnapshotUpdate<RewriteDataFiles, Rewri
*/
String OUTPUT_PARTITION_SPEC_ID = "output-partition-spec-id";
- enum Strategy {
- BINPACK
- }
-
/**
- * Sets the strategy to be used by this rewrite action
- * @param strategy the strategy to use
+ * Choose BINPACK as a strategy for this rewrite operation
* @return this for method chaining
*/
- RewriteDataFiles strategy(Strategy strategy);
+ default RewriteDataFiles binPack() {
+ return this;
+ }
/**
* A user provided filter for determining which files will be considered by
the rewrite strategy. This will be used
diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
index 965d433..b42001c 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
@@ -25,7 +25,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.actions.RewriteDataFiles.Strategy;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -40,25 +39,19 @@ import org.apache.iceberg.util.PropertyUtil;
* <p>
* Once selected files are grouped based on a {@link BinPacking} into groups
defined
* by {@link RewriteDataFiles#MAX_FILE_GROUP_SIZE_BYTES}. Groups will be
considered for rewriting if they contain
- * more files than {@link MIN_INPUT_FILES} and would produce more files than
{@link MIN_OUTPUT_FILES}.
+ * more files than {@link MIN_INPUT_FILES} or would produce at least one file
of
+ * {@link RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
*/
abstract class BinPackStrategy implements RewriteStrategy {
/**
- * Minimum number of files that need to be in a file group to be considered
- * for rewriting. This is considered in conjunction with {@link
MIN_OUTPUT_FILES}, both
- * conditions must pass to consider a group of files to be rewritten.
+ * The minimum number of files that need to be in a file group for it to be
considered for
+ * compaction if the total size of that group is less than the {@link
RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
+ * This can also be thought of as the maximum number of non-target-size
files that could remain in a file
+ * group (partition) after rewriting.
*/
public static final String MIN_INPUT_FILES = "min-input-files";
- public static final int MIN_INPUT_FILES_DEFAULT = 1;
-
- /**
- * Minimum number of files we want to be created by file group when being
- * rewritten. This is considered in conjunction with {@link
MIN_INPUT_FILES}, both
- * conditions must pass to consider a group of files to be rewritten.
- */
- public static final String MIN_OUTPUT_FILES = "min-output-files";
- public static final int MIN_OUTPUT_FILES_DEFAULT = 1;
+ public static final int MIN_INPUT_FILES_DEFAULT = 5;
/**
* Adjusts files which will be considered for rewriting. Files smaller than
@@ -81,7 +74,6 @@ abstract class BinPackStrategy implements RewriteStrategy {
public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80d;
private int minInputFiles;
- private int minOutputFiles;
private long minFileSize;
private long maxFileSize;
private long targetFileSize;
@@ -89,14 +81,13 @@ abstract class BinPackStrategy implements RewriteStrategy {
@Override
public String name() {
- return Strategy.BINPACK.name();
+ return "BINPACK";
}
@Override
public Set<String> validOptions() {
return ImmutableSet.of(
MIN_INPUT_FILES,
- MIN_OUTPUT_FILES,
MIN_FILE_SIZE_BYTES,
MAX_FILE_SIZE_BYTES
);
@@ -123,10 +114,6 @@ abstract class BinPackStrategy implements RewriteStrategy {
RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES,
RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES_DEFAULT);
- minOutputFiles = PropertyUtil.propertyAsInt(options,
- MIN_OUTPUT_FILES,
- MIN_OUTPUT_FILES_DEFAULT);
-
minInputFiles = PropertyUtil.propertyAsInt(options,
MIN_INPUT_FILES,
MIN_INPUT_FILES_DEFAULT);
@@ -146,13 +133,12 @@ abstract class BinPackStrategy implements RewriteStrategy
{
ListPacker<FileScanTask> packer = new
BinPacking.ListPacker<>(maxGroupSize, 1, false);
List<List<FileScanTask>> potentialGroups = packer.pack(dataFiles,
FileScanTask::length);
return potentialGroups.stream().filter(group ->
- numOutputFiles(group) >= minOutputFiles && group.size() >=
minInputFiles
+ group.size() >= minInputFiles || sizeOfInputFiles(group) > targetFileSize
).collect(Collectors.toList());
}
- private long numOutputFiles(List<FileScanTask> group) {
- long groupSize = group.stream().mapToLong(FileScanTask::length).sum();
- return (long) Math.ceil((double) groupSize / targetFileSize);
+ private long sizeOfInputFiles(List<FileScanTask> group) {
+ return group.stream().mapToLong(FileScanTask::length).sum();
}
private void validateOptions() {
@@ -175,9 +161,5 @@ abstract class BinPackStrategy implements RewriteStrategy {
Preconditions.checkArgument(minInputFiles > 0,
"Cannot set %s is less than 1. All values less than 1 have the same
effect as 1. %d < 1",
MIN_INPUT_FILES, minInputFiles);
-
- Preconditions.checkArgument(minOutputFiles > 0,
- "Cannot set %s to less than 1. All values less than 1 have the same
effect as 1. %d < 1",
- MIN_OUTPUT_FILES, minOutputFiles);
}
}
diff --git
a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
index 10002a3..b42496f 100644
--- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
+++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackStrategy.java
@@ -121,44 +121,31 @@ public class TestBinPackStrategy extends TableTestBase {
}
@Test
- public void testGroupingMinInputFilesValid() {
+ public void testGroupWithLargeFileMinInputFiles() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
BinPackStrategy.MIN_INPUT_FILES, Integer.toString(5)
));
- Iterable<FileScanTask> testFiles = filesOfSize(1, 1, 1, 1, 1);
+ Iterable<FileScanTask> testFiles = filesOfSize(2000);
Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);
- Assert.assertEquals("Should plan 1 groups since there are enough input
files",
+ Assert.assertEquals("Should plan 1 groups, not enough input files but the
input file exceeds our max" +
+ "and can be written into at least one new target-file-size files",
ImmutableList.of(testFiles), grouped);
}
@Test
- public void testGroupingMinOutputFilesInvalid() {
- RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
- BinPackStrategy.MIN_OUTPUT_FILES, Integer.toString(3)
- ));
-
- Iterable<FileScanTask> testFiles = filesOfSize(200, 200, 200, 200, 200);
-
- Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);
-
- Assert.assertEquals("Shouldn't plan any groups because we only would
produce 2 files and require 3",
- Collections.emptyList(), grouped);
- }
-
- @Test
- public void testGroupingMinOutputFilesValid() {
+ public void testGroupingMinInputFilesValid() {
RewriteStrategy strategy = defaultBinPack().options(ImmutableMap.of(
- BinPackStrategy.MIN_OUTPUT_FILES, Integer.toString(2)
+ BinPackStrategy.MIN_INPUT_FILES, Integer.toString(5)
));
- Iterable<FileScanTask> testFiles = filesOfSize(200, 200, 200, 200, 200);
+ Iterable<FileScanTask> testFiles = filesOfSize(1, 1, 1, 1, 1);
Iterable<List<FileScanTask>> grouped = strategy.planFileGroups(testFiles);
- Assert.assertEquals("Should plan 1 groups since there would be 2 output
files",
+ Assert.assertEquals("Should plan 1 groups since there are enough input
files",
ImmutableList.of(testFiles), grouped);
}
@@ -196,12 +183,6 @@ public class TestBinPackStrategy extends TableTestBase {
BinPackStrategy.MIN_INPUT_FILES, Long.toString(-5)));
});
- AssertHelpers.assertThrows("Should not allow min output size smaller than
target",
- IllegalArgumentException.class, () -> {
- defaultBinPack().options(ImmutableMap.of(
- BinPackStrategy.MIN_OUTPUT_FILES, Long.toString(-5)));
- });
-
AssertHelpers.assertThrows("Should not allow negative target size",
IllegalArgumentException.class, () -> {
defaultBinPack().options(ImmutableMap.of(