This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9900c36e1 [core] Support using table branches with $branch_ and
support different schemas for different branches (#3757)
9900c36e1 is described below
commit 9900c36e1cd50e5006a0170032cd80addad48bd2
Author: tsreaper <[email protected]>
AuthorDate: Tue Jul 16 14:12:51 2024 +0800
[core] Support using table branches with $branch_ and support different
schemas for different branches (#3757)
---
docs/content/maintenance/manage-branches.md | 4 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 64 ++++++++++++++--
.../java/org/apache/paimon/catalog/Catalog.java | 1 +
.../apache/paimon/catalog/FileSystemCatalog.java | 44 ++++++-----
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 16 +++-
.../org/apache/paimon/schema/SchemaManager.java | 22 +++---
.../paimon/table/AbstractFileStoreTable.java | 17 ++++-
.../paimon/table/AppendOnlyFileStoreTable.java | 5 --
.../paimon/table/PrimaryKeyFileStoreTable.java | 5 --
.../org/apache/paimon/flink/BranchSqlITCase.java | 87 ++++++++++++++++++++++
.../java/org/apache/paimon/hive/HiveCatalog.java | 22 +++++-
11 files changed, 239 insertions(+), 48 deletions(-)
diff --git a/docs/content/maintenance/manage-branches.md
b/docs/content/maintenance/manage-branches.md
index ccd2402a1..0cb52b6f2 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -117,10 +117,10 @@ You can read or write with branch as below.
```sql
-- read from branch 'branch1'
-SELECT * FROM t /*+ OPTIONS('branch' = 'branch1') */;
+SELECT * FROM `t$branch_branch1`;
-- write to branch 'branch1'
-INSERT INTO t /*+ OPTIONS('branch' = 'branch1') */ SELECT ...
+INSERT INTO `t$branch_branch1` SELECT ...
```
{{< /tab >}}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 0c6d19e28..45a900a5c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
@@ -57,6 +58,7 @@ import static
org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Common implementation of {@link Catalog}. */
@@ -218,7 +220,9 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
+ checkNotBranch(identifier, "dropTable");
checkNotSystemTable(identifier, "dropTable");
+
if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
return;
@@ -234,6 +238,7 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
+ checkNotBranch(identifier, "createTable");
checkNotSystemTable(identifier, "createTable");
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
@@ -260,6 +265,8 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean
ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
+ checkNotBranch(fromTable, "renameTable");
+ checkNotBranch(toTable, "renameTable");
checkNotSystemTable(fromTable, "renameTable");
checkNotSystemTable(toTable, "renameTable");
validateIdentifierNameCaseInsensitive(toTable);
@@ -288,6 +295,14 @@ public abstract class AbstractCatalog implements Catalog {
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitiveInSchemaChange(changes);
+ Optional<Pair<Identifier, String>> optionalBranchName =
+ getOriginalIdentifierAndBranch(identifier);
+ String branchName = DEFAULT_MAIN_BRANCH;
+ if (optionalBranchName.isPresent()) {
+ identifier = optionalBranchName.get().getLeft();
+ branchName = optionalBranchName.get().getRight();
+ }
+
if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
return;
@@ -295,10 +310,11 @@ public abstract class AbstractCatalog implements Catalog {
throw new TableNotExistException(identifier);
}
- alterTableImpl(identifier, changes);
+ alterTableImpl(identifier, branchName, changes);
}
- protected abstract void alterTableImpl(Identifier identifier,
List<SchemaChange> changes)
+ protected abstract void alterTableImpl(
+ Identifier identifier, String branchName, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
@Nullable
@@ -344,7 +360,15 @@ public abstract class AbstractCatalog implements Catalog {
}
private FileStoreTable getDataTable(Identifier identifier) throws
TableNotExistException {
- TableSchema tableSchema = getDataTableSchema(identifier);
+ Optional<Pair<Identifier, String>> optionalBranchName =
+ getOriginalIdentifierAndBranch(identifier);
+ String branch = DEFAULT_MAIN_BRANCH;
+ if (optionalBranchName.isPresent()) {
+ identifier = optionalBranchName.get().getLeft();
+ branch = optionalBranchName.get().getRight();
+ }
+
+ TableSchema tableSchema = getDataTableSchema(identifier, branch);
return FileStoreTableFactory.create(
fileIO,
getDataTableLocation(identifier),
@@ -384,7 +408,7 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- protected abstract TableSchema getDataTableSchema(Identifier identifier)
+ protected abstract TableSchema getDataTableSchema(Identifier identifier,
String branchName)
throws TableNotExistException;
@VisibleForTesting
@@ -392,8 +416,38 @@ public abstract class AbstractCatalog implements Catalog {
return new Path(newDatabasePath(identifier.getDatabaseName()),
identifier.getObjectName());
}
+ private static Optional<Pair<Identifier, String>>
getOriginalIdentifierAndBranch(
+ Identifier identifier) {
+ String tableName = identifier.getObjectName();
+ if (tableName.contains(BRANCH_PREFIX)) {
+ int idx = tableName.indexOf(BRANCH_PREFIX);
+ String branchName = tableName.substring(idx +
BRANCH_PREFIX.length());
+ if (StringUtils.isNullOrWhitespaceOnly(branchName)) {
+ return Optional.empty();
+ } else {
+ return Optional.of(
+ Pair.of(
+ Identifier.create(
+ identifier.getDatabaseName(),
tableName.substring(0, idx)),
+ branchName));
+ }
+ }
+ return Optional.empty();
+ }
+
+ protected void checkNotBranch(Identifier identifier, String method) {
+ if (getOriginalIdentifierAndBranch(identifier).isPresent()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot '%s' for branch table '%s', "
+ + "please modify the table with the
default branch.",
+ method, identifier));
+ }
+ }
+
private static boolean isSpecifiedSystemTable(Identifier identifier) {
- return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER);
+ return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
+ && !getOriginalIdentifierAndBranch(identifier).isPresent();
}
protected boolean isSystemTable(Identifier identifier) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 9057b1c3d..b6221dac7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -43,6 +43,7 @@ public interface Catalog extends AutoCloseable {
String DEFAULT_DATABASE = "default";
+ String BRANCH_PREFIX = "$branch_";
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String COMMENT_PROP = "comment";
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index f0e5572e3..577e06836 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
@@ -36,6 +37,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import static
org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {
@@ -103,9 +105,20 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
- return schemaManager(identifier)
+ public TableSchema getDataTableSchema(Identifier identifier, String
branchName)
+ throws TableNotExistException {
+ return schemaManager(identifier, branchName)
.latest()
+ .map(
+ s -> {
+ if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ Options branchOptions = new
Options(s.options());
+ branchOptions.set(CoreOptions.BRANCH,
branchName);
+ return s.copy(branchOptions.toMap());
+ } else {
+ return s;
+ }
+ })
.orElseThrow(() -> new TableNotExistException(identifier));
}
@@ -117,26 +130,22 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
public void createTableImpl(Identifier identifier, Schema schema) {
- uncheck(() -> schemaManager(identifier).createTable(schema));
+ uncheck(() -> schemaManager(identifier,
DEFAULT_MAIN_BRANCH).createTable(schema));
}
- private SchemaManager schemaManager(Identifier identifier) {
+ private SchemaManager schemaManager(Identifier identifier, String
branchName) {
Path path = getDataTableLocation(identifier);
CatalogLock catalogLock =
- lockFactory()
- .map(
- fac ->
- fac.createLock(
- lockContext()
- .orElseThrow(
- () ->
- new
RuntimeException(
-
"No lock context when lock is enabled."))))
- .orElse(null);
- return new SchemaManager(fileIO, path)
+ lockFactory().map(fac ->
fac.createLock(assertGetLockContext())).orElse(null);
+ return new SchemaManager(fileIO, path, branchName)
.withLock(catalogLock == null ? null :
Lock.fromCatalog(catalogLock, identifier));
}
+ private CatalogLockContext assertGetLockContext() {
+ return lockContext()
+ .orElseThrow(() -> new RuntimeException("No lock context when
lock is enabled."));
+ }
+
@Override
public void renameTableImpl(Identifier fromTable, Identifier toTable) {
Path fromPath = getDataTableLocation(fromTable);
@@ -145,9 +154,10 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
+ protected void alterTableImpl(
+ Identifier identifier, String branchName, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- schemaManager(identifier).commitChanges(changes);
+ schemaManager(identifier, branchName).commitChanges(changes);
}
protected static <T> T uncheck(Callable<T> callable) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 45600715b..dd479dfd8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -57,6 +57,7 @@ import static
org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -308,8 +309,10 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
+ protected void alterTableImpl(
+ Identifier identifier, String branchName, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ assertMainBranch(branchName);
if (!tableExists(identifier)) {
throw new RuntimeException("Table is not exists " +
identifier.getFullName());
}
@@ -318,7 +321,9 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- protected TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ protected TableSchema getDataTableSchema(Identifier identifier, String
branchName)
+ throws TableNotExistException {
+ assertMainBranch(branchName);
if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
@@ -329,6 +334,13 @@ public class JdbcCatalog extends AbstractCatalog {
() -> new RuntimeException("There is no paimon table
in " + tableLocation));
}
+ private void assertMainBranch(String branchName) {
+ if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ throw new UnsupportedOperationException(
+ "JdbcCatalog currently does not support table branches");
+ }
+ }
+
@Override
public boolean tableExists(Identifier identifier) {
if (isSystemTable(identifier)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 366f285fc..4f70ac725 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -42,6 +42,7 @@ import org.apache.paimon.types.DataTypeCasts;
import org.apache.paimon.types.DataTypeVisitor;
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -67,7 +68,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
-import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -189,7 +189,7 @@ public class SchemaManager implements Serializable {
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
-
fromPath(tableRoot.toString(), true)));
+ fromPath(branchPath(),
true)));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new
AtomicInteger(schema.highestFieldId());
@@ -211,13 +211,13 @@ public class SchemaManager implements Serializable {
SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f ->
f.name().equals(addColumn.fieldName()))) {
throw new Catalog.ColumnAlreadyExistException(
- fromPath(tableRoot.toString(), true),
addColumn.fieldName());
+ fromPath(branchPath(), true),
addColumn.fieldName());
}
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
"Column %s cannot specify NOT NULL in the %s
table.",
addColumn.fieldName(),
- fromPath(tableRoot.toString(),
true).getFullName());
+ fromPath(branchPath(), true).getFullName());
int id = highestFieldId.incrementAndGet();
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(),
highestFieldId);
@@ -248,7 +248,7 @@ public class SchemaManager implements Serializable {
validateNotPrimaryAndPartitionKey(schema,
rename.fieldName());
if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
- fromPath(tableRoot.toString(), true),
rename.fieldName());
+ fromPath(branchPath(), true),
rename.fieldName());
}
updateNestedColumn(
@@ -267,7 +267,7 @@ public class SchemaManager implements Serializable {
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn)
change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
- fromPath(tableRoot.toString(), true),
drop.fieldName());
+ fromPath(branchPath(), true),
drop.fieldName());
}
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all
fields in table");
@@ -494,7 +494,7 @@ public class SchemaManager implements Serializable {
}
if (!found) {
throw new Catalog.ColumnNotExistException(
- fromPath(tableRoot.toString(), true),
Arrays.toString(updateFieldNames));
+ fromPath(branchPath(), true),
Arrays.toString(updateFieldNames));
}
}
@@ -535,13 +535,17 @@ public class SchemaManager implements Serializable {
}
}
+ private String branchPath() {
+ return BranchManager.branchPath(tableRoot, branch);
+ }
+
public Path schemaDirectory() {
- return new Path(branchPath(tableRoot, branch) + "/schema");
+ return new Path(branchPath() + "/schema");
}
@VisibleForTesting
public Path toSchemaPath(long schemaId) {
- return new Path(branchPath(tableRoot, branch) + "/schema/" +
SCHEMA_PREFIX + schemaId);
+ return new Path(branchPath() + "/schema/" + SCHEMA_PREFIX + schemaId);
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index a73f6150a..86672dda8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -256,7 +256,15 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
CoreOptions.setDefaultValues(newOptions);
// copy a new table schema to contain dynamic options
- TableSchema newTableSchema = tableSchema.copy(newOptions.toMap());
+ TableSchema newTableSchema = tableSchema;
+ if (newOptions.contains(CoreOptions.BRANCH)) {
+ newTableSchema =
+ schemaManager()
+ .copyWithBranch(new
CoreOptions(newOptions).branch())
+ .latest()
+ .get();
+ }
+ newTableSchema = newTableSchema.copy(newOptions.toMap());
if (tryTimeTravel) {
// see if merged options contain time travel option
@@ -285,6 +293,13 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
}
+ @Override
+ public FileStoreTable copy(TableSchema newTableSchema) {
+ return newTableSchema.primaryKeys().isEmpty()
+ ? new AppendOnlyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment)
+ : new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
+ }
+
protected SchemaManager schemaManager() {
return new SchemaManager(fileIO(), path,
CoreOptions.branch(options()));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 0c389e408..40eeb4d28 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -64,11 +64,6 @@ class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
super(fileIO, path, tableSchema, catalogEnvironment);
}
- @Override
- public FileStoreTable copy(TableSchema newTableSchema) {
- return new AppendOnlyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
- }
-
@Override
public AppendOnlyFileStore store() {
if (lazyStore == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 0393b0d6e..6ac2763ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -66,11 +66,6 @@ class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
super(fileIO, path, tableSchema, catalogEnvironment);
}
- @Override
- public FileStoreTable copy(TableSchema newTableSchema) {
- return new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
- }
-
@Override
public KeyValueFileStore store() {
if (lazyStore == null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
new file mode 100644
index 000000000..be0a2e805
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for table with branches using SQL. */
+public class BranchSqlITCase extends AbstractTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testAlterTable() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' =
'" + tempDir + "' )");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k)
NOT ENFORCED ) "
+ + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");
+
+ tEnv.executeSql(
+ "INSERT INTO t VALUES (1, 10, 'apple'), (1, 20,
'banana'), (2, 10, 'cat'), (2, 20, 'dog')")
+ .await();
+ tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)");
+ tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20,
'DOG'), (2, 30, 'horse')")
+ .await();
+
+ tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await();
+ tEnv.executeSql(
+ "INSERT INTO `t$branch_test` VALUES "
+ + "(1, 10, 'cherry', 100), (2, 20, 'bird',
200), (2, 40, 'wolf', 400)")
+ .await();
+
+ assertThat(collectResult(tEnv, "SELECT * FROM t"))
+ .containsExactlyInAnyOrder(
+ "+I[1, 10, APPLE]",
+ "+I[1, 20, banana]",
+ "+I[2, 30, horse]",
+ "+I[2, 10, cat]",
+ "+I[2, 20, DOG]");
+ assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test"))
+ .containsExactlyInAnyOrder(
+ "+I[1, 10, cherry, 100]",
+ "+I[1, 20, banana, null]",
+ "+I[2, 10, cat, null]",
+ "+I[2, 20, bird, 200]",
+ "+I[2, 40, wolf, 400]");
+ }
+
+ private List<String> collectResult(TableEnvironment tEnv, String sql)
throws Exception {
+ List<String> result = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ result.add(it.next().toString());
+ }
+ }
+ return result;
+ }
+}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index c7c947ce5..32f572bb8 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -93,6 +93,7 @@ import static
org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -402,10 +403,24 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ public TableSchema getDataTableSchema(Identifier identifier, String
branchName)
+ throws TableNotExistException {
+ assertMainBranch(branchName);
+ return getDataTableSchema(identifier);
+ }
+
+ private void assertMainBranch(String branchName) {
+ if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ throw new UnsupportedOperationException(
+ "HiveCatalog currently does not support table branches");
+ }
+ }
+
+ private TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
+
Path tableLocation = getDataTableLocation(identifier);
return tableSchemaInFileSystem(tableLocation)
.orElseThrow(() -> new TableNotExistException(identifier));
@@ -535,8 +550,10 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
+ protected void alterTableImpl(
+ Identifier identifier, String branchName, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ assertMainBranch(branchName);
final SchemaManager schemaManager = schemaManager(identifier);
// first commit changes to underlying files
@@ -616,6 +633,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void repairTable(Identifier identifier) throws
TableNotExistException {
+ checkNotBranch(identifier, "repairTable");
checkNotSystemTable(identifier, "repairTable");
validateIdentifierNameCaseInsensitive(identifier);