This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f1a7884bcd4 [Bug](agg-state) fix stream load failed on agg-state
column (#28050) (#28465)
f1a7884bcd4 is described below
commit f1a7884bcd4bf210e48c8188ff958ede626ffcf7
Author: Pxl <[email protected]>
AuthorDate: Sat Dec 16 09:46:35 2023 +0800
[Bug](agg-state) fix stream load failed on agg-state column (#28050)
(#28465)
---
.../src/main/java/org/apache/doris/load/Load.java | 32 +++++++++++++++++++---
regression-test/data/mv_p0/agg_state/test | 2 ++
.../data/mv_p0/agg_state/test_agg_state_max_by.out | 10 +++++++
.../mv_p0/agg_state/test_agg_state_max_by.groovy | 8 ++++++
4 files changed, 48 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 8e8506ec5ac..6b03a388b4d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -517,8 +517,9 @@ public class Load {
* ->
* (A, B, C) SET (__doris_shadow_B = B)
*/
- ImportColumnDesc importColumnDesc = new
ImportColumnDesc(column.getName(),
- new SlotRef(null, originCol));
+ SlotRef slot = new SlotRef(null, originCol);
+ slot.setType(column.getType());
+ ImportColumnDesc importColumnDesc = new
ImportColumnDesc(column.getName(), slot);
shadowColumnDescs.add(importColumnDesc);
}
} else {
@@ -772,6 +773,30 @@ public class Load {
LOG.debug("after init column, exprMap: {}", exprsByName);
}
+ private static Expr getExprFromDesc(Analyzer analyzer, SlotDescriptor
slotDesc, SlotRef slot)
+ throws AnalysisException {
+ SlotRef newSlot = new SlotRef(slotDesc);
+ newSlot.setType(slotDesc.getType());
+ Expr rhs = newSlot;
+ rhs = rhs.castTo(slot.getType());
+
+ if (slot.getDesc() == null) {
+ // shadow column
+ return rhs;
+ }
+
+ if (newSlot.isNullable() && !slot.isNullable()) {
+ rhs = new FunctionCallExpr("non_nullable",
Lists.newArrayList(rhs));
+ rhs.setType(slotDesc.getType());
+ rhs.analyze(analyzer);
+ } else if (!newSlot.isNullable() && slot.isNullable()) {
+ rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs));
+ rhs.setType(slotDesc.getType());
+ rhs.analyze(analyzer);
+ }
+ return rhs;
+ }
+
private static void analyzeAllExprs(Table tbl, Analyzer analyzer,
Map<String, Expr> exprsByName,
Map<String, Expr> mvDefineExpr, Map<String, SlotDescriptor>
slotDescByName) throws UserException {
// analyze all exprs
@@ -829,8 +854,7 @@ public class Load {
for (SlotRef slot : slots) {
if (slotDescByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
- smap.getRhs().add(new
CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
- new
SlotRef(slotDescByName.get(slot.getColumnName()))));
+ smap.getRhs().add(getExprFromDesc(analyzer,
slotDescByName.get(slot.getColumnName()), slot));
} else if (exprsByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
smap.getRhs().add(new
CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
diff --git a/regression-test/data/mv_p0/agg_state/test
b/regression-test/data/mv_p0/agg_state/test
new file mode 100644
index 00000000000..668ff5ffcf4
--- /dev/null
+++ b/regression-test/data/mv_p0/agg_state/test
@@ -0,0 +1,2 @@
+100,200,300,lalala
+111,-444,-4444,ddd
\ No newline at end of file
diff --git a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
index 406e9fa334d..fef8545d8c0 100644
--- a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
+++ b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
@@ -5,20 +5,30 @@
1 -3 \N c
1 1 1 a
1 2 2 b
+100 200 300 lalala
+111 -444 -4444 ddd
-- !select_mv --
\N \N
1 2
+100 200
+111 -444
-- !select_mv --
\N \N
1 4
+100 500
+111 -4888
-- !select_mv --
\N \N
1 4
+100 500
+111 -4888
-- !select_mv --
\N \N
1 -4
+100 200
+111 -444
diff --git
a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
index 08c8e00e91a..90ad5cfb98d 100644
--- a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
+++ b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
@@ -44,6 +44,14 @@ suite ("test_agg_state_max_by") {
sql "insert into d_table select 1,-4,-4,'d';"
+
+ streamLoad {
+ table "d_table"
+ set 'column_separator', ','
+ file './test'
+ time 10000 // limit inflight 10s
+ }
+
qt_select_star "select * from d_table order by 1,2;"
explain {
sql("select k1,max_by(k2,k3) from d_table group by k1 order by 1,2;")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]