This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new f3a42faf406 [branch-2.1][improvement](jdbc catalog) support jdbc
external catalog insert stmt in nereids (#40902)
f3a42faf406 is described below
commit f3a42faf4064695f6df781a62d62afff9e1ca0fb
Author: zy-kkk <[email protected]>
AuthorDate: Wed Sep 18 14:02:20 2024 +0800
[branch-2.1][improvement](jdbc catalog) support jdbc external catalog
insert stmt in nereids (#40902)
pick (#39813)
---
.../nereids/analyzer/UnboundJdbcTableSink.java | 84 ++++++++++++
.../nereids/analyzer/UnboundTableSinkCreator.java | 18 +--
.../glue/translator/PhysicalPlanTranslator.java | 20 +++
.../pre/TurnOffPageCacheForInsertIntoSelect.java | 8 ++
.../nereids/properties/RequestPropertyDeriver.java | 9 ++
.../org/apache/doris/nereids/rules/RuleSet.java | 2 +
.../org/apache/doris/nereids/rules/RuleType.java | 2 +
.../doris/nereids/rules/analysis/BindSink.java | 77 ++++++++++-
...ogicalJdbcTableSinkToPhysicalJdbcTableSink.java | 48 +++++++
.../apache/doris/nereids/trees/plans/PlanType.java | 3 +
.../insert/BaseExternalTableInsertExecutor.java | 2 +-
.../commands/insert/InsertIntoTableCommand.java | 27 +++-
.../trees/plans/commands/insert/InsertUtils.java | 9 ++
.../commands/insert/JdbcInsertCommandContext.java} | 10 +-
.../plans/commands/insert/JdbcInsertExecutor.java | 113 +++++++++++++++
.../trees/plans/logical/LogicalJdbcTableSink.java | 151 +++++++++++++++++++++
.../physical/PhysicalBaseExternalTableSink.java | 4 +
.../plans/physical/PhysicalJdbcTableSink.java | 109 +++++++++++++++
.../nereids/trees/plans/visitor/SinkVisitor.java | 15 ++
...actionType.java => JdbcTransactionManager.java} | 26 +++-
.../apache/doris/transaction/TransactionType.java | 3 +-
21 files changed, 718 insertions(+), 22 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java
new file mode 100644
index 00000000000..53367cf9c21
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.analyzer;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Represent an jdbc table sink plan node that has not been bound.
+ */
+public class UnboundJdbcTableSink<CHILD_TYPE extends Plan> extends
UnboundBaseExternalTableSink<CHILD_TYPE> {
+
+ public UnboundJdbcTableSink(List<String> nameParts, List<String> colNames,
List<String> hints,
+ List<String> partitions, CHILD_TYPE child) {
+ this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
+ Optional.empty(), Optional.empty(), child);
+ }
+
+ /**
+ * constructor
+ */
+ public UnboundJdbcTableSink(List<String> nameParts,
+ List<String> colNames,
+ List<String> hints,
+ List<String> partitions,
+ DMLCommandType dmlCommandType,
+ Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties,
+ CHILD_TYPE child) {
+ super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK,
ImmutableList.of(), groupExpression,
+ logicalProperties, colNames, dmlCommandType, child, hints,
partitions);
+ }
+
+ @Override
+ public Plan withChildren(List<Plan> children) {
+ Preconditions.checkArgument(children.size() == 1,
+ "UnboundJdbcTableSink should have exactly one child");
+ return new UnboundJdbcTableSink<>(nameParts, colNames, hints,
partitions,
+ dmlCommandType, Optional.empty(), Optional.empty(),
children.get(0));
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitUnboundJdbcTableSink(this, context);
+ }
+
+ @Override
+ public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
+ return new UnboundJdbcTableSink<>(nameParts, colNames, hints,
partitions,
+ dmlCommandType, groupExpression,
Optional.of(getLogicalProperties()), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ return new UnboundJdbcTableSink<>(nameParts, colNames, hints,
partitions,
+ dmlCommandType, groupExpression, logicalProperties,
children.get(0));
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
index e1c771b6a4c..8ca58f97757 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -78,6 +79,9 @@ public class UnboundTableSinkCreator {
} else if (curCatalog instanceof IcebergExternalCatalog) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints,
partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
+ } else if (curCatalog instanceof JdbcExternalCatalog) {
+ return new UnboundJdbcTableSink<>(nameParts, colNames, hints,
partitions,
+ dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new RuntimeException("Load data to " +
curCatalog.getClass().getSimpleName() + " is not supported.");
}
@@ -109,16 +113,12 @@ public class UnboundTableSinkCreator {
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog &&
!isAutoDetectPartition) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints,
partitions,
- dmlCommandType, Optional.empty(), Optional.empty(), plan);
- }
- // TODO: we need to support insert into other catalog
- try {
- if (ConnectContext.get() != null) {
-
ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce();
- }
- } catch (Exception e) {
- // ignore this.
+ dmlCommandType, Optional.empty(), Optional.empty(), plan);
+ } else if (curCatalog instanceof JdbcExternalCatalog) {
+ return new UnboundJdbcTableSink<>(nameParts, colNames, hints,
partitions,
+ dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
+
throw new AnalysisException(
(isOverwrite ? "insert overwrite" : "insert") + " data to " +
curCatalog.getClass().getSimpleName()
+ " is not supported."
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 78a6a083090..c2de6c0818d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -57,6 +57,7 @@ import org.apache.doris.datasource.hudi.source.HudiScanNode;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
@@ -125,6 +126,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
@@ -489,6 +491,24 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
return rootFragment;
}
+ @Override
+ public PlanFragment visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<?
extends Plan> jdbcTableSink,
+ PlanTranslatorContext context) {
+ PlanFragment rootFragment = jdbcTableSink.child().accept(this,
context);
+ rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
+ List<Column> targetTableColumns = jdbcTableSink.getCols();
+ List<String> insertCols = targetTableColumns.stream()
+ .map(Column::getName)
+ .collect(Collectors.toList());
+
+ JdbcTableSink sink = new JdbcTableSink(
+ ((JdbcExternalTable)
jdbcTableSink.getTargetTable()).getJdbcTable(),
+ insertCols
+ );
+ rootFragment.setSink(sink);
+ return rootFragment;
+ }
+
@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan>
fileSink,
PlanTranslatorContext context) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
index ab817c2f1d7..2479af68fbe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java
@@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
@@ -67,6 +68,13 @@ public class TurnOffPageCacheForInsertIntoSelect extends
PlanPreprocessor {
return tableSink;
}
+ @Override
+ public Plan visitLogicalJdbcTableSink(
+ LogicalJdbcTableSink<? extends Plan> tableSink, StatementContext
context) {
+ turnOffPageCache(context);
+ return tableSink;
+ }
+
private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable =
context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally
block' of StmtExecutor#execute
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 750707c52c4..ee3d8ee3124 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -40,6 +40,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@@ -152,6 +153,14 @@ public class RequestPropertyDeriver extends
PlanVisitor<Void, PlanContext> {
return null;
}
+ @Override
+ public Void visitPhysicalJdbcTableSink(
+ PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, PlanContext
context) {
+ // Always use gather properties for jdbcTableSink
+ addRequestPropertyToChildren(PhysicalProperties.GATHER);
+ return null;
+ }
+
@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan>
physicalResultSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index 26920764d89..26dfa1bfcb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -76,6 +76,7 @@ import
org.apache.doris.nereids.rules.implementation.LogicalHudiScanToPhysicalHu
import
org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
import
org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import
org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
+import
org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
import
org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin;
import
org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit;
@@ -202,6 +203,7 @@ public class RuleSet {
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink())
+ .add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 082ee72fbed..b2d84679f23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -32,6 +32,7 @@ public enum RuleType {
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
+ BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE),
@@ -428,6 +429,7 @@ public enum RuleType {
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
+
LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
index 793ed5cc8f8..4a9660a5144 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java
@@ -31,10 +31,13 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
+import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -57,6 +60,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -108,7 +112,8 @@ public class BindSink implements AnalysisRuleFactory {
// TODO: bind hive taget table
RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)),
RuleType.BINDING_INSERT_ICEBERG_TABLE.build(
-
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink))
+
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)),
+
RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink))
);
}
@@ -502,6 +507,64 @@ public class BindSink implements AnalysisRuleFactory {
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}
+ private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>>
ctx) {
+ UnboundJdbcTableSink<?> sink = ctx.root;
+ Pair<JdbcExternalDatabase, JdbcExternalTable> pair =
bind(ctx.cascadesContext, sink);
+ JdbcExternalDatabase database = pair.first;
+ JdbcExternalTable table = pair.second;
+ LogicalPlan child = ((LogicalPlan) sink.child());
+
+ List<Column> bindColumns;
+ if (sink.getColNames().isEmpty()) {
+ bindColumns =
table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
+ } else {
+ bindColumns = sink.getColNames().stream().map(cn -> {
+ Column column = table.getColumn(cn);
+ if (column == null) {
+ throw new AnalysisException(String.format("column %s is
not found in table %s",
+ cn, table.getName()));
+ }
+ return column;
+ }).collect(ImmutableList.toImmutableList());
+ }
+ LogicalJdbcTableSink<?> boundSink = new LogicalJdbcTableSink<>(
+ database,
+ table,
+ bindColumns,
+ child.getOutput().stream()
+ .map(NamedExpression.class::cast)
+ .collect(ImmutableList.toImmutableList()),
+ sink.getDMLCommandType(),
+ Optional.empty(),
+ Optional.empty(),
+ child);
+ // we need to insert all the columns of the target table
+ if (boundSink.getCols().size() != child.getOutput().size()) {
+ throw new AnalysisException("insert into cols should be
corresponding to the query output");
+ }
+ Map<String, NamedExpression> columnToOutput =
getJdbcColumnToOutput(bindColumns, child);
+ // We don't need to insert unmentioned columns, only user specified
columns
+ LogicalProject<?> outputProject =
getOutputProjectByCoercion(bindColumns, child, columnToOutput);
+ return boundSink.withChildAndUpdateOutput(outputProject);
+ }
+
+ private static Map<String, NamedExpression> getJdbcColumnToOutput(
+ List<Column> bindColumns, LogicalPlan child) {
+ Map<String, NamedExpression> columnToOutput =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+
+ for (int i = 0; i < bindColumns.size(); i++) {
+ Column column = bindColumns.get(i);
+ NamedExpression outputExpr = child.getOutput().get(i);
+ Alias output = new Alias(
+ TypeCoercionUtils.castIfNotSameType(outputExpr,
DataType.fromCatalogType(column.getType())),
+ column.getName()
+ );
+ columnToOutput.put(column.getName(), output);
+ }
+
+ return columnToOutput;
+ }
+
private Pair<Database, OlapTable> bind(CascadesContext cascadesContext,
UnboundTableSink<? extends Plan> sink) {
List<String> tableQualifier =
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
@@ -545,6 +608,18 @@ public class BindSink implements AnalysisRuleFactory {
throw new AnalysisException("the target table of insert into is not an
iceberg table");
}
+ private Pair<JdbcExternalDatabase, JdbcExternalTable> bind(CascadesContext
cascadesContext,
+ UnboundJdbcTableSink<? extends Plan> sink) {
+ List<String> tableQualifier =
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
+ sink.getNameParts());
+ Pair<DatabaseIf<?>, TableIf> pair =
RelationUtil.getDbAndTable(tableQualifier,
+ cascadesContext.getConnectContext().getEnv());
+ if (pair.second instanceof JdbcExternalTable) {
+ return Pair.of(((JdbcExternalDatabase) pair.first),
(JdbcExternalTable) pair.second);
+ }
+ throw new AnalysisException("the target table of insert into is not an
jdbc table");
+ }
+
private List<Long> bindPartitionIds(OlapTable table, List<String>
partitions, boolean temp) {
return partitions.isEmpty()
? ImmutableList.of()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java
new file mode 100644
index 00000000000..960350c6117
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
+
+import java.util.Optional;
+
+/**
+ * Implementation rule that convert logical JdbcTableSink to physical
JdbcTableSink.
+ */
+public class LogicalJdbcTableSinkToPhysicalJdbcTableSink extends
OneImplementationRuleFactory {
+ @Override
+ public Rule build() {
+ return logicalJdbcTableSink().thenApply(ctx -> {
+ LogicalJdbcTableSink<? extends Plan> sink = ctx.root;
+ return new PhysicalJdbcTableSink<>(
+ sink.getDatabase(),
+ sink.getTargetTable(),
+ sink.getCols(),
+ sink.getOutputExprs(),
+ Optional.empty(),
+ sink.getLogicalProperties(),
+ null,
+ null,
+ sink.child());
+
}).toRule(RuleType.LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index c665e4751d3..3ff217f39ef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -49,9 +49,11 @@ public enum PlanType {
LOGICAL_OLAP_TABLE_SINK,
LOGICAL_HIVE_TABLE_SINK,
LOGICAL_ICEBERG_TABLE_SINK,
+ LOGICAL_JDBC_TABLE_SINK,
LOGICAL_RESULT_SINK,
LOGICAL_UNBOUND_OLAP_TABLE_SINK,
LOGICAL_UNBOUND_HIVE_TABLE_SINK,
+ LOGICAL_UNBOUND_JDBC_TABLE_SINK,
LOGICAL_UNBOUND_RESULT_SINK,
// logical others
@@ -103,6 +105,7 @@ public enum PlanType {
PHYSICAL_OLAP_TABLE_SINK,
PHYSICAL_HIVE_TABLE_SINK,
PHYSICAL_ICEBERG_TABLE_SINK,
+ PHYSICAL_JDBC_TABLE_SINK,
PHYSICAL_RESULT_SINK,
// physical others
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index 1c22b9bf56a..ef14eb914a7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -46,8 +46,8 @@ import java.util.Optional;
* Insert executor for base external table
*/
public abstract class BaseExternalTableInsertExecutor extends
AbstractInsertExecutor {
+ protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG =
LogManager.getLogger(BaseExternalTableInsertExecutor.class);
- private static final long INVALID_TXN_ID = -1L;
protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
protected final TransactionManager transactionManager;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 4e89a004bd2..e5a4eac5e39 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
@@ -25,12 +26,14 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
@@ -40,8 +43,11 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.planner.DataSink;
import org.apache.doris.qe.ConnectContext;
@@ -52,6 +58,7 @@ import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -192,9 +199,27 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
IcebergExternalTable icebergExternalTable =
(IcebergExternalTable) targetTableIf;
insertExecutor = new IcebergInsertExecutor(ctx,
icebergExternalTable, label, planner,
Optional.of(insertCtx.orElse((new
BaseExternalTableInsertCommandContext()))), emptyInsert);
+ } else if (physicalSink instanceof PhysicalJdbcTableSink) {
+ boolean emptyInsert = childIsEmptyRelation(physicalSink);
+ List<Column> cols = ((PhysicalJdbcTableSink<?>)
physicalSink).getCols();
+ List<Slot> slots = ((PhysicalJdbcTableSink<?>)
physicalSink).getOutput();
+ if (physicalSink.children().size() == 1) {
+ if (physicalSink.child(0) instanceof PhysicalOneRowRelation
+ || physicalSink.child(0) instanceof PhysicalUnion)
{
+ for (int i = 0; i < cols.size(); i++) {
+ if (!(cols.get(i).isAllowNull()) &&
slots.get(i).nullable()) {
+ throw new AnalysisException("Column `" +
cols.get(i).getName()
+ + "` is not nullable, but the inserted
value is nullable.");
+ }
+ }
+ }
+ }
+ JdbcExternalTable jdbcExternalTable = (JdbcExternalTable)
targetTableIf;
+ insertExecutor = new JdbcInsertExecutor(ctx,
jdbcExternalTable, label, planner,
+ Optional.of(insertCtx.orElse((new
JdbcInsertCommandContext()))), emptyInsert);
} else {
// TODO: support other table types
- throw new AnalysisException("insert into command only support
[olap, hive, iceberg] table");
+ throw new AnalysisException("insert into command only support
[olap, hive, iceberg, jdbc] table");
}
if (!insertExecutor.isEmptyInsert()) {
insertExecutor.beginTransaction();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 48ea98ff9de..dc1fefdbff4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -27,9 +27,11 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
+import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -260,6 +262,11 @@ public class InsertUtils {
throw new AnalysisException("View is not support in hive
external table.");
}
}
+ if (table instanceof JdbcExternalTable) {
+ // todo:
+ // For JDBC External Table, we always allow certain columns to be
missing during insertion
+ // Specific check for non-nullable columns only if insertion is
direct VALUES or SELECT constants
+ }
if (table instanceof OlapTable && ((OlapTable) table).getKeysType() ==
KeysType.UNIQUE_KEYS) {
if (unboundLogicalSink instanceof UnboundTableSink
&& ((UnboundTableSink<? extends Plan>)
unboundLogicalSink).isPartialUpdate()) {
@@ -383,6 +390,8 @@ public class InsertUtils {
unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan;
} else if (plan instanceof UnboundIcebergTableSink) {
unboundTableSink = (UnboundIcebergTableSink<? extends Plan>) plan;
+ } else if (plan instanceof UnboundJdbcTableSink) {
+ unboundTableSink = (UnboundJdbcTableSink<? extends Plan>) plan;
} else {
throw new AnalysisException("the root of plan should be"
+ " [UnboundTableSink, UnboundHiveTableSink,
UnboundIcebergTableSink],"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java
similarity index 81%
copy from
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java
index 2372c199738..71df7e417e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.transaction;
+package org.apache.doris.nereids.trees.plans.commands.insert;
-public enum TransactionType {
- UNKNOWN,
- HMS,
- ICEBERG
+/**
+ * For iceberg External Table
+ */
+public class JdbcInsertCommandContext extends
BaseExternalTableInsertCommandContext {
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
new file mode 100644
index 00000000000..928b17edf38
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.transaction.TransactionType;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Insert executor for jdbc table
+ */
+public class JdbcInsertExecutor extends BaseExternalTableInsertExecutor {
+ private static final Logger LOG =
LogManager.getLogger(JdbcInsertExecutor.class);
+
+ /**
+ * constructor
+ */
+ public JdbcInsertExecutor(ConnectContext ctx, JdbcExternalTable table,
+ String labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx,
+ boolean emptyInsert) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+ }
+
+ @Override
+ public void beginTransaction() {
+ // do nothing
+ }
+
+ @Override
+ protected void onComplete() throws UserException {
+ if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
+ LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
+ } else {
+ summaryProfile.ifPresent(profile ->
profile.setTransactionBeginTime(transactionType()));
+ summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
+ txnStatus = TransactionStatus.COMMITTED;
+ }
+ }
+
+ @Override
+ protected void onFail(Throwable t) {
+ errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
+ String queryId = DebugUtil.printId(ctx.queryId());
+ // if any throwable being thrown during insert operation, first we
should abort this txn
+ LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
+ StringBuilder sb = new StringBuilder(t.getMessage());
+ if (txnId != INVALID_TXN_ID) {
+ LOG.warn("insert [{}] with query id {} abort txn {} failed",
labelName, queryId, txnId);
+ if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
+ sb.append(". url: ").append(coordinator.getTrackingUrl());
+ }
+ }
+ ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
+ }
+
+ @Override
+ protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
+ // do nothing
+ }
+
+ @Override
+ protected void setCollectCommitInfoFunc() {
+ // do nothing
+ }
+
+ @Override
+ protected void doBeforeCommit() throws UserException {
+ // do nothing
+ }
+
+ @Override
+ protected TransactionType transactionType() {
+ return TransactionType.JDBC;
+ }
+
+ @Override
+ protected void beforeExec() {
+ String queryId = DebugUtil.printId(ctx.queryId());
+ LOG.info("start insert [{}] with query id {} and txn id {}",
labelName, queryId, txnId);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java
new file mode 100644
index 00000000000..b4027383916
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * logical jdbc table sink for insert command
+ */
+public class LogicalJdbcTableSink<CHILD_TYPE extends Plan> extends
LogicalTableSink<CHILD_TYPE>
+ implements Sink, PropagateFuncDeps {
+ // bound data sink
+ private final JdbcExternalDatabase database;
+ private final JdbcExternalTable targetTable;
+ private final DMLCommandType dmlCommandType;
+
+ /**
+ * constructor
+ */
+ public LogicalJdbcTableSink(JdbcExternalDatabase database,
+ JdbcExternalTable targetTable,
+ List<Column> cols,
+ List<NamedExpression> outputExprs,
+ DMLCommandType dmlCommandType,
+ Optional<GroupExpression> groupExpression,
+ Optional<LogicalProperties> logicalProperties,
+ CHILD_TYPE child) {
+ super(PlanType.LOGICAL_JDBC_TABLE_SINK, outputExprs, groupExpression,
logicalProperties, cols, child);
+ this.database = Objects.requireNonNull(database, "database != null in
LogicalJdbcTableSink");
+ this.targetTable = Objects.requireNonNull(targetTable, "targetTable !=
null in LogicalJdbcTableSink");
+ this.dmlCommandType = dmlCommandType;
+ }
+
+ public Plan withChildAndUpdateOutput(Plan child) {
+ List<NamedExpression> output = child.getOutput().stream()
+ .map(NamedExpression.class::cast)
+ .collect(ImmutableList.toImmutableList());
+ return new LogicalJdbcTableSink<>(database, targetTable, cols, output,
+ dmlCommandType, Optional.empty(), Optional.empty(), child);
+ }
+
+ @Override
+ public Plan withChildren(List<Plan> children) {
+ Preconditions.checkArgument(children.size() == 1,
"LogicalJdbcTableSink only accepts one child");
+ return new LogicalJdbcTableSink<>(database, targetTable, cols,
outputExprs,
+ dmlCommandType, Optional.empty(), Optional.empty(),
children.get(0));
+ }
+
+ @Override
+ public LogicalSink<CHILD_TYPE> withOutputExprs(List<NamedExpression>
outputExprs) {
+ return new LogicalJdbcTableSink<>(database, targetTable, cols,
outputExprs,
+ dmlCommandType, Optional.empty(), Optional.empty(), child());
+ }
+
+ public JdbcExternalDatabase getDatabase() {
+ return database;
+ }
+
+ public JdbcExternalTable getTargetTable() {
+ return targetTable;
+ }
+
+ public DMLCommandType getDmlCommandType() {
+ return dmlCommandType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof LogicalJdbcTableSink)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ LogicalJdbcTableSink<?> that = (LogicalJdbcTableSink<?>) o;
+ return dmlCommandType == that.dmlCommandType
+ && Objects.equals(database, that.database)
+ && Objects.equals(targetTable, that.targetTable) &&
Objects.equals(cols, that.cols);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), database, targetTable,
dmlCommandType);
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("LogicalJdbcTableSink[" + id.asInt() + "]",
+ "outputExprs", outputExprs,
+ "database", database.getFullName(),
+ "targetTable", targetTable.getName(),
+ "cols", cols,
+ "dmlCommandType", dmlCommandType
+ );
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLogicalJdbcTableSink(this, context);
+ }
+
+ @Override
+ public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
+ return new LogicalJdbcTableSink<>(database, targetTable, cols,
outputExprs,
+ dmlCommandType, groupExpression,
Optional.of(getLogicalProperties()), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ return new LogicalJdbcTableSink<>(database, targetTable, cols,
outputExprs,
+ dmlCommandType, groupExpression, logicalProperties,
children.get(0));
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
index 82483c63a40..7c99886f06d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java
@@ -71,6 +71,10 @@ public abstract class
PhysicalBaseExternalTableSink<CHILD_TYPE extends Plan> ext
return targetTable;
}
+ public List<Column> getCols() {
+ return cols;
+ }
+
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java
new file mode 100644
index 00000000000..2b0f12c1dea
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.statistics.Statistics;
+
+import java.util.List;
+import java.util.Optional;
+
+/** physical jdbc sink */
+public class PhysicalJdbcTableSink<CHILD_TYPE extends Plan> extends
PhysicalBaseExternalTableSink<CHILD_TYPE> {
+
+ /**
+ * constructor
+ */
+ public PhysicalJdbcTableSink(JdbcExternalDatabase database,
+ JdbcExternalTable targetTable,
+ List<Column> cols,
+ List<NamedExpression> outputExprs,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties,
+ CHILD_TYPE child) {
+ this(database, targetTable, cols, outputExprs, groupExpression,
logicalProperties,
+ PhysicalProperties.GATHER, null, child);
+ }
+
+ /**
+ * constructor
+ */
+ public PhysicalJdbcTableSink(JdbcExternalDatabase database,
+ JdbcExternalTable targetTable,
+ List<Column> cols,
+ List<NamedExpression> outputExprs,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties,
+ PhysicalProperties physicalProperties,
+ Statistics statistics,
+ CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_JDBC_TABLE_SINK, database, targetTable, cols,
outputExprs, groupExpression,
+ logicalProperties, physicalProperties, statistics, child);
+ }
+
+ @Override
+ public Plan withChildren(List<Plan> children) {
+ return new PhysicalJdbcTableSink<>(
+ (JdbcExternalDatabase) database, (JdbcExternalTable)
targetTable,
+ cols, outputExprs, groupExpression,
+ getLogicalProperties(), physicalProperties, statistics,
children.get(0));
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPhysicalJdbcTableSink(this, context);
+ }
+
+ @Override
+ public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
+ return new PhysicalJdbcTableSink<>(
+ (JdbcExternalDatabase) database, (JdbcExternalTable)
targetTable, cols, outputExprs,
+ groupExpression, getLogicalProperties(), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ return new PhysicalJdbcTableSink<>(
+ (JdbcExternalDatabase) database, (JdbcExternalTable)
targetTable, cols, outputExprs,
+ groupExpression, logicalProperties.get(), children.get(0));
+ }
+
+ @Override
+ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties
physicalProperties, Statistics statistics) {
+ return new PhysicalJdbcTableSink<>(
+ (JdbcExternalDatabase) database, (JdbcExternalTable)
targetTable, cols, outputExprs,
+ groupExpression, getLogicalProperties(), physicalProperties,
statistics, child());
+ }
+
+ @Override
+ public PhysicalProperties getRequirePhysicalProperties() {
+ // Since JDBC tables do not have partitioning, return a default
physical property.
+ // GATHER implies that all data is gathered to a single location,
which is a common requirement for JDBC sinks.
+ return PhysicalProperties.GATHER;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
index e0b8a1dddc1..289687476b2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
+import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -26,6 +27,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResul
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@@ -34,6 +36,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeRes
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@@ -68,6 +71,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(unboundTableSink, context);
}
+ default R visitUnboundJdbcTableSink(UnboundJdbcTableSink<? extends Plan>
unboundTableSink, C context) {
+ return visitLogicalSink(unboundTableSink, context);
+ }
+
default R visitUnboundResultSink(UnboundResultSink<? extends Plan>
unboundResultSink, C context) {
return visitLogicalSink(unboundResultSink, context);
}
@@ -96,6 +103,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalTableSink(icebergTableSink, context);
}
+ default R visitLogicalJdbcTableSink(LogicalJdbcTableSink<? extends Plan>
jdbcTableSink, C context) {
+ return visitLogicalTableSink(jdbcTableSink, context);
+ }
+
default R visitLogicalResultSink(LogicalResultSink<? extends Plan>
logicalResultSink, C context) {
return visitLogicalSink(logicalResultSink, context);
}
@@ -129,6 +140,10 @@ public interface SinkVisitor<R, C> {
return visitPhysicalTableSink(icebergTableSink, context);
}
+ default R visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan>
jdbcTableSink, C context) {
+ return visitPhysicalTableSink(jdbcTableSink, context);
+ }
+
default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan>
physicalResultSink, C context) {
return visitPhysicalSink(physicalResultSink, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java
similarity index 67%
copy from
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
copy to
fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java
index 2372c199738..a0a1cc28803 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java
@@ -17,8 +17,26 @@
package org.apache.doris.transaction;
-public enum TransactionType {
- UNKNOWN,
- HMS,
- ICEBERG
+import org.apache.doris.common.UserException;
+
+public class JdbcTransactionManager implements TransactionManager {
+ @Override
+ public long begin() {
+ return 0;
+ }
+
+ @Override
+ public void commit(long id) throws UserException {
+
+ }
+
+ @Override
+ public void rollback(long id) {
+
+ }
+
+ @Override
+ public Transaction getTransaction(long id) {
+ return null;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
index 2372c199738..c83f6188890 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java
@@ -20,5 +20,6 @@ package org.apache.doris.transaction;
public enum TransactionType {
UNKNOWN,
HMS,
- ICEBERG
+ ICEBERG,
+ JDBC
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]