This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch cp_1215_2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fa5cc0eb12dbce850ec095ba045df0c7d3aa1770
Author: Pxl <[email protected]>
AuthorDate: Wed Dec 6 20:41:29 2023 +0800

    [Bug](agg-state) fix stream load failed on agg-state column (#28050)
---
 .../src/main/java/org/apache/doris/load/Load.java  | 32 +++++++++++++++++++---
 regression-test/data/mv_p0/agg_state/test          |  2 ++
 .../data/mv_p0/agg_state/test_agg_state_max_by.out | 10 +++++++
 .../mv_p0/agg_state/test_agg_state_max_by.groovy   |  8 ++++++
 4 files changed, 48 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 8e8506ec5ac..6b03a388b4d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -517,8 +517,9 @@ public class Load {
                      * ->
                      * (A, B, C) SET (__doris_shadow_B = B)
                      */
-                    ImportColumnDesc importColumnDesc = new 
ImportColumnDesc(column.getName(),
-                            new SlotRef(null, originCol));
+                    SlotRef slot = new SlotRef(null, originCol);
+                    slot.setType(column.getType());
+                    ImportColumnDesc importColumnDesc = new 
ImportColumnDesc(column.getName(), slot);
                     shadowColumnDescs.add(importColumnDesc);
                 }
             } else {
@@ -772,6 +773,30 @@ public class Load {
         LOG.debug("after init column, exprMap: {}", exprsByName);
     }
 
+    private static Expr getExprFromDesc(Analyzer analyzer, SlotDescriptor 
slotDesc, SlotRef slot)
+            throws AnalysisException {
+        SlotRef newSlot = new SlotRef(slotDesc);
+        newSlot.setType(slotDesc.getType());
+        Expr rhs = newSlot;
+        rhs = rhs.castTo(slot.getType());
+
+        if (slot.getDesc() == null) {
+            // shadow column
+            return rhs;
+        }
+
+        if (newSlot.isNullable() && !slot.isNullable()) {
+            rhs = new FunctionCallExpr("non_nullable", 
Lists.newArrayList(rhs));
+            rhs.setType(slotDesc.getType());
+            rhs.analyze(analyzer);
+        } else if (!newSlot.isNullable() && slot.isNullable()) {
+            rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs));
+            rhs.setType(slotDesc.getType());
+            rhs.analyze(analyzer);
+        }
+        return rhs;
+    }
+
     private static void analyzeAllExprs(Table tbl, Analyzer analyzer, 
Map<String, Expr> exprsByName,
             Map<String, Expr> mvDefineExpr, Map<String, SlotDescriptor> 
slotDescByName) throws UserException {
         // analyze all exprs
@@ -829,8 +854,7 @@ public class Load {
             for (SlotRef slot : slots) {
                 if (slotDescByName.get(slot.getColumnName()) != null) {
                     smap.getLhs().add(slot);
-                    smap.getRhs().add(new 
CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
-                            new 
SlotRef(slotDescByName.get(slot.getColumnName()))));
+                    smap.getRhs().add(getExprFromDesc(analyzer, 
slotDescByName.get(slot.getColumnName()), slot));
                 } else if (exprsByName.get(slot.getColumnName()) != null) {
                     smap.getLhs().add(slot);
                     smap.getRhs().add(new 
CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
diff --git a/regression-test/data/mv_p0/agg_state/test 
b/regression-test/data/mv_p0/agg_state/test
new file mode 100644
index 00000000000..668ff5ffcf4
--- /dev/null
+++ b/regression-test/data/mv_p0/agg_state/test
@@ -0,0 +1,2 @@
+100,200,300,lalala
+111,-444,-4444,ddd
\ No newline at end of file
diff --git a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out 
b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
index 406e9fa334d..fef8545d8c0 100644
--- a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
+++ b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
@@ -5,20 +5,30 @@
 1      -3      \N      c
 1      1       1       a
 1      2       2       b
+100    200     300     lalala
+111    -444    -4444   ddd
 
 -- !select_mv --
 \N     \N
 1      2
+100    200
+111    -444
 
 -- !select_mv --
 \N     \N
 1      4
+100    500
+111    -4888
 
 -- !select_mv --
 \N     \N
 1      4
+100    500
+111    -4888
 
 -- !select_mv --
 \N     \N
 1      -4
+100    200
+111    -444
 
diff --git 
a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy 
b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
index 08c8e00e91a..90ad5cfb98d 100644
--- a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
+++ b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
@@ -44,6 +44,14 @@ suite ("test_agg_state_max_by") {
 
     sql "insert into d_table select 1,-4,-4,'d';"
 
+
+    streamLoad {
+        table "d_table"
+        set 'column_separator', ','
+        file './test'
+        time 10000 // limit inflight 10s
+    }
+
     qt_select_star "select * from d_table order by 1,2;"
     explain {
         sql("select k1,max_by(k2,k3) from d_table group by k1 order by 1,2;")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to