This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4f5821407f7 [case]Load data with load_parallelism=any > 1 and stream
load with compress type (#27306)
4f5821407f7 is described below
commit 4f5821407f74b9b59f5a773ffc2b088dd42e5198
Author: Zhiyu Hu <[email protected]>
AuthorDate: Wed Dec 13 18:41:14 2023 +0800
[case]Load data with load_parallelism=any > 1 and stream load with compress
type (#27306)
---
.../test_s3_load_with_load_parallelism.out | 4 +
.../load_p0/stream_load/test_compress_type.out | 4 +
.../test_s3_load_with_load_parallelism.groovy | 179 +++++++++++++
.../suites/load_p0/stream_load/ddl/basic_data.sql | 29 +++
.../load_p0/stream_load/test_compress_type.groovy | 280 +++++++++++++++++++++
5 files changed, 496 insertions(+)
diff --git
a/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out
b/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out
new file mode 100644
index 00000000000..7f63e400c3d
--- /dev/null
+++
b/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+5000000
+
diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out
b/regression-test/data/load_p0/stream_load/test_compress_type.out
new file mode 100644
index 00000000000..f76aa4d7415
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_compress_type.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+120
+
diff --git
a/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy
b/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy
new file mode 100644
index 00000000000..be0d7f9c348
--- /dev/null
+++
b/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy
@@ -0,0 +1,179 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_s3_load_with_load_parallelism", "load_p0") {
+
+ def tableName = "test_load_parallelism"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}(
+ `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2`
VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
+ `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7`
VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
+ `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12`
VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
+ `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17`
VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
+ `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22`
VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
+ `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27`
VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
+ `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32`
VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
+ `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37`
VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
+ `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42`
VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
+ `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47`
VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1" );
+ """
+
+ def attributesList = [
+
+ ]
+
+ // attributesList.add(new
LoadAttributes("s3://doris-build-1308700295/regression/load/data/enclose_not_trim_quotes.csv",
+ // "${tableName}", "", "COLUMNS TERMINATED BY \",\"", "FORMAT AS
\"CSV\"", "(k1,k2,v1,v2,v3,v4)",
+ // "PROPERTIES (\"enclose\" = \"\\\"\", \"escape\" =
\"\\\\\")").addProperties("trim_double_quotes", "false"))
+
+ attributesList.add(new
LoadAttributes("s3://test-for-student-1308700295/regression/segcompaction/segcompaction.orc",
+ "${tableName}", "", "", "FORMAT AS \"ORC\"", "(col_0, col_1, col_2,
col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12,
col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22,
col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32,
col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42,
col_43, col_44, col_45, col_46, col_47, col_48, col_49)",
"").addProperties("load_parallelism", "3"))
+
+ def ak = getS3AK()
+ def sk = getS3SK()
+
+
+ def i = 0
+ for (LoadAttributes attributes : attributesList) {
+ def label = "test_s3_load_escape_enclose" +
UUID.randomUUID().toString().replace("-", "_") + "_" + i
+ attributes.label = label
+ def prop = attributes.getPropertiesStr()
+
+ sql """
+ LOAD LABEL $label (
+ DATA INFILE("$attributes.dataDesc.path")
+ INTO TABLE $attributes.dataDesc.tableName
+ $attributes.dataDesc.columnTermClause
+ $attributes.dataDesc.lineTermClause
+ $attributes.dataDesc.formatClause
+ $attributes.dataDesc.columns
+ $attributes.dataDesc.whereExpr
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "$ak",
+ "AWS_SECRET_KEY" = "$sk",
+ "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+ "AWS_REGION" = "ap-beijing"
+ )
+ ${prop}
+ """
+ // "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+ // "AWS_ACCESS_KEY" = "AKIDd9RVMzIOI0V7Wlnbr9JG0WrhJk28zc2H",
+ // "AWS_SECRET_KEY"="4uWxMhqnW3Plz97sPjqlSUXO1RhokRuO",
+ // "AWS_REGION" = "ap-beijing"
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where
label="$attributes.label" order by createtime desc limit 1; """
+
+ if (result[0][2].equals("FINISHED")) {
+ if (attributes.isExceptFailed) {
+ assertTrue(false, "load should be failed but was success:
$result")
+ }
+ logger.info("Load FINISHED " + attributes.label + ": $result")
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ if (attributes.dataDesc.path.split("/")[-1] ==
"enclose_incomplete.csv" || attributes.dataDesc.path.split("/")[-1] ==
"enclose_without_escape.csv") {
+ break
+ }
+ if (attributes.isExceptFailed) {
+ logger.info("Load FINISHED " + attributes.label)
+ break
+ }
+ assertTrue(false, "load failed: $result")
+ break
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if (max_try_milli_secs <= 0) {
+ assertTrue(false, "load Timeout: $attributes.label")
+ }
+ }
+ }
+ sql "sync"
+ qt_sql """
+ SELECT count(*) FROM ${tableName}
+ """
+}
+
+class DataDesc {
+ public String mergeType = ""
+ public String path
+ public String tableName
+ public String lineTermClause
+ public String columnTermClause
+ public String formatClause
+ public String columns
+ public String whereExpr
+}
+
+class LoadAttributes {
+ LoadAttributes(String path, String tableName, String lineTermClause,
String columnTermClause, String formatClause,
+ String columns, String whereExpr, boolean isExceptFailed =
false) {
+ this.dataDesc = new DataDesc()
+ this.dataDesc.path = path
+ this.dataDesc.tableName = tableName
+ this.dataDesc.lineTermClause = lineTermClause
+ this.dataDesc.columnTermClause = columnTermClause
+ this.dataDesc.formatClause = formatClause
+ this.dataDesc.columns = columns
+ this.dataDesc.whereExpr = whereExpr
+
+ this.isExceptFailed = isExceptFailed
+
+ properties = new HashMap<>()
+ properties.put("use_new_load_scan_node", "true")
+ }
+
+ LoadAttributes addProperties(String k, String v) {
+ properties.put(k, v)
+ return this
+ }
+
+ String getPropertiesStr() {
+ if (properties.isEmpty()) {
+ return ""
+ }
+ String prop = "PROPERTIES ("
+ properties.forEach (k, v) -> {
+ prop += "\"${k}\" = \"${v}\","
+ }
+ prop = prop.substring(0, prop.size() - 1)
+ prop += ")"
+ return prop
+ }
+
+ LoadAttributes withPathStyle() {
+ usePathStyle = "true"
+ return this
+ }
+
+ public DataDesc dataDesc
+ public Map<String, String> properties
+ public String label
+ public String usePathStyle = "false"
+ public boolean isExceptFailed
+
+
+}
diff --git a/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
new file mode 100644
index 00000000000..41c3660e11c
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
@@ -0,0 +1,29 @@
+CREATE TABLE basic_data
+(
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL
+
+)
+DUPLICATE KEY(k00)
+DISTRIBUTED BY HASH(k00) BUCKETS 32
+PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+);
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
new file mode 100644
index 00000000000..d1123027bac
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_compress_type", "load_p0") {
+ def tableName = "basic_data"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ // GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
+ sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+ // sql new
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ set 'compress_type', 'GZ'
+
+ file "basic_data.csv.gz"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ set 'compress_type', 'BZ2'
+
+ file "basic_data.csv.bz2"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', 'csv'
+ set 'compress_type', 'LZ4'
+
+ file "basic_data.csv.lz4"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'GZ'
+
+ file "basic_data.csv.gz"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'BZ2'
+
+ file "basic_data.csv.bz2"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'compress_type', 'LZ4'
+
+ file "basic_data.csv.lz4"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ file "basic_data.csv.gz"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(13, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(13, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ file "basic_data.csv.bz2"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(9, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(9, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', "CSV"
+ file "basic_data.csv.lz4"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(31, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(31, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ file "basic_data.csv.gz"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(13, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(13, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ file "basic_data.csv.bz2"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(9, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(9, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ file "basic_data.csv.lz4"
+
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Fail", json.Status)
+ assertTrue(json.Message.contains("too many filtered rows"))
+ assertEquals(31, json.NumberTotalRows)
+ assertEquals(0, json.NumberLoadedRows)
+ assertEquals(31, json.NumberFilteredRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ qt_sql """ select count(*) from ${tableName} """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]