This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d7e906eab5b6775c00508cdf03cb71d2fb87830f Author: wangqt <[email protected]> AuthorDate: Wed Sep 20 14:56:32 2023 +0800 [Fix](broker load) broker load with or predicate error fix #24157 Co-authored-by: wangqingtao6 <[email protected]> --- .../doris/load/loadv2/LoadingTaskPlanner.java | 9 ++ .../load_p0/broker_load/broker_load_with_where.csv | 6 + .../data/load_p0/broker_load/test_array_load.out | 12 ++ .../data/load_p0/stream_load/test_json_load.out | 6 + .../org/apache/doris/regression/suite/Suite.groovy | 3 +- .../load_p0/broker_load/test_array_load.groovy | 10 +- .../broker_load/test_broker_load_with_where.groovy | 162 +++++++++++++++++++++ .../load_p0/spark_load/test_spark_load.groovy | 4 +- .../load_p0/stream_load/test_json_load.groovy | 2 +- 9 files changed, 205 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index cb06f4c42e..a11ea7634f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -187,6 +187,15 @@ public class LoadingTaskPlanner { LOG.debug("plan scanTupleDesc{}", scanTupleDesc.toString()); } + // analyze expr in whereExpr before rewrite + scanTupleDesc.setTable(table); + analyzer.registerTupleDescriptor(scanTupleDesc); + for (BrokerFileGroup fileGroup : fileGroups) { + if (fileGroup.getWhereExpr() != null) { + fileGroup.getWhereExpr().analyze(analyzer); + } + } + // Generate plan trees // 1. Broker scan node ScanNode scanNode; diff --git a/regression-test/data/load_p0/broker_load/broker_load_with_where.csv b/regression-test/data/load_p0/broker_load/broker_load_with_where.csv new file mode 100644 index 0000000000..6717da56c7 --- /dev/null +++ b/regression-test/data/load_p0/broker_load/broker_load_with_where.csv @@ -0,0 +1,6 @@ +11001,2023-09-01,1,1,10 +11001,2023-09-01,2,1,10 +11001,2023-09-01,1,2,10 +11001,2023-09-01,2,2,10 +11001,2023-09-01,1,3,10 +11001,2023-09-01,2,3,10 \ No newline at end of file diff --git a/regression-test/data/load_p0/broker_load/test_array_load.out b/regression-test/data/load_p0/broker_load/test_array_load.out index f80af4d698..816baf4c86 100644 --- a/regression-test/data/load_p0/broker_load/test_array_load.out +++ b/regression-test/data/load_p0/broker_load/test_array_load.out @@ -65,3 +65,15 @@ -- !select_count -- 6 3 +-- !select -- +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ["a", "b", "c"] ["hello", "world"] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] + +-- !select -- +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ["a", "b", "c"] ["hello", "world"] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] + +-- !select -- +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ["a", "b", "c"] ["hello", "world"] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] + +-- !select -- +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ["a", "b", "c"] ["hello", "world"] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] + diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out b/regression-test/data/load_p0/stream_load/test_json_load.out index ad68ffa129..360c7867d3 100644 --- a/regression-test/data/load_p0/stream_load/test_json_load.out +++ b/regression-test/data/load_p0/stream_load/test_json_load.out @@ -205,3 +205,9 @@ John 30 New York {"email":"[email protected]","phone":"+1-123-456-7890"} -- !select22 -- 11324 1321313082437 1678834024274 20230315 {"base_mac_value_null":24,"base_1_value_respiratoryrate":11,"base_3_value_heartrate":51,"base_3_status_onoroutofbed":3,"base_null_count_circulation":84,"base_1_status_onoroutofbed":3,"base_1_value_heartrate":51,"base_3_value_respiratoryrate":11,"base_3_value_bodyactivityenergy":43652,"base_2_value_respiratoryrate":11,"base_2_value_bodyactivityenergy":28831,"base_2_status_onoroutofbed":3,"base_1_value_bodyactivityenergy":56758,"base_2_value_heart [...] +-- !select -- +200 changsha 3456789 + +-- !select -- +200 changsha 3456789 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 464849cb22..3999fe5668 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -353,7 +353,8 @@ class Suite implements GroovyInterceptable { } String uploadToHdfs(String localFile) { - String dataDir = context.config.dataPath + "/" + group + "/" + // as group can be rewrite the origin data file not relate to group + String dataDir = context.config.dataPath + "/" localFile = dataDir + localFile String hdfsFs = context.config.otherConfigs.get("hdfsFs") String hdfsUser = context.config.otherConfigs.get("hdfsUser") diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy b/regression-test/suites/load_p0/broker_load/test_array_load.groovy index 5d7e04d883..5ef0bf7d18 100644 --- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy @@ -249,12 +249,12 @@ suite("test_array_load", "load_p0") { brokerName =getBrokerName() hdfsUser = getHdfsUser() hdfsPasswd = getHdfsPasswd() - def hdfs_json_file_path = uploadToHdfs "broker_load/simple_object_array.json" - def hdfs_csv_file_path = uploadToHdfs "broker_load/simple_array.csv" - def hdfs_orc_file_path = uploadToHdfs "broker_load/simple_array.orc" + def hdfs_json_file_path = uploadToHdfs "load_p0/broker_load/simple_object_array.json" + def hdfs_csv_file_path = uploadToHdfs "load_p0/broker_load/simple_array.csv" + def hdfs_orc_file_path = uploadToHdfs "load_p0/broker_load/simple_array.orc" // orc file with native array(list) type - def hdfs_orc_file_path2 = uploadToHdfs "broker_load/simple_array_list_type.orc" - def hdfs_parquet_file_path = uploadToHdfs "broker_load/simple_array.parquet" + def hdfs_orc_file_path2 = uploadToHdfs "load_p0/broker_load/simple_array_list_type.orc" + def hdfs_parquet_file_path = uploadToHdfs "load_p0/broker_load/simple_array.parquet" // case5: import array data by hdfs and enable vectorized engine try { diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load_with_where.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load_with_where.groovy new file mode 100644 index 0000000000..dd05905e4b --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/test_broker_load_with_where.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_broker_load_with_where", "load_p0") { + // define a sql table + def testTable = "tbl_test_broker_load_with_where" + + def create_test_table = {testTablex -> + def result1 = sql """ + CREATE TABLE IF NOT EXISTS ${testTable} ( + `k1` BIGINT NOT NULL, + `k2` DATE NULL, + `k3` INT(11) NOT NULL, + `k4` INT(11) NOT NULL, + `v5` BIGINT SUM NULL DEFAULT "0" + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`, `k3`, `k4`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2" + ); + """ + + // DDL/DML return 1 row and 3 column, the only value is update row count + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Create table should update 0 rows") + + // insert 1 row to check whether the table is ok + def result2 = sql """ INSERT INTO ${testTable} VALUES + (1,2023-09-01,1,1,1) + """ + assertTrue(result2.size() == 1) + assertTrue(result2[0].size() == 1) + assertTrue(result2[0][0] == 1, "Insert should update 1 rows") + } + + def load_from_hdfs_norm = {testTablex, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd -> + def result1= sql """ + LOAD LABEL ${label} ( + DATA INFILE("${hdfsFilePath}") + INTO TABLE ${testTablex} + COLUMNS TERMINATED BY "," + FORMAT as "${format}" + ) + with BROKER "${brokerName}" ( + "username"="${hdfsUser}", + "password"="${hdfsPasswd}") + PROPERTIES ( + "timeout"="1200", + "max_filter_ratio"="0.1"); + """ + + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") + } + + def load_from_hdfs_with_or_predicate = {testTablex, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd -> + def result1= sql """ + LOAD LABEL ${label} ( + DATA INFILE("${hdfsFilePath}") + INTO TABLE ${testTablex} + COLUMNS TERMINATED BY "," + FORMAT as "${format}" + WHERE + k1 in (11001,11002) + and ( + k3 in (1) + or k4 in (1, 2) + ) + ) + with BROKER "${brokerName}" ( + "username"="${hdfsUser}", + "password"="${hdfsPasswd}") + PROPERTIES ( + "timeout"="1200", + "max_filter_ratio"="0.1"); + """ + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") + } + + def check_load_result = {checklabel, testTablex -> + max_try_milli_secs = 10000 + while(max_try_milli_secs) { + result = sql "show load where label = '${checklabel}'" + if(result[0][2] == "FINISHED") { + //sql "sync" + qt_select "select * from ${testTablex} order by k1" + break + } else { + sleep(1000) // wait 1 second every time + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertEquals(1, 2) + } + } + } + } + + def check_data_correct = {table_name -> + sql "sync" + // select the table and check whether the data is correct + qt_select "select k1,k3,k4,sum(v5) from ${table_name} group by k1,k3,k4 order by k1,k3,k4" + } + + // if 'enableHdfs' in regression-conf.groovy has been set to true, + // the test will run these case as below. + if (enableHdfs()) { + brokerName = getBrokerName() + hdfsUser = getHdfsUser() + hdfsPasswd = getHdfsPasswd() + def hdfs_csv_file_path = uploadToHdfs "load_p0/broker_load/broker_load_with_where.csv" + //def hdfs_csv_file_path = "hdfs://ip:port/testfile" + + // case1: import csv data from hdfs with out where + try { + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(testTable) + + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs_norm.call(testTable, test_load_label, hdfs_csv_file_path, "csv", + brokerName, hdfsUser, hdfsPasswd) + + check_load_result.call(test_load_label, testTable) + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case2: import csv data from hdfs with or predicate in where + try { + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(testTable) + + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs_with_or_predicate.call(testTable, test_load_label, hdfs_csv_file_path, "csv", + brokerName, hdfsUser, hdfsPasswd) + + check_load_result.call(test_load_label, testTable) + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } +} diff --git a/regression-test/suites/load_p0/spark_load/test_spark_load.groovy b/regression-test/suites/load_p0/spark_load/test_spark_load.groovy index 6cc3df7381..c798ad1fcf 100644 --- a/regression-test/suites/load_p0/spark_load/test_spark_load.groovy +++ b/regression-test/suites/load_p0/spark_load/test_spark_load.groovy @@ -130,8 +130,8 @@ suite("test_spark_load", "p0") { // if 'enableHdfs' in regression-conf.groovy has been set to true, if (enableHdfs()) { - def hdfs_txt_file_path1 = uploadToHdfs "spark_load/all_types1.txt" - def hdfs_txt_file_path2 = uploadToHdfs "spark_load/all_types2.txt" + def hdfs_txt_file_path1 = uploadToHdfs "load_p0/spark_load/all_types1.txt" + def hdfs_txt_file_path2 = uploadToHdfs "load_p0/spark_load/all_types2.txt" try { sql "DROP TABLE IF EXISTS ${testTable}" sql "DROP TABLE IF EXISTS ${testTable2}" diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_json_load.groovy index 539eafe6ed..7ad38874fb 100644 --- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy @@ -622,7 +622,7 @@ suite("test_json_load", "p0") { brokerName =getBrokerName() hdfsUser = getHdfsUser() hdfsPasswd = getHdfsPasswd() - def hdfs_file_path = uploadToHdfs "stream_load/simple_object_json.json" + def hdfs_file_path = uploadToHdfs "load_p0/stream_load/simple_object_json.json" def format = "json" // case18: import json use pre-filter exprs --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
