This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a163dc8  [FLINK-27705] Prevent num-sorted-run.compaction-trigger from 
interfering num-levels
a163dc8 is described below

commit a163dc83c0c89fe62a7aecce0ead985710eec425
Author: Jane Chan <[email protected]>
AuthorDate: Mon May 23 15:54:46 2022 +0800

    [FLINK-27705] Prevent num-sorted-run.compaction-trigger from interfering 
num-levels
    
    This closes #132
---
 .../store/connector/ForceCompactionITCase.java     | 44 ++++++++++++++++++++--
 .../flink/table/store/file/mergetree/Levels.java   | 10 ++++-
 2 files changed, 49 insertions(+), 5 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index 0940e36..d0fb504 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.connector;
 
 import org.junit.Test;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -30,12 +30,17 @@ public class ForceCompactionITCase extends 
FileStoreTableITCase {
 
     @Override
     protected List<String> ddl() {
-        return Collections.singletonList(
+        return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS T (\n"
                         + "  f0 INT\n, "
                         + "  f1 STRING\n, "
                         + "  f2 STRING\n"
-                        + ") PARTITIONED BY (f1)");
+                        + ") PARTITIONED BY (f1)",
+                "CREATE TABLE IF NOT EXISTS T1 (\n"
+                        + "  f0 INT\n, "
+                        + "  f1 STRING\n, "
+                        + "  f2 STRING\n"
+                        + ")");
     }
 
     @Test
@@ -74,4 +79,37 @@ public class ForceCompactionITCase extends 
FileStoreTableITCase {
 
         assertThat(batchSql("SELECT * FROM T")).hasSize(21);
     }
+
+    @Test
+    public void testNoDefaultNumOfLevels() throws Exception {
+        bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 
'true')");
+        bEnv.executeSql(
+                        "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is 
Coming'),"
+                                + "(2, 'Winter', 'The First Snowflake'), "
+                                + "(2, 'Spring', 'The First Rose in Spring'), "
+                                + "(7, 'Summer', 'Summertime Sadness')")
+                .await();
+        bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
Christmas')").await();
+        bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
Coming')").await();
+        bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
'Refrain')").await();
+        bEnv.executeSql(
+                        "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
Sugar'), "
+                                + "(4, 'Spring', 'Spring Water')")
+                .await();
+        bEnv.executeSql(
+                        "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
+                                + "(9, 'Autumn', 'Wake Me Up When September 
Ends')")
+                .await();
+        bEnv.executeSql(
+                        "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+                                + "(9, 'Autumn', 'Wake Me Up When September 
Ends')")
+                .await();
+        bEnv.executeSql("ALTER TABLE T1 SET 
('num-sorted-run.compaction-trigger' = '2')");
+        bEnv.executeSql(
+                        "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+                                + "(9, 'Autumn', 'Wake Me Up When September 
Ends')")
+                .await();
+
+        assertThat(batchSql("SELECT * FROM T1")).hasSize(15);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
index 4169901..644a4ef 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
@@ -43,11 +43,17 @@ public class Levels {
 
     public Levels(Comparator<RowData> keyComparator, List<DataFileMeta> 
inputFiles, int numLevels) {
         this.keyComparator = keyComparator;
-        checkArgument(numLevels > 1, "levels must be at least 2.");
+
+        // in case the num of levels is not specified explicitly
+        int restoredMaxLevel =
+                Math.max(
+                        numLevels,
+                        
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
+        checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
         this.level0 =
                 new 
TreeSet<>(Comparator.comparing(DataFileMeta::maxSequenceNumber).reversed());
         this.levels = new ArrayList<>();
-        for (int i = 1; i < numLevels; i++) {
+        for (int i = 1; i < restoredMaxLevel; i++) {
             levels.add(SortedRun.empty());
         }
 

Reply via email to