This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9900c36e1 [core] Support using table branches with $branch_ and 
support different schemas for different branches (#3757)
9900c36e1 is described below

commit 9900c36e1cd50e5006a0170032cd80addad48bd2
Author: tsreaper <[email protected]>
AuthorDate: Tue Jul 16 14:12:51 2024 +0800

    [core] Support using table branches with $branch_ and support different 
schemas for different branches (#3757)
---
 docs/content/maintenance/manage-branches.md        |  4 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java | 64 ++++++++++++++--
 .../java/org/apache/paimon/catalog/Catalog.java    |  1 +
 .../apache/paimon/catalog/FileSystemCatalog.java   | 44 ++++++-----
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   | 16 +++-
 .../org/apache/paimon/schema/SchemaManager.java    | 22 +++---
 .../paimon/table/AbstractFileStoreTable.java       | 17 ++++-
 .../paimon/table/AppendOnlyFileStoreTable.java     |  5 --
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  5 --
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 87 ++++++++++++++++++++++
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 22 +++++-
 11 files changed, 239 insertions(+), 48 deletions(-)

diff --git a/docs/content/maintenance/manage-branches.md 
b/docs/content/maintenance/manage-branches.md
index ccd2402a1..0cb52b6f2 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -117,10 +117,10 @@ You can read or write with branch as below.
 
 ```sql
 -- read from branch 'branch1'
-SELECT * FROM t /*+ OPTIONS('branch' = 'branch1') */;
+SELECT * FROM `t$branch_branch1`;
 
 -- write to branch 'branch1'
-INSERT INTO t /*+ OPTIONS('branch' = 'branch1') */ SELECT ...
+INSERT INTO `t$branch_branch1` SELECT ...
 ```
 
 {{< /tab >}}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 0c6d19e28..45a900a5c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.StringUtils;
 
 import javax.annotation.Nullable;
@@ -57,6 +58,7 @@ import static 
org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Common implementation of {@link Catalog}. */
@@ -218,7 +220,9 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
             throws TableNotExistException {
+        checkNotBranch(identifier, "dropTable");
         checkNotSystemTable(identifier, "dropTable");
+
         if (!tableExists(identifier)) {
             if (ignoreIfNotExists) {
                 return;
@@ -234,6 +238,7 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public void createTable(Identifier identifier, Schema schema, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException {
+        checkNotBranch(identifier, "createTable");
         checkNotSystemTable(identifier, "createTable");
         validateIdentifierNameCaseInsensitive(identifier);
         validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
@@ -260,6 +265,8 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public void renameTable(Identifier fromTable, Identifier toTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, TableAlreadyExistException {
+        checkNotBranch(fromTable, "renameTable");
+        checkNotBranch(toTable, "renameTable");
         checkNotSystemTable(fromTable, "renameTable");
         checkNotSystemTable(toTable, "renameTable");
         validateIdentifierNameCaseInsensitive(toTable);
@@ -288,6 +295,14 @@ public abstract class AbstractCatalog implements Catalog {
         validateIdentifierNameCaseInsensitive(identifier);
         validateFieldNameCaseInsensitiveInSchemaChange(changes);
 
+        Optional<Pair<Identifier, String>> optionalBranchName =
+                getOriginalIdentifierAndBranch(identifier);
+        String branchName = DEFAULT_MAIN_BRANCH;
+        if (optionalBranchName.isPresent()) {
+            identifier = optionalBranchName.get().getLeft();
+            branchName = optionalBranchName.get().getRight();
+        }
+
         if (!tableExists(identifier)) {
             if (ignoreIfNotExists) {
                 return;
@@ -295,10 +310,11 @@ public abstract class AbstractCatalog implements Catalog {
             throw new TableNotExistException(identifier);
         }
 
-        alterTableImpl(identifier, changes);
+        alterTableImpl(identifier, branchName, changes);
     }
 
-    protected abstract void alterTableImpl(Identifier identifier, 
List<SchemaChange> changes)
+    protected abstract void alterTableImpl(
+            Identifier identifier, String branchName, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException;
 
     @Nullable
@@ -344,7 +360,15 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     private FileStoreTable getDataTable(Identifier identifier) throws 
TableNotExistException {
-        TableSchema tableSchema = getDataTableSchema(identifier);
+        Optional<Pair<Identifier, String>> optionalBranchName =
+                getOriginalIdentifierAndBranch(identifier);
+        String branch = DEFAULT_MAIN_BRANCH;
+        if (optionalBranchName.isPresent()) {
+            identifier = optionalBranchName.get().getLeft();
+            branch = optionalBranchName.get().getRight();
+        }
+
+        TableSchema tableSchema = getDataTableSchema(identifier, branch);
         return FileStoreTableFactory.create(
                 fileIO,
                 getDataTableLocation(identifier),
@@ -384,7 +408,7 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    protected abstract TableSchema getDataTableSchema(Identifier identifier)
+    protected abstract TableSchema getDataTableSchema(Identifier identifier, 
String branchName)
             throws TableNotExistException;
 
     @VisibleForTesting
@@ -392,8 +416,38 @@ public abstract class AbstractCatalog implements Catalog {
         return new Path(newDatabasePath(identifier.getDatabaseName()), 
identifier.getObjectName());
     }
 
+    private static Optional<Pair<Identifier, String>> 
getOriginalIdentifierAndBranch(
+            Identifier identifier) {
+        String tableName = identifier.getObjectName();
+        if (tableName.contains(BRANCH_PREFIX)) {
+            int idx = tableName.indexOf(BRANCH_PREFIX);
+            String branchName = tableName.substring(idx + 
BRANCH_PREFIX.length());
+            if (StringUtils.isNullOrWhitespaceOnly(branchName)) {
+                return Optional.empty();
+            } else {
+                return Optional.of(
+                        Pair.of(
+                                Identifier.create(
+                                        identifier.getDatabaseName(), 
tableName.substring(0, idx)),
+                                branchName));
+            }
+        }
+        return Optional.empty();
+    }
+
+    protected void checkNotBranch(Identifier identifier, String method) {
+        if (getOriginalIdentifierAndBranch(identifier).isPresent()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot '%s' for branch table '%s', "
+                                    + "please modify the table with the 
default branch.",
+                            method, identifier));
+        }
+    }
+
     private static boolean isSpecifiedSystemTable(Identifier identifier) {
-        return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER);
+        return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
+                && !getOriginalIdentifierAndBranch(identifier).isPresent();
     }
 
     protected boolean isSystemTable(Identifier identifier) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 9057b1c3d..b6221dac7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -43,6 +43,7 @@ public interface Catalog extends AutoCloseable {
 
     String DEFAULT_DATABASE = "default";
 
+    String BRANCH_PREFIX = "$branch_";
     String SYSTEM_TABLE_SPLITTER = "$";
     String SYSTEM_DATABASE_NAME = "sys";
     String COMMENT_PROP = "comment";
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index f0e5572e3..577e06836 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
@@ -36,6 +37,7 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 
 import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 
 /** A catalog implementation for {@link FileIO}. */
 public class FileSystemCatalog extends AbstractCatalog {
@@ -103,9 +105,20 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
-        return schemaManager(identifier)
+    public TableSchema getDataTableSchema(Identifier identifier, String 
branchName)
+            throws TableNotExistException {
+        return schemaManager(identifier, branchName)
                 .latest()
+                .map(
+                        s -> {
+                            if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+                                Options branchOptions = new 
Options(s.options());
+                                branchOptions.set(CoreOptions.BRANCH, 
branchName);
+                                return s.copy(branchOptions.toMap());
+                            } else {
+                                return s;
+                            }
+                        })
                 .orElseThrow(() -> new TableNotExistException(identifier));
     }
 
@@ -117,26 +130,22 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public void createTableImpl(Identifier identifier, Schema schema) {
-        uncheck(() -> schemaManager(identifier).createTable(schema));
+        uncheck(() -> schemaManager(identifier, 
DEFAULT_MAIN_BRANCH).createTable(schema));
     }
 
-    private SchemaManager schemaManager(Identifier identifier) {
+    private SchemaManager schemaManager(Identifier identifier, String 
branchName) {
         Path path = getDataTableLocation(identifier);
         CatalogLock catalogLock =
-                lockFactory()
-                        .map(
-                                fac ->
-                                        fac.createLock(
-                                                lockContext()
-                                                        .orElseThrow(
-                                                                () ->
-                                                                        new 
RuntimeException(
-                                                                               
 "No lock context when lock is enabled."))))
-                        .orElse(null);
-        return new SchemaManager(fileIO, path)
+                lockFactory().map(fac -> 
fac.createLock(assertGetLockContext())).orElse(null);
+        return new SchemaManager(fileIO, path, branchName)
                 .withLock(catalogLock == null ? null : 
Lock.fromCatalog(catalogLock, identifier));
     }
 
+    private CatalogLockContext assertGetLockContext() {
+        return lockContext()
+                .orElseThrow(() -> new RuntimeException("No lock context when 
lock is enabled."));
+    }
+
     @Override
     public void renameTableImpl(Identifier fromTable, Identifier toTable) {
         Path fromPath = getDataTableLocation(fromTable);
@@ -145,9 +154,10 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
+    protected void alterTableImpl(
+            Identifier identifier, String branchName, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        schemaManager(identifier).commitChanges(changes);
+        schemaManager(identifier, branchName).commitChanges(changes);
     }
 
     protected static <T> T uncheck(Callable<T> callable) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 45600715b..dd479dfd8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -57,6 +57,7 @@ import static 
org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
 import static org.apache.paimon.jdbc.JdbcUtils.execute;
 import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
 import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 
 /* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -308,8 +309,10 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
+    protected void alterTableImpl(
+            Identifier identifier, String branchName, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        assertMainBranch(branchName);
         if (!tableExists(identifier)) {
             throw new RuntimeException("Table is not exists " + 
identifier.getFullName());
         }
@@ -318,7 +321,9 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+    protected TableSchema getDataTableSchema(Identifier identifier, String 
branchName)
+            throws TableNotExistException {
+        assertMainBranch(branchName);
         if (!tableExists(identifier)) {
             throw new TableNotExistException(identifier);
         }
@@ -329,6 +334,13 @@ public class JdbcCatalog extends AbstractCatalog {
                         () -> new RuntimeException("There is no paimon table 
in " + tableLocation));
     }
 
+    private void assertMainBranch(String branchName) {
+        if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+            throw new UnsupportedOperationException(
+                    "JdbcCatalog currently does not support table branches");
+        }
+    }
+
     @Override
     public boolean tableExists(Identifier identifier) {
         if (isSystemTable(identifier)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 366f285fc..4f70ac725 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -42,6 +42,7 @@ import org.apache.paimon.types.DataTypeCasts;
 import org.apache.paimon.types.DataTypeVisitor;
 import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
@@ -67,7 +68,6 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
 import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
-import static org.apache.paimon.utils.BranchManager.branchPath;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -189,7 +189,7 @@ public class SchemaManager implements Serializable {
                     latest().orElseThrow(
                                     () ->
                                             new Catalog.TableNotExistException(
-                                                    
fromPath(tableRoot.toString(), true)));
+                                                    fromPath(branchPath(), 
true)));
             Map<String, String> newOptions = new HashMap<>(schema.options());
             List<DataField> newFields = new ArrayList<>(schema.fields());
             AtomicInteger highestFieldId = new 
AtomicInteger(schema.highestFieldId());
@@ -211,13 +211,13 @@ public class SchemaManager implements Serializable {
                     SchemaChange.Move move = addColumn.move();
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(addColumn.fieldName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
-                                fromPath(tableRoot.toString(), true), 
addColumn.fieldName());
+                                fromPath(branchPath(), true), 
addColumn.fieldName());
                     }
                     Preconditions.checkArgument(
                             addColumn.dataType().isNullable(),
                             "Column %s cannot specify NOT NULL in the %s 
table.",
                             addColumn.fieldName(),
-                            fromPath(tableRoot.toString(), 
true).getFullName());
+                            fromPath(branchPath(), true).getFullName());
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             ReassignFieldId.reassign(addColumn.dataType(), 
highestFieldId);
@@ -248,7 +248,7 @@ public class SchemaManager implements Serializable {
                     validateNotPrimaryAndPartitionKey(schema, 
rename.fieldName());
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
-                                fromPath(tableRoot.toString(), true), 
rename.fieldName());
+                                fromPath(branchPath(), true), 
rename.fieldName());
                     }
 
                     updateNestedColumn(
@@ -267,7 +267,7 @@ public class SchemaManager implements Serializable {
                     if (!newFields.removeIf(
                             f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
                         throw new Catalog.ColumnNotExistException(
-                                fromPath(tableRoot.toString(), true), 
drop.fieldName());
+                                fromPath(branchPath(), true), 
drop.fieldName());
                     }
                     if (newFields.isEmpty()) {
                         throw new IllegalArgumentException("Cannot drop all 
fields in table");
@@ -494,7 +494,7 @@ public class SchemaManager implements Serializable {
         }
         if (!found) {
             throw new Catalog.ColumnNotExistException(
-                    fromPath(tableRoot.toString(), true), 
Arrays.toString(updateFieldNames));
+                    fromPath(branchPath(), true), 
Arrays.toString(updateFieldNames));
         }
     }
 
@@ -535,13 +535,17 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private String branchPath() {
+        return BranchManager.branchPath(tableRoot, branch);
+    }
+
     public Path schemaDirectory() {
-        return new Path(branchPath(tableRoot, branch) + "/schema");
+        return new Path(branchPath() + "/schema");
     }
 
     @VisibleForTesting
     public Path toSchemaPath(long schemaId) {
-        return new Path(branchPath(tableRoot, branch) + "/schema/" + 
SCHEMA_PREFIX + schemaId);
+        return new Path(branchPath() + "/schema/" + SCHEMA_PREFIX + schemaId);
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a73f6150a..86672dda8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -256,7 +256,15 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         CoreOptions.setDefaultValues(newOptions);
 
         // copy a new table schema to contain dynamic options
-        TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
+        TableSchema newTableSchema = tableSchema;
+        if (newOptions.contains(CoreOptions.BRANCH)) {
+            newTableSchema =
+                    schemaManager()
+                            .copyWithBranch(new 
CoreOptions(newOptions).branch())
+                            .latest()
+                            .get();
+        }
+        newTableSchema = newTableSchema.copy(newOptions.toMap());
 
         if (tryTimeTravel) {
             // see if merged options contain time travel option
@@ -285,6 +293,13 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         }
     }
 
+    @Override
+    public FileStoreTable copy(TableSchema newTableSchema) {
+        return newTableSchema.primaryKeys().isEmpty()
+                ? new AppendOnlyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment)
+                : new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
+    }
+
     protected SchemaManager schemaManager() {
         return new SchemaManager(fileIO(), path, 
CoreOptions.branch(options()));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 0c389e408..40eeb4d28 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -64,11 +64,6 @@ class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
         super(fileIO, path, tableSchema, catalogEnvironment);
     }
 
-    @Override
-    public FileStoreTable copy(TableSchema newTableSchema) {
-        return new AppendOnlyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
-    }
-
     @Override
     public AppendOnlyFileStore store() {
         if (lazyStore == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 0393b0d6e..6ac2763ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -66,11 +66,6 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
         super(fileIO, path, tableSchema, catalogEnvironment);
     }
 
-    @Override
-    public FileStoreTable copy(TableSchema newTableSchema) {
-        return new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
-    }
-
     @Override
     public KeyValueFileStore store() {
         if (lazyStore == null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
new file mode 100644
index 000000000..be0a2e805
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for table with branches using SQL. */
+public class BranchSqlITCase extends AbstractTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testAlterTable() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = 
'" + tempDir + "' )");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) 
NOT ENFORCED ) "
+                        + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");
+
+        tEnv.executeSql(
+                        "INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 
'banana'), (2, 10, 'cat'), (2, 20, 'dog')")
+                .await();
+        tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)");
+        tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20, 
'DOG'), (2, 30, 'horse')")
+                .await();
+
+        tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await();
+        tEnv.executeSql(
+                        "INSERT INTO `t$branch_test` VALUES "
+                                + "(1, 10, 'cherry', 100), (2, 20, 'bird', 
200), (2, 40, 'wolf', 400)")
+                .await();
+
+        assertThat(collectResult(tEnv, "SELECT * FROM t"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 10, APPLE]",
+                        "+I[1, 20, banana]",
+                        "+I[2, 30, horse]",
+                        "+I[2, 10, cat]",
+                        "+I[2, 20, DOG]");
+        assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, 10, cherry, 100]",
+                        "+I[1, 20, banana, null]",
+                        "+I[2, 10, cat, null]",
+                        "+I[2, 20, bird, 200]",
+                        "+I[2, 40, wolf, 400]");
+    }
+
+    private List<String> collectResult(TableEnvironment tEnv, String sql) 
throws Exception {
+        List<String> result = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+            while (it.hasNext()) {
+                result.add(it.next().toString());
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index c7c947ce5..32f572bb8 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -93,6 +93,7 @@ import static 
org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
@@ -402,10 +403,24 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+    public TableSchema getDataTableSchema(Identifier identifier, String 
branchName)
+            throws TableNotExistException {
+        assertMainBranch(branchName);
+        return getDataTableSchema(identifier);
+    }
+
+    private void assertMainBranch(String branchName) {
+        if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+            throw new UnsupportedOperationException(
+                    "HiveCatalog currently does not support table branches");
+        }
+    }
+
+    private TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
         if (!tableExists(identifier)) {
             throw new TableNotExistException(identifier);
         }
+
         Path tableLocation = getDataTableLocation(identifier);
         return tableSchemaInFileSystem(tableLocation)
                 .orElseThrow(() -> new TableNotExistException(identifier));
@@ -535,8 +550,10 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
+    protected void alterTableImpl(
+            Identifier identifier, String branchName, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        assertMainBranch(branchName);
 
         final SchemaManager schemaManager = schemaManager(identifier);
         // first commit changes to underlying files
@@ -616,6 +633,7 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public void repairTable(Identifier identifier) throws 
TableNotExistException {
+        checkNotBranch(identifier, "repairTable");
         checkNotSystemTable(identifier, "repairTable");
         validateIdentifierNameCaseInsensitive(identifier);
 

Reply via email to