This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7df888005 [core] Identifier can now recognize branch and system tables 
(#3862)
7df888005 is described below

commit 7df888005e9c82fc337480956ddca6e54529fc55
Author: tsreaper <[email protected]>
AuthorDate: Thu Aug 1 12:34:00 2024 +0800

    [core] Identifier can now recognize branch and system tables (#3862)
---
 .../java/org/apache/paimon/utils/StringUtils.java  |   4 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java | 125 ++++++++-------------
 .../org/apache/paimon/catalog/CachingCatalog.java  |  17 +--
 .../java/org/apache/paimon/catalog/Catalog.java    |   2 +-
 .../org/apache/paimon/catalog/CatalogUtils.java    |   8 +-
 .../apache/paimon/catalog/FileSystemCatalog.java   |  33 ++----
 .../java/org/apache/paimon/catalog/Identifier.java | 103 +++++++++++++++--
 .../java/org/apache/paimon/jdbc/JdbcCatalog.java   |  21 ++--
 .../org/apache/paimon/schema/SchemaManager.java    |  37 ++++--
 .../paimon/table/AbstractFileStoreTable.java       |  14 ++-
 .../org/apache/paimon/catalog/CatalogTestBase.java |  10 +-
 .../apache/paimon/table/SchemaEvolutionTest.java   |   2 +-
 .../org/apache/paimon/flink/BranchSqlITCase.java   |  11 +-
 .../apache/paimon/flink/CatalogTableITCase.java    |   5 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  47 ++++----
 15 files changed, 243 insertions(+), 196 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index aeea97ffd..4d0d5b672 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -373,7 +373,7 @@ public class StringUtils {
      * @return an array of parsed Strings, {@code null} if null String input
      */
     public static String[] split(final String str, final String 
separatorChars) {
-        return splitWorker(str, separatorChars, -1, false);
+        return split(str, separatorChars, -1, false);
     }
 
     /**
@@ -388,7 +388,7 @@ public class StringUtils {
      *     separators; if {@code false}, adjacent separators are treated as 
one separator.
      * @return an array of parsed Strings, {@code null} if null String input
      */
-    private static String[] splitWorker(
+    public static String[] split(
             final String str,
             final String separatorChars,
             final int max,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 062e93532..ee2d1f2a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,8 +38,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -156,6 +155,7 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public void dropPartition(Identifier identifier, Map<String, String> 
partitionSpec)
             throws TableNotExistException {
+        checkNotSystemTable(identifier, "dropPartition");
         Table table = getTable(identifier);
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         try (FileStoreCommit commit =
@@ -181,7 +181,7 @@ public abstract class AbstractCatalog implements Catalog {
             throw new DatabaseNotExistException(name);
         }
 
-        if (!cascade && listTables(name).size() > 0) {
+        if (!cascade && !listTables(name).isEmpty()) {
             throw new DatabaseNotEmptyException(name);
         }
 
@@ -282,14 +282,6 @@ public abstract class AbstractCatalog implements Catalog {
         validateIdentifierNameCaseInsensitive(identifier);
         validateFieldNameCaseInsensitiveInSchemaChange(changes);
 
-        Optional<Pair<Identifier, String>> optionalBranchName =
-                getOriginalIdentifierAndBranch(identifier);
-        String branchName = DEFAULT_MAIN_BRANCH;
-        if (optionalBranchName.isPresent()) {
-            identifier = optionalBranchName.get().getLeft();
-            branchName = optionalBranchName.get().getRight();
-        }
-
         if (!tableExists(identifier)) {
             if (ignoreIfNotExists) {
                 return;
@@ -297,11 +289,10 @@ public abstract class AbstractCatalog implements Catalog {
             throw new TableNotExistException(identifier);
         }
 
-        alterTableImpl(identifier, branchName, changes);
+        alterTableImpl(identifier, changes);
     }
 
-    protected abstract void alterTableImpl(
-            Identifier identifier, String branchName, List<SchemaChange> 
changes)
+    protected abstract void alterTableImpl(Identifier identifier, 
List<SchemaChange> changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException;
 
     @Nullable
@@ -317,7 +308,7 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         if (isSystemDatabase(identifier.getDatabaseName())) {
-            String tableName = identifier.getObjectName();
+            String tableName = identifier.getTableName();
             Table table =
                     SystemTableLoader.loadGlobal(
                             tableName,
@@ -330,12 +321,17 @@ public abstract class AbstractCatalog implements Catalog {
             }
             return table;
         } else if (isSpecifiedSystemTable(identifier)) {
-            String[] splits = tableAndSystemName(identifier);
-            String tableName = splits[0];
-            String type = splits[1];
             FileStoreTable originTable =
-                    getDataTable(new Identifier(identifier.getDatabaseName(), 
tableName));
-            Table table = SystemTableLoader.load(type, originTable);
+                    getDataTable(
+                            new Identifier(
+                                    identifier.getDatabaseName(),
+                                    identifier.getTableName(),
+                                    identifier.getBranchName(),
+                                    null));
+            Table table =
+                    SystemTableLoader.load(
+                            
Preconditions.checkNotNull(identifier.getSystemTableName()),
+                            originTable);
             if (table == null) {
                 throw new TableNotExistException(identifier);
             }
@@ -346,15 +342,8 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     private FileStoreTable getDataTable(Identifier identifier) throws 
TableNotExistException {
-        Optional<Pair<Identifier, String>> optionalBranchName =
-                getOriginalIdentifierAndBranch(identifier);
-        String branch = DEFAULT_MAIN_BRANCH;
-        if (optionalBranchName.isPresent()) {
-            identifier = optionalBranchName.get().getLeft();
-            branch = optionalBranchName.get().getRight();
-        }
-
-        TableSchema tableSchema = getDataTableSchema(identifier, branch);
+        Preconditions.checkArgument(identifier.getSystemTableName() == null);
+        TableSchema tableSchema = getDataTableSchema(identifier);
         return FileStoreTableFactory.create(
                 fileIO,
                 getDataTableLocation(identifier),
@@ -394,35 +383,16 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    protected abstract TableSchema getDataTableSchema(Identifier identifier, 
String branchName)
+    protected abstract TableSchema getDataTableSchema(Identifier identifier)
             throws TableNotExistException;
 
     @VisibleForTesting
     public Path getDataTableLocation(Identifier identifier) {
-        return new Path(newDatabasePath(identifier.getDatabaseName()), 
identifier.getObjectName());
-    }
-
-    private static Optional<Pair<Identifier, String>> 
getOriginalIdentifierAndBranch(
-            Identifier identifier) {
-        String tableName = identifier.getObjectName();
-        if (tableName.contains(BRANCH_PREFIX)) {
-            int idx = tableName.indexOf(BRANCH_PREFIX);
-            String branchName = tableName.substring(idx + 
BRANCH_PREFIX.length());
-            if (StringUtils.isNullOrWhitespaceOnly(branchName)) {
-                return Optional.empty();
-            } else {
-                return Optional.of(
-                        Pair.of(
-                                Identifier.create(
-                                        identifier.getDatabaseName(), 
tableName.substring(0, idx)),
-                                branchName));
-            }
-        }
-        return Optional.empty();
+        return new Path(newDatabasePath(identifier.getDatabaseName()), 
identifier.getTableName());
     }
 
-    protected void checkNotBranch(Identifier identifier, String method) {
-        if (getOriginalIdentifierAndBranch(identifier).isPresent()) {
+    protected static void checkNotBranch(Identifier identifier, String method) 
{
+        if (identifier.getBranchName() != null) {
             throw new IllegalArgumentException(
                     String.format(
                             "Cannot '%s' for branch table '%s', "
@@ -431,23 +401,23 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    protected void assertMainBranch(String branchName) {
-        if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+    protected void assertMainBranch(Identifier identifier) {
+        if (identifier.getBranchName() != null
+                && !DEFAULT_MAIN_BRANCH.equals(identifier.getBranchName())) {
             throw new UnsupportedOperationException(
                     this.getClass().getName() + " currently does not support 
table branches");
         }
     }
 
     public static boolean isSpecifiedSystemTable(Identifier identifier) {
-        return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
-                && !getOriginalIdentifierAndBranch(identifier).isPresent();
+        return identifier.getSystemTableName() != null;
     }
 
     protected static boolean isSystemTable(Identifier identifier) {
         return isSystemDatabase(identifier.getDatabaseName()) || 
isSpecifiedSystemTable(identifier);
     }
 
-    protected void checkNotSystemTable(Identifier identifier, String method) {
+    protected static void checkNotSystemTable(Identifier identifier, String 
method) {
         if (isSystemTable(identifier)) {
             throw new IllegalArgumentException(
                     String.format(
@@ -460,26 +430,12 @@ public abstract class AbstractCatalog implements Catalog {
         tableDefaultOptions.forEach(options::putIfAbsent);
     }
 
-    public static String[] tableAndSystemName(Identifier identifier) {
-        String[] splits = StringUtils.split(identifier.getObjectName(), 
SYSTEM_TABLE_SPLITTER);
-        if (splits.length != 2) {
-            throw new IllegalArgumentException(
-                    "System table can only contain one '$' separator, but this 
is: "
-                            + identifier.getObjectName());
-        }
-        return splits;
-    }
-
     public static Path newTableLocation(String warehouse, Identifier 
identifier) {
-        if (isSpecifiedSystemTable(identifier)) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Table name[%s] cannot contain '%s' separator",
-                            identifier.getObjectName(), 
SYSTEM_TABLE_SPLITTER));
-        }
+        checkNotBranch(identifier, "newTableLocation");
+        checkNotSystemTable(identifier, "newTableLocation");
         return new Path(
                 newDatabasePath(warehouse, identifier.getDatabaseName()),
-                identifier.getObjectName());
+                identifier.getTableName());
     }
 
     public static Path newDatabasePath(String warehouse, String database) {
@@ -548,18 +504,29 @@ public abstract class AbstractCatalog implements Catalog {
     protected List<String> listTablesInFileSystem(Path databasePath) throws 
IOException {
         List<String> tables = new ArrayList<>();
         for (FileStatus status : fileIO.listDirectories(databasePath)) {
-            if (status.isDir() && tableExistsInFileSystem(status.getPath())) {
+            if (status.isDir() && tableExistsInFileSystem(status.getPath(), 
DEFAULT_MAIN_BRANCH)) {
                 tables.add(status.getPath().getName());
             }
         }
         return tables;
     }
 
-    protected boolean tableExistsInFileSystem(Path tablePath) {
-        return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
+    protected boolean tableExistsInFileSystem(Path tablePath, String 
branchName) {
+        return !new SchemaManager(fileIO, tablePath, 
branchName).listAllIds().isEmpty();
     }
 
-    public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath) {
-        return new SchemaManager(fileIO, tablePath).latest();
+    public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath, 
String branchName) {
+        return new SchemaManager(fileIO, tablePath, branchName)
+                .latest()
+                .map(
+                        s -> {
+                            if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+                                Options branchOptions = new 
Options(s.options());
+                                branchOptions.set(CoreOptions.BRANCH, 
branchName);
+                                return s.copy(branchOptions.toMap());
+                            } else {
+                                return s;
+                            }
+                        });
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index b11765a9a..b229963f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -22,6 +22,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.Preconditions;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
@@ -39,7 +40,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
-import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
 import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
 
@@ -142,18 +142,21 @@ public class CachingCatalog extends DelegateCatalog {
         }
 
         if (isSpecifiedSystemTable(identifier)) {
-            String[] splits = tableAndSystemName(identifier);
-            String tableName = splits[0];
-            String type = splits[1];
-
             Identifier originIdentifier =
-                    Identifier.create(identifier.getDatabaseName(), tableName);
+                    new Identifier(
+                            identifier.getDatabaseName(),
+                            identifier.getTableName(),
+                            identifier.getBranchName(),
+                            null);
             Table originTable = tableCache.getIfPresent(originIdentifier);
             if (originTable == null) {
                 originTable = wrapped.getTable(originIdentifier);
                 tableCache.put(originIdentifier, originTable);
             }
-            table = SystemTableLoader.load(type, (FileStoreTable) originTable);
+            table =
+                    SystemTableLoader.load(
+                            
Preconditions.checkNotNull(identifier.getSystemTableName()),
+                            (FileStoreTable) originTable);
             if (table == null) {
                 throw new TableNotExistException(identifier);
             }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index ac229464b..072af2a4e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -48,9 +48,9 @@ public interface Catalog extends AutoCloseable {
 
     String DEFAULT_DATABASE = "default";
 
-    String BRANCH_PREFIX = "$branch_";
     String SYSTEM_TABLE_SPLITTER = "$";
     String SYSTEM_DATABASE_NAME = "sys";
+    String SYSTEM_BRANCH_PREFIX = "branch_";
     String COMMENT_PROP = "comment";
     String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
     String DB_LOCATION_PROP = "location";
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index ec263f90a..39f81833a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -37,18 +37,18 @@ public class CatalogUtils {
     }
 
     public static String database(Path path) {
-        return SchemaManager.fromPath(path.toString(), 
false).getDatabaseName();
+        return SchemaManager.identifierFromPath(path.toString(), 
false).getDatabaseName();
     }
 
     public static String database(String path) {
-        return SchemaManager.fromPath(path, false).getDatabaseName();
+        return SchemaManager.identifierFromPath(path, false).getDatabaseName();
     }
 
     public static String table(Path path) {
-        return SchemaManager.fromPath(path.toString(), false).getObjectName();
+        return SchemaManager.identifierFromPath(path.toString(), 
false).getObjectName();
     }
 
     public static String table(String path) {
-        return SchemaManager.fromPath(path, false).getObjectName();
+        return SchemaManager.identifierFromPath(path, false).getObjectName();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 64f38a106..d04b975dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.catalog;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
@@ -37,7 +36,6 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 
 import static 
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
-import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 
 /** A catalog implementation for {@link FileIO}. */
 public class FileSystemCatalog extends AbstractCatalog {
@@ -100,24 +98,14 @@ public class FileSystemCatalog extends AbstractCatalog {
             return super.tableExists(identifier);
         }
 
-        return tableExistsInFileSystem(getDataTableLocation(identifier));
+        return tableExistsInFileSystem(
+                getDataTableLocation(identifier), 
identifier.getBranchNameOrDefault());
     }
 
     @Override
-    public TableSchema getDataTableSchema(Identifier identifier, String 
branchName)
-            throws TableNotExistException {
-        return schemaManager(identifier, branchName)
-                .latest()
-                .map(
-                        s -> {
-                            if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
-                                Options branchOptions = new 
Options(s.options());
-                                branchOptions.set(CoreOptions.BRANCH, 
branchName);
-                                return s.copy(branchOptions.toMap());
-                            } else {
-                                return s;
-                            }
-                        })
+    public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+        return tableSchemaInFileSystem(
+                        getDataTableLocation(identifier), 
identifier.getBranchNameOrDefault())
                 .orElseThrow(() -> new TableNotExistException(identifier));
     }
 
@@ -129,14 +117,14 @@ public class FileSystemCatalog extends AbstractCatalog {
 
     @Override
     public void createTableImpl(Identifier identifier, Schema schema) {
-        uncheck(() -> schemaManager(identifier, 
DEFAULT_MAIN_BRANCH).createTable(schema));
+        uncheck(() -> schemaManager(identifier).createTable(schema));
     }
 
-    private SchemaManager schemaManager(Identifier identifier, String 
branchName) {
+    private SchemaManager schemaManager(Identifier identifier) {
         Path path = getDataTableLocation(identifier);
         CatalogLock catalogLock =
                 lockFactory().map(fac -> 
fac.createLock(assertGetLockContext())).orElse(null);
-        return new SchemaManager(fileIO, path, branchName)
+        return new SchemaManager(fileIO, path, 
identifier.getBranchNameOrDefault())
                 .withLock(catalogLock == null ? null : 
Lock.fromCatalog(catalogLock, identifier));
     }
 
@@ -153,10 +141,9 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected void alterTableImpl(
-            Identifier identifier, String branchName, List<SchemaChange> 
changes)
+    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        schemaManager(identifier, branchName).commitChanges(changes);
+        schemaManager(identifier).commitChanges(changes);
     }
 
     protected static <T> T uncheck(Callable<T> callable) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
index 15eb31b00..c5473a714 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
@@ -21,8 +21,12 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -41,11 +45,35 @@ public class Identifier implements Serializable {
     public static final String UNKNOWN_DATABASE = "unknown";
 
     private final String database;
-    private final String table;
+    private final String object;
+
+    private transient String table;
+    private transient String branch;
+    private transient String systemTable;
+
+    public Identifier(String database, String object) {
+        this.database = database;
+        this.object = object;
+    }
 
-    public Identifier(String database, String table) {
+    public Identifier(
+            String database, String table, @Nullable String branch, @Nullable 
String systemTable) {
         this.database = database;
+
+        StringBuilder builder = new StringBuilder(table);
+        if (branch != null) {
+            builder.append(Catalog.SYSTEM_TABLE_SPLITTER)
+                    .append(Catalog.SYSTEM_BRANCH_PREFIX)
+                    .append(branch);
+        }
+        if (systemTable != null) {
+            builder.append(Catalog.SYSTEM_TABLE_SPLITTER).append(systemTable);
+        }
+        this.object = builder.toString();
+
         this.table = table;
+        this.branch = branch;
+        this.systemTable = systemTable;
     }
 
     public String getDatabaseName() {
@@ -53,13 +81,64 @@ public class Identifier implements Serializable {
     }
 
     public String getObjectName() {
-        return table;
+        return object;
     }
 
     public String getFullName() {
         return UNKNOWN_DATABASE.equals(this.database)
-                ? table
-                : String.format("%s.%s", database, table);
+                ? object
+                : String.format("%s.%s", database, object);
+    }
+
+    public String getTableName() {
+        splitObjectName();
+        return table;
+    }
+
+    public @Nullable String getBranchName() {
+        splitObjectName();
+        return branch;
+    }
+
+    public String getBranchNameOrDefault() {
+        String branch = getBranchName();
+        return branch == null ? BranchManager.DEFAULT_MAIN_BRANCH : branch;
+    }
+
+    public @Nullable String getSystemTableName() {
+        splitObjectName();
+        return systemTable;
+    }
+
+    private void splitObjectName() {
+        if (table != null) {
+            return;
+        }
+
+        String[] splits = StringUtils.split(object, 
Catalog.SYSTEM_TABLE_SPLITTER, -1, true);
+        if (splits.length == 1) {
+            table = object;
+            branch = null;
+            systemTable = null;
+        } else if (splits.length == 2) {
+            table = splits[0];
+            if (splits[1].startsWith(Catalog.SYSTEM_BRANCH_PREFIX)) {
+                branch = 
splits[1].substring(Catalog.SYSTEM_BRANCH_PREFIX.length());
+                systemTable = null;
+            } else {
+                branch = null;
+                systemTable = splits[1];
+            }
+        } else if (splits.length == 3) {
+            Preconditions.checkArgument(
+                    splits[1].startsWith(Catalog.SYSTEM_BRANCH_PREFIX),
+                    "System table can only contain one '$' separator, but this 
is: " + object);
+            table = splits[0];
+            branch = 
splits[1].substring(Catalog.SYSTEM_BRANCH_PREFIX.length());
+            systemTable = splits[2];
+        } else {
+            throw new IllegalArgumentException("Invalid object name: " + 
object);
+        }
     }
 
     public String getEscapedFullName() {
@@ -68,11 +147,11 @@ public class Identifier implements Serializable {
 
     public String getEscapedFullName(char escapeChar) {
         return String.format(
-                "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar, 
table, escapeChar);
+                "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar, 
object, escapeChar);
     }
 
-    public static Identifier create(String db, String table) {
-        return new Identifier(db, table);
+    public static Identifier create(String db, String object) {
+        return new Identifier(db, object);
     }
 
     public static Identifier fromString(String fullName) {
@@ -84,7 +163,7 @@ public class Identifier implements Serializable {
         if (paths.length != 2) {
             throw new IllegalArgumentException(
                     String.format(
-                            "Cannot get splits from '%s' to get database and 
table", fullName));
+                            "Cannot get splits from '%s' to get database and 
object", fullName));
         }
 
         return new Identifier(paths[0], paths[1]);
@@ -99,17 +178,17 @@ public class Identifier implements Serializable {
             return false;
         }
         Identifier that = (Identifier) o;
-        return Objects.equals(database, that.database) && 
Objects.equals(table, that.table);
+        return Objects.equals(database, that.database) && 
Objects.equals(object, that.object);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(database, table);
+        return Objects.hash(database, object);
     }
 
     @Override
     public String toString() {
-        return "Identifier{" + "database='" + database + '\'' + ", table='" + 
table + '\'' + '}';
+        return String.format("Identifier{database='%s', object='%s'}", 
database, object);
     }
 
     public static RowType schema() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 556c071f2..da08309ad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -214,7 +214,7 @@ public class JdbcCatalog extends AbstractCatalog {
                             JdbcUtils.DROP_TABLE_SQL,
                             catalogKey,
                             identifier.getDatabaseName(),
-                            identifier.getObjectName());
+                            identifier.getTableName());
 
             if (deletedRecords == 0) {
                 LOG.info("Skipping drop, table does not exist: {}", 
identifier);
@@ -248,7 +248,7 @@ public class JdbcCatalog extends AbstractCatalog {
                                                 
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
                                     sql.setString(1, catalogKey);
                                     sql.setString(2, 
identifier.getDatabaseName());
-                                    sql.setString(3, 
identifier.getObjectName());
+                                    sql.setString(3, 
identifier.getTableName());
                                     return sql.executeUpdate();
                                 }
                             });
@@ -277,7 +277,7 @@ public class JdbcCatalog extends AbstractCatalog {
             updateTable(connections, catalogKey, fromTable, toTable);
 
             Path fromPath = getDataTableLocation(fromTable);
-            if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
+            if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
                 // Rename the file system's table directory. Maintain 
consistency between tables in
                 // the file system and tables in the Hive Metastore.
                 Path toPath = getDataTableLocation(toTable);
@@ -297,23 +297,18 @@ public class JdbcCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected void alterTableImpl(
-            Identifier identifier, String branchName, List<SchemaChange> 
changes)
+    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        assertMainBranch(branchName);
+        assertMainBranch(identifier);
         SchemaManager schemaManager = getSchemaManager(identifier);
         schemaManager.commitChanges(changes);
     }
 
     @Override
-    protected TableSchema getDataTableSchema(Identifier identifier, String 
branchName)
-            throws TableNotExistException {
-        assertMainBranch(branchName);
+    protected TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+        assertMainBranch(identifier);
         if (!JdbcUtils.tableExists(
-                connections,
-                catalogKey,
-                identifier.getDatabaseName(),
-                identifier.getObjectName())) {
+                connections, catalogKey, identifier.getDatabaseName(), 
identifier.getTableName())) {
             throw new TableNotExistException(identifier);
         }
         Path tableLocation = getDataTableLocation(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 3c9db09aa..56773c821 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -193,7 +193,8 @@ public class SchemaManager implements Serializable {
                     latest().orElseThrow(
                                     () ->
                                             new Catalog.TableNotExistException(
-                                                    fromPath(branchPath(), 
true)));
+                                                    identifierFromPath(
+                                                            
tableRoot.toString(), true, branch)));
             Map<String, String> newOptions = new 
HashMap<>(oldTableSchema.options());
             List<DataField> newFields = new 
ArrayList<>(oldTableSchema.fields());
             AtomicInteger highestFieldId = new 
AtomicInteger(oldTableSchema.highestFieldId());
@@ -219,13 +220,14 @@ public class SchemaManager implements Serializable {
                     SchemaChange.Move move = addColumn.move();
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(addColumn.fieldName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
-                                fromPath(branchPath(), true), 
addColumn.fieldName());
+                                identifierFromPath(tableRoot.toString(), true, 
branch),
+                                addColumn.fieldName());
                     }
                     Preconditions.checkArgument(
                             addColumn.dataType().isNullable(),
                             "Column %s cannot specify NOT NULL in the %s 
table.",
                             addColumn.fieldName(),
-                            fromPath(branchPath(), true).getFullName());
+                            identifierFromPath(tableRoot.toString(), true, 
branch).getFullName());
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             ReassignFieldId.reassign(addColumn.dataType(), 
highestFieldId);
@@ -256,7 +258,8 @@ public class SchemaManager implements Serializable {
                     validateNotPrimaryAndPartitionKey(oldTableSchema, 
rename.fieldName());
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
-                                fromPath(branchPath(), true), 
rename.fieldName());
+                                identifierFromPath(tableRoot.toString(), true, 
branch),
+                                rename.fieldName());
                     }
 
                     updateNestedColumn(
@@ -275,7 +278,8 @@ public class SchemaManager implements Serializable {
                     if (!newFields.removeIf(
                             f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
                         throw new Catalog.ColumnNotExistException(
-                                fromPath(branchPath(), true), 
drop.fieldName());
+                                identifierFromPath(tableRoot.toString(), true, 
branch),
+                                drop.fieldName());
                     }
                     if (newFields.isEmpty()) {
                         throw new IllegalArgumentException("Cannot drop all 
fields in table");
@@ -511,7 +515,8 @@ public class SchemaManager implements Serializable {
         }
         if (!found) {
             throw new Catalog.ColumnNotExistException(
-                    fromPath(branchPath(), true), 
Arrays.toString(updateFieldNames));
+                    identifierFromPath(tableRoot.toString(), true, branch),
+                    Arrays.toString(updateFieldNames));
         }
     }
 
@@ -587,13 +592,22 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    public static Identifier fromPath(String tablePath, boolean 
ignoreIfUnknownDatabase) {
+    public static Identifier identifierFromPath(String tablePath, boolean 
ignoreIfUnknownDatabase) {
+        return identifierFromPath(tablePath, ignoreIfUnknownDatabase, null);
+    }
+
+    public static Identifier identifierFromPath(
+            String tablePath, boolean ignoreIfUnknownDatabase, @Nullable 
String branchName) {
+        if (DEFAULT_MAIN_BRANCH.equals(branchName)) {
+            branchName = null;
+        }
+
         String[] paths = tablePath.split("/");
         if (paths.length < 2) {
             if (!ignoreIfUnknownDatabase) {
                 throw new IllegalArgumentException(
                         String.format(
-                                "Path '%s' is not a legacy path, please use 
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
+                                "Path '%s' is not a valid path, please use 
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
                                 tablePath));
             }
             return new Identifier(UNKNOWN_DATABASE, paths[0]);
@@ -605,12 +619,13 @@ public class SchemaManager implements Serializable {
             if (!ignoreIfUnknownDatabase) {
                 throw new IllegalArgumentException(
                         String.format(
-                                "Path '%s' is not a legacy path, please use 
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
+                                "Path '%s' is not a valid path, please use 
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
                                 tablePath));
             }
-            return new Identifier(UNKNOWN_DATABASE, paths[paths.length - 1]);
+            return new Identifier(UNKNOWN_DATABASE, paths[paths.length - 1], 
branchName, null);
         }
         database = database.substring(0, index);
-        return new Identifier(database, paths[paths.length - 1]);
+
+        return new Identifier(database, paths[paths.length - 1], branchName, 
null);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d30fd7308..bad718a04 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -116,16 +116,22 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public String name() {
-        Identifier identifier = catalogEnvironment.identifier();
-        return identifier == null ? location().getName() : 
identifier.getObjectName();
+        return identifier().getObjectName();
     }
 
     @Override
     public String fullName() {
+        return identifier().getFullName();
+    }
+
+    public Identifier identifier() {
         Identifier identifier = catalogEnvironment.identifier();
         return identifier == null
-                ? SchemaManager.fromPath(location().toUri().toString(), 
true).getFullName()
-                : identifier.getFullName();
+                ? SchemaManager.identifierFromPath(
+                        location().toUri().toString(),
+                        true,
+                        options().get(CoreOptions.BRANCH.key()))
+                : identifier;
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 19c9e83c4..dd0d18050 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -239,7 +239,7 @@ public abstract class CatalogTestBase {
                                         DEFAULT_TABLE_SCHEMA,
                                         false))
                 .withMessage(
-                        "Cannot 'createTable' for system table 
'Identifier{database='test_db', table='$system_table'}', please use data 
table.");
+                        "Cannot 'createTable' for system table 
'Identifier{database='test_db', object='$system_table'}', please use data 
table.");
 
         // Create table throws DatabaseNotExistException when database does 
not exist
         assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
@@ -367,7 +367,7 @@ public abstract class CatalogTestBase {
                                 catalog.dropTable(
                                         Identifier.create("test_db", 
"$system_table"), false))
                 .withMessage(
-                        "Cannot 'dropTable' for system table 
'Identifier{database='test_db', table='$system_table'}', please use data 
table.");
+                        "Cannot 'dropTable' for system table 
'Identifier{database='test_db', object='$system_table'}', please use data 
table.");
 
         // Drop table throws TableNotExistException when table does not exist 
and ignoreIfNotExists
         // is false
@@ -402,7 +402,7 @@ public abstract class CatalogTestBase {
                                         toTable,
                                         false))
                 .withMessage(
-                        "Cannot 'renameTable' for system table 
'Identifier{database='test_db', table='$system_table'}', please use data 
table.");
+                        "Cannot 'renameTable' for system table 
'Identifier{database='test_db', object='$system_table'}', please use data 
table.");
 
         assertThatExceptionOfType(IllegalArgumentException.class)
                 .isThrownBy(
@@ -412,7 +412,7 @@ public abstract class CatalogTestBase {
                                         Identifier.create("test_db", 
"$system_table"),
                                         false))
                 .withMessage(
-                        "Cannot 'renameTable' for system table 
'Identifier{database='test_db', table='$system_table'}', please use data 
table.");
+                        "Cannot 'renameTable' for system table 
'Identifier{database='test_db', object='$system_table'}', please use data 
table.");
 
         // Rename table throws TableNotExistException when table does not exist
         assertThatExceptionOfType(Catalog.TableNotExistException.class)
@@ -466,7 +466,7 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.addColumn("col2", 
DataTypes.DATE())),
                                         false))
                 .withMessage(
-                        "Cannot 'alterTable' for system table 
'Identifier{database='test_db', table='$system_table'}', please use data 
table.");
+                        "Cannot 'alterTable' for system table 
'Identifier{database='test_db', object='$system_table'}', please use data 
table.");
 
         // Alter table throws TableNotExistException when table does not exist
         assertThatExceptionOfType(Catalog.TableNotExistException.class)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 4db178eab..8771377f2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -76,7 +76,7 @@ public class SchemaEvolutionTest {
     @BeforeEach
     public void beforeEach() {
         tablePath = new Path(tempDir.toUri());
-        identifier = SchemaManager.fromPath(tablePath.toString(), true);
+        identifier = SchemaManager.identifierFromPath(tablePath.toString(), 
true);
         schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
         commitUser = UUID.randomUUID().toString();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 80ca03d8c..d005dc4e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -32,7 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT cases for table with branches using SQL. */
 public class BranchSqlITCase extends CatalogITCaseBase {
@@ -378,13 +378,8 @@ public class BranchSqlITCase extends CatalogITCaseBase {
         sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)");
         sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
 
-        try {
-            sql("INSERT INTO t VALUES (1, 10, 'apple')");
-            fail("Expecting exceptions");
-        } catch (Exception e) {
-            assertThat(e)
-                    .hasMessageContaining("Branch main and pk does not have 
the same row type");
-        }
+        assertThatThrownBy(() -> sql("INSERT INTO t VALUES (1, 10, 'apple')"))
+                .hasMessageContaining("Branch main and pk does not have the 
same row type");
     }
 
     private List<String> collectResult(String sql) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index cb39ae3c0..199f26ad1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -172,11 +172,10 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
         assertThatThrownBy(() -> sql("CREATE TABLE T$snapshots (a INT, b 
INT)"))
                 .hasRootCauseMessage(
                         "Cannot 'createTable' for system table "
-                                + "'Identifier{database='default', 
table='T$snapshots'}', please use data table.");
+                                + "'Identifier{database='default', 
object='T$snapshots'}', please use data table.");
         assertThatThrownBy(() -> sql("CREATE TABLE T$aa$bb (a INT, b INT)"))
                 .hasRootCauseMessage(
-                        "Cannot 'createTable' for system table "
-                                + "'Identifier{database='default', 
table='T$aa$bb'}', please use data table.");
+                        "System table can only contain one '$' separator, but 
this is: T$aa$bb");
     }
 
     @Test
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index e9fc9f32c..667311ad4 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -185,7 +185,7 @@ public class HiveCatalog extends AbstractCatalog {
     public Path getDataTableLocation(Identifier identifier) {
         try {
             String databaseName = identifier.getDatabaseName();
-            String tableName = identifier.getObjectName();
+            String tableName = identifier.getTableName();
             Optional<Path> tablePath =
                     clients.run(
                             client -> {
@@ -363,7 +363,7 @@ public class HiveCatalog extends AbstractCatalog {
                             client ->
                                     client.getTable(
                                             identifier.getDatabaseName(),
-                                            identifier.getObjectName()));
+                                            identifier.getTableName()));
         } catch (NoSuchObjectException e) {
             return false;
         } catch (TException e) {
@@ -376,7 +376,11 @@ public class HiveCatalog extends AbstractCatalog {
                     "Interrupted in call to tableExists " + 
identifier.getFullName(), e);
         }
 
-        return isPaimonTable(table);
+        return isPaimonTable(table)
+                && tableSchemaInFileSystem(
+                                getDataTableLocation(identifier),
+                                identifier.getBranchNameOrDefault())
+                        .isPresent();
     }
 
     private static boolean isPaimonTable(Table table) {
@@ -387,19 +391,15 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public TableSchema getDataTableSchema(Identifier identifier, String 
branchName)
-            throws TableNotExistException {
-        assertMainBranch(branchName);
-        return getDataTableSchema(identifier);
-    }
+    public TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+        assertMainBranch(identifier);
 
-    private TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
         if (!tableExists(identifier)) {
             throw new TableNotExistException(identifier);
         }
 
-        Path tableLocation = getDataTableLocation(identifier);
-        return tableSchemaInFileSystem(tableLocation)
+        return tableSchemaInFileSystem(
+                        getDataTableLocation(identifier), 
identifier.getBranchNameOrDefault())
                 .orElseThrow(() -> new TableNotExistException(identifier));
     }
 
@@ -418,7 +418,7 @@ public class HiveCatalog extends AbstractCatalog {
                     client ->
                             client.dropTable(
                                     identifier.getDatabaseName(),
-                                    identifier.getObjectName(),
+                                    identifier.getTableName(),
                                     true,
                                     false,
                                     true));
@@ -490,10 +490,10 @@ public class HiveCatalog extends AbstractCatalog {
     protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
         try {
             String fromDB = fromTable.getDatabaseName();
-            String fromTableName = fromTable.getObjectName();
+            String fromTableName = fromTable.getTableName();
             Table table = clients.run(client -> client.getTable(fromDB, 
fromTableName));
             table.setDbName(toTable.getDatabaseName());
-            table.setTableName(toTable.getObjectName());
+            table.setTableName(toTable.getTableName());
             clients.execute(client -> client.alter_table(fromDB, 
fromTableName, table));
 
             Path fromPath = getDataTableLocation(fromTable);
@@ -516,7 +516,7 @@ public class HiveCatalog extends AbstractCatalog {
                 clients.execute(
                         client ->
                                 client.alter_table(
-                                        toTable.getDatabaseName(), 
toTable.getObjectName(), table));
+                                        toTable.getDatabaseName(), 
toTable.getTableName(), table));
             }
         } catch (TException e) {
             throw new RuntimeException("Failed to rename table " + 
fromTable.getFullName(), e);
@@ -527,10 +527,9 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected void alterTableImpl(
-            Identifier identifier, String branchName, List<SchemaChange> 
changes)
+    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        assertMainBranch(branchName);
+        assertMainBranch(identifier);
 
         final SchemaManager schemaManager = schemaManager(identifier);
         // first commit changes to underlying files
@@ -542,7 +541,7 @@ public class HiveCatalog extends AbstractCatalog {
                             client ->
                                     client.getTable(
                                             identifier.getDatabaseName(),
-                                            identifier.getObjectName()));
+                                            identifier.getTableName()));
             alterTableToHms(table, identifier, schema);
         } catch (Exception te) {
             schemaManager.deleteSchema(schema.id());
@@ -558,7 +557,7 @@ public class HiveCatalog extends AbstractCatalog {
                 client ->
                         client.alter_table(
                                 identifier.getDatabaseName(),
-                                identifier.getObjectName(),
+                                identifier.getTableName(),
                                 table,
                                 true));
     }
@@ -615,7 +614,9 @@ public class HiveCatalog extends AbstractCatalog {
         validateIdentifierNameCaseInsensitive(identifier);
 
         TableSchema tableSchema =
-                tableSchemaInFileSystem(getDataTableLocation(identifier))
+                tableSchemaInFileSystem(
+                                getDataTableLocation(identifier),
+                                identifier.getBranchNameOrDefault())
                         .orElseThrow(() -> new 
TableNotExistException(identifier));
         Table newTable = createHiveTable(identifier, tableSchema);
         try {
@@ -625,7 +626,7 @@ public class HiveCatalog extends AbstractCatalog {
                                 client ->
                                         client.getTable(
                                                 identifier.getDatabaseName(),
-                                                identifier.getObjectName()));
+                                                identifier.getTableName()));
                 checkArgument(
                         isPaimonTable(table),
                         "Table %s is not a paimon table in hive metastore.",
@@ -673,7 +674,7 @@ public class HiveCatalog extends AbstractCatalog {
                         TableType.class);
         Table table =
                 new Table(
-                        identifier.getObjectName(),
+                        identifier.getTableName(),
                         identifier.getDatabaseName(),
                         // current linux user
                         System.getProperty("user.name"),

Reply via email to