mymeiyi commented on code in PR #32523:
URL: https://github.com/apache/doris/pull/32523#discussion_r1547062527
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java:
##########
@@ -95,26 +107,39 @@ private static boolean
isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extend
return child instanceof OneRowRelation || (child instanceof
PhysicalUnion && child.arity() == 0);
}
- private void handleGroupCommit(ConnectContext ctx, DataSink sink,
- PhysicalOlapTableSink<?> physicalOlapTableSink)
- throws UserException, RpcException, TException,
ExecutionException, InterruptedException {
+ private static void handleGroupCommit(ConnectContext ctx, DataSink sink,
+ PhysicalOlapTableSink<?> physicalOlapTableSink, NereidsPlanner
planner)
+ throws UserException, TException, RpcException,
ExecutionException, InterruptedException {
// TODO we should refactor this to remove rely on UnionNode
List<InternalService.PDataRow> rows = new ArrayList<>();
- List<List<Expr>> materializedConstExprLists = ((UnionNode)
sink.getFragment()
- .getPlanRoot()).getMaterializedConstExprLists();
- int filterSize = 0;
- for (Slot slot : physicalOlapTableSink.getOutput()) {
- if (slot.getName().contains(Column.DELETE_SIGN)
- || slot.getName().contains(Column.VERSION_COL)) {
- filterSize += 1;
- }
+
+ Optional<PhysicalUnion> union = planner.getPhysicalPlan()
+
.<Set<PhysicalUnion>>collect(PhysicalUnion.class::isInstance).stream().findAny();
+ List<List<NamedExpression>> constantExprsList = null;
+ if (union.isPresent()) {
+ constantExprsList = union.get().getConstantExprsList();
+ }
+ Optional<PhysicalOneRowRelation> oneRowRelation =
planner.getPhysicalPlan()
+
.<Set<PhysicalOneRowRelation>>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
+ if (oneRowRelation.isPresent()) {
+ constantExprsList =
ImmutableList.of(oneRowRelation.get().getProjects());
}
- for (List<Expr> list : materializedConstExprLists) {
- rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
+ List<String> columnNames =
physicalOlapTableSink.getTargetTable().getFullSchema().stream()
+ .map(Column::getName)
+ .map(n -> n.replace("`", "``"))
+ .map(n -> "`" + n + "`")
+ .collect(Collectors.toList());
+ try {
+ for (List<NamedExpression> row : constantExprsList) {
+ rows.add(InsertUtils.getRowStringValue(row));
+ }
+ } catch (AnalysisException e) {
+ LOG.error("warning when group commit insert. {}", e);
Review Comment:
why not throw the exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]