github-actions[bot] commented on code in PR #61566:
URL: https://github.com/apache/doris/pull/61566#discussion_r2973447249


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java:
##########
@@ -55,6 +55,7 @@ public class AggStateType extends DataType {
             .put("var_samp", "variance_samp")
             .put("hist", "histogram")
             .put("map_agg", "map_agg_v2")
+            .put("window_funnel", "window_funnel_v2")
             .build();

Review Comment:
   **Compatibility concern:** This alias means that existing `agg_state` 
columns created with `window_funnel` will now be interpreted as 
`window_funnel_v2`. Since V1 and V2 have **different serialization formats** 
(V1 stores raw events, V2 stores matched `(timestamp, event_index)` pairs), 
deserializing old V1 `agg_state` data with the V2 deserializer will produce 
incorrect results or errors.
   
   This should either be guarded by a version check or the alias should not be 
added until a migration path is provided.



##########
be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h:
##########
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <iterator>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/column/column_string.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_window_funnel.h" // for 
WindowFunnelMode, string_to_window_funnel_mode
+#include "util/var_int.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+/// Merge two event lists, utilizing sorted flags to optimize.
+/// After merge, all events are in `events_list` and it is sorted.
+template <typename T>
+void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted, 
bool suffix_sorted) {
+    if (!prefix_sorted && !suffix_sorted) {
+        std::stable_sort(std::begin(events_list), std::end(events_list));
+    } else {
+        const auto begin = std::begin(events_list);
+        const auto middle = std::next(begin, prefix_size);
+        const auto end = std::end(events_list);
+
+        if (!prefix_sorted) {
+            std::stable_sort(begin, middle);
+        }
+        if (!suffix_sorted) {
+            std::stable_sort(middle, end);
+        }
+        std::inplace_merge(begin, middle, end);
+    }
+}
+
+/// V2 state: stores only matched events as (timestamp, event_index) pairs.
+/// Compared to V1 which stores all rows with N boolean columns, V2 only stores
+/// events that actually match at least one condition, dramatically reducing 
memory.
+///
+/// To prevent same-row multi-condition chain advancement (where a single row
+/// matching multiple conditions could incorrectly advance the funnel through
+/// multiple levels), we use the high bit of event_idx as a "same-row 
continuation"
+/// flag. When a row matches multiple conditions, the first event stored for 
that
+/// row has bit 7 = 0, and subsequent events from the same row have bit 7 = 1.
+/// The algorithm uses this to ensure each funnel step comes from a different 
row.
+///
+/// This approach adds ZERO storage overhead — each event remains 9 bytes 
(UInt64 + UInt8).
+struct WindowFunnelStateV2 {
+    /// (timestamp_int_val, 1-based event_index with continuation flag in bit 
7)
+    ///
+    /// Bit layout of event_idx:
+    ///   bit 7 (0x80): continuation flag — 1 means this event is from the 
same row
+    ///                  as the preceding event in events_list
+    ///   bits 0-6:     actual 1-based event index (supports up to 127 
conditions)
+    ///
+    /// Sorted by timestamp only (via operator<). stable_sort preserves 
insertion order
+    /// for equal timestamps, so same-row events remain consecutive after 
sorting.
+    struct TimestampEvent {
+        UInt64 timestamp;
+        UInt8 event_idx; // includes continuation flag in bit 7
+
+        /// Sort by timestamp only. For same timestamp, stable_sort preserves 
insertion
+        /// order, keeping same-row events consecutive.
+        bool operator<(const TimestampEvent& other) const { return timestamp < 
other.timestamp; }
+        bool operator<=(const TimestampEvent& other) const { return timestamp 
<= other.timestamp; }
+    };
+
+    static constexpr UInt8 CONTINUATION_FLAG = 0x80;
+    static constexpr UInt8 EVENT_IDX_MASK = 0x7F;
+
+    /// Extract the actual 1-based event index (stripping continuation flag).
+    static int get_event_idx(UInt8 raw) { return (raw & EVENT_IDX_MASK); }
+    /// Check if this event is a continuation of the same row as the previous 
event.
+    static bool is_continuation(UInt8 raw) { return (raw & CONTINUATION_FLAG) 
!= 0; }
+
+    static constexpr int64_t WINDOW_UNSET = -1;
+
+    int event_count = 0;
+    int64_t window = WINDOW_UNSET;
+    WindowFunnelMode window_funnel_mode = WindowFunnelMode::INVALID;
+    bool sorted = true;
+    std::vector<TimestampEvent> events_list;
+
+    WindowFunnelStateV2() = default;
+    WindowFunnelStateV2(int arg_event_count) : event_count(arg_event_count) {}
+
+    void reset() {
+        events_list.clear();
+        sorted = true;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = mode;
+
+        // get_data() returns DateV2Value<DateTimeV2ValueType>; convert to 
packed UInt64
+        auto timestamp = assert_cast<const 
ColumnVector<TYPE_DATETIMEV2>&>(*arg_columns[2])
+                                 .get_data()[row_num]
+                                 .to_date_int_val();
+
+        // Iterate from last event to first (reverse order).
+        // This ensures that after stable_sort, events with the same timestamp
+        // appear in descending event_index order, which is important for 
correct
+        // matching when one row satisfies multiple conditions.
+        //
+        // The first event stored for this row has continuation=0;
+        // subsequent events from the same row have continuation=1 (bit 7 set).
+        bool first_match = true;
+        for (int i = event_count - 1; i >= 0; --i) {
+            auto event_val =
+                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + 
i]).get_data()[row_num];
+            if (event_val) {
+                UInt8 packed_idx = cast_set<UInt8>(i + 1);
+                if (!first_match) {
+                    packed_idx |= CONTINUATION_FLAG;
+                }
+                first_match = false;
+                TimestampEvent new_event {timestamp, packed_idx};
+                if (sorted && !events_list.empty()) {
+                    sorted = events_list.back() <= new_event;
+                }
+                events_list.emplace_back(new_event);
+            }
+        }
+    }
+
+    void sort() {
+        if (!sorted) {
+            std::stable_sort(std::begin(events_list), std::end(events_list));
+            sorted = true;
+        }
+    }
+
+    void merge(const WindowFunnelStateV2& other) {
+        if (other.events_list.empty()) {
+            return;
+        }
+
+        if (events_list.empty()) {
+            events_list = other.events_list;
+            sorted = other.sorted;
+        } else {
+            const auto prefix_size = events_list.size();
+            events_list.insert(std::end(events_list), 
std::begin(other.events_list),
+                               std::end(other.events_list));
+            // Both stable_sort and inplace_merge preserve relative order of 
equal elements.
+            // Since same-row events have the same timestamp (and thus compare 
equal in
+            // the primary sort key), they remain consecutive after merge — 
preserving
+            // the validity of continuation flags.
+            merge_events_list(events_list, prefix_size, sorted, other.sorted);
+            sorted = true;
+        }
+
+        event_count = event_count > 0 ? event_count : other.event_count;
+        window = window != WINDOW_UNSET ? window : other.window;
+        window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
+                                     ? other.window_funnel_mode
+                                     : window_funnel_mode;
+    }
+
+    void write(BufferWritable& out) const {
+        write_var_int(event_count, out);
+        write_var_int(window, out);
+        
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
+                      out);
+        write_var_int(sorted ? 1 : 0, out);
+        write_var_int(cast_set<Int64>(events_list.size()), out);
+        for (const auto& evt : events_list) {
+            // Use fixed-size binary write for timestamp (8 bytes) and 
event_idx (1 byte).
+            // The event_idx byte includes the continuation flag in bit 7.
+            out.write(reinterpret_cast<const char*>(&evt.timestamp), 
sizeof(evt.timestamp));
+            out.write(reinterpret_cast<const char*>(&evt.event_idx), 
sizeof(evt.event_idx));
+        }
+    }
+
+    void read(BufferReadable& in) {
+        Int64 tmp = 0;
+        read_var_int(tmp, in);
+        event_count = cast_set<int>(tmp);
+
+        read_var_int(window, in);
+
+        read_var_int(tmp, in);
+        window_funnel_mode = static_cast<WindowFunnelMode>(tmp);
+
+        read_var_int(tmp, in);
+        sorted = (tmp != 0);
+
+        Int64 size = 0;
+        read_var_int(size, in);
+        events_list.clear();
+        events_list.resize(size);
+        for (Int64 i = 0; i < size; ++i) {
+            in.read(reinterpret_cast<char*>(&events_list[i].timestamp),
+                    sizeof(events_list[i].timestamp));
+            in.read(reinterpret_cast<char*>(&events_list[i].event_idx),
+                    sizeof(events_list[i].event_idx));
+        }
+    }
+
+    using DateValueType = DateV2Value<DateTimeV2ValueType>;
+
+    /// Reconstruct DateV2Value from packed UInt64.
+    static DateValueType _ts_from_int(UInt64 packed) { return 
DateValueType(packed); }
+
+    /// Check if `current_ts` is within `window` seconds of `base_ts`.
+    /// Both are packed UInt64 from DateV2Value::to_date_int_val().
+    bool _within_window(UInt64 base_ts, UInt64 current_ts) const {
+        DateValueType end_ts = _ts_from_int(base_ts);
+        TimeInterval interval(SECOND, window, false);
+        end_ts.template date_add_interval<SECOND>(interval);
+        return current_ts <= end_ts.to_date_int_val();
+    }
+
+    /// Track (first_timestamp, last_timestamp, last_event_list_idx) for each 
event level.
+    /// Uses packed UInt64 values; 0 means unset for first_ts.
+    /// last_list_idx tracks the position in events_list of the event that set 
this level,
+    /// used to check continuation flag on subsequent events to detect 
same-row advancement.
+    struct TimestampPair {
+        UInt64 first_ts = 0;
+        UInt64 last_ts = 0;
+        size_t last_list_idx = 0;
+        bool has_value() const { return first_ts != 0; }
+        void reset() {
+            first_ts = 0;
+            last_ts = 0;
+            last_list_idx = 0;
+        }
+    };
+
+    int get() const {
+        if (event_count == 0 || events_list.empty()) {
+            return 0;
+        }
+        if (window < 0) {
+            throw Exception(ErrorCode::INVALID_ARGUMENT,
+                            "the sliding time window must be a positive 
integer, but got: {}",
+                            window);
+        }
+
+        switch (window_funnel_mode) {
+        case WindowFunnelMode::DEFAULT:
+            return _get_default(false);
+        case WindowFunnelMode::INCREASE:
+            return _get_default(true);
+        case WindowFunnelMode::DEDUPLICATION:
+            return _get_deduplication();
+        case WindowFunnelMode::FIXED:
+            return _get_fixed();
+        default:
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel 
mode");
+        }
+    }
+
+private:
+    /// DEFAULT and INCREASE mode: O(N) single-pass algorithm.
+    /// Uses events_timestamp array to track the (first, last) timestamps for 
each level.
+    /// For each event in sorted order:
+    ///   - If it's event 0, start a new potential chain
+    ///   - If its predecessor level has been matched, within time window, AND 
from a
+    ///     different row (checked via continuation flag), extend the chain
+    int _get_default(bool increase_mode) const {
+        std::vector<TimestampPair> events_timestamp(event_count);
+
+        for (size_t i = 0; i < events_list.size(); ++i) {
+            const auto& evt = events_list[i];
+            int event_idx = get_event_idx(evt.event_idx) - 1;
+
+            if (event_idx == 0) {
+                events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+            } else if (events_timestamp[event_idx - 1].has_value() &&
+                       !_is_same_row(events_timestamp[event_idx - 
1].last_list_idx, i)) {
+                // Must be from a DIFFERENT row than the predecessor level
+                bool matched =
+                        _within_window(events_timestamp[event_idx - 
1].first_ts, evt.timestamp);
+                if (increase_mode) {
+                    matched = matched && events_timestamp[event_idx - 
1].last_ts < evt.timestamp;
+                }

Review Comment:
   This strict-increase check interacts incorrectly with the event-0 overwrite 
at line 302-303. When `events_timestamp[0]` gets overwritten by a later 
occurrence of event-0, `events_timestamp[event_idx - 1].last_ts` reflects the 
**new** (later) start timestamp rather than the original chain's timestamp that 
higher levels were matched against. This makes the `<` comparison fail for 
events at the same timestamp as the new event-0.



##########
be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h:
##########
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <iterator>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/column/column_string.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_window_funnel.h" // for 
WindowFunnelMode, string_to_window_funnel_mode
+#include "util/var_int.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+/// Merge two event lists, utilizing sorted flags to optimize.
+/// After merge, all events are in `events_list` and it is sorted.
+template <typename T>
+void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted, 
bool suffix_sorted) {
+    if (!prefix_sorted && !suffix_sorted) {
+        std::stable_sort(std::begin(events_list), std::end(events_list));
+    } else {
+        const auto begin = std::begin(events_list);
+        const auto middle = std::next(begin, prefix_size);
+        const auto end = std::end(events_list);
+
+        if (!prefix_sorted) {
+            std::stable_sort(begin, middle);
+        }
+        if (!suffix_sorted) {
+            std::stable_sort(middle, end);
+        }
+        std::inplace_merge(begin, middle, end);
+    }
+}
+
+/// V2 state: stores only matched events as (timestamp, event_index) pairs.
+/// Compared to V1 which stores all rows with N boolean columns, V2 only stores
+/// events that actually match at least one condition, dramatically reducing 
memory.
+///
+/// To prevent same-row multi-condition chain advancement (where a single row
+/// matching multiple conditions could incorrectly advance the funnel through
+/// multiple levels), we use the high bit of event_idx as a "same-row 
continuation"
+/// flag. When a row matches multiple conditions, the first event stored for 
that
+/// row has bit 7 = 0, and subsequent events from the same row have bit 7 = 1.
+/// The algorithm uses this to ensure each funnel step comes from a different 
row.
+///
+/// This approach adds ZERO storage overhead — each event remains 9 bytes 
(UInt64 + UInt8).
+struct WindowFunnelStateV2 {
+    /// (timestamp_int_val, 1-based event_index with continuation flag in bit 
7)
+    ///
+    /// Bit layout of event_idx:
+    ///   bit 7 (0x80): continuation flag — 1 means this event is from the 
same row
+    ///                  as the preceding event in events_list
+    ///   bits 0-6:     actual 1-based event index (supports up to 127 
conditions)
+    ///
+    /// Sorted by timestamp only (via operator<). stable_sort preserves 
insertion order
+    /// for equal timestamps, so same-row events remain consecutive after 
sorting.
+    struct TimestampEvent {
+        UInt64 timestamp;
+        UInt8 event_idx; // includes continuation flag in bit 7
+
+        /// Sort by timestamp only. For same timestamp, stable_sort preserves 
insertion
+        /// order, keeping same-row events consecutive.
+        bool operator<(const TimestampEvent& other) const { return timestamp < 
other.timestamp; }
+        bool operator<=(const TimestampEvent& other) const { return timestamp 
<= other.timestamp; }
+    };
+
+    static constexpr UInt8 CONTINUATION_FLAG = 0x80;
+    static constexpr UInt8 EVENT_IDX_MASK = 0x7F;
+
+    /// Extract the actual 1-based event index (stripping continuation flag).
+    static int get_event_idx(UInt8 raw) { return (raw & EVENT_IDX_MASK); }
+    /// Check if this event is a continuation of the same row as the previous 
event.
+    static bool is_continuation(UInt8 raw) { return (raw & CONTINUATION_FLAG) 
!= 0; }
+
+    static constexpr int64_t WINDOW_UNSET = -1;
+
+    int event_count = 0;
+    int64_t window = WINDOW_UNSET;
+    WindowFunnelMode window_funnel_mode = WindowFunnelMode::INVALID;
+    bool sorted = true;
+    std::vector<TimestampEvent> events_list;
+
+    WindowFunnelStateV2() = default;
+    WindowFunnelStateV2(int arg_event_count) : event_count(arg_event_count) {}
+
+    void reset() {
+        events_list.clear();
+        sorted = true;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = mode;
+
+        // get_data() returns DateV2Value<DateTimeV2ValueType>; convert to 
packed UInt64
+        auto timestamp = assert_cast<const 
ColumnVector<TYPE_DATETIMEV2>&>(*arg_columns[2])
+                                 .get_data()[row_num]
+                                 .to_date_int_val();
+
+        // Iterate from last event to first (reverse order).
+        // This ensures that after stable_sort, events with the same timestamp
+        // appear in descending event_index order, which is important for 
correct
+        // matching when one row satisfies multiple conditions.
+        //
+        // The first event stored for this row has continuation=0;
+        // subsequent events from the same row have continuation=1 (bit 7 set).
+        bool first_match = true;
+        for (int i = event_count - 1; i >= 0; --i) {
+            auto event_val =
+                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + 
i]).get_data()[row_num];
+            if (event_val) {
+                UInt8 packed_idx = cast_set<UInt8>(i + 1);
+                if (!first_match) {
+                    packed_idx |= CONTINUATION_FLAG;
+                }
+                first_match = false;
+                TimestampEvent new_event {timestamp, packed_idx};
+                if (sorted && !events_list.empty()) {
+                    sorted = events_list.back() <= new_event;
+                }
+                events_list.emplace_back(new_event);
+            }
+        }
+    }
+
+    void sort() {
+        if (!sorted) {
+            std::stable_sort(std::begin(events_list), std::end(events_list));
+            sorted = true;
+        }
+    }
+
+    void merge(const WindowFunnelStateV2& other) {
+        if (other.events_list.empty()) {
+            return;
+        }
+
+        if (events_list.empty()) {
+            events_list = other.events_list;
+            sorted = other.sorted;
+        } else {
+            const auto prefix_size = events_list.size();
+            events_list.insert(std::end(events_list), 
std::begin(other.events_list),
+                               std::end(other.events_list));
+            // Both stable_sort and inplace_merge preserve relative order of 
equal elements.
+            // Since same-row events have the same timestamp (and thus compare 
equal in
+            // the primary sort key), they remain consecutive after merge — 
preserving
+            // the validity of continuation flags.
+            merge_events_list(events_list, prefix_size, sorted, other.sorted);
+            sorted = true;
+        }
+
+        event_count = event_count > 0 ? event_count : other.event_count;
+        window = window != WINDOW_UNSET ? window : other.window;
+        window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
+                                     ? other.window_funnel_mode
+                                     : window_funnel_mode;
+    }
+
+    void write(BufferWritable& out) const {
+        write_var_int(event_count, out);
+        write_var_int(window, out);
+        
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
+                      out);
+        write_var_int(sorted ? 1 : 0, out);
+        write_var_int(cast_set<Int64>(events_list.size()), out);
+        for (const auto& evt : events_list) {
+            // Use fixed-size binary write for timestamp (8 bytes) and 
event_idx (1 byte).
+            // The event_idx byte includes the continuation flag in bit 7.
+            out.write(reinterpret_cast<const char*>(&evt.timestamp), 
sizeof(evt.timestamp));
+            out.write(reinterpret_cast<const char*>(&evt.event_idx), 
sizeof(evt.event_idx));
+        }
+    }
+
+    void read(BufferReadable& in) {
+        Int64 tmp = 0;
+        read_var_int(tmp, in);
+        event_count = cast_set<int>(tmp);
+
+        read_var_int(window, in);
+
+        read_var_int(tmp, in);
+        window_funnel_mode = static_cast<WindowFunnelMode>(tmp);
+
+        read_var_int(tmp, in);
+        sorted = (tmp != 0);
+
+        Int64 size = 0;
+        read_var_int(size, in);
+        events_list.clear();
+        events_list.resize(size);
+        for (Int64 i = 0; i < size; ++i) {
+            in.read(reinterpret_cast<char*>(&events_list[i].timestamp),
+                    sizeof(events_list[i].timestamp));
+            in.read(reinterpret_cast<char*>(&events_list[i].event_idx),
+                    sizeof(events_list[i].event_idx));
+        }
+    }
+
+    using DateValueType = DateV2Value<DateTimeV2ValueType>;
+
+    /// Reconstruct DateV2Value from packed UInt64.
+    static DateValueType _ts_from_int(UInt64 packed) { return 
DateValueType(packed); }
+
+    /// Check if `current_ts` is within `window` seconds of `base_ts`.
+    /// Both are packed UInt64 from DateV2Value::to_date_int_val().
+    bool _within_window(UInt64 base_ts, UInt64 current_ts) const {
+        DateValueType end_ts = _ts_from_int(base_ts);
+        TimeInterval interval(SECOND, window, false);
+        end_ts.template date_add_interval<SECOND>(interval);
+        return current_ts <= end_ts.to_date_int_val();
+    }
+
+    /// Track (first_timestamp, last_timestamp, last_event_list_idx) for each 
event level.
+    /// Uses packed UInt64 values; 0 means unset for first_ts.
+    /// last_list_idx tracks the position in events_list of the event that set 
this level,
+    /// used to check continuation flag on subsequent events to detect 
same-row advancement.
+    struct TimestampPair {
+        UInt64 first_ts = 0;
+        UInt64 last_ts = 0;
+        size_t last_list_idx = 0;
+        bool has_value() const { return first_ts != 0; }
+        void reset() {
+            first_ts = 0;
+            last_ts = 0;
+            last_list_idx = 0;
+        }
+    };
+
+    int get() const {
+        if (event_count == 0 || events_list.empty()) {
+            return 0;
+        }
+        if (window < 0) {
+            throw Exception(ErrorCode::INVALID_ARGUMENT,
+                            "the sliding time window must be a positive 
integer, but got: {}",
+                            window);
+        }
+
+        switch (window_funnel_mode) {
+        case WindowFunnelMode::DEFAULT:
+            return _get_default(false);
+        case WindowFunnelMode::INCREASE:
+            return _get_default(true);
+        case WindowFunnelMode::DEDUPLICATION:
+            return _get_deduplication();
+        case WindowFunnelMode::FIXED:
+            return _get_fixed();
+        default:
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel 
mode");
+        }
+    }
+
+private:
+    /// DEFAULT and INCREASE mode: O(N) single-pass algorithm.
+    /// Uses events_timestamp array to track the (first, last) timestamps for 
each level.
+    /// For each event in sorted order:
+    ///   - If it's event 0, start a new potential chain
+    ///   - If its predecessor level has been matched, within time window, AND 
from a
+    ///     different row (checked via continuation flag), extend the chain
+    int _get_default(bool increase_mode) const {
+        std::vector<TimestampPair> events_timestamp(event_count);
+
+        for (size_t i = 0; i < events_list.size(); ++i) {
+            const auto& evt = events_list[i];
+            int event_idx = get_event_idx(evt.event_idx) - 1;
+
+            if (event_idx == 0) {
+                events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+            } else if (events_timestamp[event_idx - 1].has_value() &&

Review Comment:
   **Critical Bug: INCREASE mode correctness**
   
   Unconditionally overwriting `events_timestamp[0]` here without clearing 
higher levels causes incorrect results in INCREASE mode.
   
   **Concrete counterexample:**
   - 3 funnel conditions, window=100, INCREASE mode
   - Sorted events: `(t=1, e=1), (t=50, e=1), (t=50, e=2), (t=60, e=3)`
   
   **V1 behavior:** Starts chain at t=1 → e2@t=50 (50>1 ✓) → e3@t=60 (60>50 ✓) 
→ returns **3**
   
   **V2 behavior:**
   1. e=1@t=1: sets `ts[0] = {1, 1, 0}`
   2. e=1@t=50: **overwrites** `ts[0] = {50, 50, 1}` (this line)
   3. e=2@t=50: checks `ts[0].last_ts(50) < 50` → **false** → not matched
   4. e=3@t=60: checks `ts[1].has_value()` → **false** → not matched
   5. Returns **1** (only level 0 matched)
   
   **Root cause:** When a new event-0 overwrites `events_timestamp[0]` with a 
later timestamp, any previously matched higher levels (e.g., 
`events_timestamp[1]`) still reference the old chain. The INCREASE check at 
line 310 then compares the new (later) `last_ts` against the current event's 
timestamp, which can fail even though a valid chain existed.
   
   **Suggested fix:** Either (a) clear all higher-level entries when restarting 
from event-0, or (b) only overwrite `events_timestamp[0]` if it hasn't been set 
yet or if the new timestamp is earlier (keeping the earliest start to maximize 
chain opportunities), or (c) keep both candidates.
   
   Note: DEFAULT mode is **not** affected by this bug — the monotonicity of 
timestamps means overwriting ts[0] only widens the window, and the `<=` check 
for DEFAULT always succeeds when the strict `<` check would.



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java:
##########
@@ -193,7 +194,8 @@ private BuiltinAggregateFunctions() {
                 agg(TopNWeighted.class, "topn_weighted"),
                 agg(Variance.class, "var_pop", "variance_pop", "variance"),
                 agg(VarianceSamp.class, "var_samp", "variance_samp"),
-                agg(WindowFunnel.class, "window_funnel")
+                agg(WindowFunnel.class, "window_funnel_v1"),
+                agg(WindowFunnelV2.class, "window_funnel_v2", "window_funnel")
         );

Review Comment:
   **Rolling upgrade concern:** By mapping `window_funnel` to `WindowFunnelV2` 
here, a newly upgraded FE will send `"window_funnel_v2"` to BE. During a 
rolling upgrade where BE is still on the old version, old BE won't recognize 
`"window_funnel_v2"` and queries using `window_funnel(...)` will fail.
   
   Doris's standard upgrade path is FE-first, so this is a real scenario. 
Consider either:
   1. Gating the V2 dispatch behind `BeExecVersionManager` so that old BEs get 
`window_funnel` (V1) until all BEs are upgraded
   2. Registering `"window_funnel"` as an alias for V2 on the BE side (so old 
BEs still handle it via V1, new BEs handle it via V2)
   3. Making the switch opt-in via a session variable initially



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to