This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3b104e334a [Bug](load) fix missing nullable info in stream load
(#12302)
3b104e334a is described below
commit 3b104e334a9cb3e8f940343bab3fc53232993816
Author: Gabriel <[email protected]>
AuthorDate: Mon Sep 5 13:41:28 2022 +0800
[Bug](load) fix missing nullable info in stream load (#12302)
---
.../apache/doris/planner/StreamLoadPlanner.java | 26 ++++++++++++++++++++--
1 file changed, 24 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 0c0a41be24..8b407fdc6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -20,6 +20,7 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
@@ -84,6 +85,7 @@ public class StreamLoadPlanner {
private StreamLoadScanNode scanNode;
private TupleDescriptor tupleDesc;
+ private TupleDescriptor scanTupleDesc;
public StreamLoadPlanner(Database db, OlapTable destTable, LoadTaskInfo
taskInfo) {
this.db = db;
@@ -123,7 +125,10 @@ public class StreamLoadPlanner {
throw new UserException("There is no sequence column in the table
" + destTable.getName());
}
resetAnalyzer();
- // construct tuple descriptor, used for scanNode and dataSink
+ // note: we use two tuples separately for Scan and Sink here to avoid
wrong nullable info.
+ // construct tuple descriptor, used for scanNode
+ scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
+ // construct tuple descriptor, used for dataSink
tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
boolean negative = taskInfo.getNegative();
// here we should be full schema to fill the descriptor table
@@ -132,13 +137,30 @@ public class StreamLoadPlanner {
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
+
+ SlotDescriptor scanSlotDesc =
descTable.addSlotDescriptor(scanTupleDesc);
+ scanSlotDesc.setIsMaterialized(true);
+ scanSlotDesc.setColumn(col);
+ scanSlotDesc.setIsNullable(col.isAllowNull());
+ for (ImportColumnDesc importColumnDesc :
taskInfo.getColumnExprDescs().descs) {
+ try {
+ if (!importColumnDesc.isColumn() &&
importColumnDesc.getColumnName() != null
+ &&
importColumnDesc.getColumnName().equals(col.getName())) {
+
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
+ break;
+ }
+ } catch (Exception e) {
+ // An exception may be thrown here because the
`importColumnDesc.getExpr()` is not analyzed now.
+ // We just skip this case here.
+ }
+ }
if (negative && !col.isKey() && col.getAggregationType() !=
AggregateType.SUM) {
throw new DdlException("Column is not SUM AggregateType.
column:" + col.getName());
}
}
// create scan node
- scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0),
tupleDesc, destTable, taskInfo);
+ scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0),
scanTupleDesc, destTable, taskInfo);
scanNode.init(analyzer);
descTable.computeStatAndMemLayout();
scanNode.finalize(analyzer);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]