This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e185b49e505 branch-4.1: [feature](paimon) implement create/drop db,
create/drop table for paimon (#58894) (#61338)
e185b49e505 is described below
commit e185b49e505082154977a5dffd2cb6735b93ef67
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Mar 15 20:39:44 2026 -0700
branch-4.1: [feature](paimon) implement create/drop db, create/drop table
for paimon (#58894) (#61338)
Cherry-pick from #58894
---------
Co-authored-by: yaoxiao <[email protected]>
Co-authored-by: yaoxiao <[email protected]>
---
.../java/org/apache/doris/catalog/TableIf.java | 2 +
.../operations/ExternalMetadataOperations.java | 6 +
.../paimon/DorisToPaimonTypeVisitor.java | 109 ++++++
.../datasource/paimon/PaimonExternalCatalog.java | 40 +-
.../doris/datasource/paimon/PaimonMetadataOps.java | 405 +++++++++++++++++++++
.../plans/commands/info/ColumnDefinition.java | 12 +
.../trees/plans/commands/info/CreateTableInfo.java | 21 +-
.../datasource/paimon/PaimonMetadataOpsTest.java | 256 +++++++++++++
.../paimon/test_paimon_table.groovy | 122 +++++++
9 files changed, 935 insertions(+), 38 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 2a17198a288..8b6f62ae76a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -485,6 +485,8 @@ public interface TableIf {
case ICEBERG:
case ICEBERG_EXTERNAL_TABLE:
return "iceberg";
+ case PAIMON_EXTERNAL_TABLE:
+ return "paimon";
case DICTIONARY:
return "dictionary";
case DORIS_EXTERNAL_TABLE:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
index 7d63b18cd13..513b3379177 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
@@ -21,6 +21,7 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetadataOps;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
+import org.apache.doris.datasource.paimon.PaimonMetadataOps;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.catalog.Catalog;
@@ -35,4 +36,9 @@ public class ExternalMetadataOperations {
public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog
dorisCatalog, Catalog catalog) {
return new IcebergMetadataOps(dorisCatalog, catalog);
}
+
+ public static PaimonMetadataOps newPaimonMetaOps(ExternalCatalog
dorisCatalog,
+
org.apache.paimon.catalog.Catalog catalog) {
+ return new PaimonMetadataOps(dorisCatalog, catalog);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java
new file mode 100644
index 00000000000..aad8106563b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructField;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.DorisTypeVisitor;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VariantType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DorisToPaimonTypeVisitor extends DorisTypeVisitor<DataType> {
+
+ @Override
+ public DataType struct(StructType struct, List<DataType> fieldResults) {
+ List<StructField> fields = struct.getFields();
+ List<DataField> newFields = new ArrayList<>(fields.size());
+ AtomicInteger atomicInteger = new AtomicInteger(-1);
+ for (int i = 0; i < fields.size(); i++) {
+ StructField field = fields.get(i);
+ DataType fieldType =
fieldResults.get(i).copy(field.getContainsNull());
+ String comment = field.getComment();
+ DataField dataField = new
DataField(atomicInteger.incrementAndGet(), field.getName(), fieldType, comment);
+ newFields.add(dataField);
+ }
+ return new RowType(newFields);
+ }
+
+ @Override
+ public DataType field(StructField field, DataType typeResult) {
+ return typeResult;
+ }
+
+ @Override
+ public DataType array(ArrayType array, DataType elementResult) {
+ return new
org.apache.paimon.types.ArrayType(elementResult.copy(array.getContainsNull()));
+ }
+
+ @Override
+ public DataType map(MapType map, DataType keyResult, DataType valueResult)
{
+ return new org.apache.paimon.types.MapType(keyResult.copy(false),
+ valueResult.copy(map.getIsValueContainsNull()));
+ }
+
+ @Override
+ public DataType atomic(Type atomic) {
+ PrimitiveType primitiveType = atomic.getPrimitiveType();
+ if (primitiveType.equals(PrimitiveType.BOOLEAN)) {
+ return new BooleanType();
+ } else if (primitiveType.equals(PrimitiveType.INT)) {
+ return new IntType();
+ } else if (primitiveType.equals(PrimitiveType.BIGINT)) {
+ return new BigIntType();
+ } else if (primitiveType.equals(PrimitiveType.FLOAT)) {
+ return new FloatType();
+ } else if (primitiveType.equals(PrimitiveType.DOUBLE)) {
+ return new DoubleType();
+ } else if (primitiveType.isCharFamily()) {
+ return new VarCharType(VarCharType.MAX_LENGTH);
+ } else if (primitiveType.equals(PrimitiveType.DATE) ||
primitiveType.equals(PrimitiveType.DATEV2)) {
+ return new DateType();
+ } else if (primitiveType.equals(PrimitiveType.DECIMALV2) ||
primitiveType.isDecimalV3Type()) {
+ return new DecimalType(((ScalarType) atomic).getScalarPrecision(),
((ScalarType) atomic).getScalarScale());
+ } else if (primitiveType.equals(PrimitiveType.DATETIME) ||
primitiveType.equals(PrimitiveType.DATETIMEV2)) {
+ return new TimestampType();
+ } else if (primitiveType.isVarbinaryType()) {
+ return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+ } else if (primitiveType.isVariantType()) {
+ return new VariantType();
+ }
+ throw new UnsupportedOperationException("Not a supported type: " +
primitiveType);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index b6a06fd4670..f15309ea0e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -24,13 +24,13 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.metacache.CacheSpec;
+import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Catalog.TableNotExistException;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.partition.Partition;
@@ -67,6 +67,7 @@ public class PaimonExternalCatalog extends ExternalCatalog {
catalogType = paimonProperties.getPaimonCatalogType();
catalog = createCatalog();
initPreExecutionAuthenticator();
+ metadataOps = ExternalMetadataOperations.newPaimonMetaOps(this,
catalog);
}
@Override
@@ -81,49 +82,16 @@ public class PaimonExternalCatalog extends ExternalCatalog {
return catalogType;
}
- protected List<String> listDatabaseNames() {
- try {
- return executionAuthenticator.execute(() -> new
ArrayList<>(catalog.listDatabases()));
- } catch (Exception e) {
- throw new RuntimeException("Failed to list databases names,
catalog name: " + getName(), e);
- }
- }
-
@Override
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
- try {
- return executionAuthenticator.execute(() -> {
- try {
- catalog.getTable(Identifier.create(dbName, tblName));
- return true;
- } catch (TableNotExistException e) {
- return false;
- }
- });
-
- } catch (Exception e) {
- throw new RuntimeException("Failed to check table existence,
catalog name: " + getName()
- + "error message is:" +
ExceptionUtils.getRootCauseMessage(e), e);
- }
+ return metadataOps.tableExist(dbName, tblName);
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
- try {
- return executionAuthenticator.execute(() -> {
- List<String> tableNames = null;
- try {
- tableNames = catalog.listTables(dbName);
- } catch (Catalog.DatabaseNotExistException e) {
- LOG.warn("DatabaseNotExistException", e);
- }
- return tableNames;
- });
- } catch (Exception e) {
- throw new RuntimeException("Failed to list table names, catalog
name: " + getName(), e);
- }
+ return metadataOps.listTableNames(dbName);
}
public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java
new file mode 100644
index 00000000000..e6c8177edca
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.catalog.StructField;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.DorisTypeVisitor;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.operations.ExternalMetadataOps;
+import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException;
+import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
+import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class PaimonMetadataOps implements ExternalMetadataOps {
+
+ private static final Logger LOG =
LogManager.getLogger(PaimonMetadataOps.class);
+ protected Catalog catalog;
+ protected ExternalCatalog dorisCatalog;
+ private ExecutionAuthenticator executionAuthenticator;
+ private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
+ private static final String PROP_COMMENT = "comment";
+ private static final String PROP_LOCATION = "location";
+
+ public PaimonMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
+ this.dorisCatalog = dorisCatalog;
+ this.catalog = catalog;
+ this.executionAuthenticator = dorisCatalog.getExecutionAuthenticator();
+ }
+
+
+ @Override
+ public boolean createDbImpl(String dbName, boolean ifNotExists,
Map<String, String> properties)
+ throws DdlException {
+ try {
+ return executionAuthenticator.execute(() ->
performCreateDb(dbName, ifNotExists, properties));
+ } catch (Exception e) {
+ throw new DdlException("Failed to create database: "
+ + dbName + ": " + Util.getRootCauseMessage(e), e);
+ }
+ }
+
+ private boolean performCreateDb(String dbName, boolean ifNotExists,
Map<String, String> properties)
+ throws DdlException, Catalog.DatabaseAlreadyExistException {
+ if (databaseExist(dbName)) {
+ if (ifNotExists) {
+ LOG.info("create database[{}] which already exists", dbName);
+ return true;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS,
dbName);
+ }
+ }
+
+ if (!properties.isEmpty() && dorisCatalog instanceof
PaimonExternalCatalog) {
+ String catalogType = ((PaimonExternalCatalog)
dorisCatalog).getCatalogType();
+ if (!PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)) {
+ throw new DdlException(
+ "Not supported: create database with properties for paimon
catalog type: " + catalogType);
+ }
+ }
+
+ catalog.createDatabase(dbName, ifNotExists, properties);
+ return false;
+ }
+
+ @Override
+ public void afterCreateDb() {
+ dorisCatalog.resetMetaCacheNames();
+ }
+
+ @Override
+ public void dropDbImpl(String dbName, boolean ifExists, boolean force)
throws DdlException {
+ try {
+ executionAuthenticator.execute(() -> {
+ performDropDb(dbName, ifExists, force);
+ return null;
+ });
+ } catch (Exception e) {
+ throw new DdlException(
+ "Failed to drop database: " + dbName + ", error message is:" +
e.getMessage(), e);
+ }
+ }
+
+ private void performDropDb(String dbName, boolean ifExists, boolean force)
throws DdlException {
+ ExternalDatabase dorisDb = dorisCatalog.getDbNullable(dbName);
+ if (dorisDb == null) {
+ if (ifExists) {
+ LOG.info("drop database[{}] which does not exist", dbName);
+ // Database does not exist and IF EXISTS is specified; treat
as no-op.
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS,
dbName);
+ // ErrorReport.reportDdlException is expected to throw
DdlException.
+ return;
+ }
+ }
+
+ if (force) {
+ List<String> tableNames = listTableNames(dbName);
+ if (!tableNames.isEmpty()) {
+ LOG.info("drop database[{}] with force, drop all tables, num:
{}", dbName, tableNames.size());
+ }
+ for (String tableName : tableNames) {
+ performDropTable(dbName, tableName, true);
+ }
+ }
+
+ try {
+ catalog.dropDatabase(dbName, ifExists, force);
+ } catch (DatabaseNotExistException e) {
+ throw new RuntimeException("database " + dbName + " does not
exist!");
+ } catch (DatabaseNotEmptyException e) {
+ throw new RuntimeException("database " + dbName + " is not empty!
please check!");
+ }
+ }
+
+ @Override
+ public void afterDropDb(String dbName) {
+ dorisCatalog.unregisterDatabase(dbName);
+ }
+
+ @Override
+ public boolean createTableImpl(CreateTableInfo createTableInfo) throws
UserException {
+ try {
+ return executionAuthenticator.execute(() ->
performCreateTable(createTableInfo));
+ } catch (Exception e) {
+ throw new DdlException(
+ "Failed to create table: " + createTableInfo.getTableName() +
", error message is:" + e.getMessage(),
+ e);
+ }
+ }
+
+ public boolean performCreateTable(CreateTableInfo createTableInfo) throws
UserException {
+ String dbName = createTableInfo.getDbName();
+ ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new UserException("Failed to get database: '" + dbName + "'
in catalog: " + dorisCatalog.getName());
+ }
+ String tableName = createTableInfo.getTableName();
+ // 1. first, check if table exist in remote
+ if (tableExist(db.getRemoteName(), tableName)) {
+ if (createTableInfo.isIfNotExists()) {
+ LOG.info("create table[{}] which already exists", tableName);
+ return true;
+ } else {
+
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ }
+
+ // 2. second, check if table exist in local.
+ // This is because case sensibility issue, eg:
+ // 1. lower_case_table_name = 1
+ // 2. create table tbl1;
+ // 3. create table TBL1; TBL1 does not exist in remote because the
remote system is case-sensitive.
+ // but because lower_case_table_name = 1, the table can not be
created in Doris because it is conflict with
+ // tbl1
+ ExternalTable dorisTable = db.getTableNullable(tableName);
+ if (dorisTable != null) {
+ if (createTableInfo.isIfNotExists()) {
+ LOG.info("create table[{}] which already exists", tableName);
+ return true;
+ } else {
+
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+ }
+ }
+ List<ColumnDefinition> columns =
createTableInfo.getColumnDefinitions();
+ List<StructField> collect = columns.stream()
+ .map(col -> new StructField(col.getName(),
col.getType().toCatalogDataType(),
+ col.getComment(), col.isNullable()))
+ .collect(Collectors.toList());
+ StructType structType = new StructType(new ArrayList<>(collect));
+ Schema schema = toPaimonSchema(structType,
createTableInfo.getPartitionDesc(), createTableInfo.getProperties());
+ try {
+ catalog.createTable(new Identifier(createTableInfo.getDbName(),
createTableInfo.getTableName()),
+ schema, createTableInfo.isIfNotExists());
+ } catch (TableAlreadyExistException | DatabaseNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+
+ private Schema toPaimonSchema(StructType structType, PartitionDesc
partitionDesc, Map<String, String> properties) {
+ Map<String, String> normalizedProperties = new HashMap<>(properties);
+ normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
+ normalizedProperties.remove(PROP_COMMENT);
+ if (normalizedProperties.containsKey(PROP_LOCATION)) {
+ String path = normalizedProperties.remove(PROP_LOCATION);
+ normalizedProperties.put(CoreOptions.PATH.key(), path);
+ }
+
+ String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
+ List<String> primaryKeys = pkAsString == null ?
Collections.emptyList() : Arrays.stream(pkAsString.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ List<String> partitionKeys = partitionDesc == null ? new ArrayList<>()
: partitionDesc.getPartitionColNames();
+ Schema.Builder schemaBuilder = Schema.newBuilder()
+ .options(normalizedProperties)
+ .primaryKey(primaryKeys)
+ .partitionKeys(partitionKeys)
+ .comment(properties.getOrDefault(PROP_COMMENT, null));
+ for (StructField field : structType.getFields()) {
+ schemaBuilder.column(field.getName(),
+
toPaimontype(field.getType()).copy(field.getContainsNull()),
+ field.getComment());
+ }
+ return schemaBuilder.build();
+ }
+
+ private DataType toPaimontype(Type type) {
+ return DorisTypeVisitor.visit(type, new DorisToPaimonTypeVisitor());
+ }
+
+ @Override
+ public void afterCreateTable(String dbName, String tblName) {
+ Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
+ if (db.isPresent()) {
+ db.get().resetMetaCacheNames();
+ }
+ LOG.info("after create table {}.{}.{}, is db exists: {}",
+ dorisCatalog.getName(), dbName, tblName, db.isPresent());
+ }
+
+ @Override
+ public void dropTableImpl(ExternalTable dorisTable, boolean ifExists)
throws DdlException {
+ try {
+ executionAuthenticator.execute(() -> {
+ performDropTable(dorisTable.getRemoteDbName(),
dorisTable.getRemoteName(), ifExists);
+ return null;
+ });
+ } catch (Exception e) {
+ throw new DdlException(
+ "Failed to drop table: " + dorisTable.getName() + ", error
message is:" + e.getMessage(), e);
+ }
+ }
+
+ private void performDropTable(String dBName, String tableName, boolean
ifExists) throws DdlException {
+ if (!tableExist(dBName, tableName)) {
+ if (ifExists) {
+ LOG.info("drop table[{}] which does not exist", tableName);
+ return;
+ } else {
+ ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE,
tableName, dBName);
+ }
+ }
+ try {
+ catalog.dropTable(Identifier.create(dBName, tableName), ifExists);
+ } catch (TableNotExistException e) {
+ throw new RuntimeException("table " + tableName + " does not
exist");
+ }
+ }
+
+ @Override
+ public void afterDropTable(String dbName, String tblName) {
+ Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
+ db.ifPresent(externalDatabase ->
externalDatabase.unregisterTable(tblName));
+ LOG.info("after drop table {}.{}.{}. is db exists: {}",
+ dorisCatalog.getName(), dbName, tblName, db.isPresent());
+ }
+
+ @Override
+ public void truncateTableImpl(ExternalTable dorisTable, List<String>
partitions) throws DdlException {
+ throw new UnsupportedOperationException("truncate table is not a
supported operation!");
+ }
+
+ @Override
+ public void createOrReplaceBranchImpl(ExternalTable dorisTable,
CreateOrReplaceBranchInfo branchInfo)
+ throws UserException {
+ throw new UnsupportedOperationException("create or replace branch is
not a supported operation!");
+ }
+
+ @Override
+ public void createOrReplaceTagImpl(ExternalTable dorisTable,
CreateOrReplaceTagInfo tagInfo) throws UserException {
+ throw new UnsupportedOperationException("create or replace tag is not
a supported operation!");
+ }
+
+ @Override
+ public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo)
throws UserException {
+ throw new UnsupportedOperationException("drop tag is not a supported
operation!");
+ }
+
+ @Override
+ public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo
branchInfo) throws UserException {
+ throw new UnsupportedOperationException("drop branch is not a
supported operation!");
+ }
+
+ @Override
+ public List<String> listDatabaseNames() {
+ try {
+ return executionAuthenticator.execute(() -> new
ArrayList<>(catalog.listDatabases()));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to list databases names,
catalog name: " + dorisCatalog.getName(), e);
+ }
+ }
+
+ @Override
+ public List<String> listTableNames(String db) {
+ try {
+ return executionAuthenticator.execute(() -> {
+ List<String> tableNames = new ArrayList<>();
+ try {
+ tableNames.addAll(catalog.listTables(db));
+ } catch (DatabaseNotExistException e) {
+ LOG.warn("DatabaseNotExistException", e);
+ }
+ return tableNames;
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to list table names, catalog
name: " + dorisCatalog.getName(), e);
+ }
+ }
+
+ @Override
+ public boolean tableExist(String dbName, String tblName) {
+ try {
+ return executionAuthenticator.execute(() -> {
+ try {
+ catalog.getTable(Identifier.create(dbName, tblName));
+ return true;
+ } catch (TableNotExistException e) {
+ return false;
+ }
+ });
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check table existence,
catalog name: " + dorisCatalog.getName()
+ + "error message is:" + ExceptionUtils.getRootCauseMessage(e),
e);
+ }
+ }
+
+ @Override
+ public boolean databaseExist(String dbName) {
+ try {
+ return executionAuthenticator.execute(() -> {
+ try {
+ catalog.getDatabase(dbName);
+ return true;
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check database exist, error
message is:" + e.getMessage(), e);
+ }
+ }
+
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+ @Override
+ public void close() {
+ if (catalog != null) {
+ catalog = null;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
index a4fdd7329e2..ee2976f29e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
@@ -190,6 +190,18 @@ public class ColumnDefinition {
this.generatedColumnsThatReferToThis = generatedColumnsThatReferToThis;
}
+ public String getComment() {
+ return getComment(false);
+ }
+
+ public String getComment(boolean escapeQuota) {
+ String comment = this.comment == null ? "" : this.comment;
+ if (!escapeQuota) {
+ return comment;
+ }
+ return SqlUtils.escapeQuota(comment);
+ }
+
/**
* toSql
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index c897a216268..7216262abcb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -49,6 +49,7 @@ import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.es.EsUtil;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
@@ -116,6 +117,7 @@ public class CreateTableInfo {
public static final String ENGINE_BROKER = "broker";
public static final String ENGINE_HIVE = "hive";
public static final String ENGINE_ICEBERG = "iceberg";
+ public static final String ENGINE_PAIMON = "paimon";
private static final ImmutableSet<AggregateType>
GENERATED_COLUMN_ALLOW_AGG_TYPE =
ImmutableSet.of(AggregateType.REPLACE,
AggregateType.REPLACE_IF_NOT_NULL);
@@ -375,6 +377,8 @@ public class CreateTableInfo {
throw new AnalysisException("Hms type catalog can only use `hive`
engine.");
} else if (catalog instanceof IcebergExternalCatalog &&
!engineName.equals(ENGINE_ICEBERG)) {
throw new AnalysisException("Iceberg type catalog can only use
`iceberg` engine.");
+ } else if (catalog instanceof PaimonExternalCatalog &&
!engineName.equals(ENGINE_PAIMON)) {
+ throw new AnalysisException("Paimon type catalog can only use
`paimon` engine.");
}
}
@@ -769,7 +773,12 @@ public class CreateTableInfo {
throw new AnalysisException(
"Iceberg doesn't support 'DISTRIBUTE BY', "
+ "and you can use 'bucket(num, column)' in
'PARTITIONED BY'.");
+ } else if (engineName.equalsIgnoreCase(ENGINE_PAIMON) &&
distribution != null) {
+ throw new AnalysisException(
+ "Paimon doesn't support 'DISTRIBUTE BY', "
+ + "and you can use 'bucket(num, column)' in
'PARTITIONED BY'.");
}
+
for (ColumnDefinition columnDef : columns) {
if (!columnDef.isNullable()
&& engineName.equalsIgnoreCase(ENGINE_HIVE)) {
@@ -865,6 +874,8 @@ public class CreateTableInfo {
engineName = ENGINE_HIVE;
} else if (catalog instanceof IcebergExternalCatalog) {
engineName = ENGINE_ICEBERG;
+ } else if (catalog instanceof PaimonExternalCatalog) {
+ engineName = ENGINE_PAIMON;
} else {
throw new AnalysisException("Current catalog does not support
create table: " + ctlName);
}
@@ -894,7 +905,8 @@ public class CreateTableInfo {
private void checkEngineName() {
if (engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_ODBC)
|| engineName.equals(ENGINE_BROKER)
|| engineName.equals(ENGINE_ELASTICSEARCH) ||
engineName.equals(ENGINE_HIVE)
- || engineName.equals(ENGINE_ICEBERG) ||
engineName.equals(ENGINE_JDBC)) {
+ || engineName.equals(ENGINE_ICEBERG) ||
engineName.equals(ENGINE_JDBC)
+ || engineName.equals(ENGINE_PAIMON)) {
if (!isExternal) {
// this is for compatibility
isExternal = true;
@@ -1071,7 +1083,8 @@ public class CreateTableInfo {
throw new AnalysisException("Create " + engineName
+ " table should not contain distribution desc");
}
- if (!engineName.equals(ENGINE_HIVE) &&
!engineName.equals(ENGINE_ICEBERG) && partitionDesc != null) {
+ if (!engineName.equals(ENGINE_HIVE) &&
!engineName.equals(ENGINE_ICEBERG)
+ && !engineName.equals(ENGINE_PAIMON) && partitionDesc !=
null) {
throw new AnalysisException("Create " + engineName
+ " table should not contain partition desc");
}
@@ -1301,6 +1314,10 @@ public class CreateTableInfo {
return partitionDesc;
}
+ public List<ColumnDefinition> getColumnDefinitions() {
+ return columns;
+ }
+
public List<Column> getColumns() {
return columns.stream()
.map(ColumnDefinition::translateToCatalogStyle).collect(Collectors.toList());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java
new file mode 100644
index 00000000000..e4146faa690
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogFactory;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
+import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+public class PaimonMetadataOpsTest {
+ public static String warehouse;
+ public static PaimonExternalCatalog paimonCatalog;
+ public static PaimonMetadataOps ops;
+ public static String dbName = "test_db";
+ public static ConnectContext connectContext;
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ Path warehousePath = Files.createTempDirectory("test_warehouse_");
+ warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
+ HashMap<String, String> param = new HashMap<>();
+ param.put("type", "paimon");
+ param.put("paimon.catalog.type", "filesystem");
+ param.put("warehouse", warehouse);
+ // create catalog
+ CreateCatalogCommand createCatalogCommand = new
CreateCatalogCommand("paimon", true, "", "comment", param);
+ paimonCatalog = (PaimonExternalCatalog)
CatalogFactory.createFromCommand(1, createCatalogCommand);
+ paimonCatalog.makeSureInitialized();
+ // create db
+ ops = new PaimonMetadataOps(paimonCatalog, paimonCatalog.catalog);
+ ops.createDb(dbName, true, Maps.newHashMap());
+ paimonCatalog.makeSureInitialized();
+
+ // context
+ connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ }
+
+ @Test
+ public void testSimpleTable() throws Exception {
+ String tableName = getTableName();
+ Identifier identifier = new Identifier(dbName, tableName);
+ String sql = "create table " + dbName + "." + tableName + " (id int)
engine = paimon";
+ createTable(sql);
+ Catalog catalog = ops.getCatalog();
+ Table table = catalog.getTable(identifier);
+ List<String> columnNames = new ArrayList<>();
+ if (catalog instanceof HiveCatalog) {
+ columnNames.addAll(((HiveCatalog)
catalog).loadTableSchema(identifier).fieldNames());
+ } else if (catalog instanceof FileSystemCatalog) {
+ columnNames.addAll(((FileSystemCatalog)
catalog).loadTableSchema(identifier).fieldNames());
+ }
+
+ if (!columnNames.isEmpty()) {
+ Assert.assertEquals(1, columnNames.size());
+ }
+ Assert.assertEquals(0, table.partitionKeys().size());
+ }
+
+ @Test
+ public void testProperties() throws Exception {
+ String tableName = getTableName();
+ Identifier identifier = new Identifier(dbName, tableName);
+ String sql = "create table " + dbName + "." + tableName + " (id int)
engine = paimon properties(\"primary-key\"=id)";
+ createTable(sql);
+ Catalog catalog = ops.getCatalog();
+ Table table = catalog.getTable(identifier);
+
+ List<String> columnNames = new ArrayList<>();
+ if (catalog instanceof HiveCatalog) {
+ columnNames.addAll(((HiveCatalog)
catalog).loadTableSchema(identifier).fieldNames());
+ } else if (catalog instanceof FileSystemCatalog) {
+ columnNames.addAll(((FileSystemCatalog)
catalog).loadTableSchema(identifier).fieldNames());
+ }
+
+ if (!columnNames.isEmpty()) {
+ Assert.assertEquals(1, columnNames.size());
+ }
+ Assert.assertEquals(0, table.partitionKeys().size());
+ Assert.assertTrue(table.primaryKeys().contains("id"));
+ Assert.assertEquals(1, table.primaryKeys().size());
+ }
+
+ @Test
+ public void testType() throws Exception {
+ String tableName = getTableName();
+ Identifier identifier = new Identifier(dbName, tableName);
+ String sql = "create table " + dbName + "." + tableName + " ("
+ + "c0 int, "
+ + "c1 bigint, "
+ + "c2 float, "
+ + "c3 double, "
+ + "c4 string, "
+ + "c5 date, "
+ + "c6 decimal(20, 10), "
+ + "c7 datetime"
+ + ") engine = paimon "
+ + "properties(\"primary-key\"=c0)";
+ createTable(sql);
+ Catalog catalog = ops.getCatalog();
+ Table table = catalog.getTable(identifier);
+
+ List<DataField> columns = new ArrayList<>();
+ if (catalog instanceof HiveCatalog) {
+ columns.addAll(((HiveCatalog)
catalog).loadTableSchema(identifier).fields());
+ } else if (catalog instanceof FileSystemCatalog) {
+ columns.addAll(((FileSystemCatalog)
catalog).loadTableSchema(identifier).fields());
+ }
+
+ if (!columns.isEmpty()) {
+ Assert.assertEquals(8, columns.size());
+ Assert.assertEquals(new IntType().asSQLString(),
columns.get(0).type().toString());
+ Assert.assertEquals(new BigIntType().asSQLString(),
columns.get(1).type().toString());
+ Assert.assertEquals(new FloatType().asSQLString(),
columns.get(2).type().toString());
+ Assert.assertEquals(new DoubleType().asSQLString(),
columns.get(3).type().toString());
+ Assert.assertEquals(new
VarCharType(VarCharType.MAX_LENGTH).asSQLString(),
columns.get(4).type().toString());
+ Assert.assertEquals(new DateType().asSQLString(),
columns.get(5).type().toString());
+ Assert.assertEquals(new DecimalType(20, 10).asSQLString(),
columns.get(6).type().toString());
+ Assert.assertEquals(new TimestampType().asSQLString(),
columns.get(7).type().toString());
+ }
+
+ Assert.assertEquals(0, table.partitionKeys().size());
+ Assert.assertTrue(table.primaryKeys().contains("c0"));
+ Assert.assertEquals(1, table.primaryKeys().size());
+ }
+
+ @Test
+ public void testPartition() throws Exception {
+ String tableName = "test04";
+ Identifier identifier = new Identifier(dbName, tableName);
+ String sql = "create table " + dbName + "." + tableName + " ("
+ + "c0 int, "
+ + "c1 bigint, "
+ + "c2 float, "
+ + "c3 double, "
+ + "c4 string, "
+ + "c5 date, "
+ + "c6 decimal(20, 10), "
+ + "c7 datetime"
+ + ") engine = paimon "
+ + "partition by ("
+ + "c1 ) ()"
+ + "properties(\"primary-key\"=c0)";
+ createTable(sql);
+ Catalog catalog = ops.getCatalog();
+ Table table = catalog.getTable(identifier);
+ Assert.assertEquals(1, table.partitionKeys().size());
+ Assert.assertTrue(table.primaryKeys().contains("c0"));
+ Assert.assertEquals(1, table.primaryKeys().size());
+ }
+
+ @Test
+ public void testBucket() throws Exception {
+ String tableName = getTableName();
+ Identifier identifier = new Identifier(dbName, tableName);
+ String sql = "create table " + dbName + "." + tableName + " ("
+ + "c0 int, "
+ + "c1 bigint, "
+ + "c2 float, "
+ + "c3 double, "
+ + "c4 string, "
+ + "c5 date, "
+ + "c6 decimal(20, 10), "
+ + "c7 datetime"
+ + ") engine = paimon "
+ + "properties(\"primary-key\"=c0,"
+ + "\"bucket\" = 4,"
+ + "\"bucket-key\" = c0)";
+ createTable(sql);
+ Catalog catalog = ops.getCatalog();
+ Table table = catalog.getTable(identifier);
+ Assert.assertEquals("4", table.options().get("bucket"));
+ Assert.assertEquals("c0", table.options().get("bucket-key"));
+ }
+
+ public void createTable(String sql) throws UserException {
+ LogicalPlan plan = new NereidsParser().parseSingle(sql);
+ Assertions.assertTrue(plan instanceof CreateTableCommand);
+ CreateTableInfo createTableInfo = ((CreateTableCommand)
plan).getCreateTableInfo();
+ createTableInfo.setIsExternal(true);
+ createTableInfo.analyzeEngine();
+ ops.createTable(createTableInfo);
+ }
+
+ public String getTableName() {
+ String s = "test_tb_" + UUID.randomUUID();
+ return s.replaceAll("-", "");
+ }
+
+ @Test
+ public void testDropDB() {
+ try {
+ // create db success
+ ops.createDb("t_paimon", false, Maps.newHashMap());
+ // drop db success
+ ops.dropDb("t_paimon", false, false);
+ } catch (Throwable t) {
+ Assert.fail();
+ }
+
+ try {
+ ops.dropDb("t_paimon", false, false);
+ Assert.fail();
+ } catch (Throwable t) {
+ Assert.assertTrue(t instanceof DdlException);
+ Assert.assertTrue(t.getMessage().contains("database doesn't
exist"));
+ }
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy
new file mode 100644
index 00000000000..032176bd103
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_create_paimon_table",
"p0,external,doris,external_docker,external_docker_doris") {
+ String catalog_name = "paimon_hms_catalog_test01"
+
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ for (String hivePrefix : ["hive2"]) {
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String hmsPort = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String hdfs_port = context.config.otherConfigs.get(hivePrefix +
"HdfsPort")
+ String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+ String warehouse = "${default_fs}/warehouse"
+
+ // 1. test create catalog
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog ${catalog_name} properties (
+ 'type'='paimon',
+ 'paimon.catalog.type'='hms',
+ 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+ 'warehouse' = '${warehouse}'
+ );
+ """
+
+ // 2. test create database
+ sql """switch ${catalog_name}"""
+ String db_name = "test_db"
+ sql """create database if not exists ${db_name}"""
+
+ // 3. test create table
+ sql """use ${db_name}"""
+ sql """drop table if exists ${db_name}.test01"""
+ sql """
+ CREATE TABLE ${db_name}.test01 (
+ id int
+ ) engine=paimon;
+ """
+
+ sql """drop table if exists ${db_name}.test02"""
+ sql """
+ CREATE TABLE ${db_name}.test02 (
+ id int
+ ) engine=paimon
+ properties("primary-key"=id);
+ """
+
+ sql """drop table if exists ${db_name}.test03"""
+ sql """
+ CREATE TABLE ${db_name}.test03 (
+ c0 int,
+ c1 bigint,
+ c2 float,
+ c3 double,
+ c4 string,
+ c5 date,
+ c6 decimal(10,5),
+ c7 datetime
+ ) engine=paimon
+ properties("primary-key"=c0);
+ """
+
+ sql """drop table if exists ${db_name}.test04"""
+ sql """
+ CREATE TABLE ${db_name}.test04 (
+ c0 int,
+ c1 bigint,
+ c2 float,
+ c3 double,
+ c4 string,
+ c5 date,
+ c6 decimal(10,5),
+ c7 datetime
+ ) engine=paimon
+ partition by (c1) ()
+ properties("primary-key"=c0);
+ """
+
+ sql """drop table if exists ${db_name}.test05"""
+ sql """
+ CREATE TABLE ${db_name}.test05 (
+ c0 int,
+ c1 bigint,
+ c2 float,
+ c3 double,
+ c4 string,
+ c5 date,
+ c6 decimal(10,5),
+ c7 datetime
+ ) engine=paimon
+ properties(
+ 'primary-key' = 'c0,c1',
+ 'bucket' = '4',
+ 'bucket-key' = 'c0,c1');
+ """
+
+ sql """ drop table if exists ${db_name}.test01"""
+ sql """ drop table if exists ${db_name}.test02"""
+ sql """ drop table if exists ${db_name}.test03"""
+ sql """ drop table if exists ${db_name}.test04"""
+ sql """ drop table if exists ${db_name}.test05"""
+ sql """ drop database if exists ${db_name}"""
+ sql """DROP CATALOG IF EXISTS ${catalog_name}"""
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]