This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1645f2e0a77 [feature](insert)add hive table sink definition (#31662)
(#32347)
1645f2e0a77 is described below
commit 1645f2e0a77f1a2804a29f71103c6a196e3f03df
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Mar 17 20:52:44 2024 +0800
[feature](insert)add hive table sink definition (#31662) (#32347)
bp #31662
Co-authored-by: slothever <[email protected]>
---
.../glue/translator/PhysicalPlanTranslator.java | 8 ++
.../pre/TurnOffPageCacheForInsertIntoSelect.java | 7 +
.../org/apache/doris/nereids/rules/RuleSet.java | 2 +
.../org/apache/doris/nereids/rules/RuleType.java | 2 +
.../doris/nereids/rules/analysis/BindSink.java | 10 +-
...ogicalHiveTableSinkToPhysicalHiveTableSink.java | 49 +++++++
.../apache/doris/nereids/trees/plans/PlanType.java | 7 +-
.../plans/commands/insert/HiveInsertExecutor.java | 100 +++++++++++++
.../commands/insert/InsertIntoTableCommand.java | 5 +
.../trees/plans/commands/insert/InsertUtils.java | 9 +-
.../trees/plans/logical/LogicalHiveTableSink.java | 160 +++++++++++++++++++++
.../plans/physical/PhysicalHiveTableSink.java | 119 +++++++++++++++
.../nereids/trees/plans/visitor/SinkVisitor.java | 10 ++
.../java/org/apache/doris/planner/DataSink.java | 7 +-
.../org/apache/doris/planner/HiveTableSink.java | 67 +++++++++
gensrc/thrift/DataSinks.thrift | 63 +++++++-
16 files changed, 617 insertions(+), 8 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9d24e229345..37d0e25f5ac 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -119,6 +119,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
@@ -427,6 +428,13 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
return rootFragment;
}
+ @Override
+ public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink<?
extends Plan> hiveTableSink,
+ PlanTranslatorContext
context) {
+ PlanFragment rootFragment = hiveTableSink.child().accept(this,
context);
+ return rootFragment;
+ }
+
@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan>
fileSink,
PlanTranslatorContext context) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
index 77955a94114..67f0c1c3ba9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
@@ -52,6 +53,12 @@ public class TurnOffPageCacheForInsertIntoSelect extends
PlanPreprocessor {
return tableSink;
}
+ @Override
+ public Plan visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan>
tableSink, StatementContext context) {
+ turnOffPageCache(context);
+ return tableSink;
+ }
+
private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable =
context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally
block' of StmtExecutor#execute
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index f2b5091fd37..408b0d7355e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -64,6 +64,7 @@ import
org.apache.doris.nereids.rules.implementation.LogicalFileScanToPhysicalFi
import
org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFileSink;
import
org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import
org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
+import
org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
import
org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import
org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
@@ -187,6 +188,7 @@ public class RuleSet {
.add(new LogicalIntersectToPhysicalIntersect())
.add(new LogicalGenerateToPhysicalGenerate())
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
+ .add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 4515aaf55fe..9650a9147b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -30,6 +30,7 @@ public enum RuleType {
// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in
the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
+ BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
@@ -386,6 +387,7 @@ public enum RuleType {
LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
+
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index 5c8a74f00e6..06ad6921def 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -68,7 +68,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
- * bind an unbound logicalOlapTableSink represent the target table of an
insert command
+ * bind an unbound logicalTableSink represent the target table of an insert
command
*/
public class BindSink implements AnalysisRuleFactory {
@@ -340,6 +340,14 @@ public class BindSink implements AnalysisRuleFactory {
fileSink.child().getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList())))
+ ),
+ // TODO: bind hive taget table
+ RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build(
+ logicalHiveTableSink().when(s ->
s.getOutputExprs().isEmpty())
+ .then(hiveTableSink ->
hiveTableSink.withOutputExprs(
+
hiveTableSink.child().getOutput().stream()
+
.map(NamedExpression.class::cast)
+
.collect(ImmutableList.toImmutableList())))
)
);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java
new file mode 100644
index 00000000000..13329a5d55e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
+
+import java.util.Optional;
+
+/**
+ * Implementation rule that convert logical HiveTableSink to physical
HiveTableSink.
+ */
+public class LogicalHiveTableSinkToPhysicalHiveTableSink extends
OneImplementationRuleFactory {
+ @Override
+ public Rule build() {
+ return logicalHiveTableSink().thenApply(ctx -> {
+ LogicalHiveTableSink<? extends Plan> sink = ctx.root;
+ return new PhysicalHiveTableSink<>(
+ sink.getDatabase(),
+ sink.getTargetTable(),
+ sink.getCols(),
+ sink.getPartitionIds(),
+ sink.getOutputExprs(),
+ Optional.empty(),
+ sink.getLogicalProperties(),
+ null,
+ null,
+ sink.child());
+
}).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index af27885fce3..11a6a7b5683 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -80,7 +80,7 @@ public enum PlanType {
LOGICAL_WINDOW,
// physical plans
- // logical relations
+ // physical relations
PHYSICAL_CTE_CONSUMER,
PHYSICAL_EMPTY_RELATION,
PHYSICAL_ES_SCAN,
@@ -92,12 +92,13 @@ public enum PlanType {
PHYSICAL_SCHEMA_SCAN,
PHYSICAL_TVF_RELATION,
- // logical sinks
+ // physical sinks
PHYSICAL_FILE_SINK,
PHYSICAL_OLAP_TABLE_SINK,
+ PHYSICAL_HIVE_TABLE_SINK,
PHYSICAL_RESULT_SINK,
- // logical others
+ // physical others
PHYSICAL_HASH_AGGREGATE,
PHYSICAL_ASSERT_NUM_ROWS,
PHYSICAL_CTE_PRODUCER,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
new file mode 100644
index 00000000000..f8bc8f2db47
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.HiveTableSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.transaction.TransactionState;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Insert executor for olap table
+ */
+public class HiveInsertExecutor extends AbstractInsertExecutor {
+ private static final Logger LOG =
LogManager.getLogger(HiveInsertExecutor.class);
+ private static final long INVALID_TXN_ID = -1L;
+ private long txnId = INVALID_TXN_ID;
+
+ /**
+ * constructor
+ */
+ public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
+ String labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx) {
+ super(ctx, table, labelName, planner, insertCtx);
+ }
+
+ public long getTxnId() {
+ return txnId;
+ }
+
+ @Override
+ public void beginTransaction() {
+
+ }
+
+ @Override
+ protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
+ HiveTableSink hiveTableSink = (HiveTableSink) sink;
+ // PhysicalHiveTableSink physicalHiveTableSink =
(PhysicalHiveTableSink) physicalSink;
+ try {
+ hiveTableSink.init();
+ hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
+ TransactionState state =
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(),
txnId);
+ if (state == null) {
+ throw new AnalysisException("txn does not exist: " + txnId);
+ }
+ } catch (Exception e) {
+ throw new AnalysisException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void beforeExec() {
+
+ }
+
+ @Override
+ protected void onComplete() throws UserException {
+
+ }
+
+ @Override
+ protected void onFail(Throwable t) {
+
+ }
+
+ @Override
+ protected void afterExec(StmtExecutor executor) {
+
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 06cd4275682..29d96ae4ad9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
+import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
@@ -34,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -160,6 +162,9 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
: false;
insertExecutor.getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+ } else if (physicalSink instanceof PhysicalHiveTableSink) {
+ HMSExternalTable hiveExternalTable = (HMSExternalTable)
targetTableIf;
+ insertExecutor = new HiveInsertExecutor(ctx,
hiveExternalTable, label, planner, insertCtx);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support
olap table");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index ab65b065e35..007fd48ccf7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
@@ -252,7 +253,13 @@ public class InsertUtils {
}
}
}
-
+ if (table instanceof HMSExternalTable) {
+ // TODO: check HMSExternalTable
+ HMSExternalTable hiveTable = (HMSExternalTable) table;
+ if (hiveTable.isView()) {
+ throw new AnalysisException("View is not support in hive
external table.");
+ }
+ }
Plan query = unboundTableSink.child();
if (!(query instanceof LogicalInlineTable)) {
return plan;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
new file mode 100644
index 00000000000..9d31a39b3e3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.hive.HMSExternalDatabase;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * logical hive table sink for insert command
+ */
+public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends
LogicalSink<CHILD_TYPE>
+ implements Sink, PropagateFuncDeps {
+ // bound data sink
+ private final HMSExternalDatabase database;
+ private final HMSExternalTable targetTable;
+ private final List<Column> cols;
+ private final List<Long> partitionIds;
+ private final DMLCommandType dmlCommandType;
+
+ /**
+ * constructor
+ */
+ public LogicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable
targetTable, List<Column> cols,
+ List<Long> partitionIds, List<NamedExpression>
outputExprs,
+ DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
+ super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression,
logicalProperties, child);
+ this.database = Objects.requireNonNull(database, "database != null in
LogicalHiveTableSink");
+ this.targetTable = Objects.requireNonNull(targetTable, "targetTable !=
null in LogicalHiveTableSink");
+ this.cols = Utils.copyRequiredList(cols);
+ this.dmlCommandType = dmlCommandType;
+ this.partitionIds = Utils.copyRequiredList(partitionIds);
+ }
+
+ public Plan withChildAndUpdateOutput(Plan child) {
+ List<NamedExpression> output = child.getOutput().stream()
+ .map(NamedExpression.class::cast)
+ .collect(ImmutableList.toImmutableList());
+ return new LogicalHiveTableSink<>(database, targetTable, cols,
partitionIds, output,
+ dmlCommandType, Optional.empty(), Optional.empty(), child);
+ }
+
+ @Override
+ public Plan withChildren(List<Plan> children) {
+ Preconditions.checkArgument(children.size() == 1,
"LogicalHiveTableSink only accepts one child");
+ return new LogicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs,
+ dmlCommandType, Optional.empty(), Optional.empty(),
children.get(0));
+ }
+
+ public LogicalHiveTableSink<CHILD_TYPE>
withOutputExprs(List<NamedExpression> outputExprs) {
+ return new LogicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs,
+ dmlCommandType, Optional.empty(), Optional.empty(), child());
+ }
+
+ public HMSExternalDatabase getDatabase() {
+ return database;
+ }
+
+ public HMSExternalTable getTargetTable() {
+ return targetTable;
+ }
+
+ public List<Column> getCols() {
+ return cols;
+ }
+
+ public List<Long> getPartitionIds() {
+ return partitionIds;
+ }
+
+ public DMLCommandType getDmlCommandType() {
+ return dmlCommandType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ LogicalHiveTableSink<?> that = (LogicalHiveTableSink<?>) o;
+ return dmlCommandType == that.dmlCommandType
+ && Objects.equals(database, that.database)
+ && Objects.equals(targetTable, that.targetTable) &&
Objects.equals(cols, that.cols)
+ && Objects.equals(partitionIds, that.partitionIds);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), database, targetTable, cols,
partitionIds, dmlCommandType);
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("LogicalHiveTableSink[" + id.asInt() + "]",
+ "outputExprs", outputExprs,
+ "database", database.getFullName(),
+ "targetTable", targetTable.getName(),
+ "cols", cols,
+ "partitionIds", partitionIds,
+ "dmlCommandType", dmlCommandType
+ );
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLogicalHiveTableSink(this, context);
+ }
+
+ @Override
+ public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
+ return new LogicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs,
+ dmlCommandType, groupExpression,
Optional.of(getLogicalProperties()), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ return new LogicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs,
+ dmlCommandType, groupExpression, logicalProperties,
children.get(0));
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java
new file mode 100644
index 00000000000..eee55e3c281
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.hive.HMSExternalDatabase;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** abstract physical hive sink */
+public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends
PhysicalSink<CHILD_TYPE> implements Sink {
+
+ private final HMSExternalDatabase database;
+ private final HMSExternalTable targetTable;
+ private final List<Column> cols;
+ private final List<Long> partitionIds;
+
+ /**
+ * constructor
+ */
+ public PhysicalHiveTableSink(HMSExternalDatabase database,
+ HMSExternalTable targetTable,
+ List<Column> cols,
+ List<Long> partitionIds,
+ List<NamedExpression> outputExprs,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties,
+ CHILD_TYPE child) {
+ this(database, targetTable, cols, partitionIds, outputExprs,
groupExpression, logicalProperties,
+ PhysicalProperties.GATHER, null, child);
+ }
+
+ /**
+ * constructor
+ */
+ public PhysicalHiveTableSink(HMSExternalDatabase database,
+ HMSExternalTable targetTable,
+ List<Column> cols,
+ List<Long> partitionIds,
+ List<NamedExpression> outputExprs,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties,
+ PhysicalProperties physicalProperties,
+ Statistics statistics,
+ CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression,
+ logicalProperties, physicalProperties, statistics, child);
+ this.database = Objects.requireNonNull(database, "database != null in
PhysicalHiveTableSink");
+ this.targetTable = Objects.requireNonNull(targetTable, "targetTable !=
null in PhysicalHiveTableSink");
+ this.cols = Utils.copyRequiredList(cols);
+ this.partitionIds = Utils.copyRequiredList(partitionIds);
+ }
+
+ @Override
+ public Plan withChildren(List<Plan> children) {
+ return new PhysicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs, groupExpression,
+ getLogicalProperties(), physicalProperties, statistics,
children.get(0));
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPhysicalHiveTableSink(this, context);
+ }
+
+ @Override
+ public List<? extends Expression> getExpressions() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
+ return new PhysicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs,
+ groupExpression, getLogicalProperties(), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties>
logicalProperties, List<Plan> children) {
+ return new PhysicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs,
+ groupExpression, logicalProperties.get(), children.get(0));
+ }
+
+ @Override
+ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties
physicalProperties, Statistics statistics) {
+ return new PhysicalHiveTableSink<>(database, targetTable, cols,
partitionIds, outputExprs,
+ groupExpression, getLogicalProperties(), physicalProperties,
statistics, child());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
index fcb0b474e47..b88cd910a36 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
@@ -22,12 +22,14 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
import
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
import
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@@ -74,6 +76,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(olapTableSink, context);
}
+ default R visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan>
hiveTableSink, C context) {
+ return visitLogicalSink(hiveTableSink, context);
+ }
+
default R visitLogicalResultSink(LogicalResultSink<? extends Plan>
logicalResultSink, C context) {
return visitLogicalSink(logicalResultSink, context);
}
@@ -99,6 +105,10 @@ public interface SinkVisitor<R, C> {
return visitPhysicalSink(olapTableSink, context);
}
+ default R visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan>
hiveTableSink, C context) {
+ return visitPhysicalSink(hiveTableSink, context);
+ }
+
default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan>
physicalResultSink, C context) {
return visitPhysicalSink(physicalResultSink, context);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
index b813093f782..c769bbea782 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
@@ -22,8 +22,9 @@ package org.apache.doris.planner;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.odbc.sink.OdbcTableSink;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TExplainLevel;
@@ -62,11 +63,13 @@ public abstract class DataSink {
public abstract DataPartition getOutputPartition();
- public static DataSink createDataSink(Table table) throws
AnalysisException {
+ public static DataSink createDataSink(TableIf table) throws
AnalysisException {
if (table instanceof MysqlTable) {
return new MysqlTableSink((MysqlTable) table);
} else if (table instanceof OdbcTable) {
return new OdbcTableSink((OdbcTable) table);
+ } else if (table instanceof HMSExternalTable) {
+ return new HiveTableSink((HMSExternalTable) table);
} else {
throw new AnalysisException("Unknown table type " +
table.getType());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
new file mode 100644
index 00000000000..99d0c6b1b03
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java
+// and modified by Doris
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.thrift.TDataSink;
+import org.apache.doris.thrift.TExplainLevel;
+
+public class HiveTableSink extends DataSink {
+
+ protected TDataSink tDataSink;
+
+ public HiveTableSink(HMSExternalTable table) {
+ super();
+ }
+
+ @Override
+ public String getExplainString(String prefix, TExplainLevel explainLevel) {
+ StringBuilder strBuilder = new StringBuilder();
+ strBuilder.append(prefix + "HIVE TABLE SINK\n");
+ if (explainLevel == TExplainLevel.BRIEF) {
+ return strBuilder.toString();
+ }
+ // TODO: explain partitions
+ return strBuilder.toString();
+ }
+
+ @Override
+ protected TDataSink toThrift() {
+ return tDataSink;
+ }
+
+ @Override
+ public PlanNodeId getExchNodeId() {
+ return null;
+ }
+
+ @Override
+ public DataPartition getOutputPartition() {
+ return DataPartition.RANDOM;
+ }
+
+ public void init() {
+ }
+
+ public void complete(Analyzer analyzer) {
+ }
+}
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 602943b4207..7c9d5e8f8c2 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -277,6 +277,67 @@ struct TOlapTableSink {
23: optional double max_filter_ratio
}
+struct THiveLocationParams {
+ 1: optional string write_path
+ 2: optional string target_path
+}
+
+struct TSortedColumn {
+ 1: optional string sort_column_name
+ 2: optional i32 order // asc(1) or desc(0)
+}
+
+struct TBucketingMode {
+ 1: optional i32 bucket_version
+}
+
+struct THiveBucket {
+ 1: optional list<string> bucketed_by
+ 2: optional TBucketingMode bucket_mode
+ 3: optional i32 bucket_count
+ 4: optional list<TSortedColumn> sorted_by
+}
+
+enum THiveCompressionType {
+ SNAPPY = 3,
+ LZ4 = 4,
+ ZLIB = 6,
+ ZSTD = 7,
+}
+
+struct THivePartition {
+ 1: optional list<string> values
+ 2: optional THiveLocationParams location
+ 3: optional PlanNodes.TFileFormatType file_format
+}
+
+struct THiveTableSink {
+ 1: optional string db_name
+ 2: optional string table_name
+ 3: optional list<string> data_column_names
+ 4: optional list<string> partition_column_names
+ 5: optional list<THivePartition> partitions
+ 6: optional list<THiveBucket> buckets
+ 7: optional PlanNodes.TFileFormatType file_format
+ 8: optional THiveCompressionType compression_type
+ 9: optional THiveLocationParams location
+}
+
+enum TUpdateMode {
+ NEW = 0, // add partition
+ APPEND = 1, // alter partition
+ OVERWRITE = 2 // insert overwrite
+}
+
+struct THivePartitionUpdate {
+ 1: optional string name
+ 2: optional TUpdateMode update_mode
+ 3: optional THiveLocationParams location
+ 4: optional list<string> file_names
+ 5: optional i64 row_count
+ 6: optional i64 file_size
+}
+
struct TDataSink {
1: required TDataSinkType type
2: optional TDataStreamSink stream_sink
@@ -289,5 +350,5 @@ struct TDataSink {
10: optional TResultFileSink result_file_sink
11: optional TJdbcTableSink jdbc_table_sink
12: optional TMultiCastDataStreamSink multi_cast_stream_sink
+ 13: optional THiveTableSink hive_table_sink
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]