This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8aa5899484 [fix](load) add scan tuple for stream load scan node only
when vectorization is enable (#12578)
8aa5899484 is described below
commit 8aa58994845c92ecd4d9fe0a8b82fbb48cc05666
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Sep 15 08:44:39 2022 +0800
[fix](load) add scan tuple for stream load scan node only when
vectorization is enable (#12578)
---
.../doris/load/loadv2/LoadingTaskPlanner.java | 6 ++--
.../apache/doris/planner/StreamLoadPlanner.java | 37 ++++++++++++----------
2 files changed, 24 insertions(+), 19 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 535ec50eed..06491c7991 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -146,12 +146,12 @@ public class LoadingTaskPlanner {
// 1. Broker scan node
ScanNode scanNode;
if (Config.enable_new_load_scan_node) {
- scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++),
destTupleDesc, "FileScanNode");
+ scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++),
scanTupleDesc, "FileScanNode");
((ExternalFileScanNode) scanNode).setLoadInfo(loadJobId, txnId,
table, brokerDesc, fileGroups,
fileStatusesList, filesAdded, strictMode, loadParallelism,
userInfo);
} else {
- scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++),
destTupleDesc, "BrokerScanNode",
- fileStatusesList, filesAdded);
+ scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++),
scanTupleDesc, "BrokerScanNode",
+ fileStatusesList, filesAdded);
((BrokerScanNode) scanNode).setLoadInfo(loadJobId, txnId, table,
brokerDesc, fileGroups, strictMode,
loadParallelism, userInfo);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 8b407fdc6d..aa1e43ac5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -125,11 +125,14 @@ public class StreamLoadPlanner {
throw new UserException("There is no sequence column in the table
" + destTable.getName());
}
resetAnalyzer();
- // note: we use two tuples separately for Scan and Sink here to avoid
wrong nullable info.
- // construct tuple descriptor, used for scanNode
- scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
// construct tuple descriptor, used for dataSink
tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
+ TupleDescriptor scanTupleDesc = tupleDesc;
+ if (Config.enable_vectorized_load) {
+ // note: we use two tuples separately for Scan and Sink here to
avoid wrong nullable info.
+ // construct tuple descriptor, used for scanNode
+ scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
+ }
boolean negative = taskInfo.getNegative();
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
@@ -138,20 +141,22 @@ public class StreamLoadPlanner {
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
- SlotDescriptor scanSlotDesc =
descTable.addSlotDescriptor(scanTupleDesc);
- scanSlotDesc.setIsMaterialized(true);
- scanSlotDesc.setColumn(col);
- scanSlotDesc.setIsNullable(col.isAllowNull());
- for (ImportColumnDesc importColumnDesc :
taskInfo.getColumnExprDescs().descs) {
- try {
- if (!importColumnDesc.isColumn() &&
importColumnDesc.getColumnName() != null
- &&
importColumnDesc.getColumnName().equals(col.getName())) {
-
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
- break;
+ if (Config.enable_vectorized_load) {
+ SlotDescriptor scanSlotDesc =
descTable.addSlotDescriptor(scanTupleDesc);
+ scanSlotDesc.setIsMaterialized(true);
+ scanSlotDesc.setColumn(col);
+ scanSlotDesc.setIsNullable(col.isAllowNull());
+ for (ImportColumnDesc importColumnDesc :
taskInfo.getColumnExprDescs().descs) {
+ try {
+ if (!importColumnDesc.isColumn() &&
importColumnDesc.getColumnName() != null
+ &&
importColumnDesc.getColumnName().equals(col.getName())) {
+
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
+ break;
+ }
+ } catch (Exception e) {
+ // An exception may be thrown here because the
`importColumnDesc.getExpr()` is not analyzed now.
+ // We just skip this case here.
}
- } catch (Exception e) {
- // An exception may be thrown here because the
`importColumnDesc.getExpr()` is not analyzed now.
- // We just skip this case here.
}
}
if (negative && !col.isKey() && col.getAggregationType() !=
AggregateType.SUM) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]