This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4f66e0b9472ae70f9fe3605c6c3ebd7cd7cc44c4
Author: Kang <[email protected]>
AuthorDate: Sun Sep 17 10:53:07 2023 +0800

    Revert "Revert "[fix](agg) windown_funnel compatibility issue with multi 
backends (#24385)""
    
    This reverts commit 66e28778831cbf9bb85154ffab61b0acdeb2a527.
---
 be/src/common/config.cpp                           |   3 -
 be/src/common/config.h                             |   3 -
 .../aggregate_function_window_funnel.h             |  26 ++++--
 .../data/query_p0/aggregate/window_funnel.out      |   3 +
 .../nereids_p0/aggregate/window_funnel.groovy      |  35 ++++++--
 .../suites/query_p0/aggregate/window_funnel.groovy | 100 +++++++++++++++++++--
 6 files changed, 142 insertions(+), 28 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e13195396a..8eb278bf82 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1058,9 +1058,6 @@ DEFINE_mInt64(lookup_connection_cache_bytes_limit, 
"4294967296");
 // level of compression when using LZ4_HC, whose defalut value is 
LZ4HC_CLEVEL_DEFAULT
 DEFINE_mInt64(LZ4_HC_compression_level, "9");
 
-// enable window_funnel_function with different modes
-DEFINE_mBool(enable_window_funnel_function_v2, "false");
-
 DEFINE_Bool(enable_hdfs_hedged_read, "false");
 DEFINE_Int32(hdfs_hedged_read_thread_num, "128");
 DEFINE_Int32(hdfs_hedged_read_threshold_time, "500");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ab74bf7503..dbf6734aa6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1104,9 +1104,6 @@ DECLARE_mInt64(lookup_connection_cache_bytes_limit);
 // level of compression when using LZ4_HC, whose defalut value is 
LZ4HC_CLEVEL_DEFAULT
 DECLARE_mInt64(LZ4_HC_compression_level);
 
-// enable window_funnel_function with different modes
-DECLARE_mBool(enable_window_funnel_function_v2);
-
 // whether to enable hdfs hedged read.
 // If set to true, it will be enabled even if user not enable it when creating 
catalog
 DECLARE_Bool(enable_hdfs_hedged_read);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h 
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
index a69e5d0cd6..253677bbc3 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
@@ -78,6 +78,7 @@ struct WindowFunnelState {
     bool sorted;
     int64_t window;
     WindowFunnelMode window_funnel_mode;
+    bool enable_mode;
 
     WindowFunnelState() {
         sorted = true;
@@ -97,7 +98,7 @@ struct WindowFunnelState {
              WindowFunnelMode mode) {
         window = win;
         max_event_level = event_num;
-        window_funnel_mode = mode;
+        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
 
         if (sorted && events.size() > 0) {
             if (events.back().first == timestamp) {
@@ -203,16 +204,20 @@ struct WindowFunnelState {
         std::inplace_merge(begin, middle, end);
         max_event_level = max_event_level > 0 ? max_event_level : 
other.max_event_level;
         window = window > 0 ? window : other.window;
-        window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
-                                     ? other.window_funnel_mode
-                                     : window_funnel_mode;
+        if (enable_mode) {
+            window_funnel_mode = window_funnel_mode == 
WindowFunnelMode::INVALID
+                                         ? other.window_funnel_mode
+                                         : window_funnel_mode;
+        } else {
+            window_funnel_mode = WindowFunnelMode::DEFAULT;
+        }
         sorted = true;
     }
 
     void write(BufferWritable& out) const {
         write_var_int(max_event_level, out);
         write_var_int(window, out);
-        if (config::enable_window_funnel_function_v2) {
+        if (enable_mode) {
             
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
                           out);
         }
@@ -232,7 +237,7 @@ struct WindowFunnelState {
         max_event_level = (int)event_level;
         read_var_int(window, in);
         window_funnel_mode = WindowFunnelMode::DEFAULT;
-        if (config::enable_window_funnel_function_v2) {
+        if (enable_mode) {
             int64_t mode;
             read_var_int(mode, in);
             window_funnel_mode = static_cast<WindowFunnelMode>(mode);
@@ -262,6 +267,12 @@ public:
                       WindowFunnelState<DateValueType, NativeType>,
                       AggregateFunctionWindowFunnel<DateValueType, 
NativeType>>(argument_types_) {}
 
+    void create(AggregateDataPtr __restrict place) const override {
+        auto data = new (place) WindowFunnelState<DateValueType, NativeType>();
+        /// support window funnel mode from 2.0. See 
`BeExecVersionManager::max_be_exec_version`
+        data->enable_mode = version >= 3;
+    }
+
     String get_name() const override { return "window_funnel"; }
 
     DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeInt32>(); }
@@ -310,6 +321,9 @@ public:
                         AggregateFunctionWindowFunnel<DateValueType, 
NativeType>>::data(place)
                         .get());
     }
+
+protected:
+    using IAggregateFunction::version;
 };
 
 } // namespace doris::vectorized
diff --git a/regression-test/data/query_p0/aggregate/window_funnel.out 
b/regression-test/data/query_p0/aggregate/window_funnel.out
index 3396dd90e8..fa239a9e71 100644
--- a/regression-test/data/query_p0/aggregate/window_funnel.out
+++ b/regression-test/data/query_p0/aggregate/window_funnel.out
@@ -11,6 +11,9 @@
 -- !window_funnel --
 2
 
+-- !window_funnel_deduplication_compat --
+4
+
 -- !window_funnel_deduplication --
 2
 
diff --git a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy 
b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy
index 6ad68bd2a3..02562c49f4 100644
--- a/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy
+++ b/regression-test/suites/nereids_p0/aggregate/window_funnel.groovy
@@ -19,6 +19,8 @@
 // 
/testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
 // and modified by Doris.
 
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
 suite("window_funnel") {
     sql "SET enable_nereids_planner=true"
     sql "SET enable_fallback_to_original_planner=false"
@@ -105,14 +107,31 @@ suite("window_funnel") {
     """
     sql """ DROP TABLE IF EXISTS ${tableName} """
 
-    String backend_id;
-    def backendId_to_backendIP = [:]
-    def backendId_to_backendHttpPort = [:]
-    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
-    for (String backendId in backendId_to_backendIP.keySet()) {
-        String be_host = backendId_to_backendIP[backendId]
-        String be_http_port = backendId_to_backendHttpPort[backendId]
-        curl("POST", 
"http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true";)
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe?conf_item=be_exec_version")
+
+    String command = strBuilder.toString()
+    def process = command.toString().execute()
+    def code = process.waitFor()
+    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+    def out = process.getText()
+    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    assertEquals(code, 0)
+    def response = parseJson(out.trim())
+    assertEquals(response.code, 0)
+    assertEquals(response.msg, "success")
+    def configJson = response.data.rows
+    def beExecVersion = 0
+    for (Object conf: configJson) {
+        assert conf instanceof Map
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"be_exec_version") {
+            beExecVersion = ((Map<String, String>) 
conf).get("Value").toInteger()
+        }
+    }
+    if (beExecVersion < 3) {
+        logger.warn("Be exec version(${beExecVersion}) is less than 3, skip 
window_funnel mode test")
+        return
     }
 
     sql """ DROP TABLE IF EXISTS ${tableName} """
diff --git a/regression-test/suites/query_p0/aggregate/window_funnel.groovy 
b/regression-test/suites/query_p0/aggregate/window_funnel.groovy
index 1fc4cf555b..edefcbc939 100644
--- a/regression-test/suites/query_p0/aggregate/window_funnel.groovy
+++ b/regression-test/suites/query_p0/aggregate/window_funnel.groovy
@@ -19,6 +19,8 @@
 // 
/testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
 // and modified by Doris.
 
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
 suite("window_funnel") {
     sql "SET enable_nereids_planner=false"
     def tableName = "windowfunnel_test"
@@ -104,14 +106,96 @@ suite("window_funnel") {
     """
     sql """ DROP TABLE IF EXISTS ${tableName} """
 
-    String backend_id;
-    def backendId_to_backendIP = [:]
-    def backendId_to_backendHttpPort = [:]
-    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
-    for (String backendId in backendId_to_backendIP.keySet()) {
-        String be_host = backendId_to_backendIP[backendId]
-        String be_http_port = backendId_to_backendHttpPort[backendId]
-        curl("POST", 
"http://${be_host}:${be_http_port}/api/update_config?enable_window_funnel_function_v2=true";)
+    StringBuilder strBuilder = new StringBuilder()
+    strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser 
+ ":" + context.config.jdbcPassword)
+    strBuilder.append(" http://"; + context.config.feHttpAddress + 
"/rest/v1/config/fe?conf_item=be_exec_version")
+
+    def command = strBuilder.toString()
+    def process = command.toString().execute()
+    def code = process.waitFor()
+    def err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+    def out = process.getText()
+    logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    assertEquals(code, 0)
+    def response = parseJson(out.trim())
+    assertEquals(response.code, 0)
+    assertEquals(response.msg, "success")
+    def configJson = response.data.rows
+    def beExecVersion = 0
+    for (Object conf: configJson) {
+        assert conf instanceof Map
+        if (((Map<String, String>) conf).get("Name").toLowerCase() == 
"be_exec_version") {
+            beExecVersion = ((Map<String, String>) 
conf).get("Value").toInteger()
+        }
+    }
+    if (beExecVersion < 3) {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                xwho varchar(50) NULL COMMENT 'xwho',
+                xwhen datetimev2(3) COMMENT 'xwhen',
+                xwhat int NULL COMMENT 'xwhat'
+            )
+            DUPLICATE KEY(xwho)
+            DISTRIBUTED BY HASH(xwho) BUCKETS 3
+            PROPERTIES (
+            "replication_num" = "1"
+            );
+        """
+        sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:03.111111', 2)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 14:15:01.111111', 3)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 15:05:04.111111', 4)"
+        qt_window_funnel_deduplication_compat """
+            select
+                window_funnel(
+                    20000,
+                    'deduplication',
+                    t.xwhen,
+                    t.xwhat = 1,
+                    t.xwhat = 2,
+                    t.xwhat = 3,
+                    t.xwhat = 4
+                    ) AS level
+            from ${tableName} t;
+        """
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        logger.warn("Be exec version(${beExecVersion}) is less than 3, skip 
window_funnel mode test")
+        return
+    } else {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                xwho varchar(50) NULL COMMENT 'xwho',
+                xwhen datetimev2(3) COMMENT 'xwhen',
+                xwhat int NULL COMMENT 'xwhat'
+            )
+            DUPLICATE KEY(xwho)
+            DISTRIBUTED BY HASH(xwho) BUCKETS 3
+            PROPERTIES (
+            "replication_num" = "1"
+            );
+        """
+        sql "INSERT into ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 10:41:00.111111', 1)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:02.111111', 2)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 13:28:03.111111', 2)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 14:15:01.111111', 3)"
+        sql "INSERT INTO ${tableName} (xwho, xwhen, xwhat) VALUES('1', 
'2022-03-12 15:05:04.111111', 4)"
+        qt_window_funnel_deduplication_compat """
+            select
+                window_funnel(
+                    20000,
+                    'default',
+                    t.xwhen,
+                    t.xwhat = 1,
+                    t.xwhat = 2,
+                    t.xwhat = 3,
+                    t.xwhat = 4
+                    ) AS level
+            from ${tableName} t;
+        """
+        sql """ DROP TABLE IF EXISTS ${tableName} """
     }
 
     sql """ DROP TABLE IF EXISTS ${tableName} """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to