This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aa5899484 [fix](load) add scan tuple for stream load scan node only 
when vectorization is enable (#12578)
8aa5899484 is described below

commit 8aa58994845c92ecd4d9fe0a8b82fbb48cc05666
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Sep 15 08:44:39 2022 +0800

    [fix](load) add scan tuple for stream load scan node only when 
vectorization is enable (#12578)
---
 .../doris/load/loadv2/LoadingTaskPlanner.java      |  6 ++--
 .../apache/doris/planner/StreamLoadPlanner.java    | 37 ++++++++++++----------
 2 files changed, 24 insertions(+), 19 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 535ec50eed..06491c7991 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -146,12 +146,12 @@ public class LoadingTaskPlanner {
         // 1. Broker scan node
         ScanNode scanNode;
         if (Config.enable_new_load_scan_node) {
-            scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), 
destTupleDesc, "FileScanNode");
+            scanNode = new ExternalFileScanNode(new PlanNodeId(nextNodeId++), 
scanTupleDesc, "FileScanNode");
             ((ExternalFileScanNode) scanNode).setLoadInfo(loadJobId, txnId, 
table, brokerDesc, fileGroups,
                     fileStatusesList, filesAdded, strictMode, loadParallelism, 
userInfo);
         } else {
-            scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), 
destTupleDesc, "BrokerScanNode",
-                fileStatusesList, filesAdded);
+            scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), 
scanTupleDesc, "BrokerScanNode",
+                    fileStatusesList, filesAdded);
             ((BrokerScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, 
brokerDesc, fileGroups, strictMode,
                     loadParallelism, userInfo);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 8b407fdc6d..aa1e43ac5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -125,11 +125,14 @@ public class StreamLoadPlanner {
             throw new UserException("There is no sequence column in the table 
" + destTable.getName());
         }
         resetAnalyzer();
-        // note: we use two tuples separately for Scan and Sink here to avoid 
wrong nullable info.
-        // construct tuple descriptor, used for scanNode
-        scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
         // construct tuple descriptor, used for dataSink
         tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
+        TupleDescriptor scanTupleDesc = tupleDesc;
+        if (Config.enable_vectorized_load) {
+            // note: we use two tuples separately for Scan and Sink here to 
avoid wrong nullable info.
+            // construct tuple descriptor, used for scanNode
+            scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
+        }
         boolean negative = taskInfo.getNegative();
         // here we should be full schema to fill the descriptor table
         for (Column col : destTable.getFullSchema()) {
@@ -138,20 +141,22 @@ public class StreamLoadPlanner {
             slotDesc.setColumn(col);
             slotDesc.setIsNullable(col.isAllowNull());
 
-            SlotDescriptor scanSlotDesc = 
descTable.addSlotDescriptor(scanTupleDesc);
-            scanSlotDesc.setIsMaterialized(true);
-            scanSlotDesc.setColumn(col);
-            scanSlotDesc.setIsNullable(col.isAllowNull());
-            for (ImportColumnDesc importColumnDesc : 
taskInfo.getColumnExprDescs().descs) {
-                try {
-                    if (!importColumnDesc.isColumn() && 
importColumnDesc.getColumnName() != null
-                            && 
importColumnDesc.getColumnName().equals(col.getName())) {
-                        
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
-                        break;
+            if (Config.enable_vectorized_load) {
+                SlotDescriptor scanSlotDesc = 
descTable.addSlotDescriptor(scanTupleDesc);
+                scanSlotDesc.setIsMaterialized(true);
+                scanSlotDesc.setColumn(col);
+                scanSlotDesc.setIsNullable(col.isAllowNull());
+                for (ImportColumnDesc importColumnDesc : 
taskInfo.getColumnExprDescs().descs) {
+                    try {
+                        if (!importColumnDesc.isColumn() && 
importColumnDesc.getColumnName() != null
+                                && 
importColumnDesc.getColumnName().equals(col.getName())) {
+                            
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
+                            break;
+                        }
+                    } catch (Exception e) {
+                        // An exception may be thrown here because the 
`importColumnDesc.getExpr()` is not analyzed now.
+                        // We just skip this case here.
                     }
-                } catch (Exception e) {
-                    // An exception may be thrown here because the 
`importColumnDesc.getExpr()` is not analyzed now.
-                    // We just skip this case here.
                 }
             }
             if (negative && !col.isKey() && col.getAggregationType() != 
AggregateType.SUM) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to