This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 1645f2e0a77 [feature](insert)add hive table sink definition (#31662) 
(#32347)
1645f2e0a77 is described below

commit 1645f2e0a77f1a2804a29f71103c6a196e3f03df
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Mar 17 20:52:44 2024 +0800

    [feature](insert)add hive table sink definition (#31662) (#32347)
    
    bp #31662
    Co-authored-by: slothever <[email protected]>
---
 .../glue/translator/PhysicalPlanTranslator.java    |   8 ++
 .../pre/TurnOffPageCacheForInsertIntoSelect.java   |   7 +
 .../org/apache/doris/nereids/rules/RuleSet.java    |   2 +
 .../org/apache/doris/nereids/rules/RuleType.java   |   2 +
 .../doris/nereids/rules/analysis/BindSink.java     |  10 +-
 ...ogicalHiveTableSinkToPhysicalHiveTableSink.java |  49 +++++++
 .../apache/doris/nereids/trees/plans/PlanType.java |   7 +-
 .../plans/commands/insert/HiveInsertExecutor.java  | 100 +++++++++++++
 .../commands/insert/InsertIntoTableCommand.java    |   5 +
 .../trees/plans/commands/insert/InsertUtils.java   |   9 +-
 .../trees/plans/logical/LogicalHiveTableSink.java  | 160 +++++++++++++++++++++
 .../plans/physical/PhysicalHiveTableSink.java      | 119 +++++++++++++++
 .../nereids/trees/plans/visitor/SinkVisitor.java   |  10 ++
 .../java/org/apache/doris/planner/DataSink.java    |   7 +-
 .../org/apache/doris/planner/HiveTableSink.java    |  67 +++++++++
 gensrc/thrift/DataSinks.thrift                     |  63 +++++++-
 16 files changed, 617 insertions(+), 8 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9d24e229345..37d0e25f5ac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -119,6 +119,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
@@ -427,6 +428,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return rootFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink<? 
extends Plan> hiveTableSink,
+                                                   PlanTranslatorContext 
context) {
+        PlanFragment rootFragment = hiveTableSink.child().accept(this, 
context);
+        return rootFragment;
+    }
+
     @Override
     public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> 
fileSink,
             PlanTranslatorContext context) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
index 77955a94114..67f0c1c3ba9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
@@ -52,6 +53,12 @@ public class TurnOffPageCacheForInsertIntoSelect extends 
PlanPreprocessor {
         return tableSink;
     }
 
+    @Override
+    public Plan visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan> 
tableSink, StatementContext context) {
+        turnOffPageCache(context);
+        return tableSink;
+    }
+
     private void turnOffPageCache(StatementContext context) {
         SessionVariable sessionVariable = 
context.getConnectContext().getSessionVariable();
         // set temporary session value, and then revert value in the 'finally 
block' of StmtExecutor#execute
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index f2b5091fd37..408b0d7355e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -64,6 +64,7 @@ import 
org.apache.doris.nereids.rules.implementation.LogicalFileScanToPhysicalFi
 import 
org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFileSink;
 import 
org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
 import 
org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
+import 
org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
 import 
org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
 import 
org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
 import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
@@ -187,6 +188,7 @@ public class RuleSet {
             .add(new LogicalIntersectToPhysicalIntersect())
             .add(new LogicalGenerateToPhysicalGenerate())
             .add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
+            .add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
             .add(new LogicalFileSinkToPhysicalFileSink())
             .add(new LogicalResultSinkToPhysicalResultSink())
             .add(new 
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 4515aaf55fe..9650a9147b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -30,6 +30,7 @@ public enum RuleType {
 
     // **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in 
the rewrite rules. ****
     BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
+    BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE),
     BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
     BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
     BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
@@ -386,6 +387,7 @@ public enum RuleType {
     LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index 5c8a74f00e6..06ad6921def 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -68,7 +68,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
- * bind an unbound logicalOlapTableSink represent the target table of an 
insert command
+ * bind an unbound logicalTableSink represent the target table of an insert 
command
  */
 public class BindSink implements AnalysisRuleFactory {
 
@@ -340,6 +340,14 @@ public class BindSink implements AnalysisRuleFactory {
                                         fileSink.child().getOutput().stream()
                                                 
.map(NamedExpression.class::cast)
                                                 
.collect(ImmutableList.toImmutableList())))
+                ),
+                // TODO: bind hive taget table
+                RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build(
+                        logicalHiveTableSink().when(s -> 
s.getOutputExprs().isEmpty())
+                                .then(hiveTableSink -> 
hiveTableSink.withOutputExprs(
+                                        
hiveTableSink.child().getOutput().stream()
+                                                
.map(NamedExpression.class::cast)
+                                                
.collect(ImmutableList.toImmutableList())))
                 )
         );
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java
new file mode 100644
index 00000000000..13329a5d55e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
+
+import java.util.Optional;
+
+/**
+ * Implementation rule that convert logical HiveTableSink to physical 
HiveTableSink.
+ */
+public class LogicalHiveTableSinkToPhysicalHiveTableSink extends 
OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalHiveTableSink().thenApply(ctx -> {
+            LogicalHiveTableSink<? extends Plan> sink = ctx.root;
+            return new PhysicalHiveTableSink<>(
+                    sink.getDatabase(),
+                    sink.getTargetTable(),
+                    sink.getCols(),
+                    sink.getPartitionIds(),
+                    sink.getOutputExprs(),
+                    Optional.empty(),
+                    sink.getLogicalProperties(),
+                    null,
+                    null,
+                    sink.child());
+        
}).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index af27885fce3..11a6a7b5683 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -80,7 +80,7 @@ public enum PlanType {
     LOGICAL_WINDOW,
 
     // physical plans
-    // logical relations
+    // physical relations
     PHYSICAL_CTE_CONSUMER,
     PHYSICAL_EMPTY_RELATION,
     PHYSICAL_ES_SCAN,
@@ -92,12 +92,13 @@ public enum PlanType {
     PHYSICAL_SCHEMA_SCAN,
     PHYSICAL_TVF_RELATION,
 
-    // logical sinks
+    // physical sinks
     PHYSICAL_FILE_SINK,
     PHYSICAL_OLAP_TABLE_SINK,
+    PHYSICAL_HIVE_TABLE_SINK,
     PHYSICAL_RESULT_SINK,
 
-    // logical others
+    // physical others
     PHYSICAL_HASH_AGGREGATE,
     PHYSICAL_ASSERT_NUM_ROWS,
     PHYSICAL_CTE_PRODUCER,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
new file mode 100644
index 00000000000..f8bc8f2db47
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.HiveTableSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.transaction.TransactionState;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Insert executor for olap table
+ */
+public class HiveInsertExecutor extends AbstractInsertExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(HiveInsertExecutor.class);
+    private static final long INVALID_TXN_ID = -1L;
+    private long txnId = INVALID_TXN_ID;
+
+    /**
+     * constructor
+     */
+    public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
+                              String labelName, NereidsPlanner planner,
+                              Optional<InsertCommandContext> insertCtx) {
+        super(ctx, table, labelName, planner, insertCtx);
+    }
+
+    public long getTxnId() {
+        return txnId;
+    }
+
+    @Override
+    public void beginTransaction() {
+
+    }
+
+    @Override
+    protected void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
+        HiveTableSink hiveTableSink = (HiveTableSink) sink;
+        // PhysicalHiveTableSink physicalHiveTableSink = 
(PhysicalHiveTableSink) physicalSink;
+        try {
+            hiveTableSink.init();
+            hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
+            TransactionState state = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), 
txnId);
+            if (state == null) {
+                throw new AnalysisException("txn does not exist: " + txnId);
+            }
+        } catch (Exception e) {
+            throw new AnalysisException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void beforeExec() {
+
+    }
+
+    @Override
+    protected void onComplete() throws UserException {
+
+    }
+
+    @Override
+    protected void onFail(Throwable t) {
+
+    }
+
+    @Override
+    protected void afterExec(StmtExecutor executor) {
+
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 06cd4275682..29d96ae4ad9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.util.ProfileManager.ProfileType;
+import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -34,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -160,6 +162,9 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                                 : false;
                 insertExecutor.getCoordinator().getQueryOptions()
                         
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+            } else if (physicalSink instanceof PhysicalHiveTableSink) {
+                HMSExternalTable hiveExternalTable = (HMSExternalTable) 
targetTableIf;
+                insertExecutor = new HiveInsertExecutor(ctx, 
hiveExternalTable, label, planner, insertCtx);
             } else {
                 // TODO: support other table types
                 throw new AnalysisException("insert into command only support 
olap table");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index ab65b065e35..007fd48ccf7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.nereids.analyzer.UnboundAlias;
 import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
@@ -252,7 +253,13 @@ public class InsertUtils {
                 }
             }
         }
-
+        if (table instanceof HMSExternalTable) {
+            // TODO: check HMSExternalTable
+            HMSExternalTable hiveTable = (HMSExternalTable) table;
+            if (hiveTable.isView()) {
+                throw new AnalysisException("View is not support in hive 
external table.");
+            }
+        }
         Plan query = unboundTableSink.child();
         if (!(query instanceof LogicalInlineTable)) {
             return plan;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
new file mode 100644
index 00000000000..9d31a39b3e3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.hive.HMSExternalDatabase;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * logical hive table sink for insert command
+ */
+public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends 
LogicalSink<CHILD_TYPE>
+        implements Sink, PropagateFuncDeps {
+    // bound data sink
+    private final HMSExternalDatabase database;
+    private final HMSExternalTable targetTable;
+    private final List<Column> cols;
+    private final List<Long> partitionIds;
+    private final DMLCommandType dmlCommandType;
+
+    /**
+     * constructor
+     */
+    public LogicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable 
targetTable, List<Column> cols,
+                                List<Long> partitionIds, List<NamedExpression> 
outputExprs,
+                                DMLCommandType dmlCommandType, 
Optional<GroupExpression> groupExpression,
+                                Optional<LogicalProperties> logicalProperties, 
CHILD_TYPE child) {
+        super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, 
logicalProperties, child);
+        this.database = Objects.requireNonNull(database, "database != null in 
LogicalHiveTableSink");
+        this.targetTable = Objects.requireNonNull(targetTable, "targetTable != 
null in LogicalHiveTableSink");
+        this.cols = Utils.copyRequiredList(cols);
+        this.dmlCommandType = dmlCommandType;
+        this.partitionIds = Utils.copyRequiredList(partitionIds);
+    }
+
+    public Plan withChildAndUpdateOutput(Plan child) {
+        List<NamedExpression> output = child.getOutput().stream()
+                .map(NamedExpression.class::cast)
+                .collect(ImmutableList.toImmutableList());
+        return new LogicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, output,
+                dmlCommandType, Optional.empty(), Optional.empty(), child);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1, 
"LogicalHiveTableSink only accepts one child");
+        return new LogicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
+                dmlCommandType, Optional.empty(), Optional.empty(), 
children.get(0));
+    }
+
+    public LogicalHiveTableSink<CHILD_TYPE> 
withOutputExprs(List<NamedExpression> outputExprs) {
+        return new LogicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
+                dmlCommandType, Optional.empty(), Optional.empty(), child());
+    }
+
+    public HMSExternalDatabase getDatabase() {
+        return database;
+    }
+
+    public HMSExternalTable getTargetTable() {
+        return targetTable;
+    }
+
+    public List<Column> getCols() {
+        return cols;
+    }
+
+    public List<Long> getPartitionIds() {
+        return partitionIds;
+    }
+
+    public DMLCommandType getDmlCommandType() {
+        return dmlCommandType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        LogicalHiveTableSink<?> that = (LogicalHiveTableSink<?>) o;
+        return dmlCommandType == that.dmlCommandType
+                && Objects.equals(database, that.database)
+                && Objects.equals(targetTable, that.targetTable) && 
Objects.equals(cols, that.cols)
+                && Objects.equals(partitionIds, that.partitionIds);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), database, targetTable, cols, 
partitionIds, dmlCommandType);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("LogicalHiveTableSink[" + id.asInt() + "]",
+                "outputExprs", outputExprs,
+                "database", database.getFullName(),
+                "targetTable", targetTable.getName(),
+                "cols", cols,
+                "partitionIds", partitionIds,
+                "dmlCommandType", dmlCommandType
+        );
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalHiveTableSink(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new LogicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
+                dmlCommandType, groupExpression, 
Optional.of(getLogicalProperties()), child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new LogicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
+                dmlCommandType, groupExpression, logicalProperties, 
children.get(0));
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java
new file mode 100644
index 00000000000..eee55e3c281
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.hive.HMSExternalDatabase;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** abstract physical hive sink */
+public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends 
PhysicalSink<CHILD_TYPE> implements Sink {
+
+    private final HMSExternalDatabase database;
+    private final HMSExternalTable targetTable;
+    private final List<Column> cols;
+    private final List<Long> partitionIds;
+
+    /**
+     * constructor
+     */
+    public PhysicalHiveTableSink(HMSExternalDatabase database,
+                                 HMSExternalTable targetTable,
+                                 List<Column> cols,
+                                 List<Long> partitionIds,
+                                 List<NamedExpression> outputExprs,
+                                 Optional<GroupExpression> groupExpression,
+                                 LogicalProperties logicalProperties,
+                                 CHILD_TYPE child) {
+        this(database, targetTable, cols, partitionIds, outputExprs, 
groupExpression, logicalProperties,
+                PhysicalProperties.GATHER, null, child);
+    }
+
+    /**
+     * constructor
+     */
+    public PhysicalHiveTableSink(HMSExternalDatabase database,
+                                 HMSExternalTable targetTable,
+                                 List<Column> cols,
+                                 List<Long> partitionIds,
+                                 List<NamedExpression> outputExprs,
+                                 Optional<GroupExpression> groupExpression,
+                                 LogicalProperties logicalProperties,
+                                 PhysicalProperties physicalProperties,
+                                 Statistics statistics,
+                                 CHILD_TYPE child) {
+        super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression,
+                logicalProperties, physicalProperties, statistics, child);
+        this.database = Objects.requireNonNull(database, "database != null in 
PhysicalHiveTableSink");
+        this.targetTable = Objects.requireNonNull(targetTable, "targetTable != 
null in PhysicalHiveTableSink");
+        this.cols = Utils.copyRequiredList(cols);
+        this.partitionIds = Utils.copyRequiredList(partitionIds);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs, groupExpression,
+                getLogicalProperties(), physicalProperties, statistics, 
children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalHiveTableSink(this, context);
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return ImmutableList.of();
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
+                groupExpression, getLogicalProperties(), child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+                                                 Optional<LogicalProperties> 
logicalProperties, List<Plan> children) {
+        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
+                groupExpression, logicalProperties.get(), children.get(0));
+    }
+
+    @Override
+    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
+        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
index fcb0b474e47..b88cd910a36 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
@@ -22,12 +22,14 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.trees.plans.Plan;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@@ -74,6 +76,10 @@ public interface SinkVisitor<R, C> {
         return visitLogicalSink(olapTableSink, context);
     }
 
+    default R visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan> 
hiveTableSink, C context) {
+        return visitLogicalSink(hiveTableSink, context);
+    }
+
     default R visitLogicalResultSink(LogicalResultSink<? extends Plan> 
logicalResultSink, C context) {
         return visitLogicalSink(logicalResultSink, context);
     }
