This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6dbeff863 [test] Complement the ITCase for flink branchSql. (#3811)
6dbeff863 is described below
commit 6dbeff86394275c36ac82659e824f96ff33d6f7e
Author: HunterXHunter <[email protected]>
AuthorDate: Sun Jul 28 11:19:03 2024 +0800
[test] Complement the ITCase for flink branchSql. (#3811)
---
.../org/apache/paimon/flink/BranchSqlITCase.java | 306 ++++++++++++++++++---
1 file changed, 275 insertions(+), 31 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index be0a2e805..cddfaf936 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -18,55 +18,76 @@
package org.apache.paimon.flink;
-import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.branch.TableBranch;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
-import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for table with branches using SQL. */
-public class BranchSqlITCase extends AbstractTestBase {
-
- @TempDir java.nio.file.Path tempDir;
+public class BranchSqlITCase extends CatalogITCaseBase {
@Test
- public void testAlterTable() throws Exception {
- TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
- tEnv.executeSql(
- "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' =
'" + tempDir + "' )");
- tEnv.executeSql("USE CATALOG mycat");
- tEnv.executeSql(
- "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k)
NOT ENFORCED ) "
- + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");
-
- tEnv.executeSql(
- "INSERT INTO t VALUES (1, 10, 'apple'), (1, 20,
'banana'), (2, 10, 'cat'), (2, 20, 'dog')")
- .await();
- tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)");
- tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20,
'DOG'), (2, 30, 'horse')")
- .await();
-
- tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await();
- tEnv.executeSql(
- "INSERT INTO `t$branch_test` VALUES "
- + "(1, 10, 'cherry', 100), (2, 20, 'bird',
200), (2, 40, 'wolf', 400)")
- .await();
-
- assertThat(collectResult(tEnv, "SELECT * FROM t"))
+ public void testAlterBranchTable() throws Exception {
+
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ sql(
+ "INSERT INTO T VALUES"
+ + " (1, 10, 'apple'),"
+ + " (1, 20, 'banana'),"
+ + " (2, 10, 'cat'),"
+ + " (2, 20, 'dog')");
+
+ sql("CALL sys.create_branch('default.T', 'test', 1)");
+
+ FileStoreTable branchTable = paimonTable("T$branch_test");
+ assertThat(branchTable.schema().fields().size()).isEqualTo(3);
+
+ sql(
+ "INSERT INTO T VALUES"
+ + " (1, 10, 'APPLE'),"
+ + " (2, 20, 'DOG'),"
+ + " (2, 30, 'horse')");
+
+ // Add v2 column for branch table.
+ sql("ALTER TABLE `T$branch_test` ADD (v2 INT)");
+
+ branchTable = paimonTable("T$branch_test");
+ assertThat(branchTable.schema().fields().size()).isEqualTo(4);
+
+ sql(
+ "INSERT INTO `T$branch_test` VALUES "
+ + "(1, 10, 'cherry', 100)"
+ + ", (2, 20, 'bird', 200)"
+ + ", (2, 40, 'wolf', 400)");
+
+ assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder(
"+I[1, 10, APPLE]",
"+I[1, 20, banana]",
"+I[2, 30, horse]",
"+I[2, 10, cat]",
"+I[2, 20, DOG]");
- assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test"))
+
+ assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder(
"+I[1, 10, cherry, 100]",
"+I[1, 20, banana, null]",
@@ -75,7 +96,224 @@ public class BranchSqlITCase extends AbstractTestBase {
"+I[2, 40, wolf, 400]");
}
- private List<String> collectResult(TableEnvironment tEnv, String sql)
throws Exception {
+ @Test
+ public void testCreateBranchFromTag() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ // snapshot 1.
+ sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20,
'banana')");
+ // snapshot 2.
+ sql("INSERT INTO T VALUES" + " (2, 10, 'cat')," + " (2, 20, 'dog')");
+
+ sql("CALL sys.create_tag('default.T', 'tag1', 1)");
+ sql("CALL sys.create_tag('default.T', 'tag2', 2)");
+
+ sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
+ sql("CALL sys.create_branch('default.T', 'test2', 'tag2')");
+
+ FileStoreTable branchTable = paimonTable("T$branch_test");
+ assertThat(branchTable.tagManager().tagExists("tag1")).isEqualTo(true);
+
+ assertThat(collectResult("SELECT * FROM T$branch_test"))
+ .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20,
banana]");
+
+ FileStoreTable branchTable2 = paimonTable("T$branch_test2");
+
assertThat(branchTable2.tagManager().tagExists("tag2")).isEqualTo(true);
+
+ assertThat(collectResult("SELECT * FROM T$branch_test2"))
+ .containsExactlyInAnyOrder(
+ "+I[1, 10, apple]",
+ "+I[1, 20, banana]",
+ "+I[2, 10, cat]",
+ "+I[2, 20, dog]");
+ }
+
+ @Test
+ public void testCreateBranchFromSnapshot() throws
Catalog.TableNotExistException {
+
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ // snapshot 1.
+ sql("INSERT INTO T VALUES(1, 10, 'apple')");
+
+ // snapshot 2.
+ sql("INSERT INTO T VALUES(1, 20, 'dog')");
+
+ sql("CALL sys.create_branch('default.T', 'test', 1)");
+ sql("CALL sys.create_branch('default.T', 'test2', 2)");
+
+ FileStoreTable table = paimonTable("T");
+
+ assertThat(
+ table.branchManager().branches().stream()
+ .map(TableBranch::getCreatedFromSnapshot))
+ .containsExactlyInAnyOrder(1L, 2L);
+
+
assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1))
+ .isEqualTo(true);
+
+
assertThat(paimonTable("T$branch_test2").snapshotManager().snapshotExists(2))
+ .isEqualTo(true);
+ }
+
+ @Test
+ public void testCreateEmptyBranch() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ // snapshot 1.
+ sql("INSERT INTO T VALUES(1, 10, 'apple')");
+
+ // snapshot 2.
+ sql("INSERT INTO T VALUES(1, 20, 'dog')");
+
+ assertThat(collectResult("SELECT * FROM T"))
+ .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20,
dog]");
+
+ // create en empty branch.
+ sql("CALL sys.create_branch('default.T', 'empty_branch')");
+
+ sql("INSERT INTO `T$branch_empty_branch` VALUES (3, 30, 'banana')");
+
+ assertThat(collectResult("SELECT * FROM T$branch_empty_branch"))
+ .containsExactlyInAnyOrder("+I[3, 30, banana]");
+ }
+
+ @Test
+ public void testDeleteBranchTable() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ // snapshot 1.
+ sql("INSERT INTO T VALUES(1, 10, 'apple')");
+
+ // snapshot 2.
+ sql("INSERT INTO T VALUES(1, 20, 'dog')");
+
+ sql("CALL sys.create_branch('default.T', 'test', 1)");
+ sql("CALL sys.create_branch('default.T', 'test2', 2)");
+
+ FileStoreTable table = paimonTable("T");
+
+ assertThat(
+ table.branchManager().branches().stream()
+ .map(TableBranch::getCreatedFromSnapshot))
+ .containsExactlyInAnyOrder(1L, 2L);
+
+
assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
+ .containsExactlyInAnyOrder("test", "test2");
+
+ sql("CALL sys.delete_branch('default.T', 'test')");
+
+
assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
+ .containsExactlyInAnyOrder("test2");
+ }
+
+ @Test
+ public void testBranchManagerGetBranchSnapshotsList()
+ throws Catalog.TableNotExistException, IOException {
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ sql("INSERT INTO T VALUES (1, 10, 'hxh')");
+ sql("INSERT INTO T VALUES (1, 20, 'hxh')");
+ sql("INSERT INTO T VALUES (1, 30, 'hxh')");
+
+ FileStoreTable table = paimonTable("T");
+ checkSnapshots(table.snapshotManager(), 1, 3);
+
+ sql("CALL sys.create_branch('default.T', 'test1', 1)");
+ sql("CALL sys.create_branch('default.T', 'test2', 2)");
+ sql("CALL sys.create_branch('default.T', 'test3', 3)");
+
+ assertThat(
+ table.branchManager().branches().stream()
+ .map(TableBranch::getCreatedFromSnapshot))
+ .containsExactlyInAnyOrder(1L, 2L, 3L);
+ }
+
+ @Test
+ public void testBranchFastForward() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '2'"
+ + " )");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ sql("INSERT INTO T VALUES (1, 10, 'hunter')");
+ sql("INSERT INTO T VALUES (1, 20, 'hunter')");
+ sql("INSERT INTO T VALUES (1, 30, 'hunter')");
+
+ checkSnapshots(snapshotManager, 1, 3);
+
+ assertThat(collectResult("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ "+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30,
hunter]");
+
+ sql("CALL sys.create_branch('default.T', 'test', 1)");
+
+ sql("INSERT INTO `T$branch_test` VALUES (2, 10, 'hunterX')");
+
+ checkSnapshots(paimonTable("T$branch_test").snapshotManager(), 1, 2);
+
+ // query branch data.
+ assertThat(collectResult("SELECT * FROM T$branch_test"))
+ .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10,
hunterX]");
+
+ sql("CALL sys.fast_forward('default.T', 'test')");
+
+ // Branch `test` replaces the main branch.
+ assertThat(collectResult("SELECT * FROM T"))
+ .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10,
hunterX]");
+
+ checkSnapshots(snapshotManager, 1, 2);
+ }
+
+ private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
while (it.hasNext()) {
@@ -84,4 +322,10 @@ public class BranchSqlITCase extends AbstractTestBase {
}
return result;
}
+
+ private void checkSnapshots(SnapshotManager sm, int earliest, int latest)
throws IOException {
+ assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
+ assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
+ assertThat(sm.latestSnapshotId()).isEqualTo(latest);
+ }
}