This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b751349f [core] Support branch function for system tables (#3815)
3b751349f is described below

commit 3b751349f1ca8930d9cb1b5eed9a70115054049c
Author: HeavenZH <[email protected]>
AuthorDate: Fri Aug 16 14:53:48 2024 +0800

    [core] Support branch function for system tables (#3815)
    
    This closes #3815.
    
    ---------
    
    Co-authored-by: tsreaper <[email protected]>
---
 .../table/system/AggregationFieldsTable.java       |  16 ++-
 .../apache/paimon/table/system/ConsumersTable.java |  16 ++-
 .../apache/paimon/table/system/OptionsTable.java   |  22 +--
 .../apache/paimon/table/system/SchemasTable.java   |  16 ++-
 .../apache/paimon/table/system/SnapshotsTable.java |  19 ++-
 .../org/apache/paimon/table/system/TagsTable.java  |  16 ++-
 .../table/system/AggregationFieldsTableTest.java   |  52 ++++++-
 .../paimon/table/system/AuditLogTableTest.java     |   4 +-
 .../table/system/CatalogOptionsTableTest.java      |   4 +-
 .../paimon/table/system/ConsumersTableTest.java    |   4 +-
 .../apache/paimon/table/system/FilesTableTest.java |   6 +-
 .../paimon/table/system/ManifestsTableTest.java    |   6 +-
 .../paimon/table/system/OptionsTableTest.java      |  50 ++++++-
 .../paimon/table/system/SchemasTableTest.java      |   4 +-
 .../paimon/table/system/SnapshotsTableTest.java    |   4 +-
 .../apache/paimon/table/system/TagsTableTest.java  |   4 +-
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 157 +++++++++++++++++++++
 17 files changed, 344 insertions(+), 56 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 48d02c421..f5f4d401f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -77,14 +78,19 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
+    private final String branch;
 
     public AggregationFieldsTable(FileStoreTable dataTable) {
-        this(dataTable.fileIO(), dataTable.location());
+        this(
+                dataTable.fileIO(),
+                dataTable.location(),
+                CoreOptions.branch(dataTable.schema().options()));
     }
 
-    public AggregationFieldsTable(FileIO fileIO, Path location) {
+    public AggregationFieldsTable(FileIO fileIO, Path location, String 
branchName) {
         this.fileIO = fileIO;
         this.location = location;
+        this.branch = branchName;
     }
 
     @Override
@@ -114,7 +120,7 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new AggregationFieldsTable(fileIO, location);
+        return new AggregationFieldsTable(fileIO, location, branch);
     }
 
     private class SchemasScan extends ReadOnceTableScan {
@@ -160,7 +166,7 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
     }
 
     /** {@link TableRead} implementation for {@link AggregationFieldsTable}. */
-    private static class SchemasRead implements InnerTableRead {
+    private class SchemasRead implements InnerTableRead {
 
         private final FileIO fileIO;
         private int[][] projection;
@@ -191,7 +197,7 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((AggregationSplit) split).location;
-            TableSchema schemas = new SchemaManager(fileIO, 
location).latest().get();
+            TableSchema schemas = new SchemaManager(fileIO, location, 
branch).latest().get();
             Iterator<InternalRow> rows = createInternalRowIterator(schemas);
             if (projection != null) {
                 rows =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index e40f087b4..11db77303 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -71,14 +72,19 @@ public class ConsumersTable implements ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
+    private final String branch;
 
     public ConsumersTable(FileStoreTable dataTable) {
-        this(dataTable.fileIO(), dataTable.location());
+        this(
+                dataTable.fileIO(),
+                dataTable.location(),
+                CoreOptions.branch(dataTable.schema().options()));
     }
 
-    public ConsumersTable(FileIO fileIO, Path location) {
+    public ConsumersTable(FileIO fileIO, Path location, String branchName) {
         this.fileIO = fileIO;
         this.location = location;
+        this.branch = branchName;
     }
 
     @Override
@@ -108,7 +114,7 @@ public class ConsumersTable implements ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new ConsumersTable(fileIO, location);
+        return new ConsumersTable(fileIO, location, branch);
     }
 
     private class ConsumersScan extends ReadOnceTableScan {
@@ -154,7 +160,7 @@ public class ConsumersTable implements ReadonlyTable {
     }
 
     /** {@link TableRead} implementation for {@link ConsumersTable}. */
-    private static class ConsumersRead implements InnerTableRead {
+    private class ConsumersRead implements InnerTableRead {
 
         private final FileIO fileIO;
         private int[][] projection;
@@ -185,7 +191,7 @@ public class ConsumersTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((ConsumersTable.ConsumersSplit) split).location;
-            Map<String, Long> consumers = new ConsumerManager(fileIO, 
location).consumers();
+            Map<String, Long> consumers = new ConsumerManager(fileIO, 
location, branch).consumers();
             Iterator<InternalRow> rows =
                     Iterators.transform(consumers.entrySet().iterator(), 
this::toRow);
             if (projection != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index d6eda6959..0874a8104 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -57,7 +58,7 @@ import static 
org.apache.paimon.utils.SerializationUtils.newStringType;
 /** A {@link Table} for showing options of table. */
 public class OptionsTable implements ReadonlyTable {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     public static final String OPTIONS = "options";
 
@@ -69,14 +70,19 @@ public class OptionsTable implements ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
+    private final String branch;
 
     public OptionsTable(FileStoreTable dataTable) {
-        this(dataTable.fileIO(), dataTable.location());
+        this(
+                dataTable.fileIO(),
+                dataTable.location(),
+                CoreOptions.branch(dataTable.schema().options()));
     }
 
-    public OptionsTable(FileIO fileIO, Path location) {
+    public OptionsTable(FileIO fileIO, Path location, String branchName) {
         this.fileIO = fileIO;
         this.location = location;
+        this.branch = branchName;
     }
 
     @Override
@@ -106,7 +112,7 @@ public class OptionsTable implements ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new OptionsTable(fileIO, location);
+        return new OptionsTable(fileIO, location, branch);
     }
 
     private class OptionsScan extends ReadOnceTableScan {
@@ -150,7 +156,7 @@ public class OptionsTable implements ReadonlyTable {
         }
     }
 
-    private static class OptionsRead implements InnerTableRead {
+    private class OptionsRead implements InnerTableRead {
 
         private final FileIO fileIO;
         private int[][] projection;
@@ -183,7 +189,7 @@ public class OptionsTable implements ReadonlyTable {
             Path location = ((OptionsSplit) split).location;
             Iterator<InternalRow> rows =
                     Iterators.transform(
-                            options(fileIO, location).entrySet().iterator(), 
this::toRow);
+                            options(fileIO, location, 
branch).entrySet().iterator(), this::toRow);
             if (projection != null) {
                 rows =
                         Iterators.transform(
@@ -199,8 +205,8 @@ public class OptionsTable implements ReadonlyTable {
         }
     }
 
-    private static Map<String, String> options(FileIO fileIO, Path location) {
-        return new SchemaManager(fileIO, location)
+    private static Map<String, String> options(FileIO fileIO, Path location, 
String branchName) {
+        return new SchemaManager(fileIO, location, branchName)
                 .latest()
                 .orElseThrow(() -> new RuntimeException("Table not exists."))
                 .options();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index b5f2bf4d5..10313a1c7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -83,14 +84,19 @@ public class SchemasTable implements ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
+    private final String branch;
 
     public SchemasTable(FileStoreTable dataTable) {
-        this(dataTable.fileIO(), dataTable.location());
+        this(
+                dataTable.fileIO(),
+                dataTable.location(),
+                CoreOptions.branch(dataTable.schema().options()));
     }
 
-    public SchemasTable(FileIO fileIO, Path location) {
+    public SchemasTable(FileIO fileIO, Path location, String branchName) {
         this.fileIO = fileIO;
         this.location = location;
+        this.branch = branchName;
     }
 
     @Override
@@ -120,7 +126,7 @@ public class SchemasTable implements ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new SchemasTable(fileIO, location);
+        return new SchemasTable(fileIO, location, branch);
     }
 
     private class SchemasScan extends ReadOnceTableScan {
@@ -165,7 +171,7 @@ public class SchemasTable implements ReadonlyTable {
     }
 
     /** {@link TableRead} implementation for {@link SchemasTable}. */
-    private static class SchemasRead implements InnerTableRead {
+    private class SchemasRead implements InnerTableRead {
 
         private final FileIO fileIO;
         private int[][] projection;
@@ -197,7 +203,7 @@ public class SchemasTable implements ReadonlyTable {
             }
             Path location = ((SchemasSplit) split).location;
             Iterator<TableSchema> schemas =
-                    new SchemaManager(fileIO, location).listAll().iterator();
+                    new SchemaManager(fileIO, location, 
branch).listAll().iterator();
             Iterator<InternalRow> rows = Iterators.transform(schemas, 
this::toRow);
             if (projection != null) {
                 rows =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 90268bd77..50def71f1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -75,7 +76,6 @@ public class SnapshotsTable implements ReadonlyTable {
     private static final long serialVersionUID = 1L;
 
     public static final String SNAPSHOTS = "snapshots";
-
     public static final RowType TABLE_TYPE =
             new RowType(
                     Arrays.asList(
@@ -106,17 +106,24 @@ public class SnapshotsTable implements ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
+    private final String branch;
 
     private final FileStoreTable dataTable;
 
     public SnapshotsTable(FileStoreTable dataTable) {
-        this(dataTable.fileIO(), dataTable.location(), dataTable);
+        this(
+                dataTable.fileIO(),
+                dataTable.location(),
+                dataTable,
+                CoreOptions.branch(dataTable.schema().options()));
     }
 
-    public SnapshotsTable(FileIO fileIO, Path location, FileStoreTable 
dataTable) {
+    public SnapshotsTable(
+            FileIO fileIO, Path location, FileStoreTable dataTable, String 
branchName) {
         this.fileIO = fileIO;
         this.location = location;
         this.dataTable = dataTable;
+        this.branch = branchName;
     }
 
     @Override
@@ -146,7 +153,7 @@ public class SnapshotsTable implements ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new SnapshotsTable(fileIO, location, 
dataTable.copy(dynamicOptions));
+        return new SnapshotsTable(fileIO, location, 
dataTable.copy(dynamicOptions), branch);
     }
 
     private class SnapshotsScan extends ReadOnceTableScan {
@@ -191,7 +198,7 @@ public class SnapshotsTable implements ReadonlyTable {
         }
     }
 
-    private static class SnapshotsRead implements InnerTableRead {
+    private class SnapshotsRead implements InnerTableRead {
 
         private final FileIO fileIO;
         private int[][] projection;
@@ -259,7 +266,7 @@ public class SnapshotsTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             SnapshotManager snapshotManager =
-                    new SnapshotManager(fileIO, ((SnapshotsSplit) 
split).location);
+                    new SnapshotManager(fileIO, ((SnapshotsSplit) 
split).location, branch);
             Iterator<Snapshot> snapshots =
                     snapshotManager.snapshotsWithinRange(
                             optionalFilterSnapshotIdMax, 
optionalFilterSnapshotIdMin);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 9fc13c65d..d92876e4c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -82,14 +83,19 @@ public class TagsTable implements ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
+    private final String branch;
 
     public TagsTable(FileStoreTable dataTable) {
-        this(dataTable.fileIO(), dataTable.location());
+        this(
+                dataTable.fileIO(),
+                dataTable.location(),
+                CoreOptions.branch(dataTable.schema().options()));
     }
 
-    public TagsTable(FileIO fileIO, Path location) {
+    public TagsTable(FileIO fileIO, Path location, String branchName) {
         this.fileIO = fileIO;
         this.location = location;
+        this.branch = branchName;
     }
 
     @Override
@@ -119,7 +125,7 @@ public class TagsTable implements ReadonlyTable {
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new TagsTable(fileIO, location);
+        return new TagsTable(fileIO, location, branch);
     }
 
     private class TagsScan extends ReadOnceTableScan {
@@ -164,7 +170,7 @@ public class TagsTable implements ReadonlyTable {
         }
     }
 
-    private static class TagsRead implements InnerTableRead {
+    private class TagsRead implements InnerTableRead {
 
         private final FileIO fileIO;
         private int[][] projection;
@@ -196,7 +202,7 @@ public class TagsTable implements ReadonlyTable {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             Path location = ((TagsSplit) split).location;
-            List<Pair<Tag, String>> tags = new TagManager(fileIO, 
location).tagObjects();
+            List<Pair<Tag, String>> tags = new TagManager(fileIO, location, 
branch).tagObjects();
             Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
             for (Pair<Tag, String> tag : tags) {
                 nameToSnapshot.put(tag.getValue(), tag.getKey());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
index e256d5ce2..0cb6ef093 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AggregationFieldsTableTest.java
@@ -27,9 +27,12 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.BranchManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Multimap;
 
@@ -40,6 +43,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.catalog.Catalog.SYSTEM_BRANCH_PREFIX;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 import static 
org.apache.paimon.table.system.AggregationFieldsTable.extractFieldMultimap;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -76,12 +81,55 @@ public class AggregationFieldsTableTest extends 
TableTestBase {
 
     @Test
     public void testAggregationFieldsRecord() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> expectRow = getExpectedResult();
         List<InternalRow> result = read(aggregationFieldsTable);
         assertThat(result).containsExactlyElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() {
+    @Test
+    public void testBranchAggregationFieldsRecord() throws Exception {
+        FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier(tableName));
+        table.createBranch("b1");
+        // verify that branch file exist
+        BranchManager branchManager = table.branchManager();
+        assertThat(branchManager.branchExists("b1")).isTrue();
+
+        SchemaManager schemaManagerBranch = schemaManager.copyWithBranch("b1");
+        SchemaUtils.forceCommit(
+                schemaManagerBranch,
+                Schema.newBuilder()
+                        .column("product_id", DataTypes.INT())
+                        .column("price", DataTypes.INT())
+                        .column("sales", DataTypes.INT())
+                        .primaryKey("product_id")
+                        .option("merge-engine", "aggregation")
+                        .option("fields.price.aggregate-function", "sum")
+                        .option("fields.sales.aggregate-function", "sum")
+                        .option("fields.sales.ignore-retract", "false")
+                        .build());
+        AggregationFieldsTable branchAggregationFieldsTable =
+                (AggregationFieldsTable)
+                        catalog.getTable(
+                                identifier(
+                                        tableName
+                                                + SYSTEM_TABLE_SPLITTER
+                                                + SYSTEM_BRANCH_PREFIX
+                                                + "b1"
+                                                + "$aggregation_fields"));
+        List<InternalRow> expectRow = getExpectedResult();
+        List<InternalRow> result = read(aggregationFieldsTable);
+        assertThat(result).containsExactlyElementsOf(expectRow);
+
+        expectRow = getExpectedResult(schemaManagerBranch);
+        result = read(branchAggregationFieldsTable);
+        assertThat(result).containsExactlyElementsOf(expectRow);
+    }
+
+    private List<InternalRow> getExpectedResult() {
+        return getExpectedResult(schemaManager);
+    }
+
+    private List<InternalRow> getExpectedResult(SchemaManager schemaManager) {
         TableSchema schema = schemaManager.latest().get();
         Multimap<String, String> function =
                 extractFieldMultimap(schema.options(), Map.Entry::getValue);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
index 77d2cafe7..ebefa2af6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
@@ -85,12 +85,12 @@ public class AuditLogTableTest extends TableTestBase {
 
     @Test
     public void testReadAuditLogFromLatest() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> expectRow = getExpectedResult();
         List<InternalRow> result = read(auditLogTable);
         assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() {
+    private List<InternalRow> getExpectedResult() {
         List<InternalRow> expectedRow = new ArrayList<>();
         expectedRow.add(
                 
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
index 9ee1a63cb..b40277db3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/CatalogOptionsTableTest.java
@@ -65,12 +65,12 @@ public class CatalogOptionsTableTest extends TableTestBase {
 
     @Test
     public void testCatalogOptionsTable() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> expectRow = getExpectedResult();
         List<InternalRow> result = read(catalogOptionsTable);
         assertThat(result).containsExactlyElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() {
+    private List<InternalRow> getExpectedResult() {
         List<InternalRow> expectedRow = new ArrayList<>();
         for (Map.Entry<String, String> option : 
catalogOptions.toMap().entrySet()) {
             expectedRow.add(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
index 0183eb800..6412db9df 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ConsumersTableTest.java
@@ -70,12 +70,12 @@ public class ConsumersTableTest extends TableTestBase {
 
     @Test
     public void testPartitionRecordCount() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> expectRow = getExpectedResult();
         List<InternalRow> result = read(consumersTable);
         assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() throws IOException {
+    private List<InternalRow> getExpectedResult() throws IOException {
         Map<String, Long> consumers = manager.consumers();
         return consumers.entrySet().stream()
                 .map(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index e26e3ba2a..0d7a8b497 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -136,7 +136,7 @@ public class FilesTableTest extends TableTestBase {
 
     @Test
     public void testReadFilesFromLatest() throws Exception {
-        List<InternalRow> expectedRow = getExceptedResult(2L);
+        List<InternalRow> expectedRow = getExpectedResult(2L);
         List<InternalRow> result = read(filesTable);
         assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
     }
@@ -149,7 +149,7 @@ public class FilesTableTest extends TableTestBase {
 
     @Test
     public void testReadFilesFromSpecifiedSnapshot() throws Exception {
-        List<InternalRow> expectedRow = getExceptedResult(1L);
+        List<InternalRow> expectedRow = getExpectedResult(1L);
         filesTable =
                 (FilesTable)
                         filesTable.copy(
@@ -169,7 +169,7 @@ public class FilesTableTest extends TableTestBase {
                 .satisfies(anyCauseMatches(IllegalArgumentException.class));
     }
 
-    private List<InternalRow> getExceptedResult(long snapshotId) {
+    private List<InternalRow> getExpectedResult(long snapshotId) {
         if (!snapshotManager.snapshotExists(snapshotId)) {
             return Collections.emptyList();
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index 970732e83..ffd4716e7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -96,14 +96,14 @@ public class ManifestsTableTest extends TableTestBase {
 
     @Test
     public void testReadManifestsFromLatest() throws Exception {
-        List<InternalRow> expectedRow = getExceptedResult(2L);
+        List<InternalRow> expectedRow = getExpectedResult(2L);
         List<InternalRow> result = read(manifestsTable);
         assertThat(result).containsExactlyElementsOf(expectedRow);
     }
 
     @Test
     public void testReadManifestsFromSpecifiedSnapshot() throws Exception {
-        List<InternalRow> expectedRow = getExceptedResult(1L);
+        List<InternalRow> expectedRow = getExpectedResult(1L);
         manifestsTable =
                 (ManifestsTable)
                         manifestsTable.copy(
@@ -122,7 +122,7 @@ public class ManifestsTableTest extends TableTestBase {
         assertThat(result).isEmpty();
     }
 
-    private List<InternalRow> getExceptedResult(long snapshotId) {
+    private List<InternalRow> getExpectedResult(long snapshotId) {
         if (!snapshotManager.snapshotExists(snapshotId)) {
             return Collections.emptyList();
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
index 31b54e83e..ce7f3a9b7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/OptionsTableTest.java
@@ -28,16 +28,22 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.BranchManager;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.catalog.Catalog.SYSTEM_BRANCH_PREFIX;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for {@link OptionsTableTest}. */
@@ -70,12 +76,52 @@ public class OptionsTableTest extends TableTestBase {
 
     @Test
     public void testOptionsTable() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> expectRow = getExpectedResult();
         List<InternalRow> result = read(optionsTable);
         assertThat(result).containsExactlyElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() {
+    @Test
+    public void testBranchOptionsTable() throws Exception {
+        FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier(tableName));
+        table.createBranch("b1");
+        // verify that branch file exist
+        BranchManager branchManager = table.branchManager();
+        assertThat(branchManager.branchExists("b1")).isTrue();
+        SchemaManager schemaManagerBranch = schemaManager.copyWithBranch("b1");
+        Map<String, String> newOptions = new HashMap<>();
+        newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
+        SchemaUtils.forceCommit(
+                schemaManagerBranch,
+                Schema.newBuilder()
+                        .column("product_id", DataTypes.INT())
+                        .column("price", DataTypes.INT())
+                        .column("sales", DataTypes.INT())
+                        .primaryKey("product_id")
+                        .option(CoreOptions.MERGE_ENGINE.key(), "deduplicate")
+                        .option(CoreOptions.TAG_AUTOMATIC_CREATION.key(), 
"watermark")
+                        .option(CoreOptions.TAG_CREATION_PERIOD.key(), "daily")
+                        .option(CoreOptions.CONSUMER_ID.key(), "id0")
+                        .build());
+        OptionsTable branchOptionsTable =
+                (OptionsTable)
+                        catalog.getTable(
+                                identifier(
+                                        tableName
+                                                + SYSTEM_TABLE_SPLITTER
+                                                + SYSTEM_BRANCH_PREFIX
+                                                + "b1"
+                                                + "$options"));
+        List<InternalRow> expectRow = getExpectedResult(schemaManagerBranch);
+        List<InternalRow> result = read(branchOptionsTable);
+        assertThat(result).containsExactlyElementsOf(expectRow);
+    }
+
+    private List<InternalRow> getExpectedResult() {
+        return getExpectedResult(schemaManager);
+    }
+
+    private List<InternalRow> getExpectedResult(SchemaManager schemaManager) {
         Map<String, String> options =
                 schemaManager
                         .latest()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
index a10b2e1d7..c7c0ca5cf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
@@ -73,12 +73,12 @@ public class SchemasTableTest extends TableTestBase {
 
     @Test
     public void testSchemasTable() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> expectRow = getExpectedResult();
         List<InternalRow> result = read(schemasTable);
         assertThat(result).containsExactlyElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() {
+    private List<InternalRow> getExpectedResult() {
         List<TableSchema> tableSchemas = schemaManager.listAll();
 
         List<InternalRow> expectedRow = new ArrayList<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
index fa89d3fca..44381c124 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SnapshotsTableTest.java
@@ -90,12 +90,12 @@ public class SnapshotsTableTest extends TableTestBase {
 
     @Test
     public void testReadSnapshotsFromLatest() throws Exception {
-        List<InternalRow> expectedRow = getExceptedResult(new long[] {1, 2});
+        List<InternalRow> expectedRow = getExpectedResult(new long[] {1, 2});
         List<InternalRow> result = read(snapshotsTable);
         assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
     }
 
-    private List<InternalRow> getExceptedResult(long[] snapshotIds) {
+    private List<InternalRow> getExpectedResult(long[] snapshotIds) {
         List<InternalRow> expectedRow = new ArrayList<>();
         for (long snapshotId : snapshotIds) {
             Snapshot snapshot = snapshotManager.snapshot(snapshotId);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
index 99c38df01..8f8029cde 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java
@@ -83,12 +83,12 @@ class TagsTableTest extends TableTestBase {
 
     @Test
     void testTagsTable() throws Exception {
-        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> expectRow = getExpectedResult();
         List<InternalRow> result = read(tagsTable);
         assertThat(result).containsExactlyElementsOf(expectRow);
     }
 
-    private List<InternalRow> getExceptedResult() {
+    private List<InternalRow> getExpectedResult() {
         List<InternalRow> internalRows = new ArrayList<>();
         for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {
             Tag tag = snapshot.getKey();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index ba197abfa..d50f36ccd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -19,8 +19,10 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
@@ -346,6 +348,161 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                 .hasMessageContaining("Branch main and pk does not have the 
same row type");
     }
 
+    @Test
+    public void testBranchOptionsTable() throws Exception {
+        sql(
+                "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) 
NOT ENFORCED ) "
+                        + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");
+
+        sql("CALL sys.create_branch('default.t', 'test')");
+
+        sql("ALTER TABLE t SET ('snapshot.time-retained' = '5 h')");
+        sql("ALTER TABLE t$branch_test SET ('snapshot.time-retained' = '1 
h')");
+
+        assertThat(collectResult("SELECT * FROM t$options"))
+                .containsExactlyInAnyOrder(
+                        "+I[bucket, 2]",
+                        "+I[snapshot.time-retained, 5 h]",
+                        "+I[scan.infer-parallelism, false]");
+        assertThat(collectResult("SELECT * FROM t$branch_test$options"))
+                .containsExactlyInAnyOrder(
+                        "+I[bucket, 2]",
+                        "+I[snapshot.time-retained, 1 h]",
+                        "+I[scan.infer-parallelism, false]");
+    }
+
+    @Test
+    public void testBranchSchemasTable() throws Exception {
+        sql("CREATE TABLE t (a INT, b INT)");
+        sql("INSERT INTO t VALUES (1, 2)");
+        sql("CALL sys.create_branch('default.t', 'b1')");
+        assertThat(collectResult("SELECT schema_id FROM t$branch_b1$schemas 
order by schema_id"))
+                .containsExactlyInAnyOrder("+I[0]");
+
+        sql("ALTER TABLE t$branch_b1 SET ('snapshot.time-retained' = '5 h')");
+        assertThat(collectResult("SELECT schema_id FROM t$branch_b1$schemas 
order by schema_id"))
+                .containsExactlyInAnyOrder("+I[0]", "+I[1]");
+    }
+
+    @Test
+    public void testBranchAuditLogTable() throws Exception {
+        sql("CREATE TABLE t (a INT, b INT)");
+        sql("INSERT INTO t VALUES (1, 2)");
+        assertThat(collectResult("SELECT * FROM t$audit_log"))
+                .containsExactlyInAnyOrder("+I[+I, 1, 2]");
+
+        sql("CALL sys.create_branch('default.t', 'b1')");
+        sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+        assertThat(collectResult("SELECT * FROM t$branch_b1$audit_log"))
+                .containsExactlyInAnyOrder("+I[+I, 3, 4]");
+    }
+
+    @Test
+    public void testBranchReadOptimizedTable() throws Exception {
+        sql("CREATE TABLE t (a INT, b INT)");
+        sql("INSERT INTO t VALUES (1, 2)");
+        assertThat(collectResult("SELECT * FROM 
t$ro")).containsExactlyInAnyOrder("+I[1, 2]");
+
+        sql("CALL sys.create_branch('default.t', 'b1')");
+        sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+        assertThat(collectResult("SELECT * FROM t$branch_b1$ro"))
+                .containsExactlyInAnyOrder("+I[3, 4]");
+    }
+
+    @Test
+    public void testBranchFilesTable() throws Exception {
+        sql("CREATE TABLE t (a INT, b INT)");
+        sql("INSERT INTO t VALUES (1, 2)");
+
+        sql("CALL sys.create_branch('default.t', 'b1')");
+        sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+        sql("INSERT INTO t$branch_b1 VALUES (5, 6)");
+
+        assertThat(collectResult("SELECT min_value_stats FROM t$files"))
+                .containsExactlyInAnyOrder("+I[{a=1, b=2}]");
+        assertThat(collectResult("SELECT min_value_stats FROM 
t$branch_b1$files"))
+                .containsExactlyInAnyOrder("+I[{a=3, b=4}]", "+I[{a=5, b=6}]");
+    }
+
+    @Test
+    public void testBranchTagsTable() throws Exception {
+        sql("CREATE TABLE t (a INT, b INT)");
+        sql("INSERT INTO t VALUES (1, 2)");
+        paimonTable("t").createTag("tag1", 1);
+
+        sql("CALL sys.create_branch('default.t', 'b1','tag1')");
+        sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+        paimonTable("t$branch_b1").createTag("tag2", 2);
+
+        assertThat(collectResult("SELECT tag_name,snapshot_id,record_count 
FROM t$tags"))
+                .containsExactlyInAnyOrder("+I[tag1, 1, 1]");
+        assertThat(collectResult("SELECT tag_name,snapshot_id,record_count 
FROM t$branch_b1$tags"))
+                .containsExactlyInAnyOrder("+I[tag1, 1, 1]", "+I[tag2, 2, 2]");
+    }
+
+    @Test
+    public void testBranchConsumersTable() throws Exception {
+        sql("CREATE TABLE t (a INT, b INT)");
+        sql("INSERT INTO t VALUES (1, 2), (3,4)");
+
+        sql("CALL sys.create_branch('default.t', 'b1')");
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM t$branch_b1 /*+ 
OPTIONS('consumer-id'='id1','consumer.expiration-time'='3h') */"));
+        sql("INSERT INTO t$branch_b1 VALUES (5, 6), (7, 8)");
+        assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(5, 
6), Row.of(7, 8));
+        iterator.close();
+
+        assertThat(collectResult("SELECT * FROM t$consumers")).isEmpty();
+        assertThat(collectResult("SELECT * FROM t$branch_b1$consumers"))
+                .containsExactlyInAnyOrder("+I[id1, 2]");
+    }
+
+    @Test
+    public void testBranchManifestsTable() {
+        sql("CREATE TABLE t (a INT, b INT)");
+        sql("INSERT INTO t VALUES (1, 2)");
+
+        sql("CALL sys.create_branch('default.t', 'b1')");
+        sql("INSERT INTO t$branch_b1 VALUES (3, 4)");
+        sql("INSERT INTO t$branch_b1 VALUES (5, 6)");
+
+        List<Row> res = sql("SELECT schema_id, file_name, file_size FROM 
t$manifests");
+        assertThat(res).hasSize(1);
+
+        res = sql("SELECT schema_id, file_name, file_size FROM 
t$branch_b1$manifests");
+        assertThat(res).hasSize(2);
+        res.forEach(
+                row -> {
+                    assertThat((long) row.getField(0)).isEqualTo(0L);
+                    assertThat(StringUtils.startsWith((String) 
row.getField(1), "manifest"))
+                            .isTrue();
+                    assertThat((long) row.getField(2)).isGreaterThan(0L);
+                });
+    }
+
+    @Test
+    public void testBranchPartitionsTable() throws Exception {
+        sql("CREATE TABLE t (a INT, b INT,c STRING) PARTITIONED BY (a)");
+        assertThat(sql("SELECT * FROM t$partitions")).isEmpty();
+
+        sql("INSERT INTO t VALUES (1, 2, 'x')");
+        sql("INSERT INTO t VALUES (1, 4, 'S2'), (2, 2, 'S1'), (2, 2, 'S1')");
+        sql("INSERT INTO t VALUES (1, 4, 'S3'), (2, 2, 'S4')");
+
+        sql("CALL sys.create_branch('default.t', 'b1')");
+        sql("INSERT INTO t$branch_b1 VALUES (1, 4, 'S2'), (2, 2, 'S1'), (2, 2, 
'S5')");
+        sql("INSERT INTO t$branch_b1 VALUES (1, 4, 'S3'), (2, 2, 'S4')");
+
+        assertThat(collectResult("SELECT `partition`, record_count, file_count 
FROM t$partitions"))
+                .containsExactlyInAnyOrder("+I[[1], 3, 3]", "+I[[2], 3, 2]");
+        assertThat(
+                        collectResult(
+                                "SELECT `partition`, record_count, file_count 
FROM t$branch_b1$partitions"))
+                .containsExactlyInAnyOrder("+I[[1], 2, 2]", "+I[[2], 3, 2]");
+    }
+
     private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {


Reply via email to