Copilot commented on code in PR #61566:
URL: https://github.com/apache/doris/pull/61566#discussion_r3013157244


##########
be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h:
##########
@@ -0,0 +1,599 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <iterator>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/column/column_string.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_window_funnel.h" // for 
WindowFunnelMode, string_to_window_funnel_mode
+#include "util/var_int.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+/// Merge two event lists, utilizing sorted flags to optimize.
+/// After merge, all events are in `events_list` and it is sorted.
+template <typename T>
+void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted, 
bool suffix_sorted) {
+    if (!prefix_sorted && !suffix_sorted) {
+        std::stable_sort(std::begin(events_list), std::end(events_list));
+    } else {
+        const auto begin = std::begin(events_list);
+        const auto middle = std::next(begin, prefix_size);
+        const auto end = std::end(events_list);
+
+        if (!prefix_sorted) {
+            std::stable_sort(begin, middle);
+        }
+        if (!suffix_sorted) {
+            std::stable_sort(middle, end);
+        }
+        std::inplace_merge(begin, middle, end);
+    }
+}
+
+/// V2 state: stores only matched events as (timestamp, event_index) pairs.
+/// Compared to V1 which stores all rows with N boolean columns, V2 only stores
+/// events that actually match at least one condition, dramatically reducing 
memory.
+///
+/// To prevent same-row multi-condition chain advancement (where a single row
+/// matching multiple conditions could incorrectly advance the funnel through
+/// multiple levels), we use the high bit of event_idx as a "same-row 
continuation"
+/// flag. When a row matches multiple conditions, the first event stored for 
that
+/// row has bit 7 = 0, and subsequent events from the same row have bit 7 = 1.
+/// The algorithm uses this to ensure each funnel step comes from a different 
row.
+///
+/// This approach adds ZERO storage overhead — each event remains 9 bytes 
(UInt64 + UInt8).
+struct WindowFunnelStateV2 {
+    /// (timestamp_int_val, 1-based event_index with continuation flag in bit 
7)
+    ///
+    /// Bit layout of event_idx:
+    ///   bit 7 (0x80): continuation flag — 1 means this event is from the 
same row
+    ///                  as the preceding event in events_list
+    ///   bits 0-6:     actual 1-based event index (supports up to 127 
conditions)
+    ///
+    /// Sorted by timestamp only (via operator<). stable_sort preserves 
insertion order
+    /// for equal timestamps, so same-row events remain consecutive after 
sorting.
+    struct TimestampEvent {
+        UInt64 timestamp;
+        UInt8 event_idx; // includes continuation flag in bit 7
+
+        /// Sort by timestamp only. For same timestamp, stable_sort preserves 
insertion
+        /// order, keeping same-row events consecutive.
+        bool operator<(const TimestampEvent& other) const { return timestamp < 
other.timestamp; }
+        bool operator<=(const TimestampEvent& other) const { return timestamp 
<= other.timestamp; }
+    };
+
+    static constexpr UInt8 CONTINUATION_FLAG = 0x80;
+    static constexpr UInt8 EVENT_IDX_MASK = 0x7F;
+
+    /// Extract the actual 1-based event index (stripping continuation flag).
+    static int get_event_idx(UInt8 raw) { return (raw & EVENT_IDX_MASK); }
+    /// Check if this event is a continuation of the same row as the previous 
event.
+    static bool is_continuation(UInt8 raw) { return (raw & CONTINUATION_FLAG) 
!= 0; }
+
+    static constexpr int64_t WINDOW_UNSET = -1;
+
+    int event_count = 0;
+    int64_t window = WINDOW_UNSET;
+    WindowFunnelMode window_funnel_mode = WindowFunnelMode::INVALID;
+    bool sorted = true;
+    std::vector<TimestampEvent> events_list;
+
+    WindowFunnelStateV2() = default;
+    WindowFunnelStateV2(int arg_event_count) : event_count(arg_event_count) {}
+
+    void reset() {
+        events_list.clear();
+        sorted = true;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = mode;
+
+        // get_data() returns DateV2Value<DateTimeV2ValueType>; convert to 
packed UInt64
+        auto timestamp = assert_cast<const 
ColumnVector<TYPE_DATETIMEV2>&>(*arg_columns[2])
+                                 .get_data()[row_num]
+                                 .to_date_int_val();
+
+        // Iterate from last event to first (reverse order).
+        // This ensures that after stable_sort, events with the same timestamp
+        // appear in descending event_index order, which is important for 
correct
+        // matching when one row satisfies multiple conditions.
+        //
+        // The first event stored for this row has continuation=0;
+        // subsequent events from the same row have continuation=1 (bit 7 set).
+        bool first_match = true;
+        for (int i = event_count - 1; i >= 0; --i) {
+            auto event_val =
+                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + 
i]).get_data()[row_num];
+            if (event_val) {
+                UInt8 packed_idx = cast_set<UInt8>(i + 1);
+                if (!first_match) {
+                    packed_idx |= CONTINUATION_FLAG;
+                }
+                first_match = false;

Review Comment:
   `event_idx` packs the 1-based condition index into the low 7 bits and uses 
the high bit as a continuation flag, but `packed_idx = cast_set<UInt8>(i + 1)` 
will silently overflow/corrupt values when there are >127 funnel conditions. 
Please add a hard validation (e.g., reject/throw when `event_count > 127` or 
use a wider type) so queries with many conditions don’t return incorrect 
results without an error.



##########
be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/aggregate/aggregate_function_window_funnel_v2.h"
+
+#include <string>
+
+#include "common/logging.h"
+#include "core/data_type/data_type.h"
+#include "core/types.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "exprs/aggregate/helpers.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+AggregateFunctionPtr create_aggregate_function_window_funnel_v2(const 
std::string& name,
+                                                                const 
DataTypes& argument_types,
+                                                                const 
DataTypePtr& result_type,
+                                                                const bool 
result_is_nullable,
+                                                                const 
AggregateFunctionAttr& attr) {
+    if (argument_types.size() < 4) {
+        LOG(WARNING) << "window_funnel_v2 requires at least 4 arguments 
(window, mode, timestamp, "
+                        "and at least 1 boolean condition), but got "
+                     << argument_types.size() << ".";
+        return nullptr;
+    }
+    if (argument_types[2]->get_primitive_type() == TYPE_DATETIMEV2) {
+        return creator_without_type::create<AggregateFunctionWindowFunnelV2>(
+                argument_types, result_is_nullable, attr);
+    } else {
+        LOG(WARNING) << "Only support DateTime type as window argument!";
+        return nullptr;
+    }

Review Comment:
   The warning message is misleading: the code checks the *3rd* argument type 
(`argument_types[2]`, the timestamp) but logs "window argument". Consider 
updating the message to refer to the timestamp parameter and (ideally) include 
the actual type received to aid debugging.
   ```suggestion
           LOG(WARNING) << "Only support DateTimeV2 type as timestamp argument, 
but got "
                        << argument_types[2]->get_name() << ".";
           return nullptr;
   ```



##########
be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp:
##########
@@ -0,0 +1,1167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <stddef.h>
+
+#include <memory>
+#include <ostream>
+
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/string_buffer.hpp"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "gtest/gtest_pred_impl.h"
+
+namespace doris {
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+void 
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& 
factory);
+
+class VWindowFunnelV2Test : public testing::Test {
+public:
+    AggregateFunctionPtr agg_function;
+
+    VWindowFunnelV2Test() {}
+
+    void SetUp() {
+        AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
+        DataTypes data_types = {
+                std::make_shared<DataTypeInt64>(),      
std::make_shared<DataTypeString>(),
+                std::make_shared<DataTypeDateTimeV2>(), 
std::make_shared<DataTypeUInt8>(),
+                std::make_shared<DataTypeUInt8>(),      
std::make_shared<DataTypeUInt8>(),
+                std::make_shared<DataTypeUInt8>()};
+        agg_function = factory.get("window_funnel_v2", data_types, nullptr, 
false,
+                                   BeExecVersionManager::get_newest_version());
+        EXPECT_NE(agg_function, nullptr);
+    }
+
+    void TearDown() {}
+
+    Arena arena;
+};
+
+TEST_F(VWindowFunnelV2Test, testEmpty) {
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+
+    ColumnString buf;
+    VectorBufferWriter buf_writer(buf);
+    agg_function->serialize(place, buf_writer);
+    buf_writer.commit();
+    LOG(INFO) << "buf size : " << buf.size();
+    VectorBufferReader buf_reader(buf.get_data_at(0));
+    agg_function->deserialize(place, buf_reader, arena);
+
+    std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place2 = memory2.get();
+    agg_function->create(place2);
+
+    agg_function->merge(place, place2, arena);
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    EXPECT_EQ(column_result.get_data()[0], 0);
+
+    ColumnInt32 column_result2;
+    agg_function->insert_result_into(place2, column_result2);
+    EXPECT_EQ(column_result2.get_data()[0], 0);
+
+    agg_function->destroy(place);
+    agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testSerialize) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    auto column_window = ColumnInt64::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_window->insert(Field::create_field<TYPE_BIGINT>(2));
+    }
+
+    std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    agg_function->create(place);
+    const IColumn* column[7] = {column_window.get(), column_mode.get(),   
column_timestamp.get(),
+                                column_event1.get(), column_event2.get(), 
column_event3.get(),
+                                column_event4.get()};
+    for (int i = 0; i < NUM_CONDS; i++) {
+        agg_function->add(place, column, i, arena);
+    }
+
+    ColumnInt32 column_result;
+    agg_function->insert_result_into(place, column_result);
+    EXPECT_EQ(column_result.get_data()[0], 3);
+
+    ColumnString buf;
+    VectorBufferWriter buf_writer(buf);
+    agg_function->serialize(place, buf_writer);
+    buf_writer.commit();
+    agg_function->destroy(place);
+
+    std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+    AggregateDataPtr place2 = memory2.get();
+    agg_function->create(place2);
+
+    VectorBufferReader buf_reader(buf.get_data_at(0));
+    agg_function->deserialize(place2, buf_reader, arena);
+
+    ColumnInt32 column_result2;
+    agg_function->insert_result_into(place2, column_result2);
+    EXPECT_EQ(column_result2.get_data()[0], 3);
+    agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultSortedNoMerge) {
+    const int NUM_CONDS = 4;
+    auto column_mode = ColumnString::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+    }
+    auto column_timestamp = ColumnDateTimeV2::create();
+    for (int i = 0; i < NUM_CONDS; i++) {
+        VecDateTimeValue time_value;
+        time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+        auto dtv2 = time_value.to_datetime_v2();
+        column_timestamp->insert_data((char*)&dtv2, 0);
+    }
+    auto column_event1 = ColumnUInt8::create();
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event2 = ColumnUInt8::create();
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event3 = ColumnUInt8::create();
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+    column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+    auto column_event4 = ColumnUInt8::create();
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+    column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+    for (int win = 0; win < NUM_CONDS + 1; win++) {
+        auto column_window = ColumnInt64::create();
+        for (int i = 0; i < NUM_CONDS; i++) {
+            column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+        }
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+        const IColumn* column[7] = {column_window.get(),    column_mode.get(),
+                                    column_timestamp.get(), 
column_event1.get(),
+                                    column_event2.get(),    
column_event3.get(),
+                                    column_event4.get()};
+        for (int i = 0; i < NUM_CONDS; i++) {
+            agg_function->add(place, column, i, arena);
+        }
+
+        ColumnInt32 column_result;
+        agg_function->insert_result_into(place, column_result);
+        EXPECT_EQ(column_result.get_data()[0],
+                  win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));

Review Comment:
   This expectation contains a dead branch: `win` is iterated from 0 to 
`NUM_CONDS`, so `win < 0` is always false. Either simplify the expected 
expression or extend the loop / add a separate test if negative windows are 
intended to be covered.
   ```suggestion
                     win < NUM_CONDS ? win + 1 : NUM_CONDS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to