This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 77e55bc7496 [feature][insert]Adapt the create table statement to the
nereids sql (#32458)
77e55bc7496 is described below
commit 77e55bc74966acf7b4ba8e62c042124ca5a2cbb4
Author: slothever <[email protected]>
AuthorDate: Wed Mar 20 20:47:18 2024 +0800
[feature][insert]Adapt the create table statement to the nereids sql
(#32458)
issue: #31442
1. adapt create table statement from doris to hive
2. fix insert overwrite for table sink
> The doris create hive table statement:
```
mysql> CREATE TABLE buck2(
-> id int COMMENT 'col1',
-> name string COMMENT 'col2',
-> dt string COMMENT 'part1',
-> dtm string COMMENT 'part2'
-> ) ENGINE=hive
-> COMMENT "create tbl"
-> PARTITION BY LIST (dt, dtm) ()
-> DISTRIBUTED BY HASH (id) BUCKETS 16
-> PROPERTIES(
-> "file_format" = "orc"
-> );
```
> generated hive create table statement:
```
CREATE TABLE `buck2`(
`id` int COMMENT 'col1',
`name` string COMMENT 'col2')
PARTITIONED BY (
`dt` string,
`dtm` string)
CLUSTERED BY (
id)
INTO 16 BUCKETS
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://HDFS8000871/usr/hive/warehouse/jz3.db/buck2'
TBLPROPERTIES (
'transient_lastDdlTime'='1710840747',
'doris.file_format'='orc')
```
---
.../main/java/org/apache/doris/common/Config.java | 5 +
.../datasource/hive/HiveMetaStoreClientHelper.java | 16 ++
.../doris/datasource/hive/HiveMetadataOps.java | 88 ++++++++---
.../doris/datasource/hive/HiveTableMetadata.java | 59 ++++++--
.../datasource/hive/ThriftHMSCachedClient.java | 40 +++--
.../doris/insertoverwrite/InsertOverwriteUtil.java | 45 +++---
.../commands/insert/HiveInsertCommandContext.java | 2 +-
.../insert/InsertOverwriteTableCommand.java | 92 +++++++-----
.../trees/plans/physical/PhysicalTableSink.java | 3 +
.../doris/datasource/hive/HiveMetadataOpsTest.java | 162 +++++++++++++++++++++
.../doris/datasource/hive/HmsCommitTest.java | 5 +-
11 files changed, 409 insertions(+), 108 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index daa9c8b1d35..9783c30ad91 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2216,6 +2216,11 @@ public class Config extends ConfigBase {
"Enable external table DDL"})
public static boolean enable_external_ddl = false;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "启用Hive分桶表",
+ "Enable external hive bucket table"})
+ public static boolean enable_create_hive_bucket_table = false;
+
@ConfField(mutable = true, masterOnly = true, description = {
"Hive创建外部表默认指定的文件格式",
"Default hive file format for creating table."})
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index 23c83a11146..bbb6129e2c1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -606,6 +606,22 @@ public class HiveMetaStoreClientHelper {
return "double";
} else if (dorisType.equals(Type.STRING)) {
return "string";
+ } else if (dorisType.equals(Type.DEFAULT_DECIMALV3)) {
+ StringBuilder decimalType = new StringBuilder();
+ decimalType.append("decimal");
+ ScalarType scalarType = (ScalarType) dorisType;
+ int precision = scalarType.getScalarPrecision();
+ if (precision == 0) {
+ precision = ScalarType.DEFAULT_PRECISION;
+ }
+ // decimal(precision, scale)
+ int scale = scalarType.getScalarScale();
+ decimalType.append("(");
+ decimalType.append(precision);
+ decimalType.append(",");
+ decimalType.append(scale);
+ decimalType.append(")");
+ return decimalType.toString();
}
throw new HMSClientException("Unsupported type conversion of " +
dorisType.toSql());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 9279c48fbaa..a182aa9cc00 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -19,8 +19,10 @@ package org.apache.doris.datasource.hive;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.common.Config;
@@ -34,7 +36,9 @@ import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.thrift.THivePartitionUpdate;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -42,25 +46,32 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
public class HiveMetadataOps implements ExternalMetadataOps {
+ public static final String LOCATION_URI_KEY = "location_uri";
+ public static final String FILE_FORMAT_KEY = "file_format";
+ public static final Set<String> DORIS_HIVE_KEYS =
ImmutableSet.of(FILE_FORMAT_KEY, LOCATION_URI_KEY);
private static final Logger LOG =
LogManager.getLogger(HiveMetadataOps.class);
private static final int MIN_CLIENT_POOL_SIZE = 8;
- private JdbcClientConfig jdbcClientConfig;
- private HiveConf hiveConf;
- private HMSExternalCatalog catalog;
- private HMSCachedClient client;
+ private final HMSCachedClient client;
private final RemoteFileSystem fs;
+ private final HMSExternalCatalog catalog;
public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig
jdbcClientConfig, HMSExternalCatalog catalog) {
+ this(catalog, createCachedClient(hiveConf,
+ Math.max(MIN_CLIENT_POOL_SIZE,
Config.max_external_cache_loader_thread_pool_size),
+ jdbcClientConfig));
+ }
+
+ @VisibleForTesting
+ public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client)
{
this.catalog = catalog;
- this.hiveConf = hiveConf;
- this.jdbcClientConfig = jdbcClientConfig;
- this.client = createCachedClient(hiveConf,
- Math.max(MIN_CLIENT_POOL_SIZE,
Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig);
+ this.client = client;
this.fs = new DFSFileSystem(catalog.getProperties());
}
@@ -91,10 +102,11 @@ public class HiveMetadataOps implements
ExternalMetadataOps {
try {
HiveDatabaseMetadata catalogDatabase = new HiveDatabaseMetadata();
catalogDatabase.setDbName(fullDbName);
- catalogDatabase.setProperties(properties);
- if (properties.containsKey("location_uri")) {
- catalogDatabase.setLocationUri(properties.get("location_uri"));
+ if (properties.containsKey(LOCATION_URI_KEY)) {
+
catalogDatabase.setLocationUri(properties.get(LOCATION_URI_KEY));
}
+ properties.remove(LOCATION_URI_KEY);
+ catalogDatabase.setProperties(properties);
catalogDatabase.setComment(properties.getOrDefault("comment", ""));
client.createDatabase(catalogDatabase);
catalog.onRefresh(true);
@@ -124,16 +136,50 @@ public class HiveMetadataOps implements
ExternalMetadataOps {
throw new UserException("Failed to get database: '" + dbName + "'
in catalog: " + catalog.getName());
}
try {
- Map<String, String> props = stmt.getExtProperties();
- String fileFormat = props.getOrDefault("file_format",
Config.hive_default_file_format);
- HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName,
- tblName,
- stmt.getColumns(),
- parsePartitionKeys(props),
- props,
- fileFormat);
-
- client.createTable(catalogTable, stmt.isSetIfNotExists());
+ Map<String, String> props = stmt.getProperties();
+ String fileFormat = props.getOrDefault(FILE_FORMAT_KEY,
Config.hive_default_file_format);
+ Map<String, String> ddlProps = new HashMap<>();
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ String key = entry.getKey().toLowerCase();
+ if (DORIS_HIVE_KEYS.contains(entry.getKey().toLowerCase())) {
+ ddlProps.put("doris." + key, entry.getValue());
+ } else {
+ ddlProps.put(key, entry.getValue());
+ }
+ }
+ List<String> partitionColNames = new ArrayList<>();
+ if (stmt.getPartitionDesc() != null) {
+
partitionColNames.addAll(stmt.getPartitionDesc().getPartitionColNames());
+ }
+ HiveTableMetadata hiveTableMeta;
+ DistributionDesc bucketInfo = stmt.getDistributionDesc();
+ if (bucketInfo != null) {
+ if (Config.enable_create_hive_bucket_table) {
+ if (bucketInfo instanceof HashDistributionDesc) {
+ hiveTableMeta = HiveTableMetadata.of(dbName,
+ tblName,
+ stmt.getColumns(),
+ partitionColNames,
+ ((HashDistributionDesc)
bucketInfo).getDistributionColumnNames(),
+ bucketInfo.getBuckets(),
+ ddlProps,
+ fileFormat);
+ } else {
+ throw new UserException("External hive table only
supports hash bucketing");
+ }
+ } else {
+ throw new UserException("Create hive bucket table need"
+ + " set enable_create_hive_bucket_table to true");
+ }
+ } else {
+ hiveTableMeta = HiveTableMetadata.of(dbName,
+ tblName,
+ stmt.getColumns(),
+ partitionColNames,
+ ddlProps,
+ fileFormat);
+ }
+ client.createTable(hiveTableMeta, stmt.isSetIfNotExists());
db.setUnInitialized(true);
} catch (Exception e) {
throw new UserException(e.getMessage(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
index fde0a2d4d04..d8de9d60734 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java
@@ -20,32 +20,45 @@ package org.apache.doris.datasource.hive;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.TableMetadata;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HiveTableMetadata implements TableMetadata {
- private String dbName;
- private String tableName;
- private List<Column> columns;
- private List<FieldSchema> partitionKeys;
- private String fileFormat;
- private Map<String, String> properties;
+ private final String dbName;
+ private final String tableName;
+ private final List<Column> columns;
+ private final List<String> partitionKeys;
+ private final String fileFormat;
+ private final Map<String, String> properties;
+ private List<String> bucketCols;
+ private int numBuckets;
// private String viewSql;
public HiveTableMetadata(String dbName,
String tblName,
List<Column> columns,
- List<FieldSchema> partitionKeys,
+ List<String> partitionKeys,
+ Map<String, String> props,
+ String fileFormat) {
+ this(dbName, tblName, columns, partitionKeys, new ArrayList<>(), 0,
props, fileFormat);
+ }
+
+ public HiveTableMetadata(String dbName, String tableName,
+ List<Column> columns,
+ List<String> partitionKeys,
+ List<String> bucketCols,
+ int numBuckets,
Map<String, String> props,
String fileFormat) {
this.dbName = dbName;
- this.tableName = tblName;
+ this.tableName = tableName;
this.columns = columns;
this.partitionKeys = partitionKeys;
- this.fileFormat = fileFormat;
+ this.bucketCols = bucketCols;
+ this.numBuckets = numBuckets;
this.properties = props;
+ this.fileFormat = fileFormat;
}
@Override
@@ -67,7 +80,7 @@ public class HiveTableMetadata implements TableMetadata {
return columns;
}
- public List<FieldSchema> getPartitionKeys() {
+ public List<String> getPartitionKeys() {
return partitionKeys;
}
@@ -75,12 +88,32 @@ public class HiveTableMetadata implements TableMetadata {
return fileFormat;
}
+ public List<String> getBucketCols() {
+ return bucketCols;
+ }
+
+ public int getNumBuckets() {
+ return numBuckets;
+ }
+
public static HiveTableMetadata of(String dbName,
String tblName,
List<Column> columns,
- List<FieldSchema> partitionKeys,
+ List<String> partitionKeys,
Map<String, String> props,
String fileFormat) {
return new HiveTableMetadata(dbName, tblName, columns, partitionKeys,
props, fileFormat);
}
+
+ public static HiveTableMetadata of(String dbName,
+ String tblName,
+ List<Column> columns,
+ List<String> partitionKeys,
+ List<String> bucketCols,
+ int numBuckets,
+ Map<String, String> props,
+ String fileFormat) {
+ return new HiveTableMetadata(dbName, tblName, columns, partitionKeys,
+ bucketCols, numBuckets, props, fileFormat);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index d4f63c5a8fb..b5b1147447e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
import org.apache.doris.datasource.DatabaseMetadata;
import org.apache.doris.datasource.TableMetadata;
import
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -71,12 +72,14 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -199,11 +202,14 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
table.setCreateTime(createTime);
table.setLastAccessTime(createTime);
// table.setRetention(0);
- String location = hiveTable.getProperties().get("external_location");
- table.setSd(toHiveStorageDesc(hiveTable.getColumns(),
- hiveTable.getFileFormat(),
- location));
- table.setPartitionKeys(hiveTable.getPartitionKeys());
+ String location =
hiveTable.getProperties().get(HiveMetadataOps.LOCATION_URI_KEY);
+ Set<String> partitionSet = new HashSet<>(hiveTable.getPartitionKeys());
+ Pair<List<FieldSchema>, List<FieldSchema>> hiveSchema =
toHiveSchema(hiveTable.getColumns(), partitionSet);
+
+ table.setSd(toHiveStorageDesc(hiveSchema.first,
hiveTable.getBucketCols(), hiveTable.getNumBuckets(),
+ hiveTable.getFileFormat(), location));
+ table.setPartitionKeys(hiveSchema.second);
+
// table.setViewOriginalText(hiveTable.getViewSql());
// table.setViewExpandedText(hiveTable.getViewSql());
table.setTableType("MANAGED_TABLE");
@@ -211,13 +217,19 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
return table;
}
- private static StorageDescriptor toHiveStorageDesc(List<Column> columns,
String fileFormat, String location) {
+ private static StorageDescriptor toHiveStorageDesc(List<FieldSchema>
columns,
+ List<String> bucketCols,
+ int numBuckets,
+ String fileFormat,
+ String location) {
StorageDescriptor sd = new StorageDescriptor();
- sd.setCols(toHiveColumns(columns));
+ sd.setCols(columns);
setFileFormat(fileFormat, sd);
if (StringUtils.isNotEmpty(location)) {
sd.setLocation(location);
}
+ sd.setBucketCols(bucketCols);
+ sd.setNumBuckets(numBuckets);
Map<String, String> parameters = new HashMap<>();
parameters.put("tag", "doris external hive talbe");
sd.setParameters(parameters);
@@ -246,17 +258,23 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
sd.setOutputFormat(outputFormat);
}
- private static List<FieldSchema> toHiveColumns(List<Column> columns) {
- List<FieldSchema> result = new ArrayList<>();
+ private static Pair<List<FieldSchema>, List<FieldSchema>>
toHiveSchema(List<Column> columns,
+ Set<String> partitionSet) {
+ List<FieldSchema> hiveCols = new ArrayList<>();
+ List<FieldSchema> hiveParts = new ArrayList<>();
for (Column column : columns) {
FieldSchema hiveFieldSchema = new FieldSchema();
// TODO: add doc, just support doris type
hiveFieldSchema.setType(HiveMetaStoreClientHelper.dorisTypeToHiveType(column.getType()));
hiveFieldSchema.setName(column.getName());
hiveFieldSchema.setComment(column.getComment());
- result.add(hiveFieldSchema);
+ if (partitionSet.contains(column.getName())) {
+ hiveParts.add(hiveFieldSchema);
+ } else {
+ hiveCols.add(hiveFieldSchema);
+ }
}
- return result;
+ return Pair.of(hiveCols, hiveParts);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
index 54f9895ab2c..c4d3068e09f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.PropertyAnalyzer;
@@ -42,16 +43,18 @@ public class InsertOverwriteUtil {
/**
* add temp partitions
*
- * @param olapTable
+ * @param tableIf
* @param partitionNames
* @param tempPartitionNames
* @throws DdlException
*/
- public static void addTempPartitions(OlapTable olapTable, List<String>
partitionNames,
- List<String> tempPartitionNames) throws DdlException {
- for (int i = 0; i < partitionNames.size(); i++) {
- Env.getCurrentEnv().addPartitionLike((Database)
olapTable.getDatabase(), olapTable.getName(),
- new AddPartitionLikeClause(tempPartitionNames.get(i),
partitionNames.get(i), true));
+ public static void addTempPartitions(TableIf tableIf, List<String>
partitionNames,
+ List<String> tempPartitionNames)
throws DdlException {
+ if (tableIf instanceof OlapTable) {
+ for (int i = 0; i < partitionNames.size(); i++) {
+ Env.getCurrentEnv().addPartitionLike((Database)
tableIf.getDatabase(), tableIf.getName(),
+ new AddPartitionLikeClause(tempPartitionNames.get(i),
partitionNames.get(i), true));
+ }
}
}
@@ -63,23 +66,25 @@ public class InsertOverwriteUtil {
* @param tempPartitionNames
* @throws DdlException
*/
- public static void replacePartition(OlapTable olapTable, List<String>
partitionNames,
+ public static void replacePartition(TableIf olapTable, List<String>
partitionNames,
List<String> tempPartitionNames) throws DdlException {
- try {
- if (!olapTable.writeLockIfExist()) {
- return;
+ if (olapTable instanceof OlapTable) {
+ try {
+ if (!olapTable.writeLockIfExist()) {
+ return;
+ }
+ Map<String, String> properties = Maps.newHashMap();
+
properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false");
+ ReplacePartitionClause replacePartitionClause = new
ReplacePartitionClause(
+ new PartitionNames(false, partitionNames),
+ new PartitionNames(true, tempPartitionNames),
properties);
+ Env.getCurrentEnv()
+ .replaceTempPartition((Database)
olapTable.getDatabase(),
+ (OlapTable) olapTable, replacePartitionClause);
+ } finally {
+ olapTable.writeUnlock();
}
- Map<String, String> properties = Maps.newHashMap();
-
properties.put(PropertyAnalyzer.PROPERTIES_USE_TEMP_PARTITION_NAME, "false");
- ReplacePartitionClause replacePartitionClause = new
ReplacePartitionClause(
- new PartitionNames(false, partitionNames),
- new PartitionNames(true, tempPartitionNames), properties);
- Env.getCurrentEnv()
- .replaceTempPartition((Database) olapTable.getDatabase(),
olapTable, replacePartitionClause);
- } finally {
- olapTable.writeUnlock();
}
-
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
index 9e4c2bc92a3..31e56fd6ccc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java
@@ -21,7 +21,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
* For Hive Table
*/
public class HiveInsertCommandContext extends InsertCommandContext {
- private boolean overwrite = true;
+ private boolean overwrite = false;
public boolean isOverwrite() {
return overwrite;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index 0741982c968..44c17545be5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -42,6 +42,7 @@ import
org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -54,6 +55,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -88,7 +90,8 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
}
public boolean isAutoDetectOverwrite() {
- return ((UnboundTableSink<?>)
this.logicalQuery).isAutoDetectPartition();
+ return (logicalQuery instanceof UnboundTableSink)
+ && ((UnboundTableSink<?>)
this.logicalQuery).isAutoDetectPartition();
}
@Override
@@ -118,27 +121,31 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
}
Optional<TreeNode<?>> plan = (planner.getPhysicalPlan()
- .<Set<TreeNode<?>>>collect(node -> node instanceof
PhysicalOlapTableSink)).stream().findAny();
+ .<Set<TreeNode<?>>>collect(node -> node instanceof
PhysicalTableSink)).stream().findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command
must contain OlapTableSinkNode");
- PhysicalOlapTableSink<?> physicalOlapTableSink =
((PhysicalOlapTableSink<?>) plan.get());
- OlapTable targetTable = physicalOlapTableSink.getTargetTable();
- InternalDatabaseUtil
- .checkDatabase(targetTable.getQualifiedDbName(),
ConnectContext.get());
- // check auth
- if (!Env.getCurrentEnv().getAccessManager()
- .checkTblPriv(ConnectContext.get(),
targetTable.getQualifiedDbName(), targetTable.getName(),
- PrivPredicate.LOAD)) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"LOAD",
- ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
- targetTable.getQualifiedDbName() + ": " +
targetTable.getName());
- }
-
- ConnectContext.get().setSkipAuth(true);
- List<String> partitionNames = ((UnboundTableSink<?>)
logicalQuery).getPartitions();
- if (CollectionUtils.isEmpty(partitionNames)) {
- partitionNames =
Lists.newArrayList(targetTable.getPartitionNames());
+ PhysicalTableSink<?> physicalTableSink = ((PhysicalTableSink<?>)
plan.get());
+ TableIf targetTable = physicalTableSink.getTargetTable();
+ List<String> partitionNames;
+ if (physicalTableSink instanceof PhysicalOlapTableSink) {
+ InternalDatabaseUtil
+ .checkDatabase(((OlapTable)
targetTable).getQualifiedDbName(), ConnectContext.get());
+ // check auth
+ if (!Env.getCurrentEnv().getAccessManager()
+ .checkTblPriv(ConnectContext.get(), ((OlapTable)
targetTable).getQualifiedDbName(),
+ targetTable.getName(), PrivPredicate.LOAD)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"LOAD",
+ ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
+ ((OlapTable) targetTable).getQualifiedDbName() + ": "
+ targetTable.getName());
+ }
+ ConnectContext.get().setSkipAuth(true);
+ partitionNames = ((UnboundTableSink<?>)
logicalQuery).getPartitions();
+ if (CollectionUtils.isEmpty(partitionNames)) {
+ partitionNames =
Lists.newArrayList(targetTable.getPartitionNames());
+ }
+ } else {
+ // Do not create temp partition on FE
+ partitionNames = new ArrayList<>();
}
-
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
@@ -170,6 +177,18 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
}
}
+ private void runInsertCommand(LogicalPlan logicalQuery,
InsertCommandContext insertCtx,
+ ConnectContext ctx, StmtExecutor executor)
throws Exception {
+ InsertIntoTableCommand insertCommand = new
InsertIntoTableCommand(logicalQuery, labelName,
+ Optional.of(insertCtx));
+ insertCommand.run(ctx, executor);
+ if (ctx.getState().getStateType() == MysqlStateType.ERR) {
+ String errMsg =
Strings.emptyToNull(ctx.getState().getErrorMessage());
+ LOG.warn("InsertInto state error:{}", errMsg);
+ throw new UserException(errMsg);
+ }
+ }
+
/**
* insert into select. for sepecified temp partitions
*
@@ -208,18 +227,11 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
insertCtx = new HiveInsertCommandContext();
- ((HiveInsertCommandContext) insertCtx).setOverwrite(false);
+ ((HiveInsertCommandContext) insertCtx).setOverwrite(true);
} else {
- throw new RuntimeException("Current catalog does not support
insert overwrite yet.");
- }
- InsertIntoTableCommand insertCommand =
- new InsertIntoTableCommand(copySink, labelName,
Optional.of(insertCtx));
- insertCommand.run(ctx, executor);
- if (ctx.getState().getStateType() == MysqlStateType.ERR) {
- String errMsg =
Strings.emptyToNull(ctx.getState().getErrorMessage());
- LOG.warn("InsertInto state error:{}", errMsg);
- throw new UserException(errMsg);
+ throw new UserException("Current catalog does not support insert
overwrite yet.");
}
+ runInsertCommand(copySink, insertCtx, ctx, executor);
}
/**
@@ -229,17 +241,19 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
* @param executor executor
*/
private void insertInto(ConnectContext ctx, StmtExecutor executor, long
groupId) throws Exception {
- UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
// 1. for overwrite situation, we disable auto create partition.
- // 2. we save and pass overwrite auto detect by insertCtx
- OlapInsertCommandContext insertCtx = new
OlapInsertCommandContext(false, sink.isAutoDetectPartition(), groupId);
- InsertIntoTableCommand insertCommand = new
InsertIntoTableCommand(sink, labelName, Optional.of(insertCtx));
- insertCommand.run(ctx, executor);
- if (ctx.getState().getStateType() == MysqlStateType.ERR) {
- String errMsg =
Strings.emptyToNull(ctx.getState().getErrorMessage());
- LOG.warn("InsertInto state error:{}", errMsg);
- throw new UserException(errMsg);
+ // 2. we save and pass overwrite auto-detected by insertCtx
+ InsertCommandContext insertCtx;
+ if (logicalQuery instanceof UnboundTableSink) {
+ insertCtx = new OlapInsertCommandContext(false,
+ ((UnboundTableSink<?>)
logicalQuery).isAutoDetectPartition(), groupId);
+ } else if (logicalQuery instanceof UnboundHiveTableSink) {
+ insertCtx = new HiveInsertCommandContext();
+ ((HiveInsertCommandContext) insertCtx).setOverwrite(true);
+ } else {
+ throw new UserException("Current catalog does not support insert
overwrite yet.");
}
+ runInsertCommand(logicalQuery, insertCtx, ctx, executor);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
index 7feb53e24b0..1461c0626d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.physical;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
@@ -44,4 +45,6 @@ public abstract class PhysicalTableSink<CHILD_TYPE extends
Plan> extends Physica
}
public abstract PhysicalProperties getRequirePhysicalProperties();
+
+ public abstract TableIf getTargetTable();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
new file mode 100644
index 00000000000..af57aae703b
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropTableStmt;
+import org.apache.doris.analysis.HashDistributionDesc;
+import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.DatabaseMetadata;
+import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.TableMetadata;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HiveMetadataOpsTest {
+
+ private HiveMetadataOps metadataOps;
+
+ @Mocked
+ private HMSCachedClient mockedClient;
+ @Mocked
+ private HMSExternalCatalog mockedCatalog;
+
+ @BeforeEach
+ public void init() {
+ metadataOps = new HiveMetadataOps(mockedCatalog, mockedClient);
+ new MockUp<HMSExternalCatalog>(HMSExternalCatalog.class) {
+ @Mock
+ public ExternalDatabase<? extends ExternalTable>
getDbNullable(String dbName) {
+ return new HMSExternalDatabase(mockedCatalog, 0L, "mockedDb");
+ }
+
+ @Mock
+ public void onRefresh(boolean invalidCache) {
+ // mocked
+ }
+ };
+ new MockUp<HMSCachedClient>(HMSCachedClient.class) {
+ @Mock
+ public void createDatabase(DatabaseMetadata catalogDatabase) {
+ // mocked
+ }
+
+ @Mock
+ public void dropDatabase(String dbName) {
+ // mocked
+ }
+
+ @Mock
+ public void dropTable(String dbName, String tableName) {
+ // mocked
+ }
+
+ @Mock
+ public void createTable(TableMetadata catalogTable, boolean
ignoreIfExists) {
+ // mocked
+ }
+ };
+ }
+
+ private void createDb(String dbName, Map<String, String> props) throws
DdlException {
+ CreateDbStmt createDbStmt = new CreateDbStmt(true, dbName, props);
+ metadataOps.createDb(createDbStmt);
+ }
+
+ private void dropDb(String dbName, boolean forceDrop) throws DdlException {
+ DropDbStmt dropDbStmt = new DropDbStmt(true, dbName, forceDrop);
+ metadataOps.dropDb(dropDbStmt);
+ }
+
+ private void createTable(TableName tableName,
+ List<Column> cols,
+ List<String> parts,
+ List<String> buckets,
+ Map<String, String> props)
+ throws UserException {
+ PartitionDesc partitionDesc = new PartitionDesc(parts, null);
+ DistributionDesc distributionDesc = null;
+ if (!buckets.isEmpty()) {
+ distributionDesc = new HashDistributionDesc(10, buckets);
+ }
+ List<String> colsName =
cols.stream().map(Column::getName).collect(Collectors.toList());
+ CreateTableStmt stmt = new CreateTableStmt(true, false,
+ tableName,
+ cols, null,
+ "hive",
+ new KeysDesc(KeysType.AGG_KEYS, colsName),
+ partitionDesc,
+ distributionDesc,
+ props,
+ props,
+ "comment",
+ null, null);
+ metadataOps.createTable(stmt);
+ }
+
+ private void dropTable(TableName tableName, boolean forceDrop) throws
DdlException {
+ DropTableStmt dropTblStmt = new DropTableStmt(true, tableName,
forceDrop);
+ metadataOps.dropTable(dropTblStmt);
+ }
+
+ @Test
+ public void testCreateAndDropAll() throws UserException {
+ Map<String, String> dbProps = new HashMap<>();
+ dbProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/db");
+ createDb("mockedDb", dbProps);
+ Map<String, String> tblProps = new HashMap<>();
+ tblProps.put(HiveMetadataOps.FILE_FORMAT_KEY, "orc");
+ tblProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/tbl");
+ tblProps.put("fs.defaultFS", "hdfs://ha");
+ TableName tableName = new TableName("mockedCtl", "mockedDb",
"mockedTbl");
+ List<Column> cols = new ArrayList<>();
+ cols.add(new Column("id", Type.BIGINT));
+ cols.add(new Column("pt", Type.STRING));
+ cols.add(new Column("rate", Type.DOUBLE));
+ cols.add(new Column("time", Type.DATETIME));
+ List<String> parts = new ArrayList<>();
+ parts.add("pt");
+ List<String> bucks = new ArrayList<>();
+ // bucks.add("id");
+ createTable(tableName, cols, parts, bucks, tblProps);
+ dropTable(tableName, true);
+ dropDb("mockedDb", true);
+ // TODO: use TestWithFeService to double check plan
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index 2316e65bf60..e5392fb11a8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -24,7 +24,6 @@ import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUpdateMode;
import com.google.common.collect.Lists;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.After;
@@ -97,8 +96,8 @@ public class HmsCommitTest {
List<Column> columns = new ArrayList<>();
columns.add(new Column("c1", PrimitiveType.INT, true));
columns.add(new Column("c2", PrimitiveType.STRING, true));
- List<FieldSchema> partitionKeys = new ArrayList<>();
- partitionKeys.add(new FieldSchema("c3", "string", "comment"));
+ List<String> partitionKeys = new ArrayList<>();
+ partitionKeys.add("c3");
HiveTableMetadata tableMetadata = new HiveTableMetadata(
dbName, tbWithPartition, columns, partitionKeys,
new HashMap<>(), fileFormat);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]