This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 202cc84f2f3 [Test](auto-partition) Add auto partition p1 test (#28028)
202cc84f2f3 is described below

commit 202cc84f2f39ca0e26dff0cf8e30f75c6f1a27bb
Author: zclllyybb <[email protected]>
AuthorDate: Wed Dec 6 10:20:39 2023 +0800

    [Test](auto-partition) Add auto partition p1 test (#28028)
    
    Add auto partition p1 test
---
 .../partition_p1/auto_partition/ddl/concurrent.sql |  19 ++
 .../doris_dbgen_conf/two_stream_load_conflict.yaml |  22 +++
 .../auto_partition/sql/multi_thread_load.groovy    | 200 +++++++++++++++++++++
 3 files changed, 241 insertions(+)

diff --git 
a/regression-test/suites/partition_p1/auto_partition/ddl/concurrent.sql 
b/regression-test/suites/partition_p1/auto_partition/ddl/concurrent.sql
new file mode 100644
index 00000000000..cb7694654e1
--- /dev/null
+++ b/regression-test/suites/partition_p1/auto_partition/ddl/concurrent.sql
@@ -0,0 +1,19 @@
+CREATE TABLE `concurrent`(
+  `col1` datetimev2 not null,
+  `col2` boolean,
+  `col3` tinyint,
+  `col4` date,
+  `col5` float,
+  `col6` double,
+  `col7` string,
+  `col8` varchar(128),
+  `col9` decimal(9, 3),
+  `col10` char(128)
+) duplicate KEY(`col1`)
+AUTO PARTITION BY range date_trunc(`col1`, 'day')
+(
+)
+DISTRIBUTED BY HASH(`col1`) BUCKETS 10
+PROPERTIES (
+  "replication_num" = "1"
+);
diff --git 
a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml
 
b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml
new file mode 100644
index 00000000000..cd9782a3b33
--- /dev/null
+++ 
b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+tables:
+  small_data_high_concurrent_load_range:
+    col1:
+      range: {min: "2020-01-01 00:00:00", max: "2023-12-31 23:59:59"}
+      force_not_null: true
diff --git 
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
new file mode 100644
index 00000000000..8fe96c934df
--- /dev/null
+++ 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.io.FileType
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.net.URL
+import java.io.File
+import java.util.concurrent.locks.ReentrantLock
+
+suite("multi_thread_load") {
+    def lock = new ReentrantLock()
+
+    // get doris-db from s3
+    def dirPath = context.file.parent
+    def fatherPath = context.file.parentFile.parentFile.getPath()
+    def fileName = "doris-dbgen"
+    def fileUrl = 
"http://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen";
+    def filePath = Paths.get(dirPath, fileName)
+    if (!Files.exists(filePath)) {
+        new URL(fileUrl).withInputStream { inputStream ->
+            Files.copy(inputStream, filePath)
+        }
+        def file = new File(dirPath + "/" + fileName)
+        file.setExecutable(true)
+    }
+
+    def data_count = 20 // number of load tasks and threads
+    def rows = 100  // total rows to load
+
+    // generate datafiles via doris-dbgen
+    def doris_dbgen_create_data = { db_name, tb_name, part_type ->
+        def bulkSize = rows
+        def tableName = tb_name
+
+        def jdbcUrl = context.config.jdbcUrl
+        def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
+        def sql_ip = urlWithoutSchema.substring(0, 
urlWithoutSchema.indexOf(":"))
+        def sql_port
+        if (urlWithoutSchema.indexOf("/") >= 0) {
+            // e.g: jdbc:mysql://locahost:8080/?a=b
+            sql_port = 
urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, 
urlWithoutSchema.indexOf("/"))
+        } else {
+            // e.g: jdbc:mysql://locahost:8080
+            sql_port = 
urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1)
+        }
+        String feHttpAddress = context.config.feHttpAddress
+        def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1)
+
+        String realDb = db_name
+        String user = context.config.jdbcUser
+        String password = context.config.jdbcPassword
+
+        for (int i = 0; i < data_count; i++) {
+            def cm
+            if (password) {
+                cm = """${dirPath}/doris-dbgen gen --host ${sql_ip} --sql-port 
${sql_port} --user ${user} --pass ${password} --database ${realDb} --table 
${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} 
--config ${fatherPath}/doris_dbgen_conf/two_stream_load_conflict.yaml 
--save-to-dir ${dirPath}/${part_type}/${part_type}_${i}/"""
+            } else {
+                cm = """${dirPath}/doris-dbgen gen --host ${sql_ip} --sql-port 
${sql_port} --user ${user} --database ${realDb} --table ${tableName} --rows 
${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config 
${fatherPath}/doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir 
${dirPath}/${part_type}/${part_type}_${i}/"""
+            }
+            logger.info("datagen: " + cm)
+            def proc = cm.execute()
+            def sout = new StringBuilder(), serr = new StringBuilder()
+            proc.consumeProcessOutput(sout, serr)
+            proc.waitForOrKill(7200000)
+            // logger.info("std out: " + sout + "std err: " + serr)
+        }
+    }
+
+    def write_to_file = { cur_path, content ->
+        File file = new File(cur_path)
+        file.write(content)
+    }
+
+    def cm_list = []
+    def doris_dbgen_load_data = { db_name, tb_name, part_type ->
+        def tableName = tb_name
+
+        def jdbcUrl = context.config.jdbcUrl
+        def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
+        def sql_ip = urlWithoutSchema.substring(0, 
urlWithoutSchema.indexOf(":"))
+        def sql_port
+        if (urlWithoutSchema.indexOf("/") >= 0) {
+            // e.g: jdbc:mysql://locahost:8080/?a=b
+            sql_port = 
urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, 
urlWithoutSchema.indexOf("/"))
+        } else {
+            // e.g: jdbc:mysql://locahost:8080
+            sql_port = 
urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1)
+        }
+        String feHttpAddress = context.config.feHttpAddress
+        def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1)
+
+        String realDb = db_name
+        String user = context.config.jdbcUser
+        String password = context.config.jdbcPassword
+
+        for (int i = 0; i < data_count; i++) {
+            def cm = ""
+            def list = []
+            def dir = new File("""${dirPath}""" + "/" + part_type + "/" + 
part_type + "_" + i)
+            dir.eachFileRecurse (FileType.FILES) { file ->
+                list << file
+            }
+
+            if (password) {
+                cm = """curl --location-trusted -u ${user}:${password} -H 
"column_separator:|" -T ${list[0]} 
http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load""";
+            } else {
+                cm = """curl --location-trusted -u root: -H 
"column_separator:|" -T ${list[0]} 
http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load""";
+            }
+            logger.info("load data: " + cm)
+
+            def load_path = """${dirPath}/range/thread_load_${i}.sh"""
+            write_to_file(load_path, cm)
+            cm_list.add("""bash ${dirPath}/range/thread_load_${i}.sh""")
+        }
+    }
+
+    def data_delete = { part_type ->
+        def sql_cm = """rm -rf ${dirPath}/${part_type}"""
+        sql_cm.execute()
+    }
+
+    def database_name = "regression_test_auto_partition_concurrent"
+    def table_name = "concurrent"
+
+    sql """create database if not exists ${database_name};"""
+    sql """use ${database_name};"""
+    sql """drop table if exists ${table_name};"""
+    sql new File("""${fatherPath}/ddl/concurrent.sql""").text
+
+    data_delete("range")
+    doris_dbgen_create_data(database_name, table_name, "range")
+    doris_dbgen_load_data(database_name, table_name, "range")
+
+    def load_threads = []
+    def concurrent_load = { str ->
+        logger.info("load start:" + str)
+        def proc = str.execute()
+        def sout = new StringBuilder(), serr = new StringBuilder()
+        proc.consumeProcessOutput(sout, serr)
+        proc.waitForOrKill(600000) // 10 minutes
+    }
+
+    // for (int i = 0; i < data_count; i++) {
+    //     logger.info("try to run " + i + " : " + cm_list[i])
+    //     load_threads.add(Thread.startDaemon{concurrent_load(cm_list[i])})
+    // }
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[0])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[1])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[2])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[3])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[4])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[5])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[6])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[7])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[8])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[9])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[10])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[11])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[12])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[13])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[14])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[15])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[16])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[17])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[18])})
+    load_threads.add(Thread.startDaemon{concurrent_load(cm_list[19])})
+
+    // wait them for finishing
+    for (Thread th in load_threads) {
+        th.join()
+    }
+
+    // check data count
+    def row_count_range = sql """select count() from ${table_name};"""
+    assertTrue(data_count*rows == row_count_range[0][0], "${data_count*rows}, 
${row_count_range[0][0]}")
+    // check there's no intersect in partitions
+    def partition_res_range = sql """show partitions from ${table_name} order 
by PartitionName;"""
+    for (int i = 0; i < partition_res_range.size(); i++) {
+        for (int j = i+1; j < partition_res_range.size(); j++) {
+            if (partition_res_range[i][6] == partition_res_range[j][6]) {
+                assertTrue(false, "$i, $j")
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to