This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a5761a25c5 [feature](move-memtable)[7/7] add regression tests (#23515)
a5761a25c5 is described below
commit a5761a25c5d8e4a27ce350640e84d606b6e134d7
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Aug 26 17:52:10 2023 +0800
[feature](move-memtable)[7/7] add regression tests (#23515)
Co-authored-by: laihui <[email protected]>
---
.../load_p0/insert/test_insert_move_memtable.out | 17 +
.../stream_load/test_stream_load_move_memtable.out | 101 +++
.../test_stream_load_new_move_memtable.out | 126 +++
.../test_materialized_view_move_memtable.out | 52 ++
.../insert/test_insert_move_memtable.groovy | 93 ++
.../test_stream_load_move_memtable.groovy | 969 +++++++++++++++++++++
.../test_stream_load_new_move_memtable.groovy | 559 ++++++++++++
.../test_materialized_view_move_memtable.groovy | 222 +++++
8 files changed, 2139 insertions(+)
diff --git a/regression-test/data/load_p0/insert/test_insert_move_memtable.out
b/regression-test/data/load_p0/insert/test_insert_move_memtable.out
new file mode 100644
index 0000000000..dcb5b854a7
--- /dev/null
+++ b/regression-test/data/load_p0/insert/test_insert_move_memtable.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql1 --
+false -2147483647 7 7
+false 103 9 16
+false 1002 2 18
+false 5014 5 23
+false 2147483647 8 31
+true -2147483647 4 4
+true 3021 1 15
+true 3021 10 15
+true 25699 6 21
+true 2147483647 3 24
+
+-- !select --
+true 10 10000 10000000 92233720368547758
19223372036854775807 3.14159 hello world, today is 15/06/2023
2023-06-15 2023-06-15T16:10:15
+true 10 10000 10000000 92233720368547758
19223372036854775807 3.14159 hello world, today is 15/06/2023
2023-06-15 2023-06-15T16:10:15
+
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_move_memtable.out
b/regression-test/data/load_p0/stream_load/test_stream_load_move_memtable.out
new file mode 100644
index 0000000000..8816fce94b
--- /dev/null
+++
b/regression-test/data/load_p0/stream_load/test_stream_load_move_memtable.out
@@ -0,0 +1,101 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+-2 -51 \N 1 \N \N \N \N \N \N
\N 2 \N \N
+-2 -50 \N 1 \N \N \N \N \N \N
\N \N j \N
+
+-- !sql1 --
+2019 9 9 9 7.700 a 2019-09-09
1970-01-01T08:33:39 k7 9.0 9.0
+
+-- !all11 --
+2500
+
+-- !all12 --
+11
+
+-- !all21 --
+2500
+
+-- !all22 --
+0
+
+-- !all23 --
+2500
+
+-- !all24 --
+2500
+
+-- !all31 --
+11
+
+-- !all32 --
+11
+
+-- !all33 --
+11
+
+-- !all41 --
+2500
+
+-- !all51 --
+0
+
+-- !all61 --
+0
+
+-- !all71 --
+1 2 1025 1028
+
+-- !all81 --
+2
+
+-- !all91 --
+1
+
+-- !all101 --
+1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ["a",
"b", "c", "d", "e"] ["hello", "world"] [1991-01-01, 1992-02-02,
1993-03-03] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878]
[1.000000, 1.200000, 1.300000]
+2 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ["a",
"b", "c", "d", "e"] ["hello", "world"] [1991-01-01, 1992-02-02,
1993-03-03] \N \N \N [1.000000, NULL, 1.300000]
+3 \N \N \N \N \N \N \N \N \N
\N
+4 \N \N \N \N \N \N \N \N \N
\N
+5 \N \N \N \N \N \N \N \N \N
\N
+
+-- !all102 --
+1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ["a",
"b", "c", "d", "e"] ["hello", "world"] [1991-01-01, 1992-02-02,
1993-03-03] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878]
[1.000000, 1.200000, 1.300000]
+2 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ["a",
"b", "c", "d", "e"] ["hello", "world"] [1991-01-01, 1992-02-02,
1993-03-03] \N \N \N [1.000000, NULL, 1.300000]
+3 \N \N \N \N \N \N \N \N \N
\N
+4 [] [] [] [] [] [] [] [] []
[]
+5 [NULL] [NULL] [NULL] [NULL] [NULL] [NULL] [NULL] [NULL] [NULL]
[NULL]
+6 [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL,
"null"] [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL,
NULL, NULL, NULL, NULL, NULL]
+6 [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL,
NULL] [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL, NULL] [NULL,
NULL, NULL, NULL, NULL, NULL]
+7 [1, 2, 3, 4, 5] \N \N \N \N \N \N \N
\N \N
+8 [1, 2, 3, 4, 5] \N \N \N \N \N [NULL] \N
[NULL] \N
+
+-- !all111 --
+1 {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+2 {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+3 \N
+4 \N
+5 \N
+
+-- !all112 --
+1 {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+2 {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+3 {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+4 {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+5 {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+6 {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+7 {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+8 {1, NULL, NULL, NULL, "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+9 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+10 {1, 100, 100000, "a", "doris", 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.100000}
+11 {1, 100, 100000, "a", "doris", 2023-02-26, NULL, NULL, NULL, 1.100000}
+12 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+13 \N
+
+-- !sql1 --
+-2 -50 1 \N 44
+2 -51 1 2 \N
+25 8 9 2 4
+33 10 9 2 7
+33 9 8 2 1
+38 1 9 2 5
+
diff --git
a/regression-test/data/load_p0/stream_load/test_stream_load_new_move_memtable.out
b/regression-test/data/load_p0/stream_load/test_stream_load_new_move_memtable.out
new file mode 100644
index 0000000000..52440d9843
--- /dev/null
+++
b/regression-test/data/load_p0/stream_load/test_stream_load_new_move_memtable.out
@@ -0,0 +1,126 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql1 --
+10000 aa
+10001 bb
+10002 cc
+10003 dd
+10004 ee
+10005 ff
+10006 gg
+10007 hh
+10008 ii
+10009 jj
+10010 kk
+
+-- !sql2 --
+10000 4444 aa 5555555 111.111 3.14
+10001 3333 bb 666 222.222 5.32
+10002 2222 cc 453 333.333 4.321
+10003 1111 dd -3241 444.444 1.34
+10004 -9999 ee 21342 555.555 1.22
+10005 8888 ff 64562 666.666 9.133
+10006 -7777 gg -12313342 777.777 8.1
+10007 6666 hh 314234 888.888 34.124
+10008 -5555 ii 1341 999.999 342.12
+10009 4444 jj -123 111.111 11.22
+10010 -3333 kk 12314 222.222 13.0
+
+-- !sql3 --
+10000 aa 2017 10 1
+10001 bb 2017 10 2
+10002 cc 2017 10 3
+10003 dd 2017 10 4
+10004 ee 2017 10 5
+10005 ff 2017 10 6
+10006 gg 2017 10 7
+10007 hh 2017 10 8
+10008 ii 2017 10 9
+10009 jj 2017 10 10
+10010 kk 2017 10 11
+
+-- !sql5 --
+10000 aa 2017-10-01 0 99999
+10001 bb 2017-10-02 0 99999
+10002 cc 2017-10-03 0 99999
+10003 dd 2017-10-04 0 99999
+10004 ee 2017-10-05 0 99999
+10005 ff 2017-10-06 0 99999
+10006 gg 2017-10-07 0 99999
+10007 hh 2017-10-08 0 99999
+10008 ii 2017-10-09 0 99999
+10009 jj 2017-10-10 0 99999
+10010 kk 2017-10-11 0 99999
+
+-- !sql6 --
+10000 aa 北京 false 11 4444 5555555 41232314 3.14
123.3423 111.111 111.111 2017-10-01 2017-10-01
2017-10-01T06:00 2017-10-01T06:00
+10001 bb 北京 false 22 3333 666 2768658 5.32
123111.3242 222.222 222.222 2017-10-02 2017-10-02
2017-10-02T07:00 2017-10-02T07:00
+10002 cc 北京 true 33 2222 453 5463456 4.321
11111.23423 333.333 333.333 2017-10-03 2017-10-03
2017-10-03T17:05:45 2017-10-03T17:05:45
+10003 dd 上海 true 44 1111 -3241 -45235 1.34
54626.324 444.444 444.444 2017-10-04 2017-10-04
2017-10-04T12:59:12 2017-10-04T12:59:12
+10004 ee 成都 false 55 -9999 21342 4513456 1.22 111.33
555.555 555.555 2017-10-05 2017-10-05 2017-10-05T11:20
2017-10-05T11:20
+10005 ff 西安 false 66 8888 64562 4356 9.133
23423.45 666.666 666.666 2017-10-06 2017-10-06
2017-10-06T12:00:15 2017-10-06T12:00:15
+10006 gg 深圳 true 77 -7777 -12313342 34534 8.1
12.0 777.777 777.777 2017-10-07 2017-10-07 2017-10-07T13:20:22
2017-10-07T13:20:22
+10007 hh 杭州 false 88 6666 314234 43535356 34.124
324.0 888.888 888.888 2017-10-08 2017-10-08 2017-10-08T14:58:10
2017-10-08T14:58:10
+10008 ii 上海 true 99 -5555 1341 23434534 342.12
34234.1 999.999 999.999 2017-10-09 2017-10-09 \N \N
+10009 jj 南京 false 11 4444 -123 53623567 11.22
324.33 111.111 111.111 2017-10-10 2017-10-10 2017-10-10T16:25:42
2017-10-10T16:25:42
+10010 kk 成都 false 22 -3333 12314 674567 13.0
45464.435 222.222 222.222 2017-10-11 2017-10-11
2017-10-11T17:22:24 2017-10-11T17:22:24
+
+-- !sql7 --
+10000 aa 西安 22 0 1234567 陕西西安 2016-02-21T07:05:01
+10000 aa 北京 21 0 1234567 北京 2017-03-11T06:01:02
+10001 bb 上海 20 1 1234567 上海 2012-05-22T12:59:12
+10001 bb 天津 33 1 1234567 天津 2019-01-11T17:05:45
+10002 bb 上海 20 1 1234567 上海 2013-06-02T12:59:12
+10003 cc 广州 32 0 1234567 广东广州 2015-08-12T11:25
+10003 cc 广州 32 0 1234567 广东广州 2014-07-02T11:20
+10004 dd 杭州 47 0 1234567 浙江杭州 2017-11-23T13:26:22
+10004 dd 深圳 33 1 1234567 广东深圳 2016-12-01T14:04:15
+10005 dd 深圳 19 0 1234567 广东深圳 2018-10-03T12:27:22
+10005 ee 成都 21 1 1234567 四川成都 2019-09-03T11:24:22
+
+-- !sql8 --
+10000 aa 西安 22 0 1234567 陕西西安 2016-02-21T07:05:01
+10001 bb 上海 20 1 1234567 上海 2012-05-22T12:59:12
+10002 bb 上海 20 1 1234567 上海 2013-06-02T12:59:12
+10003 cc 广州 32 0 1234567 广东广州 2015-08-12T11:25
+10004 dd 杭州 47 0 1234567 浙江杭州 2017-11-23T13:26:22
+10005 dd 深圳 19 0 1234567 广东深圳 2018-10-03T12:27:22
+10005 ee 成都 21 1 1234567 四川成都 2019-09-03T11:24:22
+
+-- !sql9 --
+10000 aa 西安 22 0 1234567 陕西西安 2016-02-21T07:05:01
+10001 bb 上海 20 1 1234567 上海 2012-05-22T12:59:12
+10002 bb 上海 20 1 1234567 上海 2013-06-02T12:59:12
+10003 cc 广州 32 0 1234567 广东广州 2015-08-12T11:25
+10004 dd 杭州 47 0 1234567 浙江杭州 2017-11-23T13:26:22
+10005 dd 深圳 19 0 1234567 广东深圳 2018-10-03T12:27:22
+10005 ee 成都 21 1 1234567 四川成都 2019-09-03T11:24:22
+
+-- !sql10 --
+1500
+
+-- !sql11 --
+10000 aa
+10001 bb
+10002 cc
+10003 dd
+10004 ee
+10005 ff
+10006 gg
+10007 hh
+10008 ii
+10009 jj
+10010 kk
+
+-- !sql12 --
+10000 aa
+10001 bb
+10002 cc
+10003 dd
+10004 ee
+10005 ff
+10006 gg
+10007 hh
+10008 ii
+10009 jj
+10010 kk
+
diff --git
a/regression-test/data/rollup_p0/test_materialized_view_move_memtable.out
b/regression-test/data/rollup_p0/test_materialized_view_move_memtable.out
new file mode 100644
index 0000000000..7caf3713fe
--- /dev/null
+++ b/regression-test/data/rollup_p0/test_materialized_view_move_memtable.out
@@ -0,0 +1,52 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+test_materialized_view1 DUP_KEYS record_id INT INT
Yes true \N true
+ seller_id INT INT Yes true \N
true
+ store_id INT INT Yes true \N
true
+ sale_date DATE DATEV2 Yes false \N NONE
true
+ sale_amt BIGINT BIGINT Yes false \N NONE
true
+
+amt_sum AGG_KEYS mv_store_id INT INT Yes true
\N true `store_id`
+ mva_SUM__`sale_amt` BIGINT BIGINT Yes false \N
SUM true `sale_amt`
+
+-- !sql --
+test_materialized_view2 DUP_KEYS record_id INT INT
Yes true \N true
+ seller_id INT INT Yes true \N
true
+ store_id INT INT Yes true \N
true
+ sale_date DATE DATEV2 Yes false \N NONE
true
+ sale_amt BIGINT BIGINT Yes false \N NONE
true
+
+seller_id_order DUP_KEYS mv_store_id INT INT Yes
true \N true `store_id`
+ mv_seller_id INT INT Yes true \N
true `seller_id`
+ mv_sale_amt BIGINT BIGINT Yes false \N NONE
true `sale_amt`
+
+-- !sql --
+1 1 1 2020-05-30 100
+2 1 1 2020-05-30 100
+
+-- !sql --
+1 200
+
+-- !sql --
+1 1 1 2020-05-30 100
+2 1 1 2020-05-30 100
+
+-- !sql --
+1 200
+
+-- !sql --
+
+
+ mva_SUM__CASE WHEN `sale_amt` IS NULL THEN 0 ELSE 1 END BIGINT
BIGINT No false \N SUM true CASE WHEN `sale_amt` IS NULL
THEN 0 ELSE 1 END
+ mva_SUM__`sale_amt` BIGINT BIGINT Yes false \N
SUM true `sale_amt`
+ sale_amt BIGINT BIGINT Yes false \N NONE
true
+ sale_date DATE DATEV2 Yes false \N NONE
true
+ seller_id INT INT Yes true \N
true
+ store_id INT INT Yes true \N
true
+amt_count AGG_KEYS mv_store_id INT INT Yes true
\N true `store_id`
+amt_sum AGG_KEYS mv_store_id INT INT Yes true
\N true `store_id`
+test_materialized_view1 DUP_KEYS record_id INT INT
Yes true \N true
+
+-- !sql --
+1 2
+
diff --git
a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
new file mode 100644
index 0000000000..bc9db5add0
--- /dev/null
+++ b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_move_memtable") {
+ sql """ set enable_memtable_on_sink_node=true """
+ // todo: test insert, such as insert values, insert select, insert txn
+ sql "show load"
+ def test_baseall = "test_query_db.baseall";
+ def test_bigtable = "test_query_db.bigtable";
+ def insert_tbl = "test_insert_tbl_mm";
+
+ sql """ DROP TABLE IF EXISTS ${insert_tbl}"""
+ sql """
+ CREATE TABLE ${insert_tbl} (
+ `k1` char(5) NULL,
+ `k2` int(11) NULL,
+ `k3` tinyint(4) NULL,
+ `k4` int(11) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+
+ sql """
+ INSERT INTO ${insert_tbl}
+ SELECT a.k6, a.k3, b.k1
+ , sum(b.k1) OVER (PARTITION BY a.k6 ORDER BY a.k3) AS w_sum
+ FROM ${test_baseall} a
+ JOIN ${test_bigtable} b ON a.k1 = b.k1 + 5
+ ORDER BY a.k6, a.k3, a.k1, w_sum
+ """
+
+ qt_sql1 "select * from ${insert_tbl} order by 1, 2, 3, 4"
+
+ def insert_tbl_dft = "test_insert_dft_tbl_mm"
+ sql """ DROP TABLE IF EXISTS ${insert_tbl_dft}"""
+
+ // `k7` should be float type, and bug exists now,
https://github.com/apache/doris/pull/20867
+ // `k9` should be char(16), and bug exists now as error msg raised:"can
not cast from origin type TINYINT to target type=CHAR(16)" when doing insert
+ // "`k13` datetime default CURRENT_TIMESTAMP" might have cast error in
strict mode when doing insert:
+ // [INTERNAL_ERROR]Invalid value in strict mode for function CAST, source
column String, from type String to type DateTimeV2
+ sql """
+ CREATE TABLE ${insert_tbl_dft} (
+ `k1` boolean default "true",
+ `k2` tinyint default "10",
+ `k3` smallint default "10000",
+ `k4` int default "10000000",
+ `k5` bigint default "92233720368547758",
+ `k6` largeint default "19223372036854775807",
+
+ `k8` double default "3.14159",
+
+ `k10` varchar(64) default "hello world, today is 15/06/2023",
+ `k11` date default "2023-06-15",
+ `k12` datetime default "2023-06-15 16:10:15"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_nereids_dml=true """
+ sql """ insert into ${insert_tbl_dft} values() """
+
+ sql """ set enable_nereids_planner=false """
+ sql """ set enable_nereids_dml=false """
+ sql """ insert into ${insert_tbl_dft} values() """
+
+ qt_select """ select k1,k2,k3,k4,k5,k6,k8,k10,k11,k12 from
${insert_tbl_dft} """
+ sql """ set enable_memtable_on_sink_node=false """
+}
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_move_memtable.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_move_memtable.groovy
new file mode 100644
index 0000000000..08feba85cc
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_move_memtable.groovy
@@ -0,0 +1,969 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import java.util.Date
+import java.text.SimpleDateFormat
+
+suite("test_stream_load_move_memtable", "p0") {
+ sql "show tables"
+
+ def tableName = "test_stream_load_strict_mm"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+ `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+ `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+ `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+ `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+ `v8` datetime REPLACE_IF_NOT_NULL NULL,
+ `v9` date REPLACE_IF_NOT_NULL NULL,
+ `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+ `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+ `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`k1`)
+ (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
+ PARTITION partition_b VALUES [("100000"), ("1000000000")),
+ PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+ PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // test strict_mode success
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ sql "sync"
+ qt_sql "select * from ${tableName} order by k1, k2"
+
+ // test strict_mode fail
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'test_strict_mode_fail.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(1, json.NumberFilteredRows)
+ }
+ }
+
+ sql "sync"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `id` int(11) NULL,
+ `value` varchar(64) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2",
+ "disable_auto_compaction" = "false"
+ );
+ """
+
+ streamLoad {
+ table "${tableName}"
+
+ set 'line_delimiter', 'weizuo'
+ set 'column_separator', '|'
+ set 'columns', 'id, value'
+
+ file 'test_line_delimiter.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(3, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ sql "sync"
+ rowCount = sql "select count(1) from ${tableName}"
+ assertEquals(3, rowCount[0][0])
+
+ // test load_nullable_to_not_nullable
+ def tableName2 = "load_nullable_to_not_nullable"
+ sql """ DROP TABLE IF EXISTS ${tableName2} """
+ sql """
+ CREATE TABLE IF NOT EXISTS `${tableName2}` (
+ k1 int(32) NOT NULL,
+ k2 smallint NOT NULL,
+ k3 int NOT NULL,
+ k4 bigint NOT NULL,
+ k5 decimal(9, 3) NOT NULL,
+ k6 char(5) NOT NULL,
+ k10 date NOT NULL,
+ k11 datetime NOT NULL,
+ k7 varchar(20) NOT NULL,
+ k8 double max NOT NULL,
+ k9 float sum NOT NULL )
+ AGGREGATE KEY(k1,k2,k3,k4,k5,k6,k10,k11,k7)
+ PARTITION BY RANGE(k2) (
+ PARTITION partition_a VALUES LESS THAN MAXVALUE
+ )
+ DISTRIBUTED BY HASH(k1, k2, k5)
+ BUCKETS 3
+ PROPERTIES ( "replication_allocation" = "tag.location.default: 1");
+ """
+
+ streamLoad {
+ table "${tableName2}"
+
+ set 'column_separator', '\t'
+ set 'columns',
'col,k1=year(col),k2=month(col),k3=month(col),k4=day(col),k5=7.7,k6="a",k10=date(col),k11=FROM_UNIXTIME(2019,"%Y-%m-%dT%H:%i:%s"),k7="k7",k8=month(col),k9=day(col)'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'test_time.data'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(1, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql "sync"
+ order_qt_sql1 " SELECT * FROM ${tableName2}"
+
+ // test common case
+ def tableName3 = "test_all"
+ def tableName4 = "test_less_col"
+ def tableName5 = "test_bitmap_and_hll"
+ def tableName6 = "test_unique_key"
+ def tableName7 = "test_unique_key_with_delete"
+ def tableName8 = "test_array"
+ def tableName10 = "test_struct"
+ sql """ DROP TABLE IF EXISTS ${tableName3} """
+ sql """ DROP TABLE IF EXISTS ${tableName4} """
+ sql """ DROP TABLE IF EXISTS ${tableName5} """
+ sql """ DROP TABLE IF EXISTS ${tableName6} """
+ sql """ DROP TABLE IF EXISTS ${tableName7} """
+ sql """ DROP TABLE IF EXISTS ${tableName8} """
+ sql """ DROP TABLE IF EXISTS ${tableName10} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName3} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `k3` smallint(6) NULL,
+ `k4` bigint(20) NULL,
+ `k5` largeint(40) NULL,
+ `k6` float NULL,
+ `k7` double NULL,
+ `k8` decimal(9, 0) NULL,
+ `k9` char(10) NULL,
+ `k10` varchar(1024) NULL,
+ `k11` text NULL,
+ `k12` date NULL,
+ `k13` datetime NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName4} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `k3` smallint(6) NULL,
+ `k4` bigint(20) NULL,
+ `k5` largeint(40) NULL
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName5} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `v1` bitmap bitmap_union,
+ `v2` hll hll_union
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName6} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `v1` varchar(1024)
+ ) ENGINE=OLAP
+ UNIQUE KEY(k1, k2)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName7} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `v1` varchar(1024)
+ ) ENGINE=OLAP
+ UNIQUE KEY(k1, k2)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "function_column.sequence_type" = "int",
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName8} (
+ `k1` INT(11) NULL COMMENT "",
+ `k2` ARRAY<SMALLINT> NULL COMMENT "",
+ `k3` ARRAY<INT(11)> NULL COMMENT "",
+ `k4` ARRAY<BIGINT> NULL COMMENT "",
+ `k5` ARRAY<CHAR> NULL COMMENT "",
+ `k6` ARRAY<VARCHAR(20)> NULL COMMENT "",
+ `k7` ARRAY<DATE> NULL COMMENT "",
+ `k8` ARRAY<DATETIME> NULL COMMENT "",
+ `k9` ARRAY<FLOAT> NULL COMMENT "",
+ `k10` ARRAY<DOUBLE> NULL COMMENT "",
+ `k11` ARRAY<DECIMAL(20, 6)> NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName10} (
+ `k1` INT(11) NULL COMMENT "",
+ `k2` STRUCT<
+ f1:SMALLINT,
+ f2:INT(11),
+ f3:BIGINT,
+ f4:CHAR,
+ f5:VARCHAR(20),
+ f6:DATE,
+ f7:DATETIME,
+ f8:FLOAT,
+ f9:DOUBLE,
+ f10:DECIMAL(20, 6)> NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ // load all columns
+ streamLoad {
+ table "${tableName3}"
+
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2500, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql "sync"
+ order_qt_all11 "SELECT count(*) FROM ${tableName3}" // 2500
+ order_qt_all12 "SELECT count(*) FROM ${tableName3} where k1 <= 10" // 11
+ sql """truncate table ${tableName3}"""
+ sql """sync"""
+
+ // load part of columns
+ streamLoad {
+ table "${tableName3}"
+
+ set 'column_separator', ','
+ set 'columns', 'k1, k2'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals(0, json.NumberLoadedRows)
+ }
+ }
+ sql "sync"
+
+ // load with skip 2 columns, with gzip
+ streamLoad {
+ table "${tableName3}"
+
+ set 'column_separator', ','
+ set 'columns', 'k1, k2, k3, k4, tmp1, tmp2, k7, k8, k9, k10, k11, k12,
k13'
+ set 'compress_type', 'gz'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'all_types.csv.gz'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2500, json.NumberTotalRows)
+ assertEquals(2500, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all21 "SELECT count(*) FROM ${tableName3}" // 2500
+ order_qt_all22 "SELECT count(*) FROM ${tableName3} where k1 is null" // 0
+ order_qt_all23 "SELECT count(*) FROM ${tableName3} where k5 is null" //
2500
+ order_qt_all24 "SELECT count(*) FROM ${tableName3} where k6 is null" //
2500
+ sql """truncate table ${tableName3}"""
+ sql """sync"""
+
+ // load with column mapping and where predicate
+ streamLoad {
+ table "${tableName3}"
+
+ set 'column_separator', ','
+ set 'columns', 'k1, k2, k3, k4, tmp5, k6, tmpk7, k8, k9, k10, k11,
k12, k13, k7=tmpk7+1'
+ set 'where', 'k1 <= 10'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2500, json.NumberTotalRows)
+ assertEquals(11, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(2489, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all31 "SELECT count(*) FROM ${tableName3}" // 11
+ order_qt_all32 "SELECT count(*) FROM ${tableName3} where k7 >= 7" // 11
+ order_qt_all33 "SELECT count(*) FROM ${tableName3} where k5 is null" // 11
+ sql """truncate table ${tableName3}"""
+ sql """sync"""
+
+ // load without strict_mode
+ streamLoad {
+ table "${tableName3}"
+
+ set 'column_separator', ','
+ set 'columns', 'tmpk1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12,
k13, k1=k13'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2500, json.NumberTotalRows)
+ assertEquals(2500, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all41 "SELECT count(*) FROM ${tableName3} where k1 is null" //
2500
+ sql """truncate table ${tableName3}"""
+ sql """sync"""
+
+ // load with strict_mode false and max_filter_ratio
+ streamLoad {
+ table "${tableName4}"
+
+ set 'column_separator', ','
+ set 'columns', 'k1, k2, k3, k4, tmpk5, tmpk6, tmpk7, tmpk8, tmpk9,
tmpk10, tmpk11, tmpk12, k5'
+ set 'max_filter_ratio', '1'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2500, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(2500, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all51 "SELECT count(*) FROM ${tableName4}" // 0
+ sql """truncate table ${tableName4}"""
+ sql """sync"""
+
+ // load with strict_mode true and max_filter_ratio
+ streamLoad {
+ table "${tableName4}"
+
+ set 'column_separator', ','
+ set 'columns', 'k1, k2, k3, k4, tmpk5, tmpk6, tmpk7, tmpk8, tmpk9,
tmpk10, tmpk11, tmpk12, k5'
+ set 'max_filter_ratio', '0'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals(0, json.NumberLoadedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all61 "SELECT count(*) FROM ${tableName4}" // 0
+ sql """truncate table ${tableName4}"""
+ sql """sync"""
+
+ // load bitmap and hll with bzip2
+ streamLoad {
+ table "${tableName5}"
+
+ set 'column_separator', ','
+ set 'columns', 'k1, k2, tmp1, tmp2, v1=to_bitmap(tmp1),
v2=hll_hash(tmp2)'
+ set 'compress_type', 'bz2'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'bitmap_hll.csv.bz2'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(1025, json.NumberTotalRows)
+ assertEquals(1025, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all71 "SELECT k1, k2, bitmap_union_count(v1), HLL_UNION_AGG(v2)
FROM ${tableName5} group by k1, k2" // 1,2,1025,1028
+ sql """truncate table ${tableName5}"""
+ sql """sync"""
+
+ // load unique key
+ streamLoad {
+ table "${tableName6}"
+
+ set 'column_separator', ','
+ set 'compress_type', 'lz4'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'unique_key.csv.lz4'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(8001, json.NumberTotalRows)
+ assertEquals(8001, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all81 "SELECT count(*) from ${tableName6}" // 2
+ sql """truncate table ${tableName6}"""
+ sql """sync"""
+
+ // load unique key with delete and sequence
+ streamLoad {
+ table "${tableName7}"
+
+ set 'column_separator', ','
+ set 'columns', 'k1,k2,v1,del,seq'
+ set 'delete', 'del=1'
+ set 'merge_type', 'merge'
+ set 'function_column.sequence_col', 'seq'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'unique_key_with_delete.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(6, json.NumberTotalRows)
+ assertEquals(6, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all91 "SELECT count(*) from ${tableName7}" // 2
+ sql """truncate table ${tableName7}"""
+ sql """sync"""
+
+ // ===== test array stream load
+ // malformat without strictmode
+ streamLoad {
+ table "${tableName8}"
+
+ set 'column_separator', '|'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'array_malformat.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(5, json.NumberTotalRows)
+ assertEquals(5, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all101 "SELECT * from ${tableName8}" // 5
+ sql """truncate table ${tableName8}"""
+ sql """sync"""
+
+ // malformat with strictmode
+ streamLoad {
+ table "${tableName8}"
+
+ set 'column_separator', '|'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'array_malformat.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals(5, json.NumberTotalRows)
+ assertEquals(3, json.NumberLoadedRows)
+ assertEquals(2, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+
+ // normal load
+ streamLoad {
+ table "${tableName8}"
+
+ set 'column_separator', '|'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'array_normal.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(9, json.NumberTotalRows)
+ assertEquals(9, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ order_qt_all102 "SELECT * from ${tableName8}" // 8
+ sql """truncate table ${tableName8}"""
+ sql """sync"""
+
+ // malformat with mismatch array type
+ streamLoad {
+ table "${tableName8}"
+
+ set 'column_separator', '|'
+ set 'columns',
'k1,k2,k3,k4,k5,k6,k7,k8,k9,b10,k11,k10=array_remove(cast(k5 as array<bigint>),
1)'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'array_normal.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains('Don\'t support load from type'))
+ }
+ }
+ sql "sync"
+
+ // ===== test struct stream load
+ // malformat without strictmode
+ streamLoad {
+ table "${tableName10}"
+
+ set 'column_separator', '|'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'struct_malformat.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(5, json.NumberTotalRows)
+ assertEquals(5, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ qt_all111 "SELECT * from ${tableName10} order by k1" // 5
+ sql """truncate table ${tableName10}"""
+ sql """sync"""
+
+ // malformat with strictmode
+ streamLoad {
+ table "${tableName10}"
+
+ set 'column_separator', '|'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'struct_malformat.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals(5, json.NumberTotalRows)
+ assertEquals(3, json.NumberLoadedRows)
+ assertEquals(2, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+
+ // normal load
+ streamLoad {
+ table "${tableName10}"
+
+ set 'column_separator', '|'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'struct_normal.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(13, json.NumberTotalRows)
+ assertEquals(13, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ qt_all112 "SELECT * from ${tableName10} order by k1" // 10
+ sql """truncate table ${tableName10}"""
+ sql """sync"""
+
+ // test immutable partition success
+ def tableName9 = "test_immutable_partition"
+ sql """ DROP TABLE IF EXISTS ${tableName9} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName9} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`k1`)
+ (PARTITION partition_a VALUES [("-9223372036854775808"), ("10")),
+ PARTITION partition_b VALUES [("10"), ("20")),
+ PARTITION partition_c VALUES [("20"), ("30")),
+ PARTITION partition_d VALUES [("30"), ("40")))
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ sql """ALTER TABLE ${tableName9} ADD PARTITION partition_e VALUES less
than ('3000') properties ('mutable' = 'false')"""
+ sql """ALTER TABLE ${tableName9} MODIFY PARTITION partition_b set
('mutable' = 'false')"""
+
+ streamLoad {
+ table "${tableName9}"
+
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v1, v2, v3'
+ set 'partitions', 'partition_a, partition_b, partition_c, partition_d,
partition_e'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'test_immutable_partition.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(5, json.NumberUnselectedRows)
+ }
+ }
+
+ sql "sync"
+ order_qt_sql1 "select * from ${tableName9} order by k1, k2"
+
+ // test common user
+ def tableName13 = "test_common_user"
+ sql """ DROP TABLE IF EXISTS ${tableName13} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName13} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`k1`)
+ (PARTITION partition_a VALUES [("-9223372036854775808"), ("10")),
+ PARTITION partition_b VALUES [("10"), ("20")),
+ PARTITION partition_c VALUES [("20"), ("30")),
+ PARTITION partition_d VALUES [("30"), ("40")))
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ sql """create USER common_user@'%' IDENTIFIED BY '123456'"""
+ sql """GRANT LOAD_PRIV ON *.* TO 'common_user'@'%';"""
+
+ streamLoad {
+ table "${tableName13}"
+
+ set 'column_separator', '|'
+ set 'columns', 'k1, k2, v1, v2, v3'
+ set 'strict_mode', 'true'
+ set 'Authorization', 'Basic Y29tbW9uX3VzZXI6MTIzNDU2'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'test_auth.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+
+ sql "sync"
+ sql """DROP USER 'common_user'@'%'"""
+
+ // test default value
+ def tableName14 = "test_default_value"
+ sql """ DROP TABLE IF EXISTS ${tableName14} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName14} (
+ `k1` bigint(20) NULL DEFAULT "1",
+ `k2` bigint(20) NULL ,
+ `v1` tinyint(4) NULL,
+ `v2` tinyint(4) NULL,
+ `v3` tinyint(4) NULL,
+ `v4` DATETIME NULL DEFAULT CURRENT_TIMESTAMP
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ streamLoad {
+ table "${tableName14}"
+
+ set 'column_separator', '|'
+ set 'columns', 'k2, v1, v2, v3'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+
+ file 'test_default_value.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+
+ sql "sync"
+ def res = sql "select * from ${tableName14}"
+ def time = res[0][5].toString().split("T")[0].split("-")
+ def year = time[0].toString()
+ SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd")
+ def now = sdf.format(new Date()).toString().split("-")
+
+ // parse time is correct
+ // Due to the time difference in parsing, should deal with three
situations:
+ // 2023-6-29 -> 2023-6-30
+ // 2023-6-30 -> 2023-7-1
+ // 2023-12-31 -> 2024-1-1
+ // now only compare year simply, you can retry if this test is error.
+ assertEquals(year, now[0])
+ // parse k1 default value
+ assertEquals(res[0][0], 1)
+ assertEquals(res[1][0], 1)
+}
+
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_new_move_memtable.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_new_move_memtable.groovy
new file mode 100644
index 0000000000..1fb43b21c7
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_new_move_memtable.groovy
@@ -0,0 +1,559 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Random;
+
+suite("test_stream_load_new_move_memtable", "p0") {
+
+ sql """set enable_memtable_on_sink_node=true"""
+
+ // 1. test column with currenttimestamp default value
+ def tableName1 = "test_stream_load_new_current_timestamp_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName1} (
+ id int,
+ name CHAR(10),
+ dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+ dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+ dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+ dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'columns', 'id, name'
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName1}"
+ time 10000
+ file 'test_stream_load_new_current_timestamp.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql1 "select id, name from ${tableName1}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName1}"
+ }
+
+ // 2. test change column order
+ def tableName2 = "test_stream_load_new_change_column_order_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName2} (
+ k1 int,
+ k2 smallint,
+ k3 CHAR(10),
+ k4 bigint,
+ k5 decimal(6, 3),
+ k6 float
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'columns', 'k1, k3, k2, k4, k6, k5'
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName2}"
+ time 10000
+ file 'test_stream_load_new_change_column_order.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql2 "select * from ${tableName2}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName2}"
+ }
+
+ // 3. test with function
+ def tableName3 = "test_stream_load_new_function_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName3} (
+ id int,
+ name CHAR(10),
+ year int,
+ month int,
+ day int
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'columns', 'id, name, tmp_c3, year = year(tmp_c3), month =
month(tmp_c3), day = day(tmp_c3)'
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName3}"
+ time 10000
+ file 'test_stream_load_new_function.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql3 "select * from ${tableName3}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName3}"
+ }
+
+ // 4. test column number mismatch
+ def tableName4 = "test_stream_load_new_column_number_mismatch_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName4} (
+ k1 int NOT NULL,
+ k2 CHAR(10) NOT NULL,
+ k3 smallint NOT NULL
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName4}"
+ time 10000
+ file 'test_stream_load_new_column_number_mismatch.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ }
+ }
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Distribution column(id) doesn't
exist"), e.getMessage())
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName4}"
+ }
+
+ // 5. test with default value
+ def tableName5 = "test_stream_load_new_default_value_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName5} (
+ id int NOT NULL,
+ name CHAR(10) NOT NULL,
+ date DATE NOT NULL,
+ max_dwell_time INT DEFAULT "0",
+ min_dwell_time INT DEFAULT "99999"
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'columns', 'id, name, date'
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName5}"
+ time 10000
+ file 'test_stream_load_new_default_value.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql5 "select * from ${tableName5}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName5}"
+ }
+
+ // 6. test some column type
+ def tableName6 = "test_stream_load_new_column_type_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName6} (
+ c_int int(11) NULL,
+ c_char char(15) NULL,
+ c_varchar varchar(100) NULL,
+ c_bool boolean NULL,
+ c_tinyint tinyint(4) NULL,
+ c_smallint smallint(6) NULL,
+ c_bigint bigint(20) NULL,
+ c_largeint largeint(40) NULL,
+ c_float float NULL,
+ c_double double NULL,
+ c_decimal decimal(6, 3) NULL,
+ c_decimalv3 decimalv3(6, 3) NULL,
+ c_date date NULL,
+ c_datev2 datev2 NULL,
+ c_datetime datetime NULL,
+ c_datetimev2 datetimev2(0) NULL
+ )
+ DISTRIBUTED BY HASH(c_int) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName6}"
+ time 10000
+ file 'test_stream_load_new_column_type.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql6 "select * from ${tableName6}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName6}"
+ }
+
+ // 7. test duplicate key
+ def tableName7 = "test_stream_load_duplicate_key_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName7}
+ (
+ user_id LARGEINT NOT NULL,
+ username VARCHAR(50) NOT NULL,
+ city VARCHAR(20),
+ age SMALLINT,
+ sex TINYINT,
+ phone LARGEINT,
+ address VARCHAR(500),
+ register_time DATETIME
+ )
+ DUPLICATE KEY(`user_id`, `username`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName7}"
+ time 10000
+ file 'test_stream_load_data_model.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql7 "select * from ${tableName7}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName7}"
+ }
+
+ // 8. test merge on read unique key
+ def tableName8 = "test_stream_load_unique_key_merge_on_read_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName8}
+ (
+ user_id LARGEINT NOT NULL,
+ username VARCHAR(50) NOT NULL,
+ city VARCHAR(20),
+ age SMALLINT,
+ sex TINYINT,
+ phone LARGEINT,
+ address VARCHAR(500),
+ register_time DATETIME
+ )
+ UNIQUE KEY(`user_id`, `username`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName8}"
+ time 10000
+ file 'test_stream_load_data_model.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql8 "select * from ${tableName8}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName8}"
+ }
+
+ // 9. test merge on write unique key
+ def tableName9 = "test_stream_load_unique_key_merge_on_write_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName9}
+ (
+ user_id LARGEINT NOT NULL,
+ username VARCHAR(50) NOT NULL,
+ city VARCHAR(20),
+ age SMALLINT,
+ sex TINYINT,
+ phone LARGEINT,
+ address VARCHAR(500),
+ register_time DATETIME
+ )
+ UNIQUE KEY(`user_id`, `username`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName9}"
+ time 10000
+ file 'test_stream_load_data_model.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql9 "select * from ${tableName9}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName9}"
+ }
+
+ // 10. test stream load multiple times
+ def tableName10 = "test_stream_load_multiple_times_mm"
+ Random rd = new Random()
+ def disable_auto_compaction = "false"
+ if (rd.nextBoolean()) {
+ disable_auto_compaction = "true"
+ }
+ log.info("disable_auto_compaction: ${disable_auto_compaction}".toString())
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName10}
+ (
+ user_id LARGEINT NOT NULL,
+ username VARCHAR(50) NOT NULL,
+ money INT
+ )
+ DUPLICATE KEY(`user_id`, `username`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "${disable_auto_compaction}"
+ )
+ """
+ for (int i = 0; i < 3; ++i) {
+ streamLoad {
+ set 'column_separator', ','
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName10}"
+ time 10000
+ file 'test_stream_load_multiple_times.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(500, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ }
+
+ qt_sql10 "select count(*) from ${tableName10}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName10}"
+ }
+
+ // 11. test stream load column separator
+ def tableName11 = "test_stream_load_column_separator_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName11} (
+ id int,
+ name CHAR(10),
+ dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+ dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+ dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+ dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', '--'
+ set 'columns', 'id, name'
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName11}"
+ time 10000
+ file 'test_stream_load_new_column_separator.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql11 "select id, name from ${tableName11}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName11}"
+ }
+
+ // 12. test stream load line delimiter
+ def tableName12 = "test_stream_load_line_delimiter_mm"
+
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName12} (
+ id int,
+ name CHAR(10),
+ dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP,
+ dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP,
+ dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP,
+ dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'column_separator', ','
+ set 'line_delimiter', '||'
+ set 'columns', 'id, name'
+ set 'memtable_on_sink_node', 'true'
+ table "${tableName12}"
+ time 10000
+ file 'test_stream_load_new_line_delimiter.csv'
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(11, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql12 "select id, name from ${tableName12}"
+ } finally {
+ try_sql "DROP TABLE IF EXISTS ${tableName12}"
+ }
+
+ sql """set enable_memtable_on_sink_node=false"""
+}
+
diff --git
a/regression-test/suites/rollup_p0/test_materialized_view_move_memtable.groovy
b/regression-test/suites/rollup_p0/test_materialized_view_move_memtable.groovy
new file mode 100644
index 0000000000..917fbdc08a
--- /dev/null
+++
b/regression-test/suites/rollup_p0/test_materialized_view_move_memtable.groovy
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_materialized_view_move_memtable", "rollup") {
+
+ // because nereids cannot support rollup correctly forbid it temporary
+ sql """set enable_nereids_planner=false"""
+ sql """set enable_memtable_on_sink_node=true"""
+
+ def tbName1 = "test_materialized_view_mm"
+ def tbName2 = "test_materialized_view_dynamic_partition_mm"
+ def tbName3 = "test_materialized_view_schema_change_mm"
+ def tbName4 = "test_materialized_view_dynamic_partition_schema_change_mm"
+
+ def getJobState = { tableName ->
+ def jobStateResult = sql """ SHOW ALTER TABLE MATERIALIZED VIEW WHERE
TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """
+ return jobStateResult[0][8]
+ }
+
+ sql "DROP TABLE IF EXISTS ${tbName1}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName1}(
+ k1 DATE,
+ k2 DECIMAL(10, 2),
+ k3 CHAR(10),
+ k4 INT NOT NULL
+ )
+ DUPLICATE KEY(k1, k2)
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+ PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+ PARTITION p3 VALUES LESS THAN ("2020-01-01")
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 32
+ properties(
+ "light_schema_change" = "false",
+ "replication_num" = "1"
+ );
+ """
+
+ sql "DROP TABLE IF EXISTS ${tbName2}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName2}(
+ k1 DATE,
+ k2 DECIMAL(10, 2),
+ k3 CHAR(10),
+ k4 INT NOT NULL
+ )
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+ PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+ PARTITION p3 VALUES LESS THAN ("2020-01-01")
+ )
+ DISTRIBUTED BY HASH(k1)
+ PROPERTIES
+ (
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-2147483648",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "32",
+ "light_schema_change" = "false",
+ "replication_num"="1"
+ );
+ """
+
+ sql "DROP TABLE IF EXISTS ${tbName3}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName3}(
+ k1 DATE,
+ k2 DECIMAL(10, 2),
+ k3 CHAR(10),
+ k4 INT NOT NULL
+ )
+ DUPLICATE KEY(k1, k2)
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+ PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+ PARTITION p3 VALUES LESS THAN ("2020-01-01")
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 32
+
+ properties(
+ "light_schema_change" = "true",
+ "replication_num" = "1"
+ );
+ """
+
+ sql "DROP TABLE IF EXISTS ${tbName4}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName4}(
+ k1 DATE,
+ k2 DECIMAL(10, 2),
+ k3 CHAR(10),
+ k4 INT NOT NULL
+ )
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+ PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+ PARTITION p3 VALUES LESS THAN ("2020-01-01")
+ )
+ DISTRIBUTED BY HASH(k1)
+ PROPERTIES
+ (
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-2147483648",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "32",
+ "light_schema_change" = "true",
+ "replication_num"="1"
+ );
+ """
+
+ sql "CREATE materialized VIEW test_load_open AS SELECT k1 FROM ${tbName1}
GROUP BY k1;"
+ int max_try_secs = 60
+ while (max_try_secs--) {
+ String res = getJobState(tbName1)
+ if (res == "FINISHED" || res == "CANCELLED") {
+ assertEquals("FINISHED", res)
+ sleep(3000)
+ break
+ } else {
+ Thread.sleep(2000)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+
+ sql "CREATE materialized VIEW test_load_open_dynamic_partition AS SELECT
k1 FROM ${tbName2} GROUP BY k1;"
+ max_try_secs = 60
+ while (max_try_secs--) {
+ String res = getJobState(tbName2)
+ if (res == "FINISHED" || res == "CANCELLED") {
+ assertEquals("FINISHED", res)
+ sleep(3000)
+ break
+ } else {
+ Thread.sleep(2000)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+
+ sql "CREATE materialized VIEW test_load_open_schema_change AS SELECT k1
FROM ${tbName3} GROUP BY k1;"
+ max_try_secs = 60
+ while (max_try_secs--) {
+ String res = getJobState(tbName3)
+ if (res == "FINISHED" || res == "CANCELLED") {
+ assertEquals("FINISHED", res)
+ sleep(3000)
+ break
+ } else {
+ Thread.sleep(2000)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+
+ sql "CREATE materialized VIEW
test_load_open_dynamic_partition_schema_change AS SELECT k1 FROM ${tbName4}
GROUP BY k1;"
+ max_try_secs = 60
+ while (max_try_secs--) {
+ String res = getJobState(tbName4)
+ if (res == "FINISHED" || res == "CANCELLED") {
+ assertEquals("FINISHED", res)
+ sleep(3000)
+ break
+ } else {
+ Thread.sleep(2000)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+
+ sql "insert into ${tbName1} values('2000-05-20', 1.5, 'test', 1);"
+ sql "insert into ${tbName1} values('2010-05-20', 1.5, 'test', 1);"
+
+ sql "insert into ${tbName2} values('2000-05-20', 1.5, 'test', 1);"
+ sql "insert into ${tbName2} values('2010-05-20', 1.5, 'test', 1);"
+
+ sql "insert into ${tbName3} values('2000-05-20', 1.5, 'test', 1);"
+ sql "ALTER table ${tbName3} ADD COLUMN new_column INT;"
+ sql "insert into ${tbName3} values('2010-05-20', 1.5, 'test', 1, 1);"
+
+ sql "insert into ${tbName4} values('2000-05-20', 1.5, 'test', 1);"
+ sql "ALTER table ${tbName4} ADD COLUMN new_column INT;"
+ sql "insert into ${tbName4} values('2010-05-20', 1.5, 'test', 1, 1);"
+
+ sql "DROP TABLE ${tbName1} FORCE;"
+ sql "DROP TABLE ${tbName2} FORCE;"
+ sql "DROP TABLE ${tbName3} FORCE;"
+ sql "DROP TABLE ${tbName4} FORCE;"
+ sql """set enable_memtable_on_sink_node=false"""
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]