This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dbeff863 [test] Complement the ITCase for flink branchSql. (#3811)
6dbeff863 is described below

commit 6dbeff86394275c36ac82659e824f96ff33d6f7e
Author: HunterXHunter <[email protected]>
AuthorDate: Sun Jul 28 11:19:03 2024 +0800

    [test] Complement the ITCase for flink branchSql. (#3811)
---
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 306 ++++++++++++++++++---
 1 file changed, 275 insertions(+), 31 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index be0a2e805..cddfaf936 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -18,55 +18,76 @@
 
 package org.apache.paimon.flink;
 
-import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.branch.TableBranch;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
 
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for table with branches using SQL. */
-public class BranchSqlITCase extends AbstractTestBase {
-
-    @TempDir java.nio.file.Path tempDir;
+public class BranchSqlITCase extends CatalogITCaseBase {
 
     @Test
-    public void testAlterTable() throws Exception {
-        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
-        tEnv.executeSql(
-                "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = 
'" + tempDir + "' )");
-        tEnv.executeSql("USE CATALOG mycat");
-        tEnv.executeSql(
-                "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) 
NOT ENFORCED ) "
-                        + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");
-
-        tEnv.executeSql(
-                        "INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 
'banana'), (2, 10, 'cat'), (2, 20, 'dog')")
-                .await();
-        tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)");
-        tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20, 
'DOG'), (2, 30, 'horse')")
-                .await();
-
-        tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await();
-        tEnv.executeSql(
-                        "INSERT INTO `t$branch_test` VALUES "
-                                + "(1, 10, 'cherry', 100), (2, 20, 'bird', 
200), (2, 40, 'wolf', 400)")
-                .await();
-
-        assertThat(collectResult(tEnv, "SELECT * FROM t"))
+    public void testAlterBranchTable() throws Exception {
+
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        sql(
+                "INSERT INTO T VALUES"
+                        + " (1, 10, 'apple'),"
+                        + " (1, 20, 'banana'),"
+                        + " (2, 10, 'cat'),"
+                        + " (2, 20, 'dog')");
+
+        sql("CALL sys.create_branch('default.T', 'test', 1)");
+
+        FileStoreTable branchTable = paimonTable("T$branch_test");
+        assertThat(branchTable.schema().fields().size()).isEqualTo(3);
+
+        sql(
+                "INSERT INTO T VALUES"
+                        + " (1, 10, 'APPLE'),"
+                        + " (2, 20, 'DOG'),"
+                        + " (2, 30, 'horse')");
+
+        // Add v2 column for branch table.
+        sql("ALTER TABLE `T$branch_test` ADD (v2 INT)");
+
+        branchTable = paimonTable("T$branch_test");
+        assertThat(branchTable.schema().fields().size()).isEqualTo(4);
+
+        sql(
+                "INSERT INTO `T$branch_test` VALUES "
+                        + "(1, 10, 'cherry', 100)"
+                        + ", (2, 20, 'bird', 200)"
+                        + ", (2, 40, 'wolf', 400)");
+
+        assertThat(collectResult("SELECT * FROM T"))
                 .containsExactlyInAnyOrder(
                         "+I[1, 10, APPLE]",
                         "+I[1, 20, banana]",
                         "+I[2, 30, horse]",
                         "+I[2, 10, cat]",
                         "+I[2, 20, DOG]");
-        assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test"))
+
+        assertThat(collectResult("SELECT * FROM T$branch_test"))
                 .containsExactlyInAnyOrder(
                         "+I[1, 10, cherry, 100]",
                         "+I[1, 20, banana, null]",
@@ -75,7 +96,224 @@ public class BranchSqlITCase extends AbstractTestBase {
                         "+I[2, 40, wolf, 400]");
     }
 
-    private List<String> collectResult(TableEnvironment tEnv, String sql) 
throws Exception {
+    @Test
+    public void testCreateBranchFromTag() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        // snapshot 1.
+        sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 
'banana')");
+        // snapshot 2.
+        sql("INSERT INTO T VALUES" + " (2, 10, 'cat')," + " (2, 20, 'dog')");
+
+        sql("CALL sys.create_tag('default.T', 'tag1', 1)");
+        sql("CALL sys.create_tag('default.T', 'tag2', 2)");
+
+        sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
+        sql("CALL sys.create_branch('default.T', 'test2', 'tag2')");
+
+        FileStoreTable branchTable = paimonTable("T$branch_test");
+        assertThat(branchTable.tagManager().tagExists("tag1")).isEqualTo(true);
+
+        assertThat(collectResult("SELECT * FROM T$branch_test"))
+                .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, 
banana]");
+
+        FileStoreTable branchTable2 = paimonTable("T$branch_test2");
+        
assertThat(branchTable2.tagManager().tagExists("tag2")).isEqualTo(true);
+
+        assertThat(collectResult("SELECT * FROM T$branch_test2"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 10, apple]",
+                        "+I[1, 20, banana]",
+                        "+I[2, 10, cat]",
+                        "+I[2, 20, dog]");
+    }
+
+    @Test
+    public void testCreateBranchFromSnapshot() throws 
Catalog.TableNotExistException {
+
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        // snapshot 1.
+        sql("INSERT INTO T VALUES(1, 10, 'apple')");
+
+        // snapshot 2.
+        sql("INSERT INTO T VALUES(1, 20, 'dog')");
+
+        sql("CALL sys.create_branch('default.T', 'test', 1)");
+        sql("CALL sys.create_branch('default.T', 'test2', 2)");
+
+        FileStoreTable table = paimonTable("T");
+
+        assertThat(
+                        table.branchManager().branches().stream()
+                                .map(TableBranch::getCreatedFromSnapshot))
+                .containsExactlyInAnyOrder(1L, 2L);
+
+        
assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1))
+                .isEqualTo(true);
+
+        
assertThat(paimonTable("T$branch_test2").snapshotManager().snapshotExists(2))
+                .isEqualTo(true);
+    }
+
+    @Test
+    public void testCreateEmptyBranch() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        // snapshot 1.
+        sql("INSERT INTO T VALUES(1, 10, 'apple')");
+
+        // snapshot 2.
+        sql("INSERT INTO T VALUES(1, 20, 'dog')");
+
+        assertThat(collectResult("SELECT * FROM T"))
+                .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, 
dog]");
+
+        // create en empty branch.
+        sql("CALL sys.create_branch('default.T', 'empty_branch')");
+
+        sql("INSERT INTO `T$branch_empty_branch` VALUES (3, 30, 'banana')");
+
+        assertThat(collectResult("SELECT * FROM T$branch_empty_branch"))
+                .containsExactlyInAnyOrder("+I[3, 30, banana]");
+    }
+
+    @Test
+    public void testDeleteBranchTable() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        // snapshot 1.
+        sql("INSERT INTO T VALUES(1, 10, 'apple')");
+
+        // snapshot 2.
+        sql("INSERT INTO T VALUES(1, 20, 'dog')");
+
+        sql("CALL sys.create_branch('default.T', 'test', 1)");
+        sql("CALL sys.create_branch('default.T', 'test2', 2)");
+
+        FileStoreTable table = paimonTable("T");
+
+        assertThat(
+                        table.branchManager().branches().stream()
+                                .map(TableBranch::getCreatedFromSnapshot))
+                .containsExactlyInAnyOrder(1L, 2L);
+
+        
assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
+                .containsExactlyInAnyOrder("test", "test2");
+
+        sql("CALL sys.delete_branch('default.T', 'test')");
+
+        
assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
+                .containsExactlyInAnyOrder("test2");
+    }
+
+    @Test
+    public void testBranchManagerGetBranchSnapshotsList()
+            throws Catalog.TableNotExistException, IOException {
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        sql("INSERT INTO T VALUES (1, 10, 'hxh')");
+        sql("INSERT INTO T VALUES (1, 20, 'hxh')");
+        sql("INSERT INTO T VALUES (1, 30, 'hxh')");
+
+        FileStoreTable table = paimonTable("T");
+        checkSnapshots(table.snapshotManager(), 1, 3);
+
+        sql("CALL sys.create_branch('default.T', 'test1', 1)");
+        sql("CALL sys.create_branch('default.T', 'test2', 2)");
+        sql("CALL sys.create_branch('default.T', 'test3', 3)");
+
+        assertThat(
+                        table.branchManager().branches().stream()
+                                .map(TableBranch::getCreatedFromSnapshot))
+                .containsExactlyInAnyOrder(1L, 2L, 3L);
+    }
+
+    @Test
+    public void testBranchFastForward() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '2'"
+                        + " )");
+
+        FileStoreTable table = paimonTable("T");
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        sql("INSERT INTO T VALUES (1, 10, 'hunter')");
+        sql("INSERT INTO T VALUES (1, 20, 'hunter')");
+        sql("INSERT INTO T VALUES (1, 30, 'hunter')");
+
+        checkSnapshots(snapshotManager, 1, 3);
+
+        assertThat(collectResult("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, 
hunter]");
+
+        sql("CALL sys.create_branch('default.T', 'test', 1)");
+
+        sql("INSERT INTO `T$branch_test` VALUES (2, 10, 'hunterX')");
+
+        checkSnapshots(paimonTable("T$branch_test").snapshotManager(), 1, 2);
+
+        // query branch data.
+        assertThat(collectResult("SELECT * FROM T$branch_test"))
+                .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, 
hunterX]");
+
+        sql("CALL sys.fast_forward('default.T', 'test')");
+
+        // Branch `test` replaces the main branch.
+        assertThat(collectResult("SELECT * FROM T"))
+                .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, 
hunterX]");
+
+        checkSnapshots(snapshotManager, 1, 2);
+    }
+
+    private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
             while (it.hasNext()) {
@@ -84,4 +322,10 @@ public class BranchSqlITCase extends AbstractTestBase {
         }
         return result;
     }
+
+    private void checkSnapshots(SnapshotManager sm, int earliest, int latest) 
throws IOException {
+        assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
+        assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
+        assertThat(sm.latestSnapshotId()).isEqualTo(latest);
+    }
 }

Reply via email to