This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7df888005 [core] Identifier can now recognize branch and system tables
(#3862)
7df888005 is described below
commit 7df888005e9c82fc337480956ddca6e54529fc55
Author: tsreaper <[email protected]>
AuthorDate: Thu Aug 1 12:34:00 2024 +0800
[core] Identifier can now recognize branch and system tables (#3862)
---
.../java/org/apache/paimon/utils/StringUtils.java | 4 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 125 ++++++++-------------
.../org/apache/paimon/catalog/CachingCatalog.java | 17 +--
.../java/org/apache/paimon/catalog/Catalog.java | 2 +-
.../org/apache/paimon/catalog/CatalogUtils.java | 8 +-
.../apache/paimon/catalog/FileSystemCatalog.java | 33 ++----
.../java/org/apache/paimon/catalog/Identifier.java | 103 +++++++++++++++--
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 21 ++--
.../org/apache/paimon/schema/SchemaManager.java | 37 ++++--
.../paimon/table/AbstractFileStoreTable.java | 14 ++-
.../org/apache/paimon/catalog/CatalogTestBase.java | 10 +-
.../apache/paimon/table/SchemaEvolutionTest.java | 2 +-
.../org/apache/paimon/flink/BranchSqlITCase.java | 11 +-
.../apache/paimon/flink/CatalogTableITCase.java | 5 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 47 ++++----
15 files changed, 243 insertions(+), 196 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index aeea97ffd..4d0d5b672 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -373,7 +373,7 @@ public class StringUtils {
* @return an array of parsed Strings, {@code null} if null String input
*/
public static String[] split(final String str, final String
separatorChars) {
- return splitWorker(str, separatorChars, -1, false);
+ return split(str, separatorChars, -1, false);
}
/**
@@ -388,7 +388,7 @@ public class StringUtils {
* separators; if {@code false}, adjacent separators are treated as
one separator.
* @return an array of parsed Strings, {@code null} if null String input
*/
- private static String[] splitWorker(
+ public static String[] split(
final String str,
final String separatorChars,
final int max,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 062e93532..ee2d1f2a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,8 +38,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -156,6 +155,7 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public void dropPartition(Identifier identifier, Map<String, String>
partitionSpec)
throws TableNotExistException {
+ checkNotSystemTable(identifier, "dropPartition");
Table table = getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
try (FileStoreCommit commit =
@@ -181,7 +181,7 @@ public abstract class AbstractCatalog implements Catalog {
throw new DatabaseNotExistException(name);
}
- if (!cascade && listTables(name).size() > 0) {
+ if (!cascade && !listTables(name).isEmpty()) {
throw new DatabaseNotEmptyException(name);
}
@@ -282,14 +282,6 @@ public abstract class AbstractCatalog implements Catalog {
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitiveInSchemaChange(changes);
- Optional<Pair<Identifier, String>> optionalBranchName =
- getOriginalIdentifierAndBranch(identifier);
- String branchName = DEFAULT_MAIN_BRANCH;
- if (optionalBranchName.isPresent()) {
- identifier = optionalBranchName.get().getLeft();
- branchName = optionalBranchName.get().getRight();
- }
-
if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
return;
@@ -297,11 +289,10 @@ public abstract class AbstractCatalog implements Catalog {
throw new TableNotExistException(identifier);
}
- alterTableImpl(identifier, branchName, changes);
+ alterTableImpl(identifier, changes);
}
- protected abstract void alterTableImpl(
- Identifier identifier, String branchName, List<SchemaChange>
changes)
+ protected abstract void alterTableImpl(Identifier identifier,
List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
@Nullable
@@ -317,7 +308,7 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
if (isSystemDatabase(identifier.getDatabaseName())) {
- String tableName = identifier.getObjectName();
+ String tableName = identifier.getTableName();
Table table =
SystemTableLoader.loadGlobal(
tableName,
@@ -330,12 +321,17 @@ public abstract class AbstractCatalog implements Catalog {
}
return table;
} else if (isSpecifiedSystemTable(identifier)) {
- String[] splits = tableAndSystemName(identifier);
- String tableName = splits[0];
- String type = splits[1];
FileStoreTable originTable =
- getDataTable(new Identifier(identifier.getDatabaseName(),
tableName));
- Table table = SystemTableLoader.load(type, originTable);
+ getDataTable(
+ new Identifier(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ identifier.getBranchName(),
+ null));
+ Table table =
+ SystemTableLoader.load(
+
Preconditions.checkNotNull(identifier.getSystemTableName()),
+ originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
@@ -346,15 +342,8 @@ public abstract class AbstractCatalog implements Catalog {
}
private FileStoreTable getDataTable(Identifier identifier) throws
TableNotExistException {
- Optional<Pair<Identifier, String>> optionalBranchName =
- getOriginalIdentifierAndBranch(identifier);
- String branch = DEFAULT_MAIN_BRANCH;
- if (optionalBranchName.isPresent()) {
- identifier = optionalBranchName.get().getLeft();
- branch = optionalBranchName.get().getRight();
- }
-
- TableSchema tableSchema = getDataTableSchema(identifier, branch);
+ Preconditions.checkArgument(identifier.getSystemTableName() == null);
+ TableSchema tableSchema = getDataTableSchema(identifier);
return FileStoreTableFactory.create(
fileIO,
getDataTableLocation(identifier),
@@ -394,35 +383,16 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- protected abstract TableSchema getDataTableSchema(Identifier identifier,
String branchName)
+ protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;
@VisibleForTesting
public Path getDataTableLocation(Identifier identifier) {
- return new Path(newDatabasePath(identifier.getDatabaseName()),
identifier.getObjectName());
- }
-
- private static Optional<Pair<Identifier, String>>
getOriginalIdentifierAndBranch(
- Identifier identifier) {
- String tableName = identifier.getObjectName();
- if (tableName.contains(BRANCH_PREFIX)) {
- int idx = tableName.indexOf(BRANCH_PREFIX);
- String branchName = tableName.substring(idx +
BRANCH_PREFIX.length());
- if (StringUtils.isNullOrWhitespaceOnly(branchName)) {
- return Optional.empty();
- } else {
- return Optional.of(
- Pair.of(
- Identifier.create(
- identifier.getDatabaseName(),
tableName.substring(0, idx)),
- branchName));
- }
- }
- return Optional.empty();
+ return new Path(newDatabasePath(identifier.getDatabaseName()),
identifier.getTableName());
}
- protected void checkNotBranch(Identifier identifier, String method) {
- if (getOriginalIdentifierAndBranch(identifier).isPresent()) {
+ protected static void checkNotBranch(Identifier identifier, String method)
{
+ if (identifier.getBranchName() != null) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for branch table '%s', "
@@ -431,23 +401,23 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- protected void assertMainBranch(String branchName) {
- if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ protected void assertMainBranch(Identifier identifier) {
+ if (identifier.getBranchName() != null
+ && !DEFAULT_MAIN_BRANCH.equals(identifier.getBranchName())) {
throw new UnsupportedOperationException(
this.getClass().getName() + " currently does not support
table branches");
}
}
public static boolean isSpecifiedSystemTable(Identifier identifier) {
- return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
- && !getOriginalIdentifierAndBranch(identifier).isPresent();
+ return identifier.getSystemTableName() != null;
}
protected static boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) ||
isSpecifiedSystemTable(identifier);
}
- protected void checkNotSystemTable(Identifier identifier, String method) {
+ protected static void checkNotSystemTable(Identifier identifier, String
method) {
if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
@@ -460,26 +430,12 @@ public abstract class AbstractCatalog implements Catalog {
tableDefaultOptions.forEach(options::putIfAbsent);
}
- public static String[] tableAndSystemName(Identifier identifier) {
- String[] splits = StringUtils.split(identifier.getObjectName(),
SYSTEM_TABLE_SPLITTER);
- if (splits.length != 2) {
- throw new IllegalArgumentException(
- "System table can only contain one '$' separator, but this
is: "
- + identifier.getObjectName());
- }
- return splits;
- }
-
public static Path newTableLocation(String warehouse, Identifier
identifier) {
- if (isSpecifiedSystemTable(identifier)) {
- throw new IllegalArgumentException(
- String.format(
- "Table name[%s] cannot contain '%s' separator",
- identifier.getObjectName(),
SYSTEM_TABLE_SPLITTER));
- }
+ checkNotBranch(identifier, "newTableLocation");
+ checkNotSystemTable(identifier, "newTableLocation");
return new Path(
newDatabasePath(warehouse, identifier.getDatabaseName()),
- identifier.getObjectName());
+ identifier.getTableName());
}
public static Path newDatabasePath(String warehouse, String database) {
@@ -548,18 +504,29 @@ public abstract class AbstractCatalog implements Catalog {
protected List<String> listTablesInFileSystem(Path databasePath) throws
IOException {
List<String> tables = new ArrayList<>();
for (FileStatus status : fileIO.listDirectories(databasePath)) {
- if (status.isDir() && tableExistsInFileSystem(status.getPath())) {
+ if (status.isDir() && tableExistsInFileSystem(status.getPath(),
DEFAULT_MAIN_BRANCH)) {
tables.add(status.getPath().getName());
}
}
return tables;
}
- protected boolean tableExistsInFileSystem(Path tablePath) {
- return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
+ protected boolean tableExistsInFileSystem(Path tablePath, String
branchName) {
+ return !new SchemaManager(fileIO, tablePath,
branchName).listAllIds().isEmpty();
}
- public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath) {
- return new SchemaManager(fileIO, tablePath).latest();
+ public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath,
String branchName) {
+ return new SchemaManager(fileIO, tablePath, branchName)
+ .latest()
+ .map(
+ s -> {
+ if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ Options branchOptions = new
Options(s.options());
+ branchOptions.set(CoreOptions.BRANCH,
branchName);
+ return s.copy(branchOptions.toMap());
+ } else {
+ return s;
+ }
+ });
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index b11765a9a..b229963f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -22,6 +22,7 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.Preconditions;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
@@ -39,7 +40,6 @@ import java.util.List;
import java.util.Map;
import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
-import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
@@ -142,18 +142,21 @@ public class CachingCatalog extends DelegateCatalog {
}
if (isSpecifiedSystemTable(identifier)) {
- String[] splits = tableAndSystemName(identifier);
- String tableName = splits[0];
- String type = splits[1];
-
Identifier originIdentifier =
- Identifier.create(identifier.getDatabaseName(), tableName);
+ new Identifier(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ identifier.getBranchName(),
+ null);
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
tableCache.put(originIdentifier, originTable);
}
- table = SystemTableLoader.load(type, (FileStoreTable) originTable);
+ table =
+ SystemTableLoader.load(
+
Preconditions.checkNotNull(identifier.getSystemTableName()),
+ (FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index ac229464b..072af2a4e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -48,9 +48,9 @@ public interface Catalog extends AutoCloseable {
String DEFAULT_DATABASE = "default";
- String BRANCH_PREFIX = "$branch_";
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
+ String SYSTEM_BRANCH_PREFIX = "branch_";
String COMMENT_PROP = "comment";
String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String DB_LOCATION_PROP = "location";
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index ec263f90a..39f81833a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -37,18 +37,18 @@ public class CatalogUtils {
}
public static String database(Path path) {
- return SchemaManager.fromPath(path.toString(),
false).getDatabaseName();
+ return SchemaManager.identifierFromPath(path.toString(),
false).getDatabaseName();
}
public static String database(String path) {
- return SchemaManager.fromPath(path, false).getDatabaseName();
+ return SchemaManager.identifierFromPath(path, false).getDatabaseName();
}
public static String table(Path path) {
- return SchemaManager.fromPath(path.toString(), false).getObjectName();
+ return SchemaManager.identifierFromPath(path.toString(),
false).getObjectName();
}
public static String table(String path) {
- return SchemaManager.fromPath(path, false).getObjectName();
+ return SchemaManager.identifierFromPath(path, false).getObjectName();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 64f38a106..d04b975dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -18,7 +18,6 @@
package org.apache.paimon.catalog;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
@@ -37,7 +36,6 @@ import java.util.Map;
import java.util.concurrent.Callable;
import static
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
-import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {
@@ -100,24 +98,14 @@ public class FileSystemCatalog extends AbstractCatalog {
return super.tableExists(identifier);
}
- return tableExistsInFileSystem(getDataTableLocation(identifier));
+ return tableExistsInFileSystem(
+ getDataTableLocation(identifier),
identifier.getBranchNameOrDefault());
}
@Override
- public TableSchema getDataTableSchema(Identifier identifier, String
branchName)
- throws TableNotExistException {
- return schemaManager(identifier, branchName)
- .latest()
- .map(
- s -> {
- if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
- Options branchOptions = new
Options(s.options());
- branchOptions.set(CoreOptions.BRANCH,
branchName);
- return s.copy(branchOptions.toMap());
- } else {
- return s;
- }
- })
+ public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ return tableSchemaInFileSystem(
+ getDataTableLocation(identifier),
identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
}
@@ -129,14 +117,14 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public void createTableImpl(Identifier identifier, Schema schema) {
- uncheck(() -> schemaManager(identifier,
DEFAULT_MAIN_BRANCH).createTable(schema));
+ uncheck(() -> schemaManager(identifier).createTable(schema));
}
- private SchemaManager schemaManager(Identifier identifier, String
branchName) {
+ private SchemaManager schemaManager(Identifier identifier) {
Path path = getDataTableLocation(identifier);
CatalogLock catalogLock =
lockFactory().map(fac ->
fac.createLock(assertGetLockContext())).orElse(null);
- return new SchemaManager(fileIO, path, branchName)
+ return new SchemaManager(fileIO, path,
identifier.getBranchNameOrDefault())
.withLock(catalogLock == null ? null :
Lock.fromCatalog(catalogLock, identifier));
}
@@ -153,10 +141,9 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- protected void alterTableImpl(
- Identifier identifier, String branchName, List<SchemaChange>
changes)
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- schemaManager(identifier, branchName).commitChanges(changes);
+ schemaManager(identifier).commitChanges(changes);
}
protected static <T> T uncheck(Callable<T> callable) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
index 15eb31b00..c5473a714 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
@@ -21,8 +21,12 @@ package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Objects;
@@ -41,11 +45,35 @@ public class Identifier implements Serializable {
public static final String UNKNOWN_DATABASE = "unknown";
private final String database;
- private final String table;
+ private final String object;
+
+ private transient String table;
+ private transient String branch;
+ private transient String systemTable;
+
+ public Identifier(String database, String object) {
+ this.database = database;
+ this.object = object;
+ }
- public Identifier(String database, String table) {
+ public Identifier(
+ String database, String table, @Nullable String branch, @Nullable
String systemTable) {
this.database = database;
+
+ StringBuilder builder = new StringBuilder(table);
+ if (branch != null) {
+ builder.append(Catalog.SYSTEM_TABLE_SPLITTER)
+ .append(Catalog.SYSTEM_BRANCH_PREFIX)
+ .append(branch);
+ }
+ if (systemTable != null) {
+ builder.append(Catalog.SYSTEM_TABLE_SPLITTER).append(systemTable);
+ }
+ this.object = builder.toString();
+
this.table = table;
+ this.branch = branch;
+ this.systemTable = systemTable;
}
public String getDatabaseName() {
@@ -53,13 +81,64 @@ public class Identifier implements Serializable {
}
public String getObjectName() {
- return table;
+ return object;
}
public String getFullName() {
return UNKNOWN_DATABASE.equals(this.database)
- ? table
- : String.format("%s.%s", database, table);
+ ? object
+ : String.format("%s.%s", database, object);
+ }
+
+ public String getTableName() {
+ splitObjectName();
+ return table;
+ }
+
+ public @Nullable String getBranchName() {
+ splitObjectName();
+ return branch;
+ }
+
+ public String getBranchNameOrDefault() {
+ String branch = getBranchName();
+ return branch == null ? BranchManager.DEFAULT_MAIN_BRANCH : branch;
+ }
+
+ public @Nullable String getSystemTableName() {
+ splitObjectName();
+ return systemTable;
+ }
+
+ private void splitObjectName() {
+ if (table != null) {
+ return;
+ }
+
+ String[] splits = StringUtils.split(object,
Catalog.SYSTEM_TABLE_SPLITTER, -1, true);
+ if (splits.length == 1) {
+ table = object;
+ branch = null;
+ systemTable = null;
+ } else if (splits.length == 2) {
+ table = splits[0];
+ if (splits[1].startsWith(Catalog.SYSTEM_BRANCH_PREFIX)) {
+ branch =
splits[1].substring(Catalog.SYSTEM_BRANCH_PREFIX.length());
+ systemTable = null;
+ } else {
+ branch = null;
+ systemTable = splits[1];
+ }
+ } else if (splits.length == 3) {
+ Preconditions.checkArgument(
+ splits[1].startsWith(Catalog.SYSTEM_BRANCH_PREFIX),
+ "System table can only contain one '$' separator, but this
is: " + object);
+ table = splits[0];
+ branch =
splits[1].substring(Catalog.SYSTEM_BRANCH_PREFIX.length());
+ systemTable = splits[2];
+ } else {
+ throw new IllegalArgumentException("Invalid object name: " +
object);
+ }
}
public String getEscapedFullName() {
@@ -68,11 +147,11 @@ public class Identifier implements Serializable {
public String getEscapedFullName(char escapeChar) {
return String.format(
- "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar,
table, escapeChar);
+ "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar,
object, escapeChar);
}
- public static Identifier create(String db, String table) {
- return new Identifier(db, table);
+ public static Identifier create(String db, String object) {
+ return new Identifier(db, object);
}
public static Identifier fromString(String fullName) {
@@ -84,7 +163,7 @@ public class Identifier implements Serializable {
if (paths.length != 2) {
throw new IllegalArgumentException(
String.format(
- "Cannot get splits from '%s' to get database and
table", fullName));
+ "Cannot get splits from '%s' to get database and
object", fullName));
}
return new Identifier(paths[0], paths[1]);
@@ -99,17 +178,17 @@ public class Identifier implements Serializable {
return false;
}
Identifier that = (Identifier) o;
- return Objects.equals(database, that.database) &&
Objects.equals(table, that.table);
+ return Objects.equals(database, that.database) &&
Objects.equals(object, that.object);
}
@Override
public int hashCode() {
- return Objects.hash(database, table);
+ return Objects.hash(database, object);
}
@Override
public String toString() {
- return "Identifier{" + "database='" + database + '\'' + ", table='" +
table + '\'' + '}';
+ return String.format("Identifier{database='%s', object='%s'}",
database, object);
}
public static RowType schema() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 556c071f2..da08309ad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -214,7 +214,7 @@ public class JdbcCatalog extends AbstractCatalog {
JdbcUtils.DROP_TABLE_SQL,
catalogKey,
identifier.getDatabaseName(),
- identifier.getObjectName());
+ identifier.getTableName());
if (deletedRecords == 0) {
LOG.info("Skipping drop, table does not exist: {}",
identifier);
@@ -248,7 +248,7 @@ public class JdbcCatalog extends AbstractCatalog {
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
sql.setString(1, catalogKey);
sql.setString(2,
identifier.getDatabaseName());
- sql.setString(3,
identifier.getObjectName());
+ sql.setString(3,
identifier.getTableName());
return sql.executeUpdate();
}
});
@@ -277,7 +277,7 @@ public class JdbcCatalog extends AbstractCatalog {
updateTable(connections, catalogKey, fromTable, toTable);
Path fromPath = getDataTableLocation(fromTable);
- if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
+ if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
// Rename the file system's table directory. Maintain
consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
@@ -297,23 +297,18 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- protected void alterTableImpl(
- Identifier identifier, String branchName, List<SchemaChange>
changes)
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- assertMainBranch(branchName);
+ assertMainBranch(identifier);
SchemaManager schemaManager = getSchemaManager(identifier);
schemaManager.commitChanges(changes);
}
@Override
- protected TableSchema getDataTableSchema(Identifier identifier, String
branchName)
- throws TableNotExistException {
- assertMainBranch(branchName);
+ protected TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ assertMainBranch(identifier);
if (!JdbcUtils.tableExists(
- connections,
- catalogKey,
- identifier.getDatabaseName(),
- identifier.getObjectName())) {
+ connections, catalogKey, identifier.getDatabaseName(),
identifier.getTableName())) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 3c9db09aa..56773c821 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -193,7 +193,8 @@ public class SchemaManager implements Serializable {
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
- fromPath(branchPath(),
true)));
+ identifierFromPath(
+
tableRoot.toString(), true, branch)));
Map<String, String> newOptions = new
HashMap<>(oldTableSchema.options());
List<DataField> newFields = new
ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new
AtomicInteger(oldTableSchema.highestFieldId());
@@ -219,13 +220,14 @@ public class SchemaManager implements Serializable {
SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f ->
f.name().equals(addColumn.fieldName()))) {
throw new Catalog.ColumnAlreadyExistException(
- fromPath(branchPath(), true),
addColumn.fieldName());
+ identifierFromPath(tableRoot.toString(), true,
branch),
+ addColumn.fieldName());
}
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
"Column %s cannot specify NOT NULL in the %s
table.",
addColumn.fieldName(),
- fromPath(branchPath(), true).getFullName());
+ identifierFromPath(tableRoot.toString(), true,
branch).getFullName());
int id = highestFieldId.incrementAndGet();
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(),
highestFieldId);
@@ -256,7 +258,8 @@ public class SchemaManager implements Serializable {
validateNotPrimaryAndPartitionKey(oldTableSchema,
rename.fieldName());
if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
- fromPath(branchPath(), true),
rename.fieldName());
+ identifierFromPath(tableRoot.toString(), true,
branch),
+ rename.fieldName());
}
updateNestedColumn(
@@ -275,7 +278,8 @@ public class SchemaManager implements Serializable {
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn)
change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
- fromPath(branchPath(), true),
drop.fieldName());
+ identifierFromPath(tableRoot.toString(), true,
branch),
+ drop.fieldName());
}
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all
fields in table");
@@ -511,7 +515,8 @@ public class SchemaManager implements Serializable {
}
if (!found) {
throw new Catalog.ColumnNotExistException(
- fromPath(branchPath(), true),
Arrays.toString(updateFieldNames));
+ identifierFromPath(tableRoot.toString(), true, branch),
+ Arrays.toString(updateFieldNames));
}
}
@@ -587,13 +592,22 @@ public class SchemaManager implements Serializable {
}
}
- public static Identifier fromPath(String tablePath, boolean
ignoreIfUnknownDatabase) {
+ public static Identifier identifierFromPath(String tablePath, boolean
ignoreIfUnknownDatabase) {
+ return identifierFromPath(tablePath, ignoreIfUnknownDatabase, null);
+ }
+
+ public static Identifier identifierFromPath(
+ String tablePath, boolean ignoreIfUnknownDatabase, @Nullable
String branchName) {
+ if (DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ branchName = null;
+ }
+
String[] paths = tablePath.split("/");
if (paths.length < 2) {
if (!ignoreIfUnknownDatabase) {
throw new IllegalArgumentException(
String.format(
- "Path '%s' is not a legacy path, please use
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
+ "Path '%s' is not a valid path, please use
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
tablePath));
}
return new Identifier(UNKNOWN_DATABASE, paths[0]);
@@ -605,12 +619,13 @@ public class SchemaManager implements Serializable {
if (!ignoreIfUnknownDatabase) {
throw new IllegalArgumentException(
String.format(
- "Path '%s' is not a legacy path, please use
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
+ "Path '%s' is not a valid path, please use
catalog table path instead: 'warehouse_path/your_database.db/your_table'.",
tablePath));
}
- return new Identifier(UNKNOWN_DATABASE, paths[paths.length - 1]);
+ return new Identifier(UNKNOWN_DATABASE, paths[paths.length - 1],
branchName, null);
}
database = database.substring(0, index);
- return new Identifier(database, paths[paths.length - 1]);
+
+ return new Identifier(database, paths[paths.length - 1], branchName,
null);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d30fd7308..bad718a04 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -116,16 +116,22 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public String name() {
- Identifier identifier = catalogEnvironment.identifier();
- return identifier == null ? location().getName() :
identifier.getObjectName();
+ return identifier().getObjectName();
}
@Override
public String fullName() {
+ return identifier().getFullName();
+ }
+
+ public Identifier identifier() {
Identifier identifier = catalogEnvironment.identifier();
return identifier == null
- ? SchemaManager.fromPath(location().toUri().toString(),
true).getFullName()
- : identifier.getFullName();
+ ? SchemaManager.identifierFromPath(
+ location().toUri().toString(),
+ true,
+ options().get(CoreOptions.BRANCH.key()))
+ : identifier;
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 19c9e83c4..dd0d18050 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -239,7 +239,7 @@ public abstract class CatalogTestBase {
DEFAULT_TABLE_SCHEMA,
false))
.withMessage(
- "Cannot 'createTable' for system table
'Identifier{database='test_db', table='$system_table'}', please use data
table.");
+ "Cannot 'createTable' for system table
'Identifier{database='test_db', object='$system_table'}', please use data
table.");
// Create table throws DatabaseNotExistException when database does
not exist
assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
@@ -367,7 +367,7 @@ public abstract class CatalogTestBase {
catalog.dropTable(
Identifier.create("test_db",
"$system_table"), false))
.withMessage(
- "Cannot 'dropTable' for system table
'Identifier{database='test_db', table='$system_table'}', please use data
table.");
+ "Cannot 'dropTable' for system table
'Identifier{database='test_db', object='$system_table'}', please use data
table.");
// Drop table throws TableNotExistException when table does not exist
and ignoreIfNotExists
// is false
@@ -402,7 +402,7 @@ public abstract class CatalogTestBase {
toTable,
false))
.withMessage(
- "Cannot 'renameTable' for system table
'Identifier{database='test_db', table='$system_table'}', please use data
table.");
+ "Cannot 'renameTable' for system table
'Identifier{database='test_db', object='$system_table'}', please use data
table.");
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
@@ -412,7 +412,7 @@ public abstract class CatalogTestBase {
Identifier.create("test_db",
"$system_table"),
false))
.withMessage(
- "Cannot 'renameTable' for system table
'Identifier{database='test_db', table='$system_table'}', please use data
table.");
+ "Cannot 'renameTable' for system table
'Identifier{database='test_db', object='$system_table'}', please use data
table.");
// Rename table throws TableNotExistException when table does not exist
assertThatExceptionOfType(Catalog.TableNotExistException.class)
@@ -466,7 +466,7 @@ public abstract class CatalogTestBase {
SchemaChange.addColumn("col2",
DataTypes.DATE())),
false))
.withMessage(
- "Cannot 'alterTable' for system table
'Identifier{database='test_db', table='$system_table'}', please use data
table.");
+ "Cannot 'alterTable' for system table
'Identifier{database='test_db', object='$system_table'}', please use data
table.");
// Alter table throws TableNotExistException when table does not exist
assertThatExceptionOfType(Catalog.TableNotExistException.class)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 4db178eab..8771377f2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -76,7 +76,7 @@ public class SchemaEvolutionTest {
@BeforeEach
public void beforeEach() {
tablePath = new Path(tempDir.toUri());
- identifier = SchemaManager.fromPath(tablePath.toString(), true);
+ identifier = SchemaManager.identifierFromPath(tablePath.toString(),
true);
schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
commitUser = UUID.randomUUID().toString();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 80ca03d8c..d005dc4e5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -32,7 +32,7 @@ import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for table with branches using SQL. */
public class BranchSqlITCase extends CatalogITCaseBase {
@@ -378,13 +378,8 @@ public class BranchSqlITCase extends CatalogITCaseBase {
sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)");
sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
- try {
- sql("INSERT INTO t VALUES (1, 10, 'apple')");
- fail("Expecting exceptions");
- } catch (Exception e) {
- assertThat(e)
- .hasMessageContaining("Branch main and pk does not have
the same row type");
- }
+ assertThatThrownBy(() -> sql("INSERT INTO t VALUES (1, 10, 'apple')"))
+ .hasMessageContaining("Branch main and pk does not have the
same row type");
}
private List<String> collectResult(String sql) throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index cb39ae3c0..199f26ad1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -172,11 +172,10 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
assertThatThrownBy(() -> sql("CREATE TABLE T$snapshots (a INT, b
INT)"))
.hasRootCauseMessage(
"Cannot 'createTable' for system table "
- + "'Identifier{database='default',
table='T$snapshots'}', please use data table.");
+ + "'Identifier{database='default',
object='T$snapshots'}', please use data table.");
assertThatThrownBy(() -> sql("CREATE TABLE T$aa$bb (a INT, b INT)"))
.hasRootCauseMessage(
- "Cannot 'createTable' for system table "
- + "'Identifier{database='default',
table='T$aa$bb'}', please use data table.");
+ "System table can only contain one '$' separator, but
this is: T$aa$bb");
}
@Test
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index e9fc9f32c..667311ad4 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -185,7 +185,7 @@ public class HiveCatalog extends AbstractCatalog {
public Path getDataTableLocation(Identifier identifier) {
try {
String databaseName = identifier.getDatabaseName();
- String tableName = identifier.getObjectName();
+ String tableName = identifier.getTableName();
Optional<Path> tablePath =
clients.run(
client -> {
@@ -363,7 +363,7 @@ public class HiveCatalog extends AbstractCatalog {
client ->
client.getTable(
identifier.getDatabaseName(),
- identifier.getObjectName()));
+ identifier.getTableName()));
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
@@ -376,7 +376,11 @@ public class HiveCatalog extends AbstractCatalog {
"Interrupted in call to tableExists " +
identifier.getFullName(), e);
}
- return isPaimonTable(table);
+ return isPaimonTable(table)
+ && tableSchemaInFileSystem(
+ getDataTableLocation(identifier),
+ identifier.getBranchNameOrDefault())
+ .isPresent();
}
private static boolean isPaimonTable(Table table) {
@@ -387,19 +391,15 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getDataTableSchema(Identifier identifier, String
branchName)
- throws TableNotExistException {
- assertMainBranch(branchName);
- return getDataTableSchema(identifier);
- }
+ public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ assertMainBranch(identifier);
- private TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
- Path tableLocation = getDataTableLocation(identifier);
- return tableSchemaInFileSystem(tableLocation)
+ return tableSchemaInFileSystem(
+ getDataTableLocation(identifier),
identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
}
@@ -418,7 +418,7 @@ public class HiveCatalog extends AbstractCatalog {
client ->
client.dropTable(
identifier.getDatabaseName(),
- identifier.getObjectName(),
+ identifier.getTableName(),
true,
false,
true));
@@ -490,10 +490,10 @@ public class HiveCatalog extends AbstractCatalog {
protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
String fromDB = fromTable.getDatabaseName();
- String fromTableName = fromTable.getObjectName();
+ String fromTableName = fromTable.getTableName();
Table table = clients.run(client -> client.getTable(fromDB,
fromTableName));
table.setDbName(toTable.getDatabaseName());
- table.setTableName(toTable.getObjectName());
+ table.setTableName(toTable.getTableName());
clients.execute(client -> client.alter_table(fromDB,
fromTableName, table));
Path fromPath = getDataTableLocation(fromTable);
@@ -516,7 +516,7 @@ public class HiveCatalog extends AbstractCatalog {
clients.execute(
client ->
client.alter_table(
- toTable.getDatabaseName(),
toTable.getObjectName(), table));
+ toTable.getDatabaseName(),
toTable.getTableName(), table));
}
} catch (TException e) {
throw new RuntimeException("Failed to rename table " +
fromTable.getFullName(), e);
@@ -527,10 +527,9 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- protected void alterTableImpl(
- Identifier identifier, String branchName, List<SchemaChange>
changes)
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- assertMainBranch(branchName);
+ assertMainBranch(identifier);
final SchemaManager schemaManager = schemaManager(identifier);
// first commit changes to underlying files
@@ -542,7 +541,7 @@ public class HiveCatalog extends AbstractCatalog {
client ->
client.getTable(
identifier.getDatabaseName(),
- identifier.getObjectName()));
+ identifier.getTableName()));
alterTableToHms(table, identifier, schema);
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
@@ -558,7 +557,7 @@ public class HiveCatalog extends AbstractCatalog {
client ->
client.alter_table(
identifier.getDatabaseName(),
- identifier.getObjectName(),
+ identifier.getTableName(),
table,
true));
}
@@ -615,7 +614,9 @@ public class HiveCatalog extends AbstractCatalog {
validateIdentifierNameCaseInsensitive(identifier);
TableSchema tableSchema =
- tableSchemaInFileSystem(getDataTableLocation(identifier))
+ tableSchemaInFileSystem(
+ getDataTableLocation(identifier),
+ identifier.getBranchNameOrDefault())
.orElseThrow(() -> new
TableNotExistException(identifier));
Table newTable = createHiveTable(identifier, tableSchema);
try {
@@ -625,7 +626,7 @@ public class HiveCatalog extends AbstractCatalog {
client ->
client.getTable(
identifier.getDatabaseName(),
- identifier.getObjectName()));
+ identifier.getTableName()));
checkArgument(
isPaimonTable(table),
"Table %s is not a paimon table in hive metastore.",
@@ -673,7 +674,7 @@ public class HiveCatalog extends AbstractCatalog {
TableType.class);
Table table =
new Table(
- identifier.getObjectName(),
+ identifier.getTableName(),
identifier.getDatabaseName(),
// current linux user
System.getProperty("user.name"),