@@ -99,6 +105,10 @@ public interface SinkVisitor<R, C> {
         return visitPhysicalSink(olapTableSink, context);
     }
 
+    default R visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> 
hiveTableSink, C context) {
+        return visitPhysicalSink(hiveTableSink, context);
+    }
+
     default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> 
physicalResultSink, C context) {
         return visitPhysicalSink(physicalResultSink, context);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
index b813093f782..c769bbea782 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
@@ -22,8 +22,9 @@ package org.apache.doris.planner;
 
 import org.apache.doris.catalog.MysqlTable;
 import org.apache.doris.catalog.OdbcTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.odbc.sink.OdbcTableSink;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TExplainLevel;
@@ -62,11 +63,13 @@ public abstract class DataSink {
 
     public abstract DataPartition getOutputPartition();
 
-    public static DataSink createDataSink(Table table) throws 
AnalysisException {
+    public static DataSink createDataSink(TableIf table) throws 
AnalysisException {
         if (table instanceof MysqlTable) {
             return new MysqlTableSink((MysqlTable) table);
         } else if (table instanceof OdbcTable) {
             return new OdbcTableSink((OdbcTable) table);
+        } else if (table instanceof HMSExternalTable) {
+            return new HiveTableSink((HMSExternalTable) table);
         } else {
             throw new AnalysisException("Unknown table type " + 
table.getType());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
new file mode 100644
index 00000000000..99d0c6b1b03
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java
+// and modified by Doris
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.thrift.TDataSink;
+import org.apache.doris.thrift.TExplainLevel;
+
+public class HiveTableSink extends DataSink {
+
+    protected TDataSink tDataSink;
+
+    public HiveTableSink(HMSExternalTable table) {
+        super();
+    }
+
+    @Override
+    public String getExplainString(String prefix, TExplainLevel explainLevel) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append(prefix + "HIVE TABLE SINK\n");
+        if (explainLevel == TExplainLevel.BRIEF) {
+            return strBuilder.toString();
+        }
+        // TODO: explain partitions
+        return strBuilder.toString();
+    }
+
+    @Override
+    protected TDataSink toThrift() {
+        return tDataSink;
+    }
+
+    @Override
+    public PlanNodeId getExchNodeId() {
+        return null;
+    }
+
+    @Override
+    public DataPartition getOutputPartition() {
+        return DataPartition.RANDOM;
+    }
+
+    public void init() {
+    }
+
+    public void complete(Analyzer analyzer) {
+    }
+}
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 602943b4207..7c9d5e8f8c2 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -277,6 +277,67 @@ struct TOlapTableSink {
     23: optional double max_filter_ratio
 }
 
+struct THiveLocationParams {
+  1: optional string write_path
+  2: optional string target_path
+}
+
+struct TSortedColumn {
+    1: optional string sort_column_name
+    2: optional i32 order // asc(1) or desc(0)
+}
+
+struct TBucketingMode {
+    1: optional i32 bucket_version
+}
+
+struct THiveBucket {
+    1: optional list<string> bucketed_by
+    2: optional TBucketingMode bucket_mode
+    3: optional i32 bucket_count
+    4: optional list<TSortedColumn> sorted_by
+}
+
+enum THiveCompressionType {
+    SNAPPY = 3,
+    LZ4 = 4,
+    ZLIB = 6,
+    ZSTD = 7,
+}
+
+struct THivePartition {
+  1: optional list<string> values
+  2: optional THiveLocationParams location
+  3: optional PlanNodes.TFileFormatType file_format
+}
+
+struct THiveTableSink {
+    1: optional string db_name
+    2: optional string table_name
+    3: optional list<string> data_column_names
+    4: optional list<string> partition_column_names
+    5: optional list<THivePartition> partitions
+    6: optional list<THiveBucket> buckets
+    7: optional PlanNodes.TFileFormatType file_format
+    8: optional THiveCompressionType compression_type
+    9: optional THiveLocationParams location
+}
+
+enum TUpdateMode {
+    NEW = 0, // add partition
+    APPEND = 1, // alter partition
+    OVERWRITE = 2 // insert overwrite
+}
+
+struct THivePartitionUpdate {
+    1: optional string name
+    2: optional TUpdateMode update_mode
+    3: optional THiveLocationParams location
+    4: optional list<string> file_names
+    5: optional i64 row_count
+    6: optional i64 file_size
+}
+
 struct TDataSink {
   1: required TDataSinkType type
   2: optional TDataStreamSink stream_sink
@@ -289,5 +350,5 @@ struct TDataSink {
   10: optional TResultFileSink result_file_sink
   11: optional TJdbcTableSink jdbc_table_sink
   12: optional TMultiCastDataStreamSink multi_cast_stream_sink
+  13: optional THiveTableSink hive_table_sink
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to