This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 9e625175 [hotfix] ALTER TABLE COMPACT should also use restored max
level
9e625175 is described below
commit 9e625175b8041f30da58fd56a6f5a6f78b44cc62
Author: Jane Chan <[email protected]>
AuthorDate: Mon Jun 13 17:42:40 2022 +0800
[hotfix] ALTER TABLE COMPACT should also use restored max level
This closes #154
---
.../store/connector/AlterTableCompactITCase.java | 41 +++++++++++++----
.../store/connector/ForceCompactionITCase.java | 53 ++++++++++------------
.../store/file/operation/FileStoreWriteImpl.java | 3 +-
3 files changed, 59 insertions(+), 38 deletions(-)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 41e1dbeb..0e4d8358 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -32,9 +32,9 @@ import org.apache.flink.types.RowKind;
import org.junit.Test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -75,28 +75,52 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
}
@Test
- public void testNonPartitioned() throws IOException {
+ public void testNonPartitioned() {
generator = new TestKeyValueGenerator(NON_PARTITIONED);
Random random = new Random();
innerTest("T0", random.nextInt(10) + 1, NON_PARTITIONED);
}
@Test
- public void testSinglePartitioned() throws IOException {
+ public void testSinglePartitioned() {
generator = new TestKeyValueGenerator(SINGLE_PARTITIONED);
Random random = new Random();
innerTest("T1", random.nextInt(10) + 1, SINGLE_PARTITIONED);
}
@Test
- public void testMultiPartitioned() throws IOException {
+ public void testMultiPartitioned() {
generator = new TestKeyValueGenerator(MULTI_PARTITIONED);
Random random = new Random();
innerTest("T2", random.nextInt(10) + 1, MULTI_PARTITIONED);
}
- private void innerTest(String tableName, int batchNum,
TestKeyValueGenerator.GeneratorMode mode)
- throws IOException {
+ @Test
+ public void testChangeNumOfSortedRunTrigger() {
+ // force auto-compaction and increase trigger
+ batchSql("ALTER TABLE T0 SET ('commit.force-compact' = 'true')");
+ batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' =
'5')");
+
+ // write duplicates
+ batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
+ batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
+ batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
+ batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
+ batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
+ Snapshot snapshot = findLatestSnapshot("T0");
+ assertThat(snapshot.id()).isEqualTo(6);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+ // decrease trigger
+ batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' =
'1')");
+ batchSql("ALTER TABLE T0 COMPACT");
+ assertThat(findLatestSnapshot("T0"))
+ .usingComparator(Comparator.comparing(Snapshot::id))
+ .isEqualTo(snapshot);
+ }
+
+ private void innerTest(
+ String tableName, int batchNum,
TestKeyValueGenerator.GeneratorMode mode) {
// increase trigger to avoid auto-compaction
batchSql(
String.format(
@@ -171,8 +195,7 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
String compactQuery,
String selectQuery,
long latestSnapshot,
- List<Row> expectedData)
- throws IOException {
+ List<Row> expectedData) {
batchSql(compactQuery);
Snapshot snapshot = findLatestSnapshot(tableName);
assertThat(snapshot.id()).isEqualTo(latestSnapshot + 1);
@@ -284,7 +307,7 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
tableName)));
}
- private Snapshot findLatestSnapshot(String tableName) throws IOException {
+ private Snapshot findLatestSnapshot(String tableName) {
SnapshotManager snapshotManager = new
SnapshotManager(getTableDirectory(tableName));
return snapshotManager.snapshot(snapshotManager.latestSnapshotId());
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index d0fb504a..c2a0cd39 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -81,34 +81,31 @@ public class ForceCompactionITCase extends
FileStoreTableITCase {
}
@Test
- public void testNoDefaultNumOfLevels() throws Exception {
- bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' =
'true')");
- bEnv.executeSql(
- "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is
Coming'),"
- + "(2, 'Winter', 'The First Snowflake'), "
- + "(2, 'Spring', 'The First Rose in Spring'), "
- + "(7, 'Summer', 'Summertime Sadness')")
- .await();
- bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last
Christmas')").await();
- bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is
Coming')").await();
- bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn',
'Refrain')").await();
- bEnv.executeSql(
- "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon
Sugar'), "
- + "(4, 'Spring', 'Spring Water')")
- .await();
- bEnv.executeSql(
- "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
- + "(9, 'Autumn', 'Wake Me Up When September
Ends')")
- .await();
- bEnv.executeSql(
- "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
- + "(9, 'Autumn', 'Wake Me Up When September
Ends')")
- .await();
- bEnv.executeSql("ALTER TABLE T1 SET
('num-sorted-run.compaction-trigger' = '2')");
- bEnv.executeSql(
- "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
- + "(9, 'Autumn', 'Wake Me Up When September
Ends')")
- .await();
+ public void testNoDefaultNumOfLevels() {
+ batchSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
+ batchSql(
+ "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming'),"
+ + "(2, 'Winter', 'The First Snowflake'), "
+ + "(2, 'Spring', 'The First Rose in Spring'), "
+ + "(7, 'Summer', 'Summertime Sadness')");
+ batchSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')");
+ batchSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')");
+ batchSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')");
+ batchSql(
+ "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
+ + "(4, 'Spring', 'Spring Water')");
+ batchSql(
+ "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
+ + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+
+ batchSql(
+ "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+ + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+
+ batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' =
'2')");
+ batchSql(
+ "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
+ + "(9, 'Autumn', 'Wake Me Up When September Ends')");
assertThat(batchSql("SELECT * FROM T1")).hasSize(15);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 447f06b6..fba8c330 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -148,8 +148,9 @@ public class FileStoreWriteImpl implements FileStoreWrite {
int bucket,
ExecutorService compactExecutor,
List<DataFileMeta> restoredFiles) {
+ Levels levels = new Levels(keyComparatorSupplier.get(), restoredFiles,
options.numLevels);
return new CompactWriter(
- CompactUnit.fromFiles(options.numLevels - 1, restoredFiles),
+ CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1,
levels.levelSortedRuns()),
createCompactManager(
partition,
bucket,