This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new add160b768e [improvement](regression-test) add more group commit
regression-test (#26952)
add160b768e is described below
commit add160b768ea68776ecd256772acd9de861cee34
Author: huanghaibin <[email protected]>
AuthorDate: Wed Nov 15 00:01:13 2023 +0800
[improvement](regression-test) add more group commit regression-test
(#26952)
---
...commit_http_stream_lineitem_multiple_client.out | 4 +
..._commit_http_stream_lineitem_multiple_table.out | 31 ++
...st_group_commit_http_stream_lineitem_normal.out | 10 +
...p_commit_http_stream_lineitem_schema_change.out | 16 +
...commit_insert_into_lineitem_multiple_client.out | 4 +
..._commit_insert_into_lineitem_multiple_table.out | 31 ++
...st_group_commit_insert_into_lineitem_normal.out | 4 +
...p_commit_insert_into_lineitem_scheme_change.out | 16 +
...commit_stream_load_lineitem_multiple_client.out | 4 +
..._commit_stream_load_lineitem_multiple_table.out | 31 ++
...st_group_commit_stream_load_lineitem_normal.out | 10 +
...p_commit_stream_load_lineitem_schema_change.out | 16 +
.../org/apache/doris/regression/suite/Suite.groovy | 6 +
...mit_http_stream_lineitem_multiple_client.groovy | 149 +++++++
...mmit_http_stream_lineitem_multiple_table.groovy | 140 +++++++
...group_commit_http_stream_lineitem_normal.groovy | 122 ++++++
...ommit_http_stream_lineitem_schema_change.groovy | 385 ++++++++++++++++++
...mit_insert_into_lineitem_multiple_client.groovy | 219 ++++++++++
...mmit_insert_into_lineitem_multiple_table.groovy | 225 +++++++++++
...group_commit_insert_into_lineitem_normal.groovy | 192 +++++++++
...ommit_insert_into_lineitem_scheme_change.groovy | 448 +++++++++++++++++++++
...mit_stream_load_lineitem_multiple_client.groovy | 146 +++++++
...mmit_stream_load_lineitem_multiple_table.groovy | 137 +++++++
...group_commit_stream_load_lineitem_normal.groovy | 119 ++++++
...ommit_stream_load_lineitem_schema_change.groovy | 353 ++++++++++++++++
25 files changed, 2818 insertions(+)
diff --git
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+6001215
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out
new file mode 100644
index 00000000000..7495e1a6b6b
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+600572
+
+-- !sql --
+599397
+
+-- !sql --
+600124
+
+-- !sql --
+599647
+
+-- !sql --
+599931
+
+-- !sql --
+601365
+
+-- !sql --
+599301
+
+-- !sql --
+600504
+
+-- !sql --
+599715
+
+-- !sql --
+600659
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out
new file mode 100644
index 00000000000..bac125e32fc
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_normal.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+6001215
+
+-- !sql --
+12002430
+
+-- !sql --
+18003645
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out
new file mode 100644
index 00000000000..1be3e26c88e
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+3601475
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+6001215
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out
new file mode 100644
index 00000000000..86534c17502
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+600572
+
+-- !sql --
+600659
+
+-- !sql --
+599397
+
+-- !sql --
+600124
+
+-- !sql --
+599647
+
+-- !sql --
+599931
+
+-- !sql --
+601365
+
+-- !sql --
+599301
+
+-- !sql --
+600504
+
+-- !sql --
+599715
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+6001215
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out
new file mode 100644
index 00000000000..4f530919417
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+3000816
+
+-- !sql --
+6001215
+
+-- !sql --
+4673070
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out
new file mode 100644
index 00000000000..ef23823e23e
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+6001215
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out
new file mode 100644
index 00000000000..7495e1a6b6b
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+600572
+
+-- !sql --
+599397
+
+-- !sql --
+600124
+
+-- !sql --
+599647
+
+-- !sql --
+599931
+
+-- !sql --
+601365
+
+-- !sql --
+599301
+
+-- !sql --
+600504
+
+-- !sql --
+599715
+
+-- !sql --
+600659
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out
new file mode 100644
index 00000000000..bac125e32fc
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_normal.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+6001215
+
+-- !sql --
+12002430
+
+-- !sql --
+18003645
+
diff --git
a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out
new file mode 100644
index 00000000000..1be3e26c88e
--- /dev/null
+++
b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+3601475
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
+-- !sql --
+6001215
+
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index c9e8fe4215e..9913a5381c8 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -266,6 +266,12 @@ class Suite implements GroovyInterceptable {
return result
}
+ List<List<Object>> insert_into_sql(String sqlStr, int num) {
+ logger.info("insert into " + num + " records")
+ def (result, meta) = JdbcUtils.executeToList(context.getConnection(),
sqlStr)
+ return result
+ }
+
def sql_return_maparray(String sqlStr) {
logger.info("Execute sql: ${sqlStr}".toString())
def (result, meta) = JdbcUtils.executeToList(context.getConnection(),
sqlStr)
diff --git
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy
new file mode 100644
index 00000000000..1bf941fb57a
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+suite("test_group_commit_http_stream_lineitem_multiple_client") {
+ def db = "regression_test_insert_p2"
+ def stream_load_table = "test_http_stream_lineitem_multiple_client_sf1"
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def total = 0;
+ def rwLock = new ReentrantReadWriteLock();
+ def wlock = rwLock.writeLock();
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ sleep(5000)
+ }
+ }
+
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
+ def create_stream_load_table = {
+ sql """ drop table if exists ${stream_load_table}; """
+
+ sql """
+ CREATE TABLE ${stream_load_table} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+
+ def do_stream_load = { i ->
+ logger.info("file:" + i)
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${stream_load_table}(l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity,
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11,
c12, c13, c14, c15, c16 from http_stream
+ ("format"="csv", "column_separator"="|")
+ """
+
+ set 'group_commit', 'true'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+ unset 'label'
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, rowCountArray[i - 1],
rowCountArray[i - 1], 0, 0)
+ }
+ }
+
+ logger.info("load num: " + rowCountArray[i - 1])
+ wlock.lock()
+ total += rowCountArray[i - 1];
+ wlock.unlock()
+
+ }
+
+ def process = {
+ def threads = []
+ for (int k = 1; k <= 10; k++) {
+ int n = k;
+ logger.info("insert into file:" + n)
+ threads.add(Thread.startDaemon {
+ do_stream_load(n)
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+
+ logger.info("total:" + total)
+ getRowCount(total, stream_load_table)
+
+ qt_sql """ select count(*) from ${stream_load_table}; """
+ }
+
+ try {
+ create_stream_load_table()
+ process()
+ } finally {
+
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy
new file mode 100644
index 00000000000..03932d96cd3
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_http_stream_lineitem_multiple_table") {
+ def db = "regression_test_insert_p2"
+ def stream_load_table_base = "test_http_stream_lineitem_multiple_table"
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ sleep(5000)
+ }
+ }
+
+ def create_stream_load_table = { table_name ->
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
+ def do_stream_load = { i, table_name ->
+ logger.info("file:" + i)
+
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${table_name}(l_orderkey, l_partkey,
l_suppkey, l_linenumber, l_quantity,
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11,
c12, c13, c14, c15, c16 from http_stream
+ ("format"="csv", "column_separator"="|")
+ """
+
+ set 'group_commit', 'true'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+ unset 'label'
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, rowCountArray[i - 1],
rowCountArray[i - 1], 0, 0)
+ }
+ }
+ getRowCount(rowCountArray[i - 1], table_name)
+ }
+
+ def process = {
+ def threads = []
+ for (int k = 1; k <= 10; k++) {
+ int n = k;
+ String table_name = stream_load_table_base + "_" + n;
+ create_stream_load_table(table_name)
+ threads.add(Thread.startDaemon {
+ do_stream_load(n, table_name)
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+
+ for (int k = 1; k <= 10; k++) {
+ String table_name = stream_load_table_base + "_" + k;
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+ }
+
+ try {
+ process()
+ } finally {
+
+ }
+
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy
new file mode 100644
index 00000000000..76ff680af11
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_http_stream_lineitem_normal") {
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ Thread.sleep(5000)
+ }
+ }
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+ def db = "regression_test_insert_p2"
+ def stream_load_table = "test_stream_load_lineitem_normal_sf1"
+ def create_stream_load_table = {
+ sql """ drop table if exists ${stream_load_table}; """
+
+ sql """
+ CREATE TABLE ${stream_load_table} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def process = {
+ int total = 0;
+ for (int k = 0; k < 3; k++) {
+ for (int i = 1; i <= 10; i++) {
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${stream_load_table}(l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity,
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11,
c12, c13, c14, c15, c16 from http_stream
+ ("format"="csv", "column_separator"="|")
+ """
+
+ set 'group_commit', 'true'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl."""
+ i
+ unset 'label'
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result,
rowCountArray[i - 1], rowCountArray[i - 1], 0, 0)
+ }
+ }
+ total += rowCountArray[i - 1];
+ }
+ getRowCount(total, stream_load_table)
+ qt_sql """ select count(*) from ${stream_load_table} """
+ }
+ }
+ try {
+ create_stream_load_table()
+ process()
+ } finally {
+
+ }
+
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy
new file mode 100644
index 00000000000..4a8359fb9c4
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+enum SC {
+ TRUNCATE_TABLE(1),
+ ADD_COLUMN(2),
+ DELETE(3),
+ DROP_COLUMN(4),
+ CHANGE_ORDER(5)
+ private int value
+
+ SC(int value) {
+ this.value = value
+ }
+
+ int getValue() {
+ return value
+ }
+}
+
+enum STATE {
+ NORMAL(1),
+ BEFORE_ADD_COLUMN(2),
+ DROP_COLUMN(3)
+ private int value
+
+ STATE(int value) {
+ this.value = value
+ }
+
+ int getValue() {
+ return value
+ }
+}
+
+suite("test_group_commit_http_stream_lineitem_schema_change") {
+ def db = "regression_test_insert_p2"
+ def stream_load_table = "test_http_stream_lineitem_schema_change_sf1"
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def total = 0;
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ Thread.sleep(5000)
+ }
+ }
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
+ def create_stream_load_table = { table_name ->
+ // create table
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+
+ def create_stream_load_table_less_column = { table_name ->
+ // create table
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+
+ }
+
+ def insert_data = { i, table_name ->
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${stream_load_table}(l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity,
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,
+l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11,
c12, c13, c14, c15, c16 from http_stream
+ ("format"="csv", "column_separator"="|")
+ """
+
+ set 'group_commit', 'true'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+ unset 'label'
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, rowCountArray[i - 1],
rowCountArray[i - 1], 0, 0)
+ }
+ }
+ total += rowCountArray[i - 1];
+ }
+
+ def insert_data_less_column = { i, table_name ->
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into ${db}.${stream_load_table}(l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity,
+l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_shipinstruct,l_shipmode,
+l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c14, c15,
c16 from http_stream
+ ("format"="csv", "column_separator"="|")
+ """
+
+ set 'group_commit', 'true'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+ unset 'label'
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, rowCountArray[i - 1],
rowCountArray[i - 1], 0, 0)
+ }
+ }
+ total += rowCountArray[i - 1];
+ }
+
+ def getAlterTableState = { table_name ->
+ def retry = 0
+ while (true) {
+ def state = sql "show alter table column where tablename =
'${table_name}' order by CreateTime desc "
+ logger.info("alter table state: ${state}")
+ logger.info("state:" + state[0][9]);
+ if (state.size() > 0 && state[0][9] == "FINISHED") {
+ return true
+ }
+ retry++
+ if (retry >= 60) {
+ return false
+ }
+ Thread.sleep(5000)
+ }
+ return false
+ }
+
+ def truncate = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ getRowCount(total, table_name)
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ truncate table ${table_name}; """
+ break
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ total = 0;
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process truncate total:" + total)
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def delete = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ def rowCount = sql """select count(*) from
${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;"""
+ logger.info("rowCount:" + rowCount)
+ sql """ delete from ${table_name} where l_orderkey
>=1000000 and l_orderkey <=5000000; """
+ total -= rowCount[0][0]
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process delete total:" + total)
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def drop_column = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} DROP column
l_receiptdate; """
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ if (i < 5) {
+ insert_data(i, table_name)
+ } else {
+ insert_data_less_column(i, table_name)
+ }
+ }
+ logger.info("process drop column total:" + total)
+ assertTrue(getAlterTableState(table_name), "drop column should
success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def add_column = { table_name ->
+ create_stream_load_table_less_column(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} ADD column
l_receiptdate DATEV2 after l_commitdate; """
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ if (i < 5) {
+ insert_data_less_column(i, table_name)
+ } else {
+ insert_data(i, table_name)
+ }
+ }
+ logger.info("process add column total:" + total)
+ assertTrue(getAlterTableState(table_name), "add column should success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def change_order = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 2) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} order by
(l_orderkey,l_shipdate,l_linenumber,
l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment);
"""
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process change order total:" + total)
+ assertTrue(getAlterTableState(table_name), "modify column order should
success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+
+ def process = { table_name ->
+ for (int i = 1; i <= 5; i++) {
+ switch (i) {
+ case SC.TRUNCATE_TABLE.value:
+ truncate(table_name)
+ break
+ case SC.DELETE.value:
+ delete(table_name)
+ break
+ case SC.DROP_COLUMN.value:
+ drop_column(table_name)
+ break
+ case SC.ADD_COLUMN.value:
+ add_column(table_name)
+ break
+ case SC.CHANGE_ORDER.value:
+ change_order(table_name)
+ break
+ }
+ }
+ }
+
+ try {
+ process(stream_load_table)
+ } finally {
+
+ }
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy
new file mode 100644
index 00000000000..64bb09f271c
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+String[] getFiles(String dirName, int num) {
+ File[] datas = new File(dirName).listFiles()
+ if (num != datas.length) {
+ throw new Exception("num not equals,expect:" + num + " vs real:" +
datas.length)
+ }
+ String[] array = new String[datas.length];
+ for (int i = 0; i < datas.length; i++) {
+ array[i] = datas[i].getPath();
+ }
+ Arrays.sort(array);
+ return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_multiple_client") {
+ String[] file_array;
+ def prepare = {
+ def dataDir = "${context.config.cacheDataPath}/lineitem/"
+ File dir = new File(dataDir)
+ if (!dir.exists()) {
+ new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+ for (int i = 1; i <= 10; i++) {
+ logger.info("download lineitem.tbl.${i}")
+ def download_file = """/usr/bin/curl
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+ }
+ }
+ file_array = getFiles(dataDir, 10)
+ for (String s : file_array) {
+ logger.info(s)
+ }
+ }
+ def insert_table = "test_insert_into_lineitem_multiple_client_sf1"
+ def batch = 100;
+ def total = 0;
+ def rwLock = new ReentrantReadWriteLock();
+ def wlock = rwLock.writeLock();
+
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ Thread.sleep(5000)
+ retry++
+ }
+ }
+
+ def create_insert_table = {
+ // create table
+ sql """ drop table if exists ${insert_table}; """
+
+ sql """
+ CREATE TABLE ${insert_table} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ }
+
+ def do_insert_into = { file_name ->
+ logger.info("file:" + file_name)
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ //read and insert
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new FileReader(file_name));
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ String s = null;
+ StringBuilder sb = null;
+ int c = 0;
+ int t = 0;
+ while (true) {
+ try {
+ if (c == batch) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, c);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ c = 0;
+ }
+ s = reader.readLine();
+ if (s != null) {
+ if (c == 0) {
+ sb = new StringBuilder();
+ sb.append("insert into ${insert_table} (l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount,
l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+ }
+ if (c > 0) {
+ sb.append(",");
+ }
+ String[] array = s.split("\\|");
+ sb.append("(");
+ for (int i = 0; i < array.length; i++) {
+ sb.append("\"" + array[i] + "\"");
+ if (i != array.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ c++;
+ t++;
+ } else if (c > 0) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, c);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ break;
+ } else {
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ {
+ logger.info("t: " + t)
+ wlock.lock()
+ total += t;
+ wlock.unlock()
+ }
+
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ def process = {
+ def threads = []
+ for (int k = 0; k < file_array.length; k++) {
+ int n = k;
+ String file_name = file_array[n]
+ logger.info("insert into file:" + file_name)
+ threads.add(Thread.startDaemon {
+ do_insert_into(file_name)
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+
+ getRowCount(total, insert_table)
+ qt_sql """ select count(*) from ${insert_table}; """
+ }
+
+ try {
+ prepare()
+ create_insert_table()
+ process()
+ } finally {
+
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy
new file mode 100644
index 00000000000..722b6973356
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+String[] getFiles(String dirName, int num) {
+ File[] datas = new File(dirName).listFiles()
+ if (num != datas.length) {
+ throw new Exception("num not equals,expect:" + num + " vs real:" +
datas.length)
+ }
+ String[] array = new String[datas.length];
+ for (int i = 0; i < datas.length; i++) {
+ array[i] = datas[i].getPath();
+ }
+ Arrays.sort(array);
+ return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_multiple_table") {
+ String[] file_array;
+ def prepare = {
+ def dataDir = "${context.config.cacheDataPath}/lineitem/"
+ File dir = new File(dataDir)
+ if (!dir.exists()) {
+ new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+ for (int i = 1; i <= 10; i++) {
+ logger.info("download lineitem.tbl.${i}")
+ def download_file = """/usr/bin/curl
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+ }
+ }
+ file_array = getFiles(dataDir, 10)
+ for (String s : file_array) {
+ logger.info(s)
+ }
+ }
+ def insert_table_base = "test_insert_into_lineitem_multiple_table_sf1"
+ def batch = 100;
+ def total = 0;
+ def rwLock = new ReentrantReadWriteLock();
+ def wlock = rwLock.writeLock();
+
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ Thread.sleep(5000)
+ retry++
+ }
+ }
+
+ def create_insert_table = { table_name ->
+ // create table
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ }
+
+ def do_insert_into = { file_name, table_name ->
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ logger.info("file:" + file_name)
+ //read and insert
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new FileReader(file_name));
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ String s = null;
+ StringBuilder sb = null;
+ int c = 0;
+ int t = 0;
+ while (true) {
+ try {
+ if (c == batch) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, c);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ c = 0;
+ }
+ s = reader.readLine();
+ if (s != null) {
+ if (c == 0) {
+ sb = new StringBuilder();
+ sb.append("insert into ${table_name} (l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount,
l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+ }
+ if (c > 0) {
+ sb.append(",");
+ }
+ String[] array = s.split("\\|");
+ sb.append("(");
+ for (int i = 0; i < array.length; i++) {
+ sb.append("\"" + array[i] + "\"");
+ if (i != array.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ c++;
+ t++;
+ } else if (c > 0) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, c);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ break;
+ } else {
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ {
+ logger.info("t: " + t)
+ wlock.lock()
+ total += t;
+ wlock.unlock()
+ }
+
+ if (reader != null) {
+ reader.close();
+ }
+ getRowCount(t, table_name)
+ }
+
+ def process = {
+ def threads = []
+ for (int k = 0; k < file_array.length; k++) {
+ int n = k;
+ String file_name = file_array[n]
+ String table_name = insert_table_base + "_" + n;
+ create_insert_table(table_name)
+ logger.info("insert into file:" + file_name)
+ threads.add(Thread.startDaemon {
+ do_insert_into(file_name, table_name)
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+
+ for (int k = 0; k < file_array.length; k++) {
+ String table_name = insert_table_base + "_" + k;
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+ }
+
+ try {
+ prepare()
+ process()
+ } finally {
+
+ }
+
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy
new file mode 100644
index 00000000000..250255958ee
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+String[] getFiles(String dirName, int num) {
+ File[] datas = new File(dirName).listFiles()
+ if (num != datas.length) {
+ throw new Exception("num not equals,expect:" + num + " vs real:" +
datas.length)
+ }
+ String[] array = new String[datas.length];
+ for (int i = 0; i < datas.length; i++) {
+ array[i] = datas[i].getPath();
+ }
+ Arrays.sort(array);
+ return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_normal") {
+ String[] file_array;
+ def prepare = {
+ def dataDir = "${context.config.cacheDataPath}/lineitem/"
+ File dir = new File(dataDir)
+ if (!dir.exists()) {
+ new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+ for (int i = 1; i <= 10; i++) {
+ logger.info("download lineitem.tbl.${i}")
+ def download_file = """/usr/bin/curl
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+ }
+ }
+ file_array = getFiles(dataDir, 10)
+ for (String s : file_array) {
+ logger.info(s)
+ }
+ }
+ def insert_table = "test_insert_into_lineitem_sf1"
+ def batch = 100;
+ def count = 0;
+ def total = 0;
+
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ Thread.sleep(5000)
+ retry++
+ }
+ }
+
+ def create_insert_table = {
+ // create table
+ sql """ drop table if exists ${insert_table}; """
+
+ sql """
+ CREATE TABLE ${insert_table} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ }
+
+ def process = {
+ for (String file : file_array) {
+ logger.info("insert into file: " + file)
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ String s = null;
+ StringBuilder sb = null;
+ count = 0;
+ while (true) {
+ try {
+ if (count == batch) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, count);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ count = 0;
+ }
+ s = reader.readLine();
+ if (s != null) {
+ if (count == 0) {
+ sb = new StringBuilder();
+ sb.append("insert into ${insert_table}
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice,
l_discount, l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+ }
+ if (count > 0) {
+ sb.append(",");
+ }
+ String[] array = s.split("\\|");
+ sb.append("(");
+ for (int i = 0; i < array.length; i++) {
+ sb.append("\"" + array[i] + "\"");
+ if (i != array.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ count++;
+ total++;
+ } else if (count > 0) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, count);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ break;
+ } else {
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ logger.info("total: " + total)
+ getRowCount(total, insert_table)
+
+ qt_sql """select count(*) from ${insert_table};"""
+ }
+
+ try {
+ prepare()
+ create_insert_table()
+ for (int i = 0; i < 1; i++) {
+ process()
+ }
+ } finally {
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy
new file mode 100644
index 00000000000..fba562f819e
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy
@@ -0,0 +1,448 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+enum SC {
+ TRUNCATE_TABLE(1),
+ ADD_COLUMN(2),
+ DELETE(3),
+ DROP_COLUMN(4),
+ CHANGE_ORDER(5)
+ private int value
+
+ SC(int value) {
+ this.value = value
+ }
+
+ int getValue() {
+ return value
+ }
+}
+
+enum STATE {
+ NORMAL(1),
+ BEFORE_ADD_COLUMN(2),
+ DROP_COLUMN(3)
+ private int value
+
+ STATE(int value) {
+ this.value = value
+ }
+
+ int getValue() {
+ return value
+ }
+}
+
+String[] getFiles(String dirName, int num) {
+ File[] datas = new File(dirName).listFiles()
+ if (num != datas.length) {
+ throw new Exception("num not equals,expect:" + num + " vs real:" +
datas.length)
+ }
+ String[] array = new String[datas.length];
+ for (int i = 0; i < datas.length; i++) {
+ array[i] = datas[i].getPath();
+ }
+ Arrays.sort(array);
+ return array;
+}
+
+suite("test_group_commit_insert_into_lineitem_scheme_change") {
+ String[] file_array;
+ def prepare = {
+ def dataDir = "${context.config.cacheDataPath}/lineitem/"
+ File dir = new File(dataDir)
+ if (!dir.exists()) {
+ new File("${context.config.cacheDataPath}/lineitem/").mkdir()
+ for (int i = 1; i <= 10; i++) {
+ logger.info("download lineitem.tbl.${i}")
+ def download_file = """/usr/bin/curl
${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
+--output
${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
+ }
+ }
+ file_array = getFiles(dataDir, 10)
+ for (String s : file_array) {
+ logger.info(s)
+ }
+ }
+ def insert_table = "test_lineitem_scheme_change_sf1"
+ def batch = 100;
+ def count = 0;
+ def total = 0;
+
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ Thread.sleep(5000)
+ retry++
+ }
+ }
+
+ def create_insert_table = { table_name ->
+ // create table
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ }
+
+ def create_insert_table_less_column = { table_name ->
+ // create table
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+
+ }
+
+ def insert_data = { file_name, table_name, index ->
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new FileReader(file_name));
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ String s = null;
+ StringBuilder sb = null;
+ count = 0;
+ while (true) {
+ try {
+ if (count == batch) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, count);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ count = 0;
+ }
+ s = reader.readLine();
+ if (s != null) {
+ if (count == 0) {
+ sb = new StringBuilder();
+ if (index == STATE.BEFORE_ADD_COLUMN.value) {
+ sb.append("insert into ${table_name} (l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount,
l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+ } else if (index == STATE.NORMAL.value) {
+ sb.append("insert into ${table_name} (l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount,
l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+ } else if (index == STATE.DROP_COLUMN.value) {
+ sb.append("insert into ${table_name} (l_orderkey,
l_partkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax,
l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)VALUES");
+ }
+ }
+ if (count > 0) {
+ sb.append(",");
+ }
+ String[] array = s.split("\\|");
+ sb.append("(");
+ for (int i = 0; i < array.length; i++) {
+ if (index == STATE.BEFORE_ADD_COLUMN.value && i == 11)
{
+ continue;
+ } else if (index == STATE.DROP_COLUMN.value && i == 2)
{
+ continue;
+ }
+ sb.append("\"" + array[i] + "\"");
+ if (i != array.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append(")");
+ count++;
+ total++;
+ } else if (count > 0) {
+ sb.append(";");
+ String exp = sb.toString();
+ while (true) {
+ try {
+ def result = insert_into_sql(exp, count);
+ logger.info("result:" + result);
+ break
+ } catch (Exception e) {
+ logger.info("got exception:" + e)
+ }
+ }
+ break;
+ } else {
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ def getAlterTableState = { table_name ->
+ def retry = 0
+ while (true) {
+ def state = sql "show alter table column where tablename =
'${table_name}' order by CreateTime desc "
+ logger.info("alter table state: ${state}")
+ logger.info("state:" + state[0][9]);
+ if (state.size() > 0 && state[0][9] == "FINISHED") {
+ return true
+ }
+ retry++
+ if (retry >= 60) {
+ return false
+ }
+ Thread.sleep(5000)
+ }
+ return false
+ }
+
+ def truncate = { table_name ->
+ create_insert_table(table_name)
+ total = 0;
+ for (int i = 0; i < file_array.length; i++) {
+ String fileName = file_array[i]
+ logger.info("process file:" + fileName)
+ if (i == 5) {
+ getRowCount(total, table_name)
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ truncate table ${table_name}; """
+ break
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ Thread.sleep(2000)
+ }
+ total = 0;
+ }
+ insert_data(fileName, table_name, STATE.NORMAL.value)
+ }
+ logger.info("process truncate total:" + total)
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def delete = { table_name ->
+ create_insert_table(table_name)
+ total = 0;
+ for (int i = 0; i < file_array.length; i++) {
+ String fileName = file_array[i]
+ logger.info("process file:" + fileName)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ def rowCount = sql """select count(*) from
${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;"""
+ log.info("rowCount:" + rowCount[0][0])
+ sql """ delete from ${table_name} where l_orderkey
>=1000000 and l_orderkey <=5000000; """
+ total -= rowCount[0][0]
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ retry++
+ Thread.sleep(2000)
+ }
+ }
+ insert_data(fileName, table_name, STATE.NORMAL.value)
+ }
+ logger.info("process delete total:" + total)
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def drop_column = { table_name ->
+ create_insert_table(table_name)
+ total = 0;
+ for (int i = 0; i < file_array.length; i++) {
+ String fileName = file_array[i]
+ logger.info("process file:" + fileName)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} DROP column
l_suppkey; """
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ retry++
+ Thread.sleep(2000)
+ }
+ }
+ if (i < 5) {
+ insert_data(fileName, table_name, STATE.NORMAL.value)
+ } else {
+ insert_data(fileName, table_name, STATE.DROP_COLUMN.value)
+ }
+ }
+ logger.info("process drop column total:" + total)
+ assertTrue(getAlterTableState(table_name), "drop column should
success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def add_column = { table_name ->
+ create_insert_table_less_column(table_name)
+ total = 0;
+ for (int i = 0; i < file_array.length; i++) {
+ String fileName = file_array[i]
+ logger.info("process file:" + fileName)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} ADD column
l_receiptdate DATEV2 after l_commitdate; """
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ retry++
+ Thread.sleep(2000)
+ }
+ }
+ if (i < 5) {
+ insert_data(fileName, table_name,
STATE.BEFORE_ADD_COLUMN.value)
+ } else {
+ insert_data(fileName, table_name, STATE.NORMAL.value)
+ }
+ }
+ logger.info("process add column total:" + total)
+ assertTrue(getAlterTableState(table_name), "add column should success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def change_order = { table_name ->
+ create_insert_table(table_name)
+ total = 0;
+ for (int i = 0; i < file_array.length; i++) {
+ String fileName = file_array[i]
+ logger.info("process file:" + fileName)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} order by
(l_orderkey,l_shipdate,l_linenumber,
l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment);
"""
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ retry++
+ Thread.sleep(2000)
+ }
+ }
+ insert_data(fileName, table_name, STATE.NORMAL.value)
+ }
+ logger.info("process change order total:" + total)
+ assertTrue(getAlterTableState(table_name), "modify column order should
success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+
+ def process = { table_name ->
+ for (int i = 1; i <= 5; i++) {
+ switch (i) {
+ case SC.TRUNCATE_TABLE.value:
+ truncate(table_name)
+ break
+ case SC.DELETE.value:
+ delete(table_name)
+ break
+ case SC.DROP_COLUMN.value:
+ drop_column(table_name)
+ break
+ case SC.ADD_COLUMN.value:
+ add_column(table_name)
+ break
+ case SC.CHANGE_ORDER.value:
+ change_order(table_name)
+ break
+ }
+ }
+ }
+
+ try {
+ prepare()
+ process(insert_table)
+ } finally {
+
+ }
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy
new file mode 100644
index 00000000000..aa7c6ffc571
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+suite("test_group_commit_stream_load_lineitem_multiple_client") {
+ def stream_load_table = "test_stream_load_lineitem_multiple_client_sf1"
+ def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber,
l_quantity, l_extendedprice, l_discount,
+l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def total = 0;
+ def rwLock = new ReentrantReadWriteLock();
+ def wlock = rwLock.writeLock();
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ sleep(5000)
+ }
+ }
+
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
+ def create_stream_load_table = {
+ sql """ drop table if exists ${stream_load_table}; """
+
+ sql """
+ CREATE TABLE ${stream_load_table} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+
+ def do_stream_load = { i ->
+ logger.info("file:" + i)
+ streamLoad {
+ table stream_load_table
+
+ set 'column_separator', '|'
+ set 'columns', columns + ",lo_dummy"
+ set 'group_commit', 'true'
+ unset 'label'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, rowCountArray[i - 1],
rowCountArray[i - 1], 0, 0)
+ }
+ }
+
+ logger.info("load num: " + rowCountArray[i - 1])
+ wlock.lock()
+ total += rowCountArray[i - 1];
+ wlock.unlock()
+
+ }
+
+ def process = {
+ def threads = []
+ for (int k = 1; k <= 10; k++) {
+ int n = k;
+ logger.info("insert into file:" + n)
+ threads.add(Thread.startDaemon {
+ do_stream_load(n)
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+
+ logger.info("total:" + total)
+ getRowCount(total, stream_load_table)
+
+ qt_sql """ select count(*) from ${stream_load_table}; """
+ }
+
+ try {
+ create_stream_load_table()
+ process()
+ } finally {
+
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy
new file mode 100644
index 00000000000..58d00e1f163
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_stream_load_lineitem_multiple_table") {
+ def stream_load_table_base = "test_stream_load_lineitem_multiple_table"
+ def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber,
l_quantity, l_extendedprice, l_discount,
+l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ sleep(5000)
+ }
+ }
+
+ def create_stream_load_table = { table_name ->
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
+ def do_stream_load = { i, table_name ->
+ logger.info("file:" + i)
+
+ streamLoad {
+ table table_name
+
+ set 'column_separator', '|'
+ set 'columns', columns + ",lo_dummy"
+ set 'group_commit', 'true'
+ unset 'label'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, rowCountArray[i - 1],
rowCountArray[i - 1], 0, 0)
+ }
+ }
+ getRowCount(rowCountArray[i - 1], table_name)
+ }
+
+ def process = {
+ def threads = []
+ for (int k = 1; k <= 10; k++) {
+ int n = k;
+ String table_name = stream_load_table_base + "_" + n;
+ create_stream_load_table(table_name)
+ threads.add(Thread.startDaemon {
+ do_stream_load(n, table_name)
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+
+ for (int k = 1; k <= 10; k++) {
+ String table_name = stream_load_table_base + "_" + k;
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+ }
+
+ try {
+ process()
+ } finally {
+
+ }
+
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy
new file mode 100644
index 00000000000..1924396718a
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_stream_load_lineitem_normal") {
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ Thread.sleep(5000)
+ }
+ }
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+ def stream_load_table = "test_stream_load_lineitem_normal_sf1"
+ def create_stream_load_table = {
+ sql """ drop table if exists ${stream_load_table}; """
+
+ sql """
+ CREATE TABLE ${stream_load_table} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber,
l_quantity, l_extendedprice, l_discount,
+l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+ def process = {
+ int total = 0;
+ for (int k = 0; k < 3; k++) {
+ for (int i = 1; i <= 10; i++) {
+ streamLoad {
+ table stream_load_table
+
+ set 'column_separator', '|'
+ set 'columns', columns + ",lo_dummy"
+ set 'group_commit', 'true'
+ unset 'label'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl."""
+ i
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result,
rowCountArray[i - 1], rowCountArray[i - 1], 0, 0)
+ }
+ }
+ total += rowCountArray[i - 1];
+ }
+ getRowCount(total, stream_load_table)
+ qt_sql """ select count(*) from ${stream_load_table} """
+ }
+ }
+ try {
+ create_stream_load_table()
+ process()
+ } finally {
+
+ }
+
+
+}
\ No newline at end of file
diff --git
a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy
new file mode 100644
index 00000000000..1be1d9180b5
--- /dev/null
+++
b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+enum SC {
+ TRUNCATE_TABLE(1),
+ ADD_COLUMN(2),
+ DELETE(3),
+ DROP_COLUMN(4),
+ CHANGE_ORDER(5)
+ private int value
+
+ SC(int value) {
+ this.value = value
+ }
+
+ int getValue() {
+ return value
+ }
+}
+
+enum STATE {
+ NORMAL(1),
+ BEFORE_ADD_COLUMN(2),
+ DROP_COLUMN(3)
+ private int value
+
+ STATE(int value) {
+ this.value = value
+ }
+
+ int getValue() {
+ return value
+ }
+}
+
+suite("test_group_commit_stream_load_lineitem_schema_change") {
+ def stream_load_table = "test_stream_load_lineitem_schema_change_sf1"
+ def columns = """l_orderkey, l_partkey, l_suppkey, l_linenumber,
l_quantity, l_extendedprice, l_discount,
+l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment"""
+ int[] rowCountArray = new int[]{600572, 599397, 600124, 599647, 599931,
601365, 599301, 600504, 599715, 600659};
+ def total = 0;
+ def getRowCount = { expectedRowCount, table_name ->
+ def retry = 0
+ while (retry < 60) {
+ try {
+ def rowCount = sql "select count(*) from ${table_name}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ retry++
+ Thread.sleep(5000)
+ }
+ }
+ def checkStreamLoadResult = { exception, result, total_rows, loaded_rows,
filtered_rows, unselected_rows ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(json.GroupCommit)
+ assertTrue(json.Label.startsWith("group_commit_"))
+ assertEquals(total_rows, json.NumberTotalRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(unselected_rows, json.NumberUnselectedRows)
+ if (filtered_rows > 0) {
+ assertFalse(json.ErrorURL.isEmpty())
+ } else {
+ assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
+ }
+ }
+
+ def create_stream_load_table = { table_name ->
+ // create table
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_receiptdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+ }
+
+ def create_stream_load_table_less_column = { table_name ->
+ // create table
+ sql """ drop table if exists ${table_name}; """
+
+ sql """
+ CREATE TABLE ${table_name} (
+ l_shipdate DATEV2 NOT NULL,
+ l_orderkey bigint NOT NULL,
+ l_linenumber int not null,
+ l_partkey int NOT NULL,
+ l_suppkey int not null,
+ l_quantity decimalv3(15, 2) NOT NULL,
+ l_extendedprice decimalv3(15, 2) NOT NULL,
+ l_discount decimalv3(15, 2) NOT NULL,
+ l_tax decimalv3(15, 2) NOT NULL,
+ l_returnflag VARCHAR(1) NOT NULL,
+ l_linestatus VARCHAR(1) NOT NULL,
+ l_commitdate DATEV2 NOT NULL,
+ l_shipinstruct VARCHAR(25) NOT NULL,
+ l_shipmode VARCHAR(10) NOT NULL,
+ l_comment VARCHAR(44) NOT NULL
+)ENGINE=OLAP
+DUPLICATE KEY(`l_shipdate`, `l_orderkey`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
+PROPERTIES (
+ "replication_num" = "1"
+);
+ """
+
+ }
+
+ def insert_data = { i, table_name ->
+ streamLoad {
+ table table_name
+
+ set 'column_separator', '|'
+ set 'columns', columns + ",lo_dummy"
+ set 'group_commit', 'true'
+ unset 'label'
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(exception, result, rowCountArray[i - 1],
rowCountArray[i - 1], 0, 0)
+ }
+ }
+ total += rowCountArray[i - 1];
+ }
+
+ def getAlterTableState = { table_name ->
+ def retry = 0
+ while (true) {
+ def state = sql "show alter table column where tablename =
'${table_name}' order by CreateTime desc "
+ logger.info("alter table state: ${state}")
+ logger.info("state:" + state[0][9]);
+ if (state.size() > 0 && state[0][9] == "FINISHED") {
+ return true
+ }
+ retry++
+ if (retry >= 60) {
+ return false
+ }
+ Thread.sleep(5000)
+ }
+ return false
+ }
+
+ def truncate = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ getRowCount(total, table_name)
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ truncate table ${table_name}; """
+ break
+ } catch (Exception e) {
+ logger.info("select count get exception", e);
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ total = 0;
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process truncate total:" + total)
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def delete = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ def rowCount = sql """select count(*) from
${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;"""
+ logger.info("rowCount:" + rowCount)
+ sql """ delete from ${table_name} where l_orderkey
>=1000000 and l_orderkey <=5000000; """
+ total -= rowCount[0][0]
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process delete total:" + total)
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def drop_column = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} DROP column
l_suppkey; """
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process drop column total:" + total)
+ assertTrue(getAlterTableState(table_name), "drop column should
success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def add_column = { table_name ->
+ create_stream_load_table_less_column(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 5) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} ADD column
l_receiptdate DATEV2 after l_commitdate; """
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process add column total:" + total)
+ assertTrue(getAlterTableState(table_name), "add column should success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+ def change_order = { table_name ->
+ create_stream_load_table(table_name)
+ total = 0;
+ for (int i = 1; i <= 10; i++) {
+ logger.info("process file:" + i)
+ if (i == 2) {
+ def retry = 0
+ while (retry < 10) {
+ try {
+ sql """ alter table ${table_name} order by
(l_orderkey,l_shipdate,l_linenumber,
l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment);
"""
+ break
+ } catch (Exception e) {
+ log.info("exception:", e)
+ }
+ Thread.sleep(2000)
+ retry++
+ }
+ }
+ insert_data(i, table_name)
+ }
+ logger.info("process change order total:" + total)
+ assertTrue(getAlterTableState(table_name), "modify column order should
success")
+ getRowCount(total, table_name)
+ qt_sql """ select count(*) from ${table_name}; """
+ }
+
+
+ def process = { table_name ->
+ for (int i = 1; i <= 5; i++) {
+ switch (i) {
+ case SC.TRUNCATE_TABLE.value:
+ truncate(table_name)
+ break
+ case SC.DELETE.value:
+ delete(table_name)
+ break
+ case SC.DROP_COLUMN.value:
+ drop_column(table_name)
+ break
+ case SC.ADD_COLUMN.value:
+ add_column(table_name)
+ break
+ case SC.CHANGE_ORDER.value:
+ change_order(table_name)
+ break
+ }
+ }
+ }
+
+ try {
+ process(stream_load_table)
+ } finally {
+
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]