This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.1 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 686a17dd7a12606289000159b5a894b952e3247b Author: Jingsong Lee <[email protected]> AuthorDate: Fri Apr 1 18:27:23 2022 +0800 [FLINK-26971] UniversalCompaction should pick by size ratio after picking by file num This closes #70 --- .../file/mergetree/compact/UniversalCompaction.java | 20 ++++++++++++++++---- .../mergetree/compact/UniversalCompactionTest.java | 3 ++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java index a590085..d37c899 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java @@ -63,7 +63,8 @@ public class UniversalCompaction implements CompactStrategy { // 3 checking for file num if (runs.size() > maxRunNum) { // compacting for file num - return Optional.of(createUnit(runs, maxLevel, runs.size() - maxRunNum + 1)); + int candidateCount = runs.size() - maxRunNum + 1; + return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount)); } return Optional.empty(); @@ -97,10 +98,13 @@ public class UniversalCompaction implements CompactStrategy { return null; } - int candidateCount = 1; - long candidateSize = runs.get(0).run().totalSize(); + return pickForSizeRatio(maxLevel, runs, 1); + } - for (int i = 1; i < runs.size(); i++) { + private CompactUnit pickForSizeRatio( + int maxLevel, List<LevelSortedRun> runs, int candidateCount) { + long candidateSize = candidateSize(runs, candidateCount); + for (int i = candidateCount; i < runs.size(); i++) { LevelSortedRun next = runs.get(i); if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) { break; @@ -117,6 +121,14 @@ public class UniversalCompaction implements CompactStrategy { return null; } + private long candidateSize(List<LevelSortedRun> runs, int candidateCount) { + long size = 0; + for (int i = 0; i < candidateCount; i++) { + size += runs.get(i).run().totalSize(); + } + return size; + } + @VisibleForTesting static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) { int outputLevel; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java index c7678d7..14af8ad 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java @@ -64,7 +64,8 @@ public class UniversalCompactionTest { pick = compaction.pick(3, level0(1, 2, 3, 50)); assertThat(pick.isPresent()).isTrue(); results = pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray(); - assertThat(results).isEqualTo(new long[] {1, 2}); + // 3 should be in the candidate, by size ratio after picking by file num + assertThat(results).isEqualTo(new long[] {1, 2, 3}); } @Test
