This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5761a25c5 [feature](move-memtable)[7/7] add regression tests (#23515)
a5761a25c5 is described below

commit a5761a25c5d8e4a27ce350640e84d606b6e134d7
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Aug 26 17:52:10 2023 +0800

    [feature](move-memtable)[7/7] add regression tests (#23515)
    
    Co-authored-by: laihui <[email protected]>
---
 .../load_p0/insert/test_insert_move_memtable.out   |  17 +
 .../stream_load/test_stream_load_move_memtable.out | 101 +++
 .../test_stream_load_new_move_memtable.out         | 126 +++
 .../test_materialized_view_move_memtable.out       |  52 ++
 .../insert/test_insert_move_memtable.groovy        |  93 ++
 .../test_stream_load_move_memtable.groovy          | 969 +++++++++++++++++++++
 .../test_stream_load_new_move_memtable.groovy      | 559 ++++++++++++
 .../test_materialized_view_move_memtable.groovy    | 222 +++++
 8 files changed, 2139 insertions(+)

diff --git a/regression-test/data/load_p0/insert/test_insert_move_memtable.out 
b/regression-test/data/load_p0/insert/test_insert_move_memtable.out
new file mode 100644
index 0000000000..dcb5b854a7
--- /dev/null
+++ b/regression-test/data/load_p0/insert/test_insert_move_memtable.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql1 --
+false  -2147483647     7       7
+false  103     9       16
+false  1002    2       18
+false  5014    5       23
+false  2147483647      8       31
+true   -2147483647     4       4
+true   3021    1       15
+true   3021    10      15
+true   25699   6       21
+true   2147483647      3       24
+
+-- !select --
+true   10      10000   10000000        92233720368547758       
19223372036854775807    3.14159 hello world, today is 15/06/2023        
2023-06-15      2023-06-15T16:10:15
+true   10      10000   10000000        92233720368547758       
19223372036854775807    3.14159 hello world, today is 15/06/2023        
2023-06-15      2023-06-15T16:10:15
+
diff --git 
a/regression-test/data/load_p0/stream_load/test_stream_load_move_memtable.out 
b/regression-test/data/load_p0/stream_load/test_stream_load_move_memtable.out
new file mode 100644
index 0000000000..8816fce94b
--- /dev/null
+++ 
b/regression-test/data/load_p0/stream_load/test_stream_load_move_memtable.out
@@ -0,0 +1,101 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+-2     -51     \N      1       \N      \N      \N      \N      \N      \N      
\N      2       \N      \N
+-2     -50     \N      1       \N      \N      \N      \N      \N      \N      
\N      \N      j       \N
+
+-- !sql1 --
+2019   9       9       9       7.700   a       2019-09-09      
1970-01-01T08:33:39     k7      9.0     9.0
+
+-- !all11 --
+2500
+
+-- !all12 --
+11
+
+-- !all21 --
+2500
+
+-- !all22 --
+0
+
+-- !all23 --
+2500
+
+-- !all24 --
+2500
+
+-- !all31 --
+11
+
+-- !all32 --
+11
+
+-- !all33 --
+11
+
+-- !all41 --
+2500
+
+-- !all51 --
+0
+
+-- !all61 --
+0
+
+-- !all71 --
+1      2       1025    1028
+
+-- !all81 --
+2
+
+-- !all91 --
+1
+
+-- !all101 --
+1      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ["a", 
"b", "c", "d", "e"]       ["hello", "world"]      [1991-01-01, 1992-02-02, 
1993-03-03]    [1991-01-01 00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878] 
       [1.000000, 1.200000, 1.300000]
+2      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ["a", 
"b", "c", "d", "e"]       ["hello", "world"]      [1991-01-01, 1992-02-02, 
1993-03-03]    \N      \N      \N      [1.000000, NULL, 1.300000]
+3      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+4      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+5      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+
+-- !all102 --
+1      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ["a", 
"b", "c", "d", "e"]       ["hello", "world"]      [1991-01-01, 1992-02-02, 
1993-03-03]    [1991-01-01 00:00:00]   [0.33, 0.67]    [3.1415926, 0.878787878] 
       [1.000000, 1.200000, 1.300000]
+2      [1, 2, 3, 4, 5] [32767, 32768, 32769]   [65534, 65535, 65536]   ["a", 
"b", "c", "d", "e"]       ["hello", "world"]      [1991-01-01, 1992-02-02, 
1993-03-03]    \N      \N      \N      [1.000000, NULL, 1.300000]
+3      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N
+4      []      []      []      []      []      []      []      []      []      
[]
+5      [NULL]  [NULL]  [NULL]  [NULL]  [NULL]  [NULL]  [NULL]  [NULL]  [NULL]  
[NULL]
+6      [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, 
"null"]  [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, 
NULL, NULL, NULL, NULL, NULL]
+6      [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, 
NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, NULL]    [NULL, 
NULL, NULL, NULL, NULL, NULL]
+7      [1, 2, 3, 4, 5] \N      \N      \N      \N      \N      \N      \N      
\N      \N
+8      [1, 2, 3, 4, 5] \N      \N      \N      \N      \N      [NULL]  \N      
[NULL]  \N
+
+-- !all111 --
+1      {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+2      {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+3      \N
+4      \N
+5      \N
+
+-- !all112 --
+1      {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+2      {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+3      {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+4      {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+5      {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+6      {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+7      {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+8      {1, NULL, NULL, NULL, "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+9      {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+10     {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01, 
3.1415926, 1.100000}
+11     {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+12     {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+13     \N
+
+-- !sql1 --
+-2     -50     1       \N      44
+2      -51     1       2       \N
+25     8       9       2       4
+33     10      9       2       7
+33     9       8       2       1
+38     1       9       2       5
+
diff --git 
a/regression-test/data/load_p0/stream_load/test_stream_load_new_move_memtable.out
 
b/regression-test/data/load_p0/stream_load/test_stream_load_new_move_memtable.out
new file mode 100644
index 0000000000..52440d9843
--- /dev/null
+++ 
b/regression-test/data/load_p0/stream_load/test_stream_load_new_move_memtable.out
@@ -0,0 +1,126 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql1 --
+10000  aa
+10001  bb
+10002  cc
+10003  dd
+10004  ee
+10005  ff
+10006  gg
+10007  hh
+10008  ii
+10009  jj
+10010  kk
+
+-- !sql2 --
+10000  4444    aa      5555555 111.111 3.14
+10001  3333    bb      666     222.222 5.32
+10002  2222    cc      453     333.333 4.321
+10003  1111    dd      -3241   444.444 1.34
+10004  -9999   ee      21342   555.555 1.22
+10005  8888    ff      64562   666.666 9.133
+10006  -7777   gg      -12313342       777.777 8.1
+10007  6666    hh      314234  888.888 34.124
+10008  -5555   ii      1341    999.999 342.12
+10009  4444    jj      -123    111.111 11.22
+10010  -3333   kk      12314   222.222 13.0
+
+-- !sql3 --
+10000  aa      2017    10      1
+10001  bb      2017    10      2
+10002  cc      2017    10      3
+10003  dd      2017    10      4
+10004  ee      2017    10      5
+10005  ff      2017    10      6
+10006  gg      2017    10      7
+10007  hh      2017    10      8
+10008  ii      2017    10      9
+10009  jj      2017    10      10
+10010  kk      2017    10      11
+
+-- !sql5 --
+10000  aa      2017-10-01      0       99999
+10001  bb      2017-10-02      0       99999
+10002  cc      2017-10-03      0       99999
+10003  dd      2017-10-04      0       99999
+10004  ee      2017-10-05      0       99999
+10005  ff      2017-10-06      0       99999
+10006  gg      2017-10-07      0       99999
+10007  hh      2017-10-08      0       99999
+10008  ii      2017-10-09      0       99999
+10009  jj      2017-10-10      0       99999
+10010  kk      2017-10-11      0       99999
+
+-- !sql6 --
+10000  aa      北京      false   11      4444    5555555 41232314        3.14    
123.3423        111.111 111.111 2017-10-01      2017-10-01      
2017-10-01T06:00        2017-10-01T06:00
+10001  bb      北京      false   22      3333    666     2768658 5.32    
123111.3242     222.222 222.222 2017-10-02      2017-10-02      
2017-10-02T07:00        2017-10-02T07:00
+10002  cc      北京      true    33      2222    453     5463456 4.321   
11111.23423     333.333 333.333 2017-10-03      2017-10-03      
2017-10-03T17:05:45     2017-10-03T17:05:45
+10003  dd      上海      true    44      1111    -3241   -45235  1.34    
54626.324       444.444 444.444 2017-10-04      2017-10-04      
2017-10-04T12:59:12     2017-10-04T12:59:12
+10004  ee      成都      false   55      -9999   21342   4513456 1.22    111.33  
555.555 555.555 2017-10-05      2017-10-05      2017-10-05T11:20        
2017-10-05T11:20
+10005  ff      西安      false   66      8888    64562   4356    9.133   
23423.45        666.666 666.666 2017-10-06      2017-10-06      
2017-10-06T12:00:15     2017-10-06T12:00:15
+10006  gg      深圳      true    77      -7777   -12313342       34534   8.1     
12.0    777.777 777.777 2017-10-07      2017-10-07      2017-10-07T13:20:22     
2017-10-07T13:20:22
+10007  hh      杭州      false   88      6666    314234  43535356        34.124  
324.0   888.888 888.888 2017-10-08      2017-10-08      2017-10-08T14:58:10     
2017-10-08T14:58:10
+10008  ii      上海      true    99      -5555   1341    23434534        342.12  
34234.1 999.999 999.999 2017-10-09      2017-10-09      \N      \N
+10009  jj      南京      false   11      4444    -123    53623567        11.22   
324.33  111.111 111.111 2017-10-10      2017-10-10      2017-10-10T16:25:42     
2017-10-10T16:25:42
+10010  kk      成都      false   22      -3333   12314   674567  13.0    
45464.435       222.222 222.222 2017-10-11      2017-10-11      
2017-10-11T17:22:24     2017-10-11T17:22:24
+
+-- !sql7 --
+10000  aa      西安      22      0       1234567 陕西西安    2016-02-21T07:05:01
+10000  aa      北京      21      0       1234567 北京      2017-03-11T06:01:02
+10001  bb      上海      20      1       1234567 上海      2012-05-22T12:59:12
+10001  bb      天津      33      1       1234567 天津      2019-01-11T17:05:45
+10002  bb      上海      20      1       1234567 上海      2013-06-02T12:59:12
+10003  cc      广州      32      0       1234567 广东广州    2015-08-12T11:25
+10003  cc      广州      32      0       1234567 广东广州    2014-07-02T11:20
+10004  dd      杭州      47      0       1234567 浙江杭州    2017-11-23T13:26:22
+10004  dd      深圳      33      1       1234567 广东深圳    2016-12-01T14:04:15
+10005  dd      深圳      19      0       1234567 广东深圳    2018-10-03T12:27:22
+10005  ee      成都      21      1       1234567 四川成都    2019-09-03T11:24:22
+
+-- !sql8 --
+10000  aa      西安      22      0       1234567 陕西西安    2016-02-21T07:05:01
+10001  bb      上海      20      1       1234567 上海      2012-05-22T12:59:12
+10002  bb      上海      20      1       1234567 上海      2013-06-02T12:59:12
+10003  cc      广州      32      0       1234567 广东广州    2015-08-12T11:25
+10004  dd      杭州      47      0       1234567 浙江杭州    2017-11-23T13:26:22
+10005  dd      深圳      19      0       1234567 广东深圳    2018-10-03T12:27:22
+10005  ee      成都      21      1       1234567 四川成都    2019-09-03T11:24:22
+
+-- !sql9 --
+10000  aa      西安      22      0       1234567 陕西西安    2016-02-21T07:05:01
+10001  bb      上海      20      1       1234567 上海      2012-05-22T12:59:12
+10002  bb      上海      20      1       1234567 上海      2013-06-02T12:59:12
+10003  cc      广州      32      0       1234567 广东广州    2015-08-12T11:25
+10004  dd      杭州      47      0       1234567 浙江杭州    2017-11-23T13:26:22
+10005  dd      深圳      19      0       1234567 广东深圳    2018-10-03T12:27:22
+10005  ee      成都      21      1       1234567 四川成都    2019-09-03T11:24:22
+
+-- !sql10 --
+1500
+
+-- !sql11 --
+10000  aa
+10001  bb
+10002  cc
+10003  dd
+10004  ee
+10005  ff
+10006  gg
+10007  hh
+10008  ii
+10009  jj
+10010  kk
+
+-- !sql12 --
+10000  aa
+10001  bb
+10002  cc
+10003  dd
+10004  ee
+10005  ff
+10006  gg
+10007  hh
+10008  ii
+10009  jj
+10010  kk
+
diff --git 
a/regression-test/data/rollup_p0/test_materialized_view_move_memtable.out 
b/regression-test/data/rollup_p0/test_materialized_view_move_memtable.out
new file mode 100644
index 0000000000..7caf3713fe
--- /dev/null
+++ b/regression-test/data/rollup_p0/test_materialized_view_move_memtable.out
@@ -0,0 +1,52 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+test_materialized_view1        DUP_KEYS        record_id       INT     INT     
Yes     true    \N              true            
+               seller_id       INT     INT     Yes     true    \N              
true            
+               store_id        INT     INT     Yes     true    \N              
true            
+               sale_date       DATE    DATEV2  Yes     false   \N      NONE    
true            
+               sale_amt        BIGINT  BIGINT  Yes     false   \N      NONE    
true            
+                                                                               
        
+amt_sum        AGG_KEYS        mv_store_id     INT     INT     Yes     true    
\N              true    `store_id`      
+               mva_SUM__`sale_amt`     BIGINT  BIGINT  Yes     false   \N      
SUM     true    `sale_amt`      
+
+-- !sql --
+test_materialized_view2        DUP_KEYS        record_id       INT     INT     
Yes     true    \N              true            
+               seller_id       INT     INT     Yes     true    \N              
true            
+               store_id        INT     INT     Yes     true    \N              
true            
+               sale_date       DATE    DATEV2  Yes     false   \N      NONE    
true            
+               sale_amt        BIGINT  BIGINT  Yes     false   \N      NONE    
true            
+                                                                               
        
+seller_id_order        DUP_KEYS        mv_store_id     INT     INT     Yes     
true    \N              true    `store_id`      
+               mv_seller_id    INT     INT     Yes     true    \N              
true    `seller_id`     
+               mv_sale_amt     BIGINT  BIGINT  Yes     false   \N      NONE    
true    `sale_amt`      
+
+-- !sql --
+1      1       1       2020-05-30      100
+2      1       1       2020-05-30      100
+
+-- !sql --
+1      200
+
+-- !sql --
+1      1       1       2020-05-30      100
+2      1       1       2020-05-30      100
+
+-- !sql --
+1      200
+
+-- !sql --
+                                                                               
        
+                                                                               
        
+               mva_SUM__CASE WHEN `sale_amt` IS NULL THEN 0 ELSE 1 END BIGINT  
BIGINT  No      false   \N      SUM     true    CASE WHEN `sale_amt` IS NULL 
THEN 0 ELSE 1 END  
+               mva_SUM__`sale_amt`     BIGINT  BIGINT  Yes     false   \N      
SUM     true    `sale_amt`      
+               sale_amt        BIGINT  BIGINT  Yes     false   \N      NONE    
true            
+               sale_date       DATE    DATEV2  Yes     false   \N      NONE    
true            
+               seller_id       INT     INT     Yes     true    \N              
true            
+               store_id        INT     INT     Yes     true    \N              
true            
+amt_count      AGG_KEYS        mv_store_id     INT     INT     Yes     true    
\N              true    `store_id`      
+amt_sum        AGG_KEYS        mv_store_id     INT     INT     Yes     true    
\N              true    `store_id`      
+test_materialized_view1        DUP_KEYS        record_id       INT     INT     
Yes     true    \N              true            
+
+-- !sql --
+1      2
+
diff --git 
a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy 
b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
new file mode 100644
index 0000000000..bc9db5add0
--- /dev/null
+++ b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_move_memtable") {
+    sql """ set enable_memtable_on_sink_node=true """
+    // todo: test insert, such as insert values, insert select, insert txn
+    sql "show load"
+    def test_baseall = "test_query_db.baseall";
+    def test_bigtable = "test_query_db.bigtable";
+    def insert_tbl = "test_insert_tbl_mm";
+
+    sql """ DROP TABLE IF EXISTS ${insert_tbl}"""
+    sql """
+     CREATE TABLE ${insert_tbl} (
+       `k1` char(5) NULL,
+       `k2` int(11) NULL,
+       `k3` tinyint(4) NULL,
+       `k4` int(11) NULL
+     ) ENGINE=OLAP
+     DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
+     COMMENT 'OLAP'
+     DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+     PROPERTIES (
+       "replication_num"="1"
+     );
+    """
+
+    sql """ 
+    INSERT INTO ${insert_tbl}
+    SELECT a.k6, a.k3, b.k1
+       , sum(b.k1) OVER (PARTITION BY a.k6 ORDER BY a.k3) AS w_sum
+    FROM ${test_baseall} a
+       JOIN ${test_bigtable} b ON a.k1 = b.k1 + 5
+    ORDER BY a.k6, a.k3, a.k1, w_sum
+    """
+
+    qt_sql1 "select * from ${insert_tbl} order by 1, 2, 3, 4"
+
+    def insert_tbl_dft = "test_insert_dft_tbl_mm"
+    sql """ DROP TABLE IF EXISTS ${insert_tbl_dft}"""
+    
+    // `k7` should be float type, and bug exists now, 
https://github.com/apache/doris/pull/20867
+    // `k9` should be char(16), and bug exists now as error msg raised:"can 
not cast from origin type TINYINT to target type=CHAR(16)" when doing insert
+    // "`k13` datetime default CURRENT_TIMESTAMP" might have cast error in 
strict mode when doing insert:
+    // [INTERNAL_ERROR]Invalid value in strict mode for function CAST, source 
column String, from type String to type DateTimeV2
+    sql """
+        CREATE TABLE ${insert_tbl_dft} (
+            `k1` boolean default "true",
+            `k2` tinyint default "10",
+            `k3` smallint default "10000",
+            `k4` int default "10000000",
+            `k5` bigint default "92233720368547758",
+            `k6` largeint default "19223372036854775807",
+                 
+            `k8` double default "3.14159",
+
+            `k10` varchar(64) default "hello world, today is 15/06/2023",
+            `k11` date default "2023-06-15",
+            `k12` datetime default "2023-06-15 16:10:15" 
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+        PROPERTIES (
+            "replication_num"="1"
+        );
+    """
+    
+    sql """ set enable_nereids_planner=true """
+    sql """ set enable_nereids_dml=true """
+    sql """ insert into ${insert_tbl_dft} values() """
+
+    sql """ set enable_nereids_planner=false """
+    sql """ set enable_nereids_dml=false """
+    sql """ insert into ${insert_tbl_dft} values() """
+    
+    qt_select """ select k1,k2,k3,k4,k5,k6,k8,k10,k11,k12 from 
${insert_tbl_dft} """
+    sql """ set enable_memtable_on_sink_node=false """
+}
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_move_memtable.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_move_memtable.groovy
new file mode 100644
index 0000000000..08feba85cc
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_move_memtable.groovy
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import java.util.Date
+import java.text.SimpleDateFormat
+
+suite("test_stream_load_move_memtable", "p0") {
+    sql "show tables"
+
+    def tableName = "test_stream_load_strict_mm"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+            `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+            `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+            `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+            `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+            `v8` datetime REPLACE_IF_NOT_NULL NULL,
+            `v9` date REPLACE_IF_NOT_NULL NULL,
+            `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+            `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+            `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
+        PARTITION partition_b VALUES [("100000"), ("1000000000")),
+        PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+        PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+    
+    // test strict_mode success
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', '\t'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'test_strict_mode.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    sql "sync"
+    qt_sql "select * from ${tableName} order by k1, k2"
+
+    // test strict_mode fail
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', '\t'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'test_strict_mode_fail.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals(2, json.NumberTotalRows)
+            assertEquals(1, json.NumberFilteredRows)
+        }
+    }
+
+    sql "sync"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+          `id` int(11) NULL,
+          `value` varchar(64) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "disable_auto_compaction" = "false"
+        );
+    """
+
+    streamLoad {
+        table "${tableName}"
+
+        set 'line_delimiter', 'weizuo'
+        set 'column_separator', '|'
+        set 'columns', 'id, value'
+
+        file 'test_line_delimiter.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(3, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+
+    sql "sync"
+    rowCount = sql "select count(1) from ${tableName}"
+    assertEquals(3, rowCount[0][0])
+
+    // test load_nullable_to_not_nullable
+    def tableName2 = "load_nullable_to_not_nullable"
+    sql """ DROP TABLE IF EXISTS ${tableName2} """
+    sql """
+    CREATE TABLE IF NOT EXISTS `${tableName2}` (
+        k1 int(32) NOT NULL,
+        k2 smallint NOT NULL,
+        k3 int NOT NULL,
+        k4 bigint NOT NULL,
+        k5 decimal(9, 3) NOT NULL,
+        k6 char(5) NOT NULL,
+        k10 date NOT NULL,
+        k11 datetime NOT NULL,
+        k7 varchar(20) NOT NULL,
+        k8 double max NOT NULL,
+        k9 float sum NOT NULL )
+    AGGREGATE KEY(k1,k2,k3,k4,k5,k6,k10,k11,k7)
+    PARTITION BY RANGE(k2) (
+        PARTITION partition_a VALUES LESS THAN MAXVALUE
+    )
+    DISTRIBUTED BY HASH(k1, k2, k5)
+    BUCKETS 3
+    PROPERTIES ( "replication_allocation" = "tag.location.default: 1");
+    """
+
+    streamLoad {
+        table "${tableName2}"
+
+        set 'column_separator', '\t'
+        set 'columns', 
'col,k1=year(col),k2=month(col),k3=month(col),k4=day(col),k5=7.7,k6="a",k10=date(col),k11=FROM_UNIXTIME(2019,"%Y-%m-%dT%H:%i:%s"),k7="k7",k8=month(col),k9=day(col)'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'test_time.data'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(1, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+    sql "sync"
+    order_qt_sql1 " SELECT * FROM ${tableName2}"
+
+    // test common case
+    def tableName3 = "test_all"
+    def tableName4 = "test_less_col"
+    def tableName5 = "test_bitmap_and_hll"
+    def tableName6 = "test_unique_key"
+    def tableName7 = "test_unique_key_with_delete"
+    def tableName8 = "test_array"
+    def tableName10 = "test_struct"
+    sql """ DROP TABLE IF EXISTS ${tableName3} """
+    sql """ DROP TABLE IF EXISTS ${tableName4} """
+    sql """ DROP TABLE IF EXISTS ${tableName5} """
+    sql """ DROP TABLE IF EXISTS ${tableName6} """
+    sql """ DROP TABLE IF EXISTS ${tableName7} """
+    sql """ DROP TABLE IF EXISTS ${tableName8} """
+    sql """ DROP TABLE IF EXISTS ${tableName10} """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName3} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `k3` smallint(6) NULL,
+      `k4` bigint(20) NULL,
+      `k5` largeint(40) NULL,
+      `k6` float NULL,
+      `k7` double NULL,
+      `k8` decimal(9, 0) NULL,
+      `k9` char(10) NULL,
+      `k10` varchar(1024) NULL,
+      `k11` text NULL,
+      `k12` date NULL,
+      `k13` datetime NULL
+    ) ENGINE=OLAP
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName4} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `k3` smallint(6) NULL,
+      `k4` bigint(20) NULL,
+      `k5` largeint(40) NULL
+    ) ENGINE=OLAP
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName5} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `v1` bitmap bitmap_union,
+      `v2` hll hll_union
+    ) ENGINE=OLAP
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName6} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `v1` varchar(1024)
+    ) ENGINE=OLAP
+    UNIQUE KEY(k1, k2)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName7} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `v1` varchar(1024)
+    ) ENGINE=OLAP
+    UNIQUE KEY(k1, k2)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "function_column.sequence_type" = "int",
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName8} (
+      `k1` INT(11) NULL COMMENT "",
+      `k2` ARRAY<SMALLINT> NULL COMMENT "",
+      `k3` ARRAY<INT(11)> NULL COMMENT "",
+      `k4` ARRAY<BIGINT> NULL COMMENT "",
+      `k5` ARRAY<CHAR> NULL COMMENT "",
+      `k6` ARRAY<VARCHAR(20)> NULL COMMENT "",
+      `k7` ARRAY<DATE> NULL COMMENT "", 
+      `k8` ARRAY<DATETIME> NULL COMMENT "",
+      `k9` ARRAY<FLOAT> NULL COMMENT "",
+      `k10` ARRAY<DOUBLE> NULL COMMENT "",
+      `k11` ARRAY<DECIMAL(20, 6)> NULL COMMENT ""
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`k1`)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName10} (
+      `k1` INT(11) NULL COMMENT "",
+      `k2` STRUCT<
+               f1:SMALLINT,
+               f2:INT(11),
+               f3:BIGINT,
+               f4:CHAR,
+               f5:VARCHAR(20),
+               f6:DATE,
+               f7:DATETIME,
+               f8:FLOAT,
+               f9:DOUBLE,
+               f10:DECIMAL(20, 6)> NULL COMMENT ""
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`k1`)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+
+    // load all columns
+    streamLoad {
+        table "${tableName3}"
+
+        set 'column_separator', ','
+        set 'memtable_on_sink_node', 'true'
+
+        file 'all_types.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(2500, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+    sql "sync"
+    order_qt_all11 "SELECT count(*) FROM ${tableName3}" // 2500
+    order_qt_all12 "SELECT count(*) FROM ${tableName3} where k1 <= 10"  // 11
+    sql """truncate table ${tableName3}"""
+    sql """sync"""
+
+    // load part of columns
+    streamLoad {
+        table "${tableName3}"
+
+        set 'column_separator', ','
+        set 'columns', 'k1, k2'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'all_types.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals(0, json.NumberLoadedRows)
+        }
+    }
+    sql "sync"
+
+    // load with skip 2 columns, with gzip
+    streamLoad {
+        table "${tableName3}"
+
+        set 'column_separator', ','
+        set 'columns', 'k1, k2, k3, k4, tmp1, tmp2, k7, k8, k9, k10, k11, k12, 
k13'
+        set 'compress_type', 'gz'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'all_types.csv.gz'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(2500, json.NumberTotalRows)
+            assertEquals(2500, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all21 "SELECT count(*) FROM ${tableName3}" // 2500
+    order_qt_all22 "SELECT count(*) FROM ${tableName3} where k1 is null"  // 0
+    order_qt_all23 "SELECT count(*) FROM ${tableName3} where k5 is null"  // 
2500
+    order_qt_all24 "SELECT count(*) FROM ${tableName3} where k6 is null"  // 
2500
+    sql """truncate table ${tableName3}"""
+    sql """sync"""
+
+    // load with column mapping and where predicate
+    streamLoad {
+        table "${tableName3}"
+
+        set 'column_separator', ','
+        set 'columns', 'k1, k2, k3, k4, tmp5, k6, tmpk7, k8, k9, k10, k11, 
k12, k13, k7=tmpk7+1'
+        set 'where', 'k1 <= 10'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'all_types.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(2500, json.NumberTotalRows)
+            assertEquals(11, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(2489, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all31 "SELECT count(*) FROM ${tableName3}" // 11
+    order_qt_all32 "SELECT count(*) FROM ${tableName3} where k7 >= 7"  // 11
+    order_qt_all33 "SELECT count(*) FROM ${tableName3} where k5 is null"  // 11
+    sql """truncate table ${tableName3}"""
+    sql """sync"""
+
+    // load without strict_mode
+    streamLoad {
+        table "${tableName3}"
+
+        set 'column_separator', ','
+        set 'columns', 'tmpk1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, 
k13, k1=k13'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'all_types.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(2500, json.NumberTotalRows)
+            assertEquals(2500, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all41 "SELECT count(*) FROM ${tableName3} where k1 is null" // 
2500
+    sql """truncate table ${tableName3}"""
+    sql """sync"""
+
+    // load with strict_mode false and max_filter_ratio
+    streamLoad {
+        table "${tableName4}"
+
+        set 'column_separator', ','
+        set 'columns', 'k1, k2, k3, k4, tmpk5, tmpk6, tmpk7, tmpk8, tmpk9, 
tmpk10, tmpk11, tmpk12, k5'
+        set 'max_filter_ratio', '1'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'all_types.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(2500, json.NumberTotalRows)
+            assertEquals(0, json.NumberLoadedRows)
+            assertEquals(2500, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all51 "SELECT count(*) FROM ${tableName4}" // 0
+    sql """truncate table ${tableName4}"""
+    sql """sync"""
+
+    // load with strict_mode true and max_filter_ratio
+    streamLoad {
+        table "${tableName4}"
+
+        set 'column_separator', ','
+        set 'columns', 'k1, k2, k3, k4, tmpk5, tmpk6, tmpk7, tmpk8, tmpk9, 
tmpk10, tmpk11, tmpk12, k5'
+        set 'max_filter_ratio', '0'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'all_types.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals(0, json.NumberLoadedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all61 "SELECT count(*) FROM ${tableName4}" // 0
+    sql """truncate table ${tableName4}"""
+    sql """sync"""
+
+    // load bitmap and hll with bzip2
+    streamLoad {
+        table "${tableName5}"
+
+        set 'column_separator', ','
+        set 'columns', 'k1, k2, tmp1, tmp2, v1=to_bitmap(tmp1), 
v2=hll_hash(tmp2)'
+        set 'compress_type', 'bz2'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'bitmap_hll.csv.bz2'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(1025, json.NumberTotalRows)
+            assertEquals(1025, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all71 "SELECT k1, k2, bitmap_union_count(v1), HLL_UNION_AGG(v2) 
FROM ${tableName5} group by k1, k2" // 1,2,1025,1028
+    sql """truncate table ${tableName5}"""
+    sql """sync"""
+
+    // load unique key
+    streamLoad {
+        table "${tableName6}"
+
+        set 'column_separator', ','
+        set 'compress_type', 'lz4'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'unique_key.csv.lz4'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(8001, json.NumberTotalRows)
+            assertEquals(8001, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all81 "SELECT count(*) from ${tableName6}" // 2
+    sql """truncate table ${tableName6}"""
+    sql """sync"""
+
+    // load unique key with delete and sequence
+    streamLoad {
+        table "${tableName7}"
+
+        set 'column_separator', ','
+        set 'columns', 'k1,k2,v1,del,seq'
+        set 'delete', 'del=1'
+        set 'merge_type', 'merge'
+        set 'function_column.sequence_col', 'seq'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'unique_key_with_delete.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(6, json.NumberTotalRows)
+            assertEquals(6, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all91 "SELECT count(*) from ${tableName7}" // 2
+    sql """truncate table ${tableName7}"""
+    sql """sync"""
+
+    // ===== test array stream load
+    // malformat without strictmode
+    streamLoad {
+        table "${tableName8}"
+
+        set 'column_separator', '|'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'array_malformat.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(5, json.NumberTotalRows)
+            assertEquals(5, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all101 "SELECT * from ${tableName8}" // 5
+    sql """truncate table ${tableName8}"""
+    sql """sync"""
+
+    // malformat with strictmode
+    streamLoad {
+        table "${tableName8}"
+
+        set 'column_separator', '|'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'array_malformat.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals(5, json.NumberTotalRows)
+            assertEquals(3, json.NumberLoadedRows)
+            assertEquals(2, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+
+    // normal load
+    streamLoad {
+        table "${tableName8}"
+
+        set 'column_separator', '|'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'array_normal.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(9, json.NumberTotalRows)
+            assertEquals(9, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    order_qt_all102 "SELECT * from ${tableName8}" // 8
+    sql """truncate table ${tableName8}"""
+    sql """sync"""
+
+    // malformat with mismatch array type
+    streamLoad {
+        table "${tableName8}"
+
+        set 'column_separator', '|'
+        set 'columns', 
'k1,k2,k3,k4,k5,k6,k7,k8,k9,b10,k11,k10=array_remove(cast(k5 as array<bigint>), 
1)'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'array_normal.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertTrue(json.Message.contains('Don\'t support load from type'))
+        }
+    }
+    sql "sync"
+
+    // ===== test struct stream load
+    // malformat without strictmode
+    streamLoad {
+        table "${tableName10}"
+
+        set 'column_separator', '|'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'struct_malformat.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(5, json.NumberTotalRows)
+            assertEquals(5, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    qt_all111 "SELECT * from ${tableName10} order by k1" // 5
+    sql """truncate table ${tableName10}"""
+    sql """sync"""
+
+    // malformat with strictmode
+    streamLoad {
+        table "${tableName10}"
+
+        set 'column_separator', '|'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'struct_malformat.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("fail", json.Status.toLowerCase())
+            assertEquals(5, json.NumberTotalRows)
+            assertEquals(3, json.NumberLoadedRows)
+            assertEquals(2, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+
+    // normal load
+    streamLoad {
+        table "${tableName10}"
+
+        set 'column_separator', '|'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'struct_normal.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(13, json.NumberTotalRows)
+            assertEquals(13, json.NumberLoadedRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    sql "sync"
+    qt_all112 "SELECT * from ${tableName10} order by k1" // 10
+    sql """truncate table ${tableName10}"""
+    sql """sync"""
+
+    // test immutable partition success
+    def tableName9 = "test_immutable_partition"
+    sql """ DROP TABLE IF EXISTS ${tableName9} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName9} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION partition_a VALUES [("-9223372036854775808"), ("10")),
+        PARTITION partition_b VALUES [("10"), ("20")),
+        PARTITION partition_c VALUES [("20"), ("30")),
+        PARTITION partition_d VALUES [("30"), ("40")))
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    sql """ALTER TABLE ${tableName9} ADD PARTITION partition_e VALUES less 
than ('3000') properties ('mutable' = 'false')"""
+    sql """ALTER TABLE ${tableName9} MODIFY PARTITION partition_b set 
('mutable' = 'false')"""
+
+    streamLoad {
+        table "${tableName9}"
+
+        set 'column_separator', '\t'
+        set 'columns', 'k1, k2, v1, v2, v3'
+        set 'partitions', 'partition_a, partition_b, partition_c, partition_d, 
partition_e'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'test_immutable_partition.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(11, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(5, json.NumberUnselectedRows)
+        }
+    }
+    
+    sql "sync"
+    order_qt_sql1 "select * from ${tableName9} order by k1, k2"
+
+    // test common user
+    def tableName13 = "test_common_user"
+    sql """ DROP TABLE IF EXISTS ${tableName13} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName13} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION partition_a VALUES [("-9223372036854775808"), ("10")),
+        PARTITION partition_b VALUES [("10"), ("20")),
+        PARTITION partition_c VALUES [("20"), ("30")),
+        PARTITION partition_d VALUES [("30"), ("40")))
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+    
+    sql """create USER common_user@'%' IDENTIFIED BY '123456'"""
+    sql """GRANT LOAD_PRIV ON *.* TO 'common_user'@'%';"""
+
+    streamLoad {
+        table "${tableName13}"
+
+        set 'column_separator', '|'
+        set 'columns', 'k1, k2, v1, v2, v3'
+        set 'strict_mode', 'true'
+        set 'Authorization', 'Basic  Y29tbW9uX3VzZXI6MTIzNDU2'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'test_auth.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(2, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    
+    sql "sync"
+    sql """DROP USER 'common_user'@'%'"""
+
+    // test default value
+    def tableName14 = "test_default_value"
+    sql """ DROP TABLE IF EXISTS ${tableName14} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName14} (
+            `k1` bigint(20) NULL DEFAULT "1",
+            `k2` bigint(20) NULL ,
+            `v1` tinyint(4) NULL,
+            `v2` tinyint(4) NULL,
+            `v3` tinyint(4) NULL,
+            `v4` DATETIME NULL DEFAULT CURRENT_TIMESTAMP
+        ) ENGINE=OLAP
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    streamLoad {
+        table "${tableName14}"
+
+        set 'column_separator', '|'
+        set 'columns', 'k2, v1, v2, v3'
+        set 'strict_mode', 'true'
+        set 'memtable_on_sink_node', 'true'
+
+        file 'test_default_value.csv'
+        time 10000 // limit inflight 10s
+
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(2, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+            assertEquals(0, json.NumberUnselectedRows)
+        }
+    }
+    
+    sql "sync"
+    def res = sql "select * from ${tableName14}"
+    def time = res[0][5].toString().split("T")[0].split("-")
+    def year = time[0].toString()
+    SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd")
+    def now = sdf.format(new Date()).toString().split("-")
+
+    // parse time is correct
+    // Due to the time difference in parsing, should deal with three 
situations:
+    // 2023-6-29 -> 2023-6-30
+    // 2023-6-30 -> 2023-7-1
+    // 2023-12-31 -> 2024-1-1
+    // now only compare year simply, you can retry if this test is error.
+    assertEquals(year, now[0])
+    // parse k1 default value
+    assertEquals(res[0][0], 1)
+    assertEquals(res[1][0], 1)
+}
+
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_new_move_memtable.groovy
 
b/regression-test/suites/load_p0/stream_load/test_stream_load_new_move_memtable.groovy
new file mode 100644
index 0000000000..1fb43b21c7
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_new_move_memtable.groovy
@@ -0,0 +1,559 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Random;
+
+suite("test_stream_load_new_move_memtable", "p0") {
+
+    sql """set enable_memtable_on_sink_node=true"""
+
+    // 1. test column with currenttimestamp default value
+    def tableName1 = "test_stream_load_new_current_timestamp_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName1} (
+            id int,
+            name CHAR(10),
+            dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+            dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+            dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+            dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'columns', 'id, name'
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName1}"
+            time 10000
+            file 'test_stream_load_new_current_timestamp.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql1 "select id, name from ${tableName1}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName1}"
+    }
+
+    // 2. test change column order
+    def tableName2 = "test_stream_load_new_change_column_order_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName2} (
+            k1 int,
+            k2 smallint,
+            k3 CHAR(10),
+            k4 bigint,
+            k5 decimal(6, 3),
+            k6 float
+        )
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'columns', 'k1, k3, k2, k4, k6, k5'
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName2}"
+            time 10000
+            file 'test_stream_load_new_change_column_order.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql2 "select * from ${tableName2}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName2}"
+    }
+
+    // 3. test with function
+    def tableName3 = "test_stream_load_new_function_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName3} (
+            id int,
+            name CHAR(10),
+            year int,
+            month int,
+            day int
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'columns', 'id, name, tmp_c3, year = year(tmp_c3), month = 
month(tmp_c3), day = day(tmp_c3)'
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName3}"
+            time 10000
+            file 'test_stream_load_new_function.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql3 "select * from ${tableName3}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName3}"
+    }
+
+    // 4. test column number mismatch
+    def tableName4 = "test_stream_load_new_column_number_mismatch_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName4} (
+            k1 int NOT NULL,
+            k2 CHAR(10) NOT NULL,
+            k3 smallint NOT NULL
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName4}"
+            time 10000
+            file 'test_stream_load_new_column_number_mismatch.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+            }
+        }
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("Distribution column(id) doesn't 
exist"), e.getMessage())
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName4}"
+    }
+
+    // 5. test with default value
+    def tableName5 = "test_stream_load_new_default_value_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName5} (
+            id int NOT NULL,
+            name CHAR(10) NOT NULL,
+            date DATE NOT NULL, 
+            max_dwell_time INT DEFAULT "0",
+            min_dwell_time INT DEFAULT "99999" 
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'columns', 'id, name, date'
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName5}"
+            time 10000
+            file 'test_stream_load_new_default_value.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql5 "select * from ${tableName5}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName5}"
+    }
+
+    // 6. test some column type
+    def tableName6 = "test_stream_load_new_column_type_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName6} (
+            c_int int(11) NULL,
+            c_char char(15) NULL,
+            c_varchar varchar(100) NULL,
+            c_bool boolean NULL,
+            c_tinyint tinyint(4) NULL,
+            c_smallint smallint(6) NULL,
+            c_bigint bigint(20) NULL,
+            c_largeint largeint(40) NULL,
+            c_float float NULL,
+            c_double double NULL,
+            c_decimal decimal(6, 3) NULL,
+            c_decimalv3 decimalv3(6, 3) NULL,
+            c_date date NULL,
+            c_datev2 datev2 NULL,
+            c_datetime datetime NULL,
+            c_datetimev2 datetimev2(0) NULL
+        )
+        DISTRIBUTED BY HASH(c_int) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName6}"
+            time 10000
+            file 'test_stream_load_new_column_type.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql6 "select * from ${tableName6}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName6}"
+    }
+
+    // 7. test duplicate key
+    def tableName7 = "test_stream_load_duplicate_key_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName7}
+        (
+            user_id LARGEINT NOT NULL,
+            username VARCHAR(50) NOT NULL,
+            city VARCHAR(20),
+            age SMALLINT,
+            sex TINYINT,
+            phone LARGEINT,
+            address VARCHAR(500),
+            register_time DATETIME
+        )
+        DUPLICATE KEY(`user_id`, `username`)
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName7}"
+            time 10000
+            file 'test_stream_load_data_model.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql7 "select * from ${tableName7}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName7}"
+    }
+
+    // 8. test merge on read unique key
+    def tableName8 = "test_stream_load_unique_key_merge_on_read_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName8}
+        (
+            user_id LARGEINT NOT NULL,
+            username VARCHAR(50) NOT NULL,
+            city VARCHAR(20),
+            age SMALLINT,
+            sex TINYINT,
+            phone LARGEINT,
+            address VARCHAR(500),
+            register_time DATETIME
+        )
+        UNIQUE KEY(`user_id`, `username`)
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName8}"
+            time 10000
+            file 'test_stream_load_data_model.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql8 "select * from ${tableName8}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName8}"
+    }
+
+    // 9. test merge on write unique key
+    def tableName9 = "test_stream_load_unique_key_merge_on_write_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName9}
+        (
+            user_id LARGEINT NOT NULL,
+            username VARCHAR(50) NOT NULL,
+            city VARCHAR(20),
+            age SMALLINT,
+            sex TINYINT,
+            phone LARGEINT,
+            address VARCHAR(500),
+            register_time DATETIME
+        )
+        UNIQUE KEY(`user_id`, `username`)
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName9}"
+            time 10000
+            file 'test_stream_load_data_model.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql9 "select * from ${tableName9}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName9}"
+    }
+
+    // 10. test stream load multiple times
+    def tableName10 = "test_stream_load_multiple_times_mm"
+    Random rd = new Random()
+    def disable_auto_compaction = "false"
+    if (rd.nextBoolean()) {
+        disable_auto_compaction = "true"
+    }
+    log.info("disable_auto_compaction: ${disable_auto_compaction}".toString())
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName10}
+        (
+            user_id LARGEINT NOT NULL,
+            username VARCHAR(50) NOT NULL,
+            money INT
+        )
+        DUPLICATE KEY(`user_id`, `username`)
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "disable_auto_compaction" = "${disable_auto_compaction}"
+        )
+        """
+        for (int i = 0; i < 3; ++i) {
+            streamLoad {
+                set 'column_separator', ','
+                set 'memtable_on_sink_node', 'true'
+                table "${tableName10}"
+                time 10000
+                file 'test_stream_load_multiple_times.csv'
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(500, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                }
+            }
+        }
+
+        qt_sql10 "select count(*) from ${tableName10}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName10}"
+    }
+
+    // 11. test stream load column separator
+    def tableName11 = "test_stream_load_column_separator_mm"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName11} (
+            id int,
+            name CHAR(10),
+            dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+            dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+            dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+            dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', '--'
+            set 'columns', 'id, name'
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName11}"
+            time 10000
+            file 'test_stream_load_new_column_separator.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql11 "select id, name from ${tableName11}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName11}"
+    }
+
+    // 12. test stream load line delimiter
+    def tableName12 = "test_stream_load_line_delimiter_mm"
+    
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName12} (
+            id int,
+            name CHAR(10),
+            dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+            dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+            dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+            dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'line_delimiter', '||'
+            set 'columns', 'id, name'
+            set 'memtable_on_sink_node', 'true'
+            table "${tableName12}"
+            time 10000
+            file 'test_stream_load_new_line_delimiter.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(11, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+
+        qt_sql12 "select id, name from ${tableName12}"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName12}"
+    }
+
+    sql """set enable_memtable_on_sink_node=false"""
+}
+
diff --git 
a/regression-test/suites/rollup_p0/test_materialized_view_move_memtable.groovy 
b/regression-test/suites/rollup_p0/test_materialized_view_move_memtable.groovy
new file mode 100644
index 0000000000..917fbdc08a
--- /dev/null
+++ 
b/regression-test/suites/rollup_p0/test_materialized_view_move_memtable.groovy
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_materialized_view_move_memtable", "rollup") {
+
+    // because nereids cannot support rollup correctly forbid it temporary
+    sql """set enable_nereids_planner=false"""
+    sql """set enable_memtable_on_sink_node=true"""
+
+    def tbName1 = "test_materialized_view_mm"
+    def tbName2 = "test_materialized_view_dynamic_partition_mm"
+    def tbName3 = "test_materialized_view_schema_change_mm"
+    def tbName4 = "test_materialized_view_dynamic_partition_schema_change_mm"
+
+    def getJobState = { tableName ->
+        def jobStateResult = sql """  SHOW ALTER TABLE MATERIALIZED VIEW WHERE 
TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """
+        return jobStateResult[0][8]
+    }
+
+    sql "DROP TABLE IF EXISTS ${tbName1}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName1}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            DUPLICATE KEY(k1, k2)
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 32 
+            properties(
+                "light_schema_change" = "false",
+                "replication_num" = "1"
+            );
+        """
+
+    sql "DROP TABLE IF EXISTS ${tbName2}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName2}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1)
+            PROPERTIES
+            (
+                "dynamic_partition.enable" = "true",
+                "dynamic_partition.time_unit" = "DAY",
+                "dynamic_partition.start" = "-2147483648",
+                "dynamic_partition.end" = "3",
+                "dynamic_partition.prefix" = "p",
+                "dynamic_partition.buckets" = "32",
+                "light_schema_change" = "false",
+                "replication_num"="1"
+            );
+        """
+    
+    sql "DROP TABLE IF EXISTS ${tbName3}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName3}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            DUPLICATE KEY(k1, k2)
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 32 
+
+            properties(
+                "light_schema_change" = "true",
+                "replication_num" = "1"
+            );
+        """
+    
+    sql "DROP TABLE IF EXISTS ${tbName4}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName4}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1)
+            PROPERTIES
+            (
+                "dynamic_partition.enable" = "true",
+                "dynamic_partition.time_unit" = "DAY",
+                "dynamic_partition.start" = "-2147483648",
+                "dynamic_partition.end" = "3",
+                "dynamic_partition.prefix" = "p",
+                "dynamic_partition.buckets" = "32",
+                "light_schema_change" = "true",
+                "replication_num"="1"
+            );
+        """
+    
+    sql "CREATE materialized VIEW test_load_open AS SELECT k1 FROM ${tbName1} 
GROUP BY k1;"
+    int max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName1)
+        if (res == "FINISHED" || res == "CANCELLED") {
+            assertEquals("FINISHED", res)
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
+    sql "CREATE materialized VIEW test_load_open_dynamic_partition AS SELECT 
k1 FROM ${tbName2} GROUP BY k1;"
+    max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName2)
+        if (res == "FINISHED" || res == "CANCELLED") {
+            assertEquals("FINISHED", res)
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
+    sql "CREATE materialized VIEW test_load_open_schema_change AS SELECT k1 
FROM ${tbName3} GROUP BY k1;"
+    max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName3)
+        if (res == "FINISHED" || res == "CANCELLED") {
+            assertEquals("FINISHED", res)
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
+    sql "CREATE materialized VIEW 
test_load_open_dynamic_partition_schema_change AS SELECT k1 FROM ${tbName4} 
GROUP BY k1;"
+    max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName4)
+        if (res == "FINISHED" || res == "CANCELLED") {
+            assertEquals("FINISHED", res)
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
+    sql "insert into ${tbName1} values('2000-05-20', 1.5, 'test', 1);"
+    sql "insert into ${tbName1} values('2010-05-20', 1.5, 'test', 1);"
+
+    sql "insert into ${tbName2} values('2000-05-20', 1.5, 'test', 1);"
+    sql "insert into ${tbName2} values('2010-05-20', 1.5, 'test', 1);"
+
+    sql "insert into ${tbName3} values('2000-05-20', 1.5, 'test', 1);"
+    sql "ALTER table ${tbName3} ADD COLUMN new_column INT;"
+    sql "insert into ${tbName3} values('2010-05-20', 1.5, 'test', 1, 1);"
+
+    sql "insert into ${tbName4} values('2000-05-20', 1.5, 'test', 1);"
+    sql "ALTER table ${tbName4} ADD COLUMN new_column INT;"
+    sql "insert into ${tbName4} values('2010-05-20', 1.5, 'test', 1, 1);"
+
+    sql "DROP TABLE ${tbName1} FORCE;"
+    sql "DROP TABLE ${tbName2} FORCE;"
+    sql "DROP TABLE ${tbName3} FORCE;"
+    sql "DROP TABLE ${tbName4} FORCE;"
+    sql """set enable_memtable_on_sink_node=false"""
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to