This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b935b738822 [fix](window_funnel) fix wrong result of fixed mode 
(#40459) (#40960)
b935b738822 is described below

commit b935b73882212c812415600245295209d32df402
Author: TengJianPing <[email protected]>
AuthorDate: Thu Sep 19 10:56:08 2024 +0800

    [fix](window_funnel) fix wrong result of fixed mode (#40459) (#40960)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    BP #40146 and #40459
---
 .../aggregate_function_window_funnel.h             | 121 +++++++++------------
 1 file changed, 49 insertions(+), 72 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h 
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
index 5a31b88a69b..2afa82e9a6d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
@@ -83,14 +83,11 @@ struct WindowFunnelState {
     int64_t window;
     bool enable_mode;
     WindowFunnelMode window_funnel_mode;
-    mutable MutableColumnPtr timestamp_column;
-    mutable MutableColumns event_columns;
+    mutable vectorized::MutableBlock mutable_block;
     ColumnVector<NativeType>::Container* timestamp_column_data;
     std::vector<ColumnVector<UInt8>::Container*> event_columns_datas;
-    Block block;
     SortDescription sort_description {1};
     bool sorted;
-    bool is_merge;
 
     WindowFunnelState() {
         event_count = 0;
@@ -101,30 +98,38 @@ struct WindowFunnelState {
         sort_description[0].direction = 1;
         sort_description[0].nulls_direction = -1;
         sorted = false;
-        is_merge = false;
     }
     WindowFunnelState(int arg_event_count) : WindowFunnelState() {
-        timestamp_column = ColumnVector<NativeType>::create();
+        event_count = arg_event_count;
+        auto timestamp_column = ColumnVector<NativeType>::create();
         timestamp_column_data =
                 
&assert_cast<ColumnVector<NativeType>&>(*timestamp_column).get_data();
-        event_count = arg_event_count;
-        event_columns.resize(event_count);
+
+        MutableColumns event_columns;
         for (int i = 0; i < event_count; i++) {
-            event_columns[i] = ColumnVector<UInt8>::create();
+            auto event_column = ColumnVector<UInt8>::create();
             event_columns_datas.emplace_back(
-                    
&assert_cast<ColumnVector<UInt8>&>(*event_columns[i]).get_data());
+                    
&assert_cast<ColumnVector<UInt8>&>(*event_column).get_data());
+            event_columns.emplace_back(std::move(event_column));
         }
+        Block tmp_block;
+        tmp_block.insert({std::move(timestamp_column),
+                          
DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"});
+        for (int i = 0; i < event_count; i++) {
+            tmp_block.insert({std::move(event_columns[i]),
+                              
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
+                              "event_" + std::to_string(i)});
+        }
+
+        mutable_block = MutableBlock(std::move(tmp_block));
     }
 
     void reset() {
         window = 0;
-        timestamp_column->clear();
-        for (auto& column : event_columns) {
-            column->clear();
-        }
-        block.clear_column_data();
+        mutable_block.clear();
+        timestamp_column_data = nullptr;
+        event_columns_datas.clear();
         sorted = false;
-        is_merge = false;
     }
 
     void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
@@ -144,24 +149,11 @@ struct WindowFunnelState {
         if (sorted) {
             return;
         }
-        if (!is_merge) {
-            Block tmp_block;
-            tmp_block.insert({std::move(timestamp_column),
-                              
DataTypeFactory::instance().create_data_type(TYPE_INDEX),
-                              "timestamp"});
-            for (int i = 0; i < event_count; i++) {
-                tmp_block.insert({std::move(event_columns[i]),
-                                  
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
-                                  "event_" + std::to_string(i)});
-            }
 
-            block = tmp_block.clone_without_columns();
-            sort_block(tmp_block, block, sort_description, 0);
-        } else {
-            auto tmp_block = block.clone_without_columns();
-            sort_block(block, tmp_block, sort_description, 0);
-            block = std::move(tmp_block);
-        }
+        Block tmp_block = mutable_block.to_block();
+        auto block = tmp_block.clone_without_columns();
+        sort_block(tmp_block, block, sort_description, 0);
+        mutable_block = MutableBlock(std::move(block));
         sorted = true;
     }
 
@@ -174,9 +166,9 @@ struct WindowFunnelState {
         TimeInterval interval(SECOND, window, false);
 
         int column_idx = 1;
-        const auto& first_event_column = block.get_by_position(column_idx);
+        const auto& first_event_column = 
mutable_block.get_column_by_position(column_idx);
         const auto& first_event_data =
-                assert_cast<const 
ColumnVector<UInt8>&>(*first_event_column.column).get_data();
+                assert_cast<const 
ColumnVector<UInt8>&>(*first_event_column).get_data();
         auto match_row = simd::find_one(first_event_data.data(), start_row, 
row_count);
         start_row = match_row + 1;
         if (match_row < row_count) {
@@ -188,12 +180,13 @@ struct WindowFunnelState {
 
             column_idx++;
             auto last_match_row = match_row;
-            for (; column_idx < event_count + 1; column_idx++) {
-                const auto& event_column = block.get_by_position(column_idx);
+            ++match_row;
+            for (; column_idx < event_count + 1 && match_row < row_count;
+                 column_idx++, match_row++) {
+                const auto& event_column = 
mutable_block.get_column_by_position(column_idx);
                 const auto& event_data =
-                        assert_cast<const 
ColumnVector<UInt8>&>(*event_column.column).get_data();
+                        assert_cast<const 
ColumnVector<UInt8>&>(*event_column).get_data();
                 if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
-                    ++match_row;
                     if (event_data[match_row] == 1) {
                         auto current_timestamp =
                                 binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
@@ -204,7 +197,7 @@ struct WindowFunnelState {
                     }
                     break;
                 }
-                match_row = simd::find_one(event_data.data(), match_row + 1, 
row_count);
+                match_row = simd::find_one(event_data.data(), match_row, 
row_count);
                 if (match_row < row_count) {
                     auto current_timestamp =
                             binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
@@ -227,10 +220,9 @@ struct WindowFunnelState {
                             for (int tmp_column_idx = 1; tmp_column_idx < 
column_idx;
                                  tmp_column_idx++) {
                                 const auto& tmp_event_column =
-                                        block.get_by_position(tmp_column_idx);
+                                        
mutable_block.get_column_by_position(tmp_column_idx);
                                 const auto& tmp_event_data =
-                                        assert_cast<const 
ColumnVector<UInt8>&>(
-                                                *tmp_event_column.column)
+                                        assert_cast<const 
ColumnVector<UInt8>&>(*tmp_event_column)
                                                 .get_data();
                                 auto dup_match_row = 
simd::find_one(tmp_event_data.data(),
                                                                     
last_match_row + 1, match_row);
@@ -258,11 +250,11 @@ struct WindowFunnelState {
     int _get_internal() const {
         size_t start_row = 0;
         int max_found_event_count = 0;
-        const auto& ts_column = block.get_by_position(0).column->get_ptr();
+        const auto& ts_column = 
mutable_block.get_column_by_position(0)->get_ptr();
         const auto& timestamp_data =
                 assert_cast<const 
ColumnVector<NativeType>&>(*ts_column).get_data().data();
 
-        auto row_count = block.rows();
+        auto row_count = mutable_block.rows();
         while (start_row < row_count) {
             auto found_event_count =
                     _match_event_list<WINDOW_FUNNEL_MODE>(start_row, 
row_count, timestamp_data);
@@ -274,7 +266,7 @@ struct WindowFunnelState {
         return max_found_event_count;
     }
     int get() const {
-        auto row_count = block.rows();
+        auto row_count = mutable_block.rows();
         if (event_count == 0 || row_count == 0) {
             return 0;
         }
@@ -294,16 +286,13 @@ struct WindowFunnelState {
     }
 
     void merge(const WindowFunnelState& other) {
-        is_merge = true;
-        MutableBlock mutable_block(&block);
-        if (!other.block.empty()) {
-            auto st = mutable_block.merge(other.block);
+        if (!other.mutable_block.empty()) {
+            auto st = mutable_block.merge(other.mutable_block.to_block());
             if (!st.ok()) {
                 throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
st.to_string());
                 return;
             }
         }
-        block = mutable_block.to_block();
 
         event_count = event_count > 0 ? event_count : other.event_count;
         window = window > 0 ? window : other.window;
@@ -328,27 +317,13 @@ struct WindowFunnelState {
         size_t compressed_bytes = 0;
         Status status;
         std::string buff;
-        if (is_merge) {
-            // as the branch-2.1 is used the new impl of window funnel, and 
the be_exec_version is 5
-            // but in branch-3.0 this be_exec_version have update to 7, so 
when upgrade from branch-2.1 to branch-3.0
-            // maybe have error send the branch-3.0 version of version 7 to 
branch-2.1([0---version--5])
-            status = block.serialize(
-                    5, &pblock, &uncompressed_bytes, &compressed_bytes,
-                    segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
-        } else {
-            Block tmp_block;
-            tmp_block.insert({std::move(timestamp_column),
-                              
DataTypeFactory::instance().create_data_type(TYPE_INDEX),
-                              "timestamp"});
-            for (int i = 0; i < event_count; i++) {
-                tmp_block.insert({std::move(event_columns[i]),
-                                  
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
-                                  "event_" + std::to_string(i)});
-            }
-            status = tmp_block.serialize(
-                    5, &pblock, &uncompressed_bytes, &compressed_bytes,
-                    segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
-        }
+        Block block = mutable_block.to_block();
+        // as the branch-2.1 is used the new impl of window funnel, and the 
be_exec_version is 5
+        // but in branch-3.0 this be_exec_version have update to 7, so when 
upgrade from branch-2.1 to branch-3.0
+        // maybe have error send the branch-3.0 version of version 7 to 
branch-2.1([0---version--5])
+        status = block.serialize(
+                5, &pblock, &uncompressed_bytes, &compressed_bytes,
+                segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
         if (!status.ok()) {
             throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
status.to_string());
             return;
@@ -385,10 +360,12 @@ struct WindowFunnelState {
             throw doris::Exception(ErrorCode::INTERNAL_ERROR,
                                    "Failed to parse window_funnel data to 
block");
         }
+        Block block;
         auto status = block.deserialize(pblock);
         if (!status.ok()) {
             throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
status.to_string());
         }
+        mutable_block = MutableBlock(std::move(block));
     }
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to