This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 686a17dd7a12606289000159b5a894b952e3247b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 1 18:27:23 2022 +0800

    [FLINK-26971] UniversalCompaction should pick by size ratio after picking 
by file num
    
    This closes #70
---
 .../file/mergetree/compact/UniversalCompaction.java  | 20 ++++++++++++++++----
 .../mergetree/compact/UniversalCompactionTest.java   |  3 ++-
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index a590085..d37c899 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -63,7 +63,8 @@ public class UniversalCompaction implements CompactStrategy {
         // 3 checking for file num
         if (runs.size() > maxRunNum) {
             // compacting for file num
-            return Optional.of(createUnit(runs, maxLevel, runs.size() - 
maxRunNum + 1));
+            int candidateCount = runs.size() - maxRunNum + 1;
+            return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, 
candidateCount));
         }
 
         return Optional.empty();
@@ -97,10 +98,13 @@ public class UniversalCompaction implements CompactStrategy 
{
             return null;
         }
 
-        int candidateCount = 1;
-        long candidateSize = runs.get(0).run().totalSize();
+        return pickForSizeRatio(maxLevel, runs, 1);
+    }
 
-        for (int i = 1; i < runs.size(); i++) {
+    private CompactUnit pickForSizeRatio(
+            int maxLevel, List<LevelSortedRun> runs, int candidateCount) {
+        long candidateSize = candidateSize(runs, candidateCount);
+        for (int i = candidateCount; i < runs.size(); i++) {
             LevelSortedRun next = runs.get(i);
             if (candidateSize * (100.0 + sizeRatio) / 100.0 < 
next.run().totalSize()) {
                 break;
@@ -117,6 +121,14 @@ public class UniversalCompaction implements 
CompactStrategy {
         return null;
     }
 
+    private long candidateSize(List<LevelSortedRun> runs, int candidateCount) {
+        long size = 0;
+        for (int i = 0; i < candidateCount; i++) {
+            size += runs.get(i).run().totalSize();
+        }
+        return size;
+    }
+
     @VisibleForTesting
     static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int 
runCount) {
         int outputLevel;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index c7678d7..14af8ad 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -64,7 +64,8 @@ public class UniversalCompactionTest {
         pick = compaction.pick(3, level0(1, 2, 3, 50));
         assertThat(pick.isPresent()).isTrue();
         results = 
pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray();
-        assertThat(results).isEqualTo(new long[] {1, 2});
+        // 3 should be in the candidate, by size ratio after picking by file 
num
+        assertThat(results).isEqualTo(new long[] {1, 2, 3});
     }
 
     @Test

Reply via email to