This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f5821407f7 [case]Load data with load_parallelism=any > 1 and stream 
load with compress type (#27306)
4f5821407f7 is described below

commit 4f5821407f74b9b59f5a773ffc2b088dd42e5198
Author: Zhiyu Hu <[email protected]>
AuthorDate: Wed Dec 13 18:41:14 2023 +0800

    [case]Load data with load_parallelism=any > 1 and stream load with compress 
type (#27306)
---
 .../test_s3_load_with_load_parallelism.out         |   4 +
 .../load_p0/stream_load/test_compress_type.out     |   4 +
 .../test_s3_load_with_load_parallelism.groovy      | 179 +++++++++++++
 .../suites/load_p0/stream_load/ddl/basic_data.sql  |  29 +++
 .../load_p0/stream_load/test_compress_type.groovy  | 280 +++++++++++++++++++++
 5 files changed, 496 insertions(+)

diff --git 
a/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out
 
b/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out
new file mode 100644
index 00000000000..7f63e400c3d
--- /dev/null
+++ 
b/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+5000000
+
diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out 
b/regression-test/data/load_p0/stream_load/test_compress_type.out
new file mode 100644
index 00000000000..f76aa4d7415
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_compress_type.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+120
+
diff --git 
a/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy
 
b/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy
new file mode 100644
index 00000000000..be0d7f9c348
--- /dev/null
+++ 
b/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy
@@ -0,0 +1,179 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_s3_load_with_load_parallelism", "load_p0") {
+
+    def tableName = "test_load_parallelism"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName}(
+            `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` 
VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
+            `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` 
VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
+            `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` 
VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
+            `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` 
VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
+            `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` 
VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
+            `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` 
VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
+            `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` 
VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
+            `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` 
VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
+            `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` 
VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
+            `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` 
VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
+        ) ENGINE=OLAP
+            DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
+            PROPERTIES ( "replication_num" = "1" );
+    """
+
+    def attributesList = [
+
+    ]
+
+    // attributesList.add(new 
LoadAttributes("s3://doris-build-1308700295/regression/load/data/enclose_not_trim_quotes.csv",
+    //     "${tableName}", "", "COLUMNS TERMINATED BY \",\"", "FORMAT AS 
\"CSV\"", "(k1,k2,v1,v2,v3,v4)", 
+    //     "PROPERTIES (\"enclose\" = \"\\\"\", \"escape\" = 
\"\\\\\")").addProperties("trim_double_quotes", "false"))
+    
+    attributesList.add(new 
LoadAttributes("s3://test-for-student-1308700295/regression/segcompaction/segcompaction.orc",
+        "${tableName}", "", "", "FORMAT AS \"ORC\"", "(col_0, col_1, col_2, 
col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, 
col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, 
col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, 
col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, 
col_43, col_44, col_45, col_46, col_47, col_48, col_49)", 
"").addProperties("load_parallelism", "3"))
+
+    def ak = getS3AK()
+    def sk = getS3SK()
+
+
+    def i = 0
+    for (LoadAttributes attributes : attributesList) {
+        def label = "test_s3_load_escape_enclose" + 
UUID.randomUUID().toString().replace("-", "_") + "_" + i
+        attributes.label = label
+        def prop = attributes.getPropertiesStr()
+
+        sql """
+            LOAD LABEL $label (
+                DATA INFILE("$attributes.dataDesc.path")
+                INTO TABLE $attributes.dataDesc.tableName
+                $attributes.dataDesc.columnTermClause
+                $attributes.dataDesc.lineTermClause
+                $attributes.dataDesc.formatClause
+                $attributes.dataDesc.columns
+                $attributes.dataDesc.whereExpr
+            )
+            WITH S3 (
+               "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+                "AWS_REGION" = "ap-beijing"
+           )
+            ${prop}
+            """
+     //   "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+    //   "AWS_ACCESS_KEY" = "AKIDd9RVMzIOI0V7Wlnbr9JG0WrhJk28zc2H",
+    //   "AWS_SECRET_KEY"="4uWxMhqnW3Plz97sPjqlSUXO1RhokRuO",
+    //   "AWS_REGION" = "ap-beijing"
+ 
+        def max_try_milli_secs = 600000
+        while (max_try_milli_secs > 0) {
+            String[][] result = sql """ show load where 
label="$attributes.label" order by createtime desc limit 1; """
+
+            if (result[0][2].equals("FINISHED")) {
+                if (attributes.isExceptFailed) {
+                    assertTrue(false, "load should be failed but was success: 
$result")
+                }
+                logger.info("Load FINISHED " + attributes.label + ": $result")
+                break
+            }
+            if (result[0][2].equals("CANCELLED")) {
+                if (attributes.dataDesc.path.split("/")[-1] == 
"enclose_incomplete.csv" || attributes.dataDesc.path.split("/")[-1] == 
"enclose_without_escape.csv") {
+                    break
+                }
+                if (attributes.isExceptFailed) {
+                    logger.info("Load FINISHED " + attributes.label)
+                    break
+                }
+                assertTrue(false, "load failed: $result")
+                break
+            }
+            Thread.sleep(1000)
+            max_try_milli_secs -= 1000
+            if (max_try_milli_secs <= 0) {
+                assertTrue(false, "load Timeout: $attributes.label")
+            }
+        }
+    }
+    sql "sync"
+    qt_sql """
+        SELECT count(*) FROM ${tableName}
+    """
+}
+
+class DataDesc {
+    public String mergeType = ""
+    public String path
+    public String tableName
+    public String lineTermClause
+    public String columnTermClause
+    public String formatClause
+    public String columns
+    public String whereExpr
+}
+
+class LoadAttributes {
+    LoadAttributes(String path, String tableName, String lineTermClause, 
String columnTermClause, String formatClause,
+                   String columns, String whereExpr, boolean isExceptFailed = 
false) {
+        this.dataDesc = new DataDesc()
+        this.dataDesc.path = path
+        this.dataDesc.tableName = tableName
+        this.dataDesc.lineTermClause = lineTermClause
+        this.dataDesc.columnTermClause = columnTermClause
+        this.dataDesc.formatClause = formatClause
+        this.dataDesc.columns = columns
+        this.dataDesc.whereExpr = whereExpr
+
+        this.isExceptFailed = isExceptFailed
+
+        properties = new HashMap<>()
+        properties.put("use_new_load_scan_node", "true")
+    }
+
+    LoadAttributes addProperties(String k, String v) {
+        properties.put(k, v)
+        return this
+    }
+
+    String getPropertiesStr() {
+        if (properties.isEmpty()) {
+            return ""
+        }
+        String prop = "PROPERTIES ("
+        properties.forEach (k, v) -> {
+            prop += "\"${k}\" = \"${v}\","
+        }
+        prop = prop.substring(0, prop.size() - 1)
+        prop += ")"
+        return prop
+    }
+
+    LoadAttributes withPathStyle() {
+        usePathStyle = "true"
+        return this
+    }
+
+    public DataDesc dataDesc
+    public Map<String, String> properties
+    public String label
+    public String usePathStyle = "false"
+    public boolean isExceptFailed
+
+    
+}
diff --git a/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql 
b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
new file mode 100644
index 00000000000..41c3660e11c
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql
@@ -0,0 +1,29 @@
+CREATE TABLE basic_data
+(
+    k00 INT             NOT NULL,
+    k01 DATE            NOT NULL,
+    k02 BOOLEAN         NULL,
+    k03 TINYINT         NULL,
+    k04 SMALLINT        NULL,
+    k05 INT             NULL,
+    k06 BIGINT          NULL,
+    k07 LARGEINT        NULL,
+    k08 FLOAT           NULL,
+    k09 DOUBLE          NULL,
+    k10 DECIMAL(9,1)    NULL,
+    k11 DECIMALV3(9,1)  NULL,
+    k12 DATETIME        NULL,
+    k13 DATEV2          NULL,
+    k14 DATETIMEV2      NULL,
+    k15 CHAR            NULL,
+    k16 VARCHAR         NULL,
+    k17 STRING          NULL,
+    k18 JSON            NULL
+
+)
+DUPLICATE KEY(k00)
+DISTRIBUTED BY HASH(k00) BUCKETS 32
+PROPERTIES (
+    "bloom_filter_columns"="k05",
+    "replication_num" = "1"
+);
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy 
b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
new file mode 100644
index 00000000000..d1123027bac
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_compress_type", "load_p0") {
+    def tableName = "basic_data"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    // GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
+    sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+    // sql new 
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        set 'compress_type', 'GZ'
+
+        file "basic_data.csv.gz"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        set 'compress_type', 'BZ2'
+
+        file "basic_data.csv.bz2"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', 'csv'
+        set 'compress_type', 'LZ4'
+
+        file "basic_data.csv.lz4"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'GZ'
+
+        file "basic_data.csv.gz"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'BZ2'
+
+        file "basic_data.csv.bz2"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'compress_type', 'LZ4'
+
+        file "basic_data.csv.lz4"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }       
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        file "basic_data.csv.gz"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(13, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(13, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        file "basic_data.csv.bz2"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(9, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(9, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', "CSV"
+        file "basic_data.csv.lz4"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(31, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(31, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.gz"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(13, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(13, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.bz2"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(9, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(9, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        file "basic_data.csv.lz4"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+                assertTrue(json.Message.contains("too many filtered rows"))
+                assertEquals(31, json.NumberTotalRows)
+                assertEquals(0, json.NumberLoadedRows)
+                assertEquals(31, json.NumberFilteredRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    qt_sql """ select count(*) from ${tableName} """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to