This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e625175 [hotfix] ALTER TABLE COMPACT should also use restored max 
level
9e625175 is described below

commit 9e625175b8041f30da58fd56a6f5a6f78b44cc62
Author: Jane Chan <[email protected]>
AuthorDate: Mon Jun 13 17:42:40 2022 +0800

    [hotfix] ALTER TABLE COMPACT should also use restored max level
    
    This closes #154
---
 .../store/connector/AlterTableCompactITCase.java   | 41 +++++++++++++----
 .../store/connector/ForceCompactionITCase.java     | 53 ++++++++++------------
 .../store/file/operation/FileStoreWriteImpl.java   |  3 +-
 3 files changed, 59 insertions(+), 38 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 41e1dbeb..0e4d8358 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -32,9 +32,9 @@ import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -75,28 +75,52 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
     }
 
     @Test
-    public void testNonPartitioned() throws IOException {
+    public void testNonPartitioned() {
         generator = new TestKeyValueGenerator(NON_PARTITIONED);
         Random random = new Random();
         innerTest("T0", random.nextInt(10) + 1, NON_PARTITIONED);
     }
 
     @Test
-    public void testSinglePartitioned() throws IOException {
+    public void testSinglePartitioned() {
         generator = new TestKeyValueGenerator(SINGLE_PARTITIONED);
         Random random = new Random();
         innerTest("T1", random.nextInt(10) + 1, SINGLE_PARTITIONED);
     }
 
     @Test
-    public void testMultiPartitioned() throws IOException {
+    public void testMultiPartitioned() {
         generator = new TestKeyValueGenerator(MULTI_PARTITIONED);
         Random random = new Random();
         innerTest("T2", random.nextInt(10) + 1, MULTI_PARTITIONED);
     }
 
-    private void innerTest(String tableName, int batchNum, 
TestKeyValueGenerator.GeneratorMode mode)
-            throws IOException {
+    @Test
+    public void testChangeNumOfSortedRunTrigger() {
+        // force auto-compaction and increase trigger
+        batchSql("ALTER TABLE T0 SET ('commit.force-compact' = 'true')");
+        batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' = 
'5')");
+
+        // write duplicates
+        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
+        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
+        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
+        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
+        batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
+        Snapshot snapshot = findLatestSnapshot("T0");
+        assertThat(snapshot.id()).isEqualTo(6);
+        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+        // decrease trigger
+        batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' = 
'1')");
+        batchSql("ALTER TABLE T0 COMPACT");
+        assertThat(findLatestSnapshot("T0"))
+                .usingComparator(Comparator.comparing(Snapshot::id))
+                .isEqualTo(snapshot);
+    }
+
+    private void innerTest(
+            String tableName, int batchNum, 
TestKeyValueGenerator.GeneratorMode mode) {
         // increase trigger to avoid auto-compaction
         batchSql(
                 String.format(
@@ -171,8 +195,7 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
             String compactQuery,
             String selectQuery,
             long latestSnapshot,
-            List<Row> expectedData)
-            throws IOException {
+            List<Row> expectedData) {
         batchSql(compactQuery);
         Snapshot snapshot = findLatestSnapshot(tableName);
         assertThat(snapshot.id()).isEqualTo(latestSnapshot + 1);
@@ -284,7 +307,7 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
                                         tableName)));
     }
 
-    private Snapshot findLatestSnapshot(String tableName) throws IOException {
+    private Snapshot findLatestSnapshot(String tableName) {
         SnapshotManager snapshotManager = new 
SnapshotManager(getTableDirectory(tableName));
         return snapshotManager.snapshot(snapshotManager.latestSnapshotId());
     }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index d0fb504a..c2a0cd39 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -81,34 +81,31 @@ public class ForceCompactionITCase extends 
FileStoreTableITCase {
     }
 
     @Test
-    public void testNoDefaultNumOfLevels() throws Exception {
-        bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 
'true')");
-        bEnv.executeSql(
-                        "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is 
Coming'),"
-                                + "(2, 'Winter', 'The First Snowflake'), "
-                                + "(2, 'Spring', 'The First Rose in Spring'), "
-                                + "(7, 'Summer', 'Summertime Sadness')")
-                .await();
-        bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
Christmas')").await();
-        bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
Coming')").await();
-        bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
'Refrain')").await();
-        bEnv.executeSql(
-                        "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
Sugar'), "
-                                + "(4, 'Spring', 'Spring Water')")
-                .await();
-        bEnv.executeSql(
-                        "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
-                                + "(9, 'Autumn', 'Wake Me Up When September 
Ends')")
-                .await();
-        bEnv.executeSql(
-                        "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
-                                + "(9, 'Autumn', 'Wake Me Up When September 
Ends')")
-                .await();
-        bEnv.executeSql("ALTER TABLE T1 SET 
('num-sorted-run.compaction-trigger' = '2')");
-        bEnv.executeSql(
-                        "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
-                                + "(9, 'Autumn', 'Wake Me Up When September 
Ends')")
-                .await();
+    public void testNoDefaultNumOfLevels() {
+        batchSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
+        batchSql(
+                "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming'),"
+                        + "(2, 'Winter', 'The First Snowflake'), "
+                        + "(2, 'Spring', 'The First Rose in Spring'), "
+                        + "(7, 'Summer', 'Summertime Sadness')");
+        batchSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')");
+        batchSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')");
+        batchSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')");
+        batchSql(
+                "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
+                        + "(4, 'Spring', 'Spring Water')");
+        batchSql(
+                "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
+                        + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+
+        batchSql(
+                "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+                        + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+
+        batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = 
'2')");
+        batchSql(
+                "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+                        + "(9, 'Autumn', 'Wake Me Up When September Ends')");
 
         assertThat(batchSql("SELECT * FROM T1")).hasSize(15);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 447f06b6..fba8c330 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -148,8 +148,9 @@ public class FileStoreWriteImpl implements FileStoreWrite {
             int bucket,
             ExecutorService compactExecutor,
             List<DataFileMeta> restoredFiles) {
+        Levels levels = new Levels(keyComparatorSupplier.get(), restoredFiles, 
options.numLevels);
         return new CompactWriter(
-                CompactUnit.fromFiles(options.numLevels - 1, restoredFiles),
+                CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, 
levels.levelSortedRuns()),
                 createCompactManager(
                         partition,
                         bucket,

Reply via email to