This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch cp_1215_2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit fa5cc0eb12dbce850ec095ba045df0c7d3aa1770 Author: Pxl <[email protected]> AuthorDate: Wed Dec 6 20:41:29 2023 +0800 [Bug](agg-state) fix stream load failed on agg-state column (#28050) --- .../src/main/java/org/apache/doris/load/Load.java | 32 +++++++++++++++++++--- regression-test/data/mv_p0/agg_state/test | 2 ++ .../data/mv_p0/agg_state/test_agg_state_max_by.out | 10 +++++++ .../mv_p0/agg_state/test_agg_state_max_by.groovy | 8 ++++++ 4 files changed, 48 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 8e8506ec5ac..6b03a388b4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -517,8 +517,9 @@ public class Load { * -> * (A, B, C) SET (__doris_shadow_B = B) */ - ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), - new SlotRef(null, originCol)); + SlotRef slot = new SlotRef(null, originCol); + slot.setType(column.getType()); + ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), slot); shadowColumnDescs.add(importColumnDesc); } } else { @@ -772,6 +773,30 @@ public class Load { LOG.debug("after init column, exprMap: {}", exprsByName); } + private static Expr getExprFromDesc(Analyzer analyzer, SlotDescriptor slotDesc, SlotRef slot) + throws AnalysisException { + SlotRef newSlot = new SlotRef(slotDesc); + newSlot.setType(slotDesc.getType()); + Expr rhs = newSlot; + rhs = rhs.castTo(slot.getType()); + + if (slot.getDesc() == null) { + // shadow column + return rhs; + } + + if (newSlot.isNullable() && !slot.isNullable()) { + rhs = new FunctionCallExpr("non_nullable", Lists.newArrayList(rhs)); + rhs.setType(slotDesc.getType()); + rhs.analyze(analyzer); + } else if (!newSlot.isNullable() && slot.isNullable()) { + rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs)); + rhs.setType(slotDesc.getType()); + rhs.analyze(analyzer); + } + return rhs; + } + private static void analyzeAllExprs(Table tbl, Analyzer analyzer, Map<String, Expr> exprsByName, Map<String, Expr> mvDefineExpr, Map<String, SlotDescriptor> slotDescByName) throws UserException { // analyze all exprs @@ -829,8 +854,7 @@ public class Load { for (SlotRef slot : slots) { if (slotDescByName.get(slot.getColumnName()) != null) { smap.getLhs().add(slot); - smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(), - new SlotRef(slotDescByName.get(slot.getColumnName())))); + smap.getRhs().add(getExprFromDesc(analyzer, slotDescByName.get(slot.getColumnName()), slot)); } else if (exprsByName.get(slot.getColumnName()) != null) { smap.getLhs().add(slot); smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(), diff --git a/regression-test/data/mv_p0/agg_state/test b/regression-test/data/mv_p0/agg_state/test new file mode 100644 index 00000000000..668ff5ffcf4 --- /dev/null +++ b/regression-test/data/mv_p0/agg_state/test @@ -0,0 +1,2 @@ +100,200,300,lalala +111,-444,-4444,ddd \ No newline at end of file diff --git a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out index 406e9fa334d..fef8545d8c0 100644 --- a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out +++ b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out @@ -5,20 +5,30 @@ 1 -3 \N c 1 1 1 a 1 2 2 b +100 200 300 lalala +111 -444 -4444 ddd -- !select_mv -- \N \N 1 2 +100 200 +111 -444 -- !select_mv -- \N \N 1 4 +100 500 +111 -4888 -- !select_mv -- \N \N 1 4 +100 500 +111 -4888 -- !select_mv -- \N \N 1 -4 +100 200 +111 -444 diff --git a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy index 08c8e00e91a..90ad5cfb98d 100644 --- a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy +++ b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy @@ -44,6 +44,14 @@ suite ("test_agg_state_max_by") { sql "insert into d_table select 1,-4,-4,'d';" + + streamLoad { + table "d_table" + set 'column_separator', ',' + file './test' + time 10000 // limit inflight 10s + } + qt_select_star "select * from d_table order by 1,2;" explain { sql("select k1,max_by(k2,k3) from d_table group by k1 order by 1,2;") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
