This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new cd5817b [FLINK-26971] UniversalCompaction should pick by size ratio
after picking by file num
cd5817b is described below
commit cd5817bab1e41da2c7e32d9064f5e4575e0ceb0b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 1 18:27:23 2022 +0800
[FLINK-26971] UniversalCompaction should pick by size ratio after picking
by file num
This closes #70
---
.../file/mergetree/compact/UniversalCompaction.java | 20 ++++++++++++++++----
.../mergetree/compact/UniversalCompactionTest.java | 3 ++-
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index a590085..d37c899 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -63,7 +63,8 @@ public class UniversalCompaction implements CompactStrategy {
// 3 checking for file num
if (runs.size() > maxRunNum) {
// compacting for file num
- return Optional.of(createUnit(runs, maxLevel, runs.size() -
maxRunNum + 1));
+ int candidateCount = runs.size() - maxRunNum + 1;
+ return Optional.ofNullable(pickForSizeRatio(maxLevel, runs,
candidateCount));
}
return Optional.empty();
@@ -97,10 +98,13 @@ public class UniversalCompaction implements CompactStrategy
{
return null;
}
- int candidateCount = 1;
- long candidateSize = runs.get(0).run().totalSize();
+ return pickForSizeRatio(maxLevel, runs, 1);
+ }
- for (int i = 1; i < runs.size(); i++) {
+ private CompactUnit pickForSizeRatio(
+ int maxLevel, List<LevelSortedRun> runs, int candidateCount) {
+ long candidateSize = candidateSize(runs, candidateCount);
+ for (int i = candidateCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
if (candidateSize * (100.0 + sizeRatio) / 100.0 <
next.run().totalSize()) {
break;
@@ -117,6 +121,14 @@ public class UniversalCompaction implements
CompactStrategy {
return null;
}
+ private long candidateSize(List<LevelSortedRun> runs, int candidateCount) {
+ long size = 0;
+ for (int i = 0; i < candidateCount; i++) {
+ size += runs.get(i).run().totalSize();
+ }
+ return size;
+ }
+
@VisibleForTesting
static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int
runCount) {
int outputLevel;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index c7678d7..14af8ad 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -64,7 +64,8 @@ public class UniversalCompactionTest {
pick = compaction.pick(3, level0(1, 2, 3, 50));
assertThat(pick.isPresent()).isTrue();
results =
pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray();
- assertThat(results).isEqualTo(new long[] {1, 2});
+ // 3 should be in the candidate, by size ratio after picking by file
num
+ assertThat(results).isEqualTo(new long[] {1, 2, 3});
}
@Test