This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4f66e0b9472ae70f9fe3605c6c3ebd7cd7cc44c4 Author: Kang <[email protected]> AuthorDate: Sun Sep 17 10:53:07 2023 +0800 Revert "Revert "[fix](agg) windown_funnel compatibility issue with multi backends (#24385)"" This reverts commit 66e28778831cbf9bb85154ffab61b0acdeb2a527. --- be/src/common/config.cpp | 3 - be/src/common/config.h | 3 - .../aggregate_function_window_funnel.h | 26 ++++-- .../data/query_p0/aggregate/window_funnel.out | 3 + .../nereids_p0/aggregate/window_funnel.groovy | 35 ++++++-- .../suites/query_p0/aggregate/window_funnel.groovy | 100 +++++++++++++++++++-- 6 files changed, 142 insertions(+), 28 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index e13195396a..8eb278bf82 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1058,9 +1058,6 @@ DEFINE_mInt64(lookup_connection_cache_bytes_limit, "4294967296"); // level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT DEFINE_mInt64(LZ4_HC_compression_level, "9"); -// enable window_funnel_function with different modes -DEFINE_mBool(enable_window_funnel_function_v2, "false"); - DEFINE_Bool(enable_hdfs_hedged_read, "false"); DEFINE_Int32(hdfs_hedged_read_thread_num, "128"); DEFINE_Int32(hdfs_hedged_read_threshold_time, "500"); diff --git a/be/src/common/config.h b/be/src/common/config.h index ab74bf7503..dbf6734aa6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1104,9 +1104,6 @@ DECLARE_mInt64(lookup_connection_cache_bytes_limit); // level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT DECLARE_mInt64(LZ4_HC_compression_level); -// enable window_funnel_function with different modes -DECLARE_mBool(enable_window_funnel_function_v2); - // whether to enable hdfs hedged read. // If set to true, it will be enabled even if user not enable it when creating catalog DECLARE_Bool(enable_hdfs_hedged_read); diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h index a69e5d0cd6..253677bbc3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h @@ -78,6 +78,7 @@ struct WindowFunnelState { bool sorted; int64_t window; WindowFunnelMode window_funnel_mode; + bool enable_mode; WindowFunnelState() { sorted = true; @@ -97,7 +98,7 @@ struct WindowFunnelState { WindowFunnelMode mode) { window = win; max_event_level = event_num; - window_funnel_mode = mode; + window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT; if (sorted && events.size() > 0) { if (events.back().first == timestamp) { @@ -203,16 +204,20 @@ struct WindowFunnelState { std::inplace_merge(begin, middle, end); max_event_level = max_event_level > 0 ? max_event_level : other.max_event_level; window = window > 0 ? window : other.window; - window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID - ? other.window_funnel_mode - : window_funnel_mode; + if (enable_mode) { + window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID + ? other.window_funnel_mode + : window_funnel_mode; + } else { + window_funnel_mode = WindowFunnelMode::DEFAULT; + } sorted = true; } void write(BufferWritable& out) const { write_var_int(max_event_level, out); write_var_int(window, out); - if (config::enable_window_funnel_function_v2) { + if (enable_mode) { write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode), out); } @@ -232,7 +237,7 @@ struct WindowFunnelState { max_event_level = (int)event_level; read_var_int(window, in); window_funnel_mode = WindowFunnelMode::DEFAULT; - if (config::enable_window_funnel_function_v2) { + if (enable_mode) { int64_t mode; read_var_int(mode, in); window_funnel_mode = static_cast<WindowFunnelMode>(mode); @@ -262,6 +267,12 @@ public: WindowFunnelState<DateValueType, NativeType>, AggregateFunctionWindowFunnel<DateValueType, NativeType>>(argument_types_) {} + void create(AggregateDataPtr __restrict place) const override { + auto data = new (place) WindowFunnelState<DateValueType, NativeType>(); + /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version` + data->enable_mode = version >= 3; + } + String get_name() const override { return "window_funnel"; } DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); } @@ -310,6 +321,9 @@ public: AggregateFunctionWindowFunnel<DateValueType, NativeType>>::data(place) .get()); } + +protected: + using IAggregateFunction::version; }; } // namespace doris::vectorized diff --git a/regression-test/data/query_p0/aggregate/window_funnel.out b/regression-test/data/query_p0/aggregate/window_funnel.out index 3396dd90e8..fa239a9e71 100644 --- a/regression-test/data/query_p0/aggregate/window_funnel.out +++ b/regression-test/data/query_p0/aggregate/window_funnel.out @@ -11,6 +11,9 @@ -- !window_funnel -- 2 +-- !window_funnel_deduplication_compat -- +4 + -- !window_funnel_deduplication -- 2 diff --git a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy index 6ad68bd2a3..02562c49f4 100644 --- a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy +++ b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy @@ -19,6 +19,8 @@ // /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate // and modified by Doris. +import org.codehaus.groovy.runtime.IOGroovyMethods + suite("window_funnel") { sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" @@ -105,14 +107,31 @@ suite("window_funnel") { """ sql """ DROP TABLE IF EXISTS ${tableName} """ - String backend_id; - def backendId_to_backendIP = [:] - def backendId_to_backendHttpPort = [:] - getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); - for (String backendId in backendId_to_backendIP.keySet()) { - String be_host = backendId_to_backendIP[backendId] - String be_http_port = backendId_to_backendHttpPort[backendId] - curl("POST", "http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true") + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe?conf_item=be_exec_version") + + String command = strBuilder.toString() + def process = command.toString().execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def response = parseJson(out.trim()) + assertEquals(response.code, 0) + assertEquals(response.msg, "success") + def configJson = response.data.rows + def beExecVersion = 0 + for (Object conf: configJson) { + assert conf instanceof Map + if (((Map<String, String>) conf).get("Name").toLowerCase() == "be_exec_version") { + beExecVersion = ((Map<String, String>) conf).get("Value").toInteger() + } + } + if (beExecVersion < 3) { + logger.warn("Be exec version(${beExecVersion}) is less than 3, skip window_funnel mode test") + return } sql """ DROP TABLE IF EXISTS ${tableName} """ diff --git a/regression-test/suites/query_p0/aggregate/window_funnel.groovy b/regression-test/suites/query_p0/aggregate/window_funnel.groovy index 1fc4cf555b..edefcbc939 100644 --- a/regression-test/suites/query_p0/aggregate/window_funnel.groovy +++ b/regression-test/suites/query_p0/aggregate/window_funnel.groovy @@ -19,6 +19,8 @@ // /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate // and modified by Doris. +import org.codehaus.groovy.runtime.IOGroovyMethods + suite("window_funnel") { sql "SET enable_nereids_planner=false" def tableName = "windowfunnel_test" @@ -104,14 +106,96 @@ suite("window_funnel") { """ sql """ DROP TABLE IF EXISTS ${tableName} """ - String backend_id; - def backendId_to_backendIP = [:] - def backendId_to_backendHttpPort = [:] - getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); - for (String backendId in backendId_to_backendIP.keySet()) { - String be_host = backendId_to_backendIP[backendId] - String be_http_port = backendId_to_backendHttpPort[backendId] - curl("POST", "http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true") + StringBuilder strBuilder = new StringBuilder() + strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword) + strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe?conf_item=be_exec_version") + + def command = strBuilder.toString() + def process = command.toString().execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def response = parseJson(out.trim()) + assertEquals(response.code, 0) + assertEquals(response.msg, "success") + def configJson = response.data.rows + def beExecVersion = 0 + for (Object conf: configJson) { + assert conf instanceof Map + if (((Map<String, String>) conf).get("Name").toLowerCase() == "be_exec_version") { + beExecVersion = ((Map<String, String>) conf).get("Value").toInteger() + } + } + if (beExecVersion < 3) { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + xwho varchar(50) NULL COMMENT 'xwho', + xwhen datetimev2(3) COMMENT 'xwhen', + xwhat int NULL COMMENT 'xwhat' + ) + DUPLICATE KEY(xwho) + DISTRIBUTED BY HASH(xwho) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:41:00.111111', 1)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02.111111', 2)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:03.111111', 2)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 14:15:01.111111', 3)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 15:05:04.111111', 4)" + qt_window_funnel_deduplication_compat """ + select + window_funnel( + 20000, + 'deduplication', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2, + t.xwhat = 3, + t.xwhat = 4 + ) AS level + from ${tableName} t; + """ + sql """ DROP TABLE IF EXISTS ${tableName} """ + logger.warn("Be exec version(${beExecVersion}) is less than 3, skip window_funnel mode test") + return + } else { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + xwho varchar(50) NULL COMMENT 'xwho', + xwhen datetimev2(3) COMMENT 'xwhen', + xwhat int NULL COMMENT 'xwhat' + ) + DUPLICATE KEY(xwho) + DISTRIBUTED BY HASH(xwho) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:41:00.111111', 1)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:02.111111', 2)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:28:03.111111', 2)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 14:15:01.111111', 3)" + sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 15:05:04.111111', 4)" + qt_window_funnel_deduplication_compat """ + select + window_funnel( + 20000, + 'default', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2, + t.xwhat = 3, + t.xwhat = 4 + ) AS level + from ${tableName} t; + """ + sql """ DROP TABLE IF EXISTS ${tableName} """ } sql """ DROP TABLE IF EXISTS ${tableName} """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
