This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a163dc8 [FLINK-27705] Prevent num-sorted-run.compaction-trigger from
interfering num-levels
a163dc8 is described below
commit a163dc83c0c89fe62a7aecce0ead985710eec425
Author: Jane Chan <[email protected]>
AuthorDate: Mon May 23 15:54:46 2022 +0800
[FLINK-27705] Prevent num-sorted-run.compaction-trigger from interfering
num-levels
This closes #132
---
.../store/connector/ForceCompactionITCase.java | 44 ++++++++++++++++++++--
.../flink/table/store/file/mergetree/Levels.java | 10 ++++-
2 files changed, 49 insertions(+), 5 deletions(-)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index 0940e36..d0fb504 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.connector;
import org.junit.Test;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -30,12 +30,17 @@ public class ForceCompactionITCase extends
FileStoreTableITCase {
@Override
protected List<String> ddl() {
- return Collections.singletonList(
+ return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T (\n"
+ " f0 INT\n, "
+ " f1 STRING\n, "
+ " f2 STRING\n"
- + ") PARTITIONED BY (f1)");
+ + ") PARTITIONED BY (f1)",
+ "CREATE TABLE IF NOT EXISTS T1 (\n"
+ + " f0 INT\n, "
+ + " f1 STRING\n, "
+ + " f2 STRING\n"
+ + ")");
}
@Test
@@ -74,4 +79,37 @@ public class ForceCompactionITCase extends
FileStoreTableITCase {
assertThat(batchSql("SELECT * FROM T")).hasSize(21);
}
+
+ @Test
+ public void testNoDefaultNumOfLevels() throws Exception {
+ bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' =
'true')");
+ bEnv.executeSql(
+ "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is
Coming'),"
+ + "(2, 'Winter', 'The First Snowflake'), "
+ + "(2, 'Spring', 'The First Rose in Spring'), "
+ + "(7, 'Summer', 'Summertime Sadness')")
+ .await();
+ bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last
Christmas')").await();
+ bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is
Coming')").await();
+ bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn',
'Refrain')").await();
+ bEnv.executeSql(
+ "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon
Sugar'), "
+ + "(4, 'Spring', 'Spring Water')")
+ .await();
+ bEnv.executeSql(
+ "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
+ + "(9, 'Autumn', 'Wake Me Up When September
Ends')")
+ .await();
+ bEnv.executeSql(
+ "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+ + "(9, 'Autumn', 'Wake Me Up When September
Ends')")
+ .await();
+ bEnv.executeSql("ALTER TABLE T1 SET
('num-sorted-run.compaction-trigger' = '2')");
+ bEnv.executeSql(
+ "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+ + "(9, 'Autumn', 'Wake Me Up When September
Ends')")
+ .await();
+
+ assertThat(batchSql("SELECT * FROM T1")).hasSize(15);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
index 4169901..644a4ef 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
@@ -43,11 +43,17 @@ public class Levels {
public Levels(Comparator<RowData> keyComparator, List<DataFileMeta>
inputFiles, int numLevels) {
this.keyComparator = keyComparator;
- checkArgument(numLevels > 1, "levels must be at least 2.");
+
+ // in case the num of levels is not specified explicitly
+ int restoredMaxLevel =
+ Math.max(
+ numLevels,
+
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
+ checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
this.level0 =
new
TreeSet<>(Comparator.comparing(DataFileMeta::maxSequenceNumber).reversed());
this.levels = new ArrayList<>();
- for (int i = 1; i < numLevels; i++) {
+ for (int i = 1; i < restoredMaxLevel; i++) {
levels.add(SortedRun.empty());
}