This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 1217886d675 branch-4.1: [Improvement](function) support window funnel
v2 #61566 (#61935)
1217886d675 is described below
commit 1217886d67558683b14b37947fab7c3a11e2941a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 1 13:36:21 2026 +0800
branch-4.1: [Improvement](function) support window funnel v2 #61566 (#61935)
Cherry-picked from #61566
Co-authored-by: Pxl <[email protected]>
---
.../aggregate_function_simple_factory.cpp | 2 +
.../aggregate/aggregate_function_window_funnel.cpp | 3 +-
.../aggregate/aggregate_function_window_funnel.h | 2 +-
...cpp => aggregate_function_window_funnel_v2.cpp} | 28 +-
.../aggregate_function_window_funnel_v2.h | 613 ++++++++++
.../exprs/aggregate/vec_window_funnel_v2_test.cpp | 1167 ++++++++++++++++++++
.../doris/catalog/BuiltinAggregateFunctions.java | 4 +-
.../expressions/functions/agg/WindowFunnel.java | 2 +-
.../agg/{WindowFunnel.java => WindowFunnelV2.java} | 33 +-
.../visitor/AggregateFunctionVisitor.java | 5 +
.../apache/doris/nereids/types/AggStateType.java | 1 +
.../analysis/CheckExpressionLegalityTest.java | 27 +
.../data/nereids_p0/aggregate/window_funnel.out | 2 +-
.../data/nereids_p0/aggregate/window_funnel_v2.out | 88 ++
.../test_aggregate_window_functions.out | 10 +-
.../nereids_p0/aggregate/window_funnel_v2.groovy | 492 +++++++++
16 files changed, 2443 insertions(+), 36 deletions(-)
diff --git a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
index c939a08dd08..d6d60250d0d 100644
--- a/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_simple_factory.cpp
@@ -57,6 +57,7 @@ void
register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact
void
register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory&
factory);
void
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory&
factory);
+void
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_regr_union(AggregateFunctionSimpleFactory&
factory);
void register_aggregate_function_retention(AggregateFunctionSimpleFactory&
factory);
void
register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory&
factory);
@@ -111,6 +112,7 @@ AggregateFunctionSimpleFactory&
AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_percentile_approx(instance);
register_aggregate_function_window_funnel(instance);
register_aggregate_function_window_funnel_old(instance);
+ register_aggregate_function_window_funnel_v2(instance);
register_aggregate_function_regr_union(instance);
register_aggregate_function_retention(instance);
register_aggregate_function_orthogonal_bitmap(instance);
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
b/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
index 3ef25d7de11..fbd9fde900e 100644
--- a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
@@ -47,7 +47,8 @@ AggregateFunctionPtr
create_aggregate_function_window_funnel(const std::string&
}
void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory&
factory) {
- factory.register_function_both("window_funnel",
create_aggregate_function_window_funnel);
+ factory.register_function_both("window_funnel_v1",
create_aggregate_function_window_funnel);
+ factory.register_alias("window_funnel_v1", "window_funnel");
}
void
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory&
factory) {
BeExecVersionManager::registe_restrict_function_compatibility("window_funnel");
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.h
b/be/src/exprs/aggregate/aggregate_function_window_funnel.h
index 9908c8c8808..4e739474ad1 100644
--- a/be/src/exprs/aggregate/aggregate_function_window_funnel.h
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel.h
@@ -56,7 +56,7 @@ namespace doris {
enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED,
INCREASE };
-WindowFunnelMode string_to_window_funnel_mode(const String& string) {
+inline WindowFunnelMode string_to_window_funnel_mode(const String& string) {
if (string == "default") {
return WindowFunnelMode::DEFAULT;
} else if (string == "deduplication") {
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp
similarity index 58%
copy from be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
copy to be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp
index 3ef25d7de11..8cb73675c34 100644
--- a/be/src/exprs/aggregate/aggregate_function_window_funnel.cpp
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exprs/aggregate/aggregate_function_window_funnel.h"
+#include "exprs/aggregate/aggregate_function_window_funnel_v2.h"
#include <string>
@@ -28,17 +28,19 @@
namespace doris {
#include "common/compile_check_begin.h"
-AggregateFunctionPtr create_aggregate_function_window_funnel(const
std::string& name,
- const DataTypes&
argument_types,
- const
DataTypePtr& result_type,
- const bool
result_is_nullable,
- const
AggregateFunctionAttr& attr) {
- if (argument_types.size() < 3) {
- LOG(WARNING) << "window_funnel's argument less than 3.";
+AggregateFunctionPtr create_aggregate_function_window_funnel_v2(const
std::string& name,
+ const
DataTypes& argument_types,
+ const
DataTypePtr& result_type,
+ const bool
result_is_nullable,
+ const
AggregateFunctionAttr& attr) {
+ if (argument_types.size() < 4) {
+ LOG(WARNING) << "window_funnel_v2 requires at least 4 arguments
(window, mode, timestamp, "
+ "and at least 1 boolean condition), but got "
+ << argument_types.size() << ".";
return nullptr;
}
if (argument_types[2]->get_primitive_type() == TYPE_DATETIMEV2) {
- return creator_without_type::create<AggregateFunctionWindowFunnel>(
+ return creator_without_type::create<AggregateFunctionWindowFunnelV2>(
argument_types, result_is_nullable, attr);
} else {
LOG(WARNING) << "Only support DateTime type as window argument!";
@@ -46,10 +48,8 @@ AggregateFunctionPtr
create_aggregate_function_window_funnel(const std::string&
}
}
-void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory&
factory) {
- factory.register_function_both("window_funnel",
create_aggregate_function_window_funnel);
-}
-void
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory&
factory) {
-
BeExecVersionManager::registe_restrict_function_compatibility("window_funnel");
+void
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory&
factory) {
+ factory.register_function_both("window_funnel_v2",
create_aggregate_function_window_funnel_v2);
}
+
} // namespace doris
diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h
b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h
new file mode 100644
index 00000000000..8038c0bef47
--- /dev/null
+++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h
@@ -0,0 +1,613 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <iterator>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/column/column_string.h"
+#include "core/data_type/data_type_number.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_window_funnel.h" // for
WindowFunnelMode, string_to_window_funnel_mode
+#include "util/var_int.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Arena;
+class BufferReadable;
+class BufferWritable;
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+/// Merge two event lists, utilizing sorted flags to optimize.
+/// After merge, all events are in `events_list` and it is sorted.
+template <typename T>
+void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted,
bool suffix_sorted) {
+ if (!prefix_sorted && !suffix_sorted) {
+ std::stable_sort(std::begin(events_list), std::end(events_list));
+ } else {
+ const auto begin = std::begin(events_list);
+ const auto middle = std::next(begin, prefix_size);
+ const auto end = std::end(events_list);
+
+ if (!prefix_sorted) {
+ std::stable_sort(begin, middle);
+ }
+ if (!suffix_sorted) {
+ std::stable_sort(middle, end);
+ }
+ std::inplace_merge(begin, middle, end);
+ }
+}
+
+/// V2 state: stores only matched events as (timestamp, event_index) pairs.
+/// Compared to V1 which stores all rows with N boolean columns, V2 only stores
+/// events that actually match at least one condition, dramatically reducing
memory.
+///
+/// To prevent same-row multi-condition chain advancement (where a single row
+/// matching multiple conditions could incorrectly advance the funnel through
+/// multiple levels), we use the high bit of event_idx as a "same-row
continuation"
+/// flag. When a row matches multiple conditions, the first event stored for
that
+/// row has bit 7 = 0, and subsequent events from the same row have bit 7 = 1.
+/// The algorithm uses this to ensure each funnel step comes from a different
row.
+///
+/// This approach adds ZERO storage overhead — each event remains 9 bytes
(UInt64 + UInt8).
+struct WindowFunnelStateV2 {
+ /// (timestamp_int_val, 1-based event_index with continuation flag in bit
7)
+ ///
+ /// Bit layout of event_idx:
+ /// bit 7 (0x80): continuation flag — 1 means this event is from the
same row
+ /// as the preceding event in events_list
+ /// bits 0-6: actual 1-based event index (supports up to 127
conditions)
+ ///
+ /// Sorted by timestamp only (via operator<). stable_sort preserves
insertion order
+ /// for equal timestamps, so same-row events remain consecutive after
sorting.
+ struct TimestampEvent {
+ UInt64 timestamp;
+ UInt8 event_idx; // includes continuation flag in bit 7
+
+ /// Sort by timestamp only. For same timestamp, stable_sort preserves
insertion
+ /// order, keeping same-row events consecutive.
+ bool operator<(const TimestampEvent& other) const { return timestamp <
other.timestamp; }
+ bool operator<=(const TimestampEvent& other) const { return timestamp
<= other.timestamp; }
+ };
+
+ static constexpr UInt8 CONTINUATION_FLAG = 0x80;
+ static constexpr UInt8 EVENT_IDX_MASK = 0x7F;
+ static constexpr int MAX_EVENT_CONDITIONS = EVENT_IDX_MASK;
+
+ static void validate_event_count(int count) {
+ if (count < 0 || count > MAX_EVENT_CONDITIONS) {
+ throw Exception(
+ ErrorCode::INVALID_ARGUMENT,
+ "window_funnel_v2 supports at most {} conditions because
event indexes are "
+ "encoded in 7 bits, but got {}",
+ MAX_EVENT_CONDITIONS, count);
+ }
+ }
+
+ /// Extract the actual 1-based event index (stripping continuation flag).
+ static int get_event_idx(UInt8 raw) { return (raw & EVENT_IDX_MASK); }
+ /// Check if this event is a continuation of the same row as the previous
event.
+ static bool is_continuation(UInt8 raw) { return (raw & CONTINUATION_FLAG)
!= 0; }
+
+ static constexpr int64_t WINDOW_UNSET = -1;
+
+ int event_count = 0;
+ int64_t window = WINDOW_UNSET;
+ WindowFunnelMode window_funnel_mode = WindowFunnelMode::INVALID;
+ bool sorted = true;
+ std::vector<TimestampEvent> events_list;
+
+ WindowFunnelStateV2() = default;
+ WindowFunnelStateV2(int arg_event_count) : event_count(arg_event_count) {}
+
+ void reset() {
+ events_list.clear();
+ sorted = true;
+ }
+
+ void add(const IColumn** arg_columns, ssize_t row_num, int64_t win,
WindowFunnelMode mode) {
+ window = win;
+ window_funnel_mode = mode;
+
+ // get_data() returns DateV2Value<DateTimeV2ValueType>; convert to
packed UInt64
+ auto timestamp = assert_cast<const
ColumnVector<TYPE_DATETIMEV2>&>(*arg_columns[2])
+ .get_data()[row_num]
+ .to_date_int_val();
+
+ // Iterate from last event to first (reverse order).
+ // This ensures that after stable_sort, events with the same timestamp
+ // appear in descending event_index order, which is important for
correct
+ // matching when one row satisfies multiple conditions.
+ //
+ // The first event stored for this row has continuation=0;
+ // subsequent events from the same row have continuation=1 (bit 7 set).
+ bool first_match = true;
+ for (int i = event_count - 1; i >= 0; --i) {
+ auto event_val =
+ assert_cast<const ColumnUInt8&>(*arg_columns[3 +
i]).get_data()[row_num];
+ if (event_val) {
+ UInt8 packed_idx = cast_set<UInt8>(i + 1);
+ if (!first_match) {
+ packed_idx |= CONTINUATION_FLAG;
+ }
+ first_match = false;
+ TimestampEvent new_event {timestamp, packed_idx};
+ if (sorted && !events_list.empty()) {
+ sorted = events_list.back() <= new_event;
+ }
+ events_list.emplace_back(new_event);
+ }
+ }
+ }
+
+ void sort() {
+ if (!sorted) {
+ std::stable_sort(std::begin(events_list), std::end(events_list));
+ sorted = true;
+ }
+ }
+
+ void merge(const WindowFunnelStateV2& other) {
+ if (other.events_list.empty()) {
+ return;
+ }
+
+ if (events_list.empty()) {
+ events_list = other.events_list;
+ sorted = other.sorted;
+ } else {
+ const auto prefix_size = events_list.size();
+ events_list.insert(std::end(events_list),
std::begin(other.events_list),
+ std::end(other.events_list));
+ // Both stable_sort and inplace_merge preserve relative order of
equal elements.
+ // Since same-row events have the same timestamp (and thus compare
equal in
+ // the primary sort key), they remain consecutive after merge —
preserving
+ // the validity of continuation flags.
+ merge_events_list(events_list, prefix_size, sorted, other.sorted);
+ sorted = true;
+ }
+
+ event_count = event_count > 0 ? event_count : other.event_count;
+ window = window != WINDOW_UNSET ? window : other.window;
+ window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
+ ? other.window_funnel_mode
+ : window_funnel_mode;
+ }
+
+ void write(BufferWritable& out) const {
+ write_var_int(event_count, out);
+ write_var_int(window, out);
+
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
+ out);
+ write_var_int(sorted ? 1 : 0, out);
+ write_var_int(cast_set<Int64>(events_list.size()), out);
+ for (const auto& evt : events_list) {
+ // Use fixed-size binary write for timestamp (8 bytes) and
event_idx (1 byte).
+ // The event_idx byte includes the continuation flag in bit 7.
+ out.write(reinterpret_cast<const char*>(&evt.timestamp),
sizeof(evt.timestamp));
+ out.write(reinterpret_cast<const char*>(&evt.event_idx),
sizeof(evt.event_idx));
+ }
+ }
+
+ void read(BufferReadable& in) {
+ Int64 tmp = 0;
+ read_var_int(tmp, in);
+ event_count = cast_set<int>(tmp);
+
+ read_var_int(window, in);
+
+ read_var_int(tmp, in);
+ window_funnel_mode = static_cast<WindowFunnelMode>(tmp);
+
+ read_var_int(tmp, in);
+ sorted = (tmp != 0);
+
+ Int64 size = 0;
+ read_var_int(size, in);
+ events_list.clear();
+ events_list.resize(size);
+ for (Int64 i = 0; i < size; ++i) {
+ in.read(reinterpret_cast<char*>(&events_list[i].timestamp),
+ sizeof(events_list[i].timestamp));
+ in.read(reinterpret_cast<char*>(&events_list[i].event_idx),
+ sizeof(events_list[i].event_idx));
+ }
+ }
+
+ using DateValueType = DateV2Value<DateTimeV2ValueType>;
+
+ /// Reconstruct DateV2Value from packed UInt64.
+ static DateValueType _ts_from_int(UInt64 packed) { return
DateValueType(packed); }
+
+ /// Check if `current_ts` is within `window` seconds of `base_ts`.
+ /// Both are packed UInt64 from DateV2Value::to_date_int_val().
+ bool _within_window(UInt64 base_ts, UInt64 current_ts) const {
+ DateValueType end_ts = _ts_from_int(base_ts);
+ TimeInterval interval(SECOND, window, false);
+ end_ts.template date_add_interval<SECOND>(interval);
+ return current_ts <= end_ts.to_date_int_val();
+ }
+
+ /// Track (first_timestamp, last_timestamp, last_event_list_idx) for each
event level.
+ /// Uses packed UInt64 values; 0 means unset for first_ts.
+ /// last_list_idx tracks the position in events_list of the event that set
this level,
+ /// used to check continuation flag on subsequent events to detect
same-row advancement.
+ struct TimestampPair {
+ UInt64 first_ts = 0;
+ UInt64 last_ts = 0;
+ size_t last_list_idx = 0;
+ bool has_value() const { return first_ts != 0; }
+ void reset() {
+ first_ts = 0;
+ last_ts = 0;
+ last_list_idx = 0;
+ }
+ };
+
+ int get() const {
+ if (event_count == 0 || events_list.empty()) {
+ return 0;
+ }
+ if (window < 0) {
+ throw Exception(ErrorCode::INVALID_ARGUMENT,
+ "the sliding time window must be a positive
integer, but got: {}",
+ window);
+ }
+
+ switch (window_funnel_mode) {
+ case WindowFunnelMode::DEFAULT:
+ return _get_default();
+ case WindowFunnelMode::INCREASE:
+ return _get_increase();
+ case WindowFunnelMode::DEDUPLICATION:
+ return _get_deduplication();
+ case WindowFunnelMode::FIXED:
+ return _get_fixed();
+ default:
+ throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel
mode");
+ }
+ }
+
+private:
+ /// DEFAULT mode: O(N) single-pass algorithm.
+ /// Uses events_timestamp array to track the (first, last) timestamps for
each level.
+ /// For each event in sorted order:
+ /// - If it's event 0, start a new potential chain
+ /// - If its predecessor level has been matched, within time window, AND
from a
+ /// different row (checked via continuation flag), extend the chain
+ ///
+ /// In DEFAULT mode, unconditionally overwriting events_timestamp[0] when
a new event-0
+ /// appears is safe: timestamps are monotonically non-decreasing, higher
levels retain
+ /// the old chain's first_ts, and the <= window check still succeeds.
+ int _get_default() const {
+ std::vector<TimestampPair> events_timestamp(event_count);
+
+ for (size_t i = 0; i < events_list.size(); ++i) {
+ const auto& evt = events_list[i];
+ int event_idx = get_event_idx(evt.event_idx) - 1;
+
+ if (event_idx == 0) {
+ events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+ } else if (events_timestamp[event_idx - 1].has_value() &&
+ !_is_same_row(events_timestamp[event_idx -
1].last_list_idx, i)) {
+ // Must be from a DIFFERENT row than the predecessor level
+ if (_within_window(events_timestamp[event_idx - 1].first_ts,
evt.timestamp)) {
+ events_timestamp[event_idx] = {events_timestamp[event_idx
- 1].first_ts,
+ evt.timestamp, i};
+ if (event_idx + 1 == event_count) {
+ return event_count;
+ }
+ }
+ }
+ }
+
+ for (int event = event_count; event > 0; --event) {
+ if (events_timestamp[event - 1].has_value()) {
+ return event;
+ }
+ }
+ return 0;
+ }
+
+ /// INCREASE mode: multi-pass algorithm matching V1 semantics.
+ ///
+ /// A single-pass approach cannot correctly handle INCREASE mode because
when a new
+ /// event-0 appears, the old chain and the new chain have different
trade-offs:
+ /// - The old chain has an earlier last_ts (better for the strict-increase
check)
+ /// - The new chain has a later first_ts (better for the time-window check)
+ /// Neither dominates the other, so both must be tried independently.
+ ///
+ /// This method iterates over each event-0 occurrence as a potential chain
start,
+ /// then scans forward to build the longest matching chain from that start.
+ /// The maximum chain length across all starts is returned.
+ ///
+ /// Complexity: O(M_event0 × N_matched) worst-case, where M_event0 is the
count of
+ /// event-0 occurrences and N_matched is the total matched-event count. In
practice
+ /// N_matched is much smaller than total rows (V2 only stores matched
events), and
+ /// the remaining-events pruning eliminates start points that cannot
improve max_level,
+ /// so the typical case is significantly better than worst-case.
+ int _get_increase() const {
+ int max_level = 0;
+ const size_t list_size = events_list.size();
+
+ for (size_t start = 0; start < list_size; ++start) {
+ // Remaining-events pruning: from this start point, at most
+ // (list_size - start) events remain. If that can't beat
max_level, stop.
+ // This also prunes all subsequent start points since they have
even fewer.
+ if (static_cast<int>(list_size - start) <= max_level) {
+ break;
+ }
+
+ int start_event = get_event_idx(events_list[start].event_idx) - 1;
+ if (start_event != 0) {
+ continue;
+ }
+
+ // Try building a chain from this event-0
+ std::vector<TimestampPair> events_timestamp(event_count);
+ events_timestamp[0] = {events_list[start].timestamp,
events_list[start].timestamp,
+ start};
+ int curr_level = 0;
+
+ for (size_t i = start + 1; i < list_size; ++i) {
+ const auto& evt = events_list[i];
+ int event_idx = get_event_idx(evt.event_idx) - 1;
+
+ if (event_idx == 0) {
+ // Another event-0: this chain's event-0 phase is done;
skip
+ continue;
+ }
+
+ if (events_timestamp[event_idx - 1].has_value() &&
+ !_is_same_row(events_timestamp[event_idx -
1].last_list_idx, i)) {
+ bool matched =
+ _within_window(events_timestamp[event_idx -
1].first_ts, evt.timestamp);
+ matched = matched && events_timestamp[event_idx -
1].last_ts < evt.timestamp;
+ if (matched) {
+ events_timestamp[event_idx] =
{events_timestamp[event_idx - 1].first_ts,
+ evt.timestamp, i};
+ if (event_idx > curr_level) {
+ curr_level = event_idx;
+ }
+ if (event_idx + 1 == event_count) {
+ return event_count;
+ }
+ }
+ }
+ }
+
+ if (curr_level + 1 > max_level) {
+ max_level = curr_level + 1;
+ }
+ }
+ return max_level;
+ }
+
+ /// DEDUPLICATION mode: if a previously matched event level appears again,
+ /// the current chain is terminated and max_level is updated.
+ /// This preserves V1 semantics where duplicate events break the chain.
+ int _get_deduplication() const {
+ std::vector<TimestampPair> events_timestamp(event_count);
+ int max_level = -1;
+ int curr_level = -1;
+
+ for (size_t i = 0; i < events_list.size(); ++i) {
+ const auto& evt = events_list[i];
+ int event_idx = get_event_idx(evt.event_idx) - 1;
+
+ if (event_idx == 0) {
+ // Duplicate of event 0: terminate current chain first
+ if (events_timestamp[0].has_value()) {
+ if (curr_level > max_level) {
+ max_level = curr_level;
+ }
+ _eliminate_chain(curr_level, events_timestamp);
+ }
+ // Start a new chain
+ events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+ curr_level = 0;
+ } else if (events_timestamp[event_idx].has_value()) {
+ // Duplicate event detected: this level was already matched
+ if (curr_level > max_level) {
+ max_level = curr_level;
+ }
+ // Eliminate current chain
+ _eliminate_chain(curr_level, events_timestamp);
+ } else if (events_timestamp[event_idx - 1].has_value() &&
+ !_is_same_row(events_timestamp[event_idx -
1].last_list_idx, i)) {
+ // Must be from a DIFFERENT row than the predecessor level
+ if (_promote_level(events_timestamp, evt.timestamp, i,
event_idx, curr_level,
+ false)) {
+ return event_count;
+ }
+ }
+ }
+
+ if (curr_level > max_level) {
+ return curr_level + 1;
+ }
+ return max_level + 1;
+ }
+
+ /// FIXED mode (StarRocks-style semantics): if a matched event appears
whose
+ /// predecessor level has NOT been matched, the chain is broken (event
level jumped).
+ /// Note: V2 semantics differ from V1. V1 checks physical row adjacency;
+ /// V2 checks event level continuity (unmatched rows don't break the
chain).
+ int _get_fixed() const {
+ std::vector<TimestampPair> events_timestamp(event_count);
+ int max_level = -1;
+ int curr_level = -1;
+ bool first_event = false;
+
+ for (size_t i = 0; i < events_list.size(); ++i) {
+ const auto& evt = events_list[i];
+ int event_idx = get_event_idx(evt.event_idx) - 1;
+
+ if (event_idx == 0) {
+ // Save current chain before starting a new one
+ if (events_timestamp[0].has_value()) {
+ if (curr_level > max_level) {
+ max_level = curr_level;
+ }
+ _eliminate_chain(curr_level, events_timestamp);
+ }
+ events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
+ curr_level = 0;
+ first_event = true;
+ } else if (first_event && !events_timestamp[event_idx -
1].has_value()) {
+ // Event level jumped: predecessor was not matched
+ if (curr_level >= 0) {
+ if (curr_level > max_level) {
+ max_level = curr_level;
+ }
+ _eliminate_chain(curr_level, events_timestamp);
+ }
+ } else if (events_timestamp[event_idx - 1].has_value() &&
+ !_is_same_row(events_timestamp[event_idx -
1].last_list_idx, i)) {
+ // Must be from a DIFFERENT row than the predecessor level
+ if (_promote_level(events_timestamp, evt.timestamp, i,
event_idx, curr_level,
+ false)) {
+ return event_count;
+ }
+ }
+ }
+
+ if (curr_level > max_level) {
+ return curr_level + 1;
+ }
+ return max_level + 1;
+ }
+
+ /// Clear the current event chain back to the beginning.
+ static void _eliminate_chain(int& curr_level, std::vector<TimestampPair>&
events_timestamp) {
+ for (; curr_level >= 0; --curr_level) {
+ events_timestamp[curr_level].reset();
+ }
+ }
+
+ /// Check if the event at `current_idx` in events_list is from the same
original row
+ /// as the event at `prev_idx`. Same-row events are consecutive in the
sorted list
+ /// with continuation flags set. We walk backwards from current_idx
checking if every
+ /// event between prev_idx+1 and current_idx has the continuation flag set.
+ bool _is_same_row(size_t prev_idx, size_t current_idx) const {
+ if (current_idx <= prev_idx) {
+ return false;
+ }
+ // Check that all events from prev_idx+1 to current_idx have the
continuation flag.
+ // If any of them doesn't, there's a row boundary in between.
+ for (size_t j = prev_idx + 1; j <= current_idx; ++j) {
+ if (!is_continuation(events_list[j].event_idx)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /// Try to promote the chain to the next level.
+ /// Returns true if we've matched all events (early termination).
+ bool _promote_level(std::vector<TimestampPair>& events_timestamp, UInt64
ts, size_t list_idx,
+ int event_idx, int& curr_level, bool increase_mode)
const {
+ bool matched = _within_window(events_timestamp[event_idx -
1].first_ts, ts);
+ if (increase_mode) {
+ matched = matched && events_timestamp[event_idx - 1].last_ts < ts;
+ }
+ if (matched) {
+ events_timestamp[event_idx] = {events_timestamp[event_idx -
1].first_ts, ts, list_idx};
+ if (event_idx > curr_level) {
+ curr_level = event_idx;
+ }
+ if (event_idx + 1 == event_count) {
+ return true;
+ }
+ }
+ return false;
+ }
+};
+
+class AggregateFunctionWindowFunnelV2
+ : public IAggregateFunctionDataHelper<WindowFunnelStateV2,
AggregateFunctionWindowFunnelV2>,
+ MultiExpression,
+ NullableAggregateFunction {
+public:
+ AggregateFunctionWindowFunnelV2(const DataTypes& argument_types_)
+ : IAggregateFunctionDataHelper<WindowFunnelStateV2,
AggregateFunctionWindowFunnelV2>(
+ argument_types_) {
+ WindowFunnelStateV2::validate_event_count(
+ cast_set<int>(IAggregateFunction::get_argument_types().size()
- 3));
+ }
+
+ void create(AggregateDataPtr __restrict place) const override {
+ new (place) WindowFunnelStateV2(
+ cast_set<int>(IAggregateFunction::get_argument_types().size()
- 3));
+ }
+
+ String get_name() const override { return "window_funnel_v2"; }
+
+ DataTypePtr get_return_type() const override { return
std::make_shared<DataTypeInt32>(); }
+
+ void reset(AggregateDataPtr __restrict place) const override {
this->data(place).reset(); }
+
+ void add(AggregateDataPtr __restrict place, const IColumn** columns,
ssize_t row_num,
+ Arena&) const override {
+ const auto& win = assert_cast<const
ColumnInt64&>(*columns[0]).get_data()[row_num];
+ StringRef mode = columns[1]->get_data_at(row_num);
+ this->data(place).add(columns, row_num, win,
+ string_to_window_funnel_mode(mode.to_string()));
+ }
+
+ void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+ Arena&) const override {
+ this->data(place).merge(this->data(rhs));
+ }
+
+ void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
+ this->data(place).write(buf);
+ }
+
+ void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+ Arena&) const override {
+ this->data(place).read(buf);
+ }
+
+ void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
+ this->data(const_cast<AggregateDataPtr>(place)).sort();
+ assert_cast<ColumnInt32&>(to).get_data().push_back(
+ IAggregateFunctionDataHelper<WindowFunnelStateV2,
+
AggregateFunctionWindowFunnelV2>::data(place)
+ .get());
+ }
+
+protected:
+ using IAggregateFunction::version;
+};
+
+} // namespace doris
+
+#include "common/compile_check_end.h"
diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp
b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp
new file mode 100644
index 00000000000..6e82d430513
--- /dev/null
+++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp
@@ -0,0 +1,1167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <stddef.h>
+
+#include <memory>
+#include <ostream>
+
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/string_buffer.hpp"
+#include "core/value/vdatetime_value.h"
+#include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_simple_factory.h"
+#include "gtest/gtest_pred_impl.h"
+
+namespace doris {
+class IColumn;
+} // namespace doris
+
+namespace doris {
+
+void
register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory&
factory);
+
+class VWindowFunnelV2Test : public testing::Test {
+public:
+ AggregateFunctionPtr agg_function;
+
+ VWindowFunnelV2Test() {}
+
+ void SetUp() {
+ AggregateFunctionSimpleFactory factory =
AggregateFunctionSimpleFactory::instance();
+ DataTypes data_types = {
+ std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeString>(),
+ std::make_shared<DataTypeDateTimeV2>(),
std::make_shared<DataTypeUInt8>(),
+ std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>(),
+ std::make_shared<DataTypeUInt8>()};
+ agg_function = factory.get("window_funnel_v2", data_types, nullptr,
false,
+ BeExecVersionManager::get_newest_version());
+ EXPECT_NE(agg_function, nullptr);
+ }
+
+ void TearDown() {}
+
+ Arena arena;
+};
+
+TEST_F(VWindowFunnelV2Test, testEmpty) {
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+
+ ColumnString buf;
+ VectorBufferWriter buf_writer(buf);
+ agg_function->serialize(place, buf_writer);
+ buf_writer.commit();
+ LOG(INFO) << "buf size : " << buf.size();
+ VectorBufferReader buf_reader(buf.get_data_at(0));
+ agg_function->deserialize(place, buf_reader, arena);
+
+ std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place2 = memory2.get();
+ agg_function->create(place2);
+
+ agg_function->merge(place, place2, arena);
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ EXPECT_EQ(column_result.get_data()[0], 0);
+
+ ColumnInt32 column_result2;
+ agg_function->insert_result_into(place2, column_result2);
+ EXPECT_EQ(column_result2.get_data()[0], 0);
+
+ agg_function->destroy(place);
+ agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testSerialize) {
+ const int NUM_CONDS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+ }
+
+ auto column_timestamp = ColumnDateTimeV2::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ VecDateTimeValue time_value;
+ time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+ auto dtv2 = time_value.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2, 0);
+ }
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(2));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_CONDS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ EXPECT_EQ(column_result.get_data()[0], 3);
+
+ ColumnString buf;
+ VectorBufferWriter buf_writer(buf);
+ agg_function->serialize(place, buf_writer);
+ buf_writer.commit();
+ agg_function->destroy(place);
+
+ std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place2 = memory2.get();
+ agg_function->create(place2);
+
+ VectorBufferReader buf_reader(buf.get_data_at(0));
+ agg_function->deserialize(place2, buf_reader, arena);
+
+ ColumnInt32 column_result2;
+ agg_function->insert_result_into(place2, column_result2);
+ EXPECT_EQ(column_result2.get_data()[0], 3);
+ agg_function->destroy(place2);
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultSortedNoMerge) {
+ const int NUM_CONDS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ VecDateTimeValue time_value;
+ time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+ auto dtv2 = time_value.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2, 0);
+ }
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ for (int win = 0; win < NUM_CONDS + 1; win++) {
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
+ column_timestamp.get(),
column_event1.get(),
+ column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_CONDS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ EXPECT_EQ(column_result.get_data()[0],
+ win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+ agg_function->destroy(place);
+ }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultSortedMerge) {
+ const int NUM_CONDS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ VecDateTimeValue time_value;
+ time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+ auto dtv2 = time_value.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2, 0);
+ }
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ for (int win = 0; win < NUM_CONDS + 1; win++) {
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
+ column_timestamp.get(),
column_event1.get(),
+ column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_CONDS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ std::unique_ptr<char[]> memory2(new
char[agg_function->size_of_data()]);
+ AggregateDataPtr place2 = memory2.get();
+ agg_function->create(place2);
+
+ agg_function->merge(place2, place, arena);
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place2, column_result);
+ EXPECT_EQ(column_result.get_data()[0],
+ win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+ agg_function->destroy(place);
+ agg_function->destroy(place2);
+ }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedNoMerge) {
+ const int NUM_CONDS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ VecDateTimeValue time_value;
+ time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
+ auto dtv2 = time_value.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2, 0);
+ }
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ for (int win = 0; win < NUM_CONDS + 1; win++) {
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
+ column_timestamp.get(),
column_event1.get(),
+ column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_CONDS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ LOG(INFO) << "win " << win;
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ EXPECT_EQ(column_result.get_data()[0],
+ win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+ agg_function->destroy(place);
+ }
+}
+
+TEST_F(VWindowFunnelV2Test, testDefaultReverseSortedMerge) {
+ const int NUM_CONDS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ VecDateTimeValue time_value;
+ time_value.unchecked_set_time(2022, 2, 28, 0, 0, NUM_CONDS - i);
+ auto dtv2 = time_value.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2, 0);
+ }
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ for (int win = 0; win < NUM_CONDS + 1; win++) {
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_CONDS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(win));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
+ column_timestamp.get(),
column_event1.get(),
+ column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_CONDS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ std::unique_ptr<char[]> memory2(new
char[agg_function->size_of_data()]);
+ AggregateDataPtr place2 = memory2.get();
+ agg_function->create(place2);
+
+ agg_function->merge(place2, place, arena);
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place2, column_result);
+ EXPECT_EQ(column_result.get_data()[0],
+ win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS));
+ agg_function->destroy(place);
+ agg_function->destroy(place2);
+ }
+}
+
+// Test that V2 only stores matched events (unmatched rows are not stored)
+// This verifies the core memory optimization.
+TEST_F(VWindowFunnelV2Test, testOnlyMatchedEventsStored) {
+ const int NUM_ROWS = 6;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ VecDateTimeValue time_value;
+ time_value.unchecked_set_time(2022, 2, 28, 0, 0, i);
+ auto dtv2 = time_value.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2, 0);
+ }
+ // 4 events, but rows 2 and 4 (0-indexed) match nothing
+ // Row 0: event1=true
+ // Row 1: event2=true
+ // Row 2: all false (no match)
+ // Row 3: event3=true
+ // Row 4: all false (no match)
+ // Row 5: event4=true
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ // All 4 events matched in order within the window
+ EXPECT_EQ(column_result.get_data()[0], 4);
+ agg_function->destroy(place);
+}
+
+// Test INCREASE mode: timestamps must be strictly increasing
+TEST_F(VWindowFunnelV2Test, testIncreaseMode) {
+ const int NUM_ROWS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ // Events 2 and 3 have the same timestamp
+ VecDateTimeValue tv0, tv1, tv2, tv3;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+ tv2.unchecked_set_time(2022, 2, 28, 0, 0, 1); // same as tv1
+ tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ auto dtv2_3 = tv3.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+ column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ // Event 2 and 3 have same timestamp, so increase mode breaks at event 3
+ // Chain: event1(t=0) -> event2(t=1), event3 has same ts as event2, so
fails
+ // Result: 2
+ EXPECT_EQ(column_result.get_data()[0], 2);
+ agg_function->destroy(place);
+}
+
+// Test DEDUPLICATION mode: duplicate events break the chain
+TEST_F(VWindowFunnelV2Test, testDeduplicationMode) {
+ const int NUM_ROWS = 5;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("deduplication"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+ tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
+ tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+ tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ auto dtv2_3 = tv3.to_datetime_v2();
+ auto dtv2_4 = tv4.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+ column_timestamp->insert_data((char*)&dtv2_3, 0);
+ column_timestamp->insert_data((char*)&dtv2_4, 0);
+
+ // Events: event1, event2, event1(dup!), event3, event4
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // duplicate
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ // Chain: event1(t=0) -> event2(t=1), then event1 dup at t=2 breaks chain
(max_level=2)
+ // New chain: event1(t=2) -> event3(t=3) -> event4(t=4): level=3 but
event2 not matched
+ // Actually event1(t=2) starts new chain, event3(t=3) needs event2 first,
so new chain = 1
+ // max(2, 1) = 2
+ EXPECT_EQ(column_result.get_data()[0], 2);
+ agg_function->destroy(place);
+}
+
+// Test FIXED mode (StarRocks-style): event level must not jump
+TEST_F(VWindowFunnelV2Test, testFixedMode) {
+ const int NUM_ROWS = 5;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("fixed"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ VecDateTimeValue tv0, tv1, tv2, tv3, tv4;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ tv1.unchecked_set_time(2022, 2, 28, 0, 0, 1);
+ tv2.unchecked_set_time(2022, 2, 28, 0, 0, 2);
+ tv3.unchecked_set_time(2022, 2, 28, 0, 0, 3);
+ tv4.unchecked_set_time(2022, 2, 28, 0, 0, 4);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ auto dtv2_3 = tv3.to_datetime_v2();
+ auto dtv2_4 = tv4.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+ column_timestamp->insert_data((char*)&dtv2_3, 0);
+ column_timestamp->insert_data((char*)&dtv2_4, 0);
+
+ // Events: event1, event2, event4(jump! skips event3), event3, event4
+ // In V2 fixed mode (StarRocks-style), event4 at t=2 has no predecessor
(event3 not matched),
+ // so the chain breaks.
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1)); // jump
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(10));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_function->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ // Chain: event1(t=0) -> event2(t=1), then event4(t=2) jumps (no event3
predecessor),
+ // chain breaks, max_level=2.
+ // No further complete chain starts since event3 and event4 happen after.
+ EXPECT_EQ(column_result.get_data()[0], 2);
+ agg_function->destroy(place);
+}
+
+// Test that same-row multi-condition matching does NOT advance the chain
+// through multiple levels. This tests the continuation flag logic.
+// Scenario: window_funnel(86400, 'default', ts, xwhat=1, xwhat!=2, xwhat=3)
+// Row 0 (xwhat=1): matches cond0=T (xwhat=1), cond1=T (1!=2), cond2=F → 2
conditions on same row
+// Row 1 (xwhat=2): matches nothing (cond1: 2!=2=F)
+// Row 2 (xwhat=3): matches cond1=T (3!=2), cond2=T (xwhat=3) → 2 conditions
on same row
+// Correct result: 2 (cond0 from row0, cond1 from row2). NOT 3.
+// Without the continuation flag, row0 would advance through both cond0 and
cond1,
+// then row2 would match cond2 → result 3 (wrong).
+TEST_F(VWindowFunnelV2Test, testSameRowMultiConditionDefault) {
+ // 3 conditions (instead of 4), so we need a different setup
+ AggregateFunctionSimpleFactory factory =
AggregateFunctionSimpleFactory::instance();
+ DataTypes data_types_3 = {
+ std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeString>(),
+ std::make_shared<DataTypeDateTimeV2>(),
std::make_shared<DataTypeUInt8>(),
+ std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>()};
+ auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr,
false,
+ BeExecVersionManager::get_newest_version());
+ ASSERT_NE(agg_func_3, nullptr);
+
+ const int NUM_ROWS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ // Row 0: ts=10:41:00 (xwhat=1)
+ // Row 1: ts=13:28:02 (xwhat=2)
+ // Row 2: ts=16:15:01 (xwhat=3)
+ // Row 3: ts=19:05:04 (xwhat=4)
+ VecDateTimeValue tv0, tv1, tv2, tv3;
+ tv0.unchecked_set_time(2022, 3, 12, 10, 41, 0);
+ tv1.unchecked_set_time(2022, 3, 12, 13, 28, 2);
+ tv2.unchecked_set_time(2022, 3, 12, 16, 15, 1);
+ tv3.unchecked_set_time(2022, 3, 12, 19, 5, 4);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ auto dtv2_3 = tv3.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+ column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+ // cond0: xwhat=1 (only row0 matches)
+ auto column_cond0 = ColumnUInt8::create();
+ column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0:
xwhat=1 → T
+ column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1:
xwhat=2 → F
+ column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row2:
xwhat=3 → F
+ column_cond0->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3:
xwhat=4 → F
+
+ // cond1: xwhat!=2 (rows 0,2,3 match)
+ auto column_cond1 = ColumnUInt8::create();
+ column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row0: 1!=2
→ T
+ column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: 2!=2
→ F
+ column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: 3!=2
→ T
+ column_cond1->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row3: 4!=2
→ T
+
+ // cond2: xwhat=3 (only row2 matches)
+ auto column_cond2 = ColumnUInt8::create();
+ column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row0: F
+ column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row1: F
+ column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(1)); // row2: T
+ column_cond2->insert(Field::create_field<TYPE_BOOLEAN>(0)); // row3: F
+
+ // window = 86400 seconds (24 hours)
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_func_3->create(place);
+ const IColumn* column[6] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_cond0.get(), column_cond1.get(),
column_cond2.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_func_3->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_func_3->insert_result_into(place, column_result);
+ // Without continuation flag: row0 matches cond0+cond1 (same row advances
both),
+ // row2 matches cond2 → result=3 (WRONG)
+ // With continuation flag: row0 sets cond0 only (cond1 is same-row
continuation),
+ // row2's cond1 extends chain (different row), but cond2 from row2 is
same-row → stops at 2
+ EXPECT_EQ(column_result.get_data()[0], 2);
+ agg_func_3->destroy(place);
+}
+
+// Test same-row multi-condition with ALL conditions matching the same event
name
+// window_funnel(big_window, 'default', ts, event='登录', event='登录',
event='登录', ...)
+// A single row matching all 4 conditions should only count as level 1 (not 4).
+TEST_F(VWindowFunnelV2Test, testSameRowAllConditionsMatch) {
+ auto column_mode = ColumnString::create();
+ column_mode->insert(Field::create_field<TYPE_STRING>("default"));
+
+ auto column_timestamp = ColumnDateTimeV2::create();
+ VecDateTimeValue tv0;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+
+ // All 4 conditions match the single row
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ auto column_event4 = ColumnUInt8::create();
+ column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ column_window->insert(Field::create_field<TYPE_BIGINT>(86400));
+
+ std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_function->create(place);
+ const IColumn* column[7] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get(),
+ column_event4.get()};
+ agg_function->add(place, column, 0, arena);
+
+ ColumnInt32 column_result;
+ agg_function->insert_result_into(place, column_result);
+ // Only 1 row matching all conditions → can only reach level 1
+ // because each funnel step must come from a different row
+ EXPECT_EQ(column_result.get_data()[0], 1);
+ agg_function->destroy(place);
+}
+
+// Test INCREASE mode: event-0 re-occurrence should not break an already-valid
chain.
+// Counterexample from code review:
+// 3 conditions, window=100s, INCREASE mode
+// Row A (t=0s): event1 only
+// Row B (t=50s): event1 only (new event-0 overwrites old)
+// Row C (t=50s): event2 only
+// Row D (t=60s): event3 only
+// Correct result: 3 (chain starting from t=0: e1@0 -> e2@50 -> e3@60)
+// Bug (before fix): returned 1 because overwriting events_timestamp[0] with
t=50
+// caused the INCREASE check (50 < 50) to fail for e2.
+TEST_F(VWindowFunnelV2Test, testIncreaseModeEvent0Overwrite) {
+ AggregateFunctionSimpleFactory factory =
AggregateFunctionSimpleFactory::instance();
+ DataTypes data_types_3 = {
+ std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeString>(),
+ std::make_shared<DataTypeDateTimeV2>(),
std::make_shared<DataTypeUInt8>(),
+ std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>()};
+ auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr,
false,
+ BeExecVersionManager::get_newest_version());
+ ASSERT_NE(agg_func_3, nullptr);
+
+ const int NUM_ROWS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ // Row 0: t=0s, Row 1: t=50s, Row 2: t=50s, Row 3: t=60s
+ VecDateTimeValue tv0, tv1, tv2, tv3;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+ tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+ tv3.unchecked_set_time(2022, 2, 28, 0, 1, 0);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ auto dtv2_3 = tv3.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+ column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+ // Row 0: event1=T, event2=F, event3=F
+ // Row 1: event1=T, event2=F, event3=F (duplicate event-0)
+ // Row 2: event1=F, event2=T, event3=F
+ // Row 3: event1=F, event2=F, event3=T
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_func_3->create(place);
+ const IColumn* column[6] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_func_3->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_func_3->insert_result_into(place, column_result);
+ // Chain from t=0: e1@0 -> e2@50 (50>0 ✓) -> e3@60 (60>50 ✓) = 3
+ EXPECT_EQ(column_result.get_data()[0], 3);
+ agg_func_3->destroy(place);
+}
+
+// Test INCREASE mode: later event-0 starts a better chain when early chain
cannot complete.
+// 3 conditions, window=5s, INCREASE mode
+// Row 0 (t=0s): event1
+// Row 1 (t=50s): event1 (new start — old chain can't reach e2 within 5s)
+// Row 2 (t=51s): event2
+// Row 3 (t=52s): event3
+// Correct result: 3 (chain starting from t=50: e1@50 -> e2@51 -> e3@52)
+TEST_F(VWindowFunnelV2Test, testIncreaseModeNewChainBetter) {
+ AggregateFunctionSimpleFactory factory =
AggregateFunctionSimpleFactory::instance();
+ DataTypes data_types_3 = {
+ std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeString>(),
+ std::make_shared<DataTypeDateTimeV2>(),
std::make_shared<DataTypeUInt8>(),
+ std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>()};
+ auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr,
false,
+ BeExecVersionManager::get_newest_version());
+ ASSERT_NE(agg_func_3, nullptr);
+
+ const int NUM_ROWS = 4;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+ }
+ auto column_timestamp = ColumnDateTimeV2::create();
+ VecDateTimeValue tv0, tv1, tv2, tv3;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ tv1.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+ tv2.unchecked_set_time(2022, 2, 28, 0, 0, 51);
+ tv3.unchecked_set_time(2022, 2, 28, 0, 0, 52);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ auto dtv2_3 = tv3.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+ column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(5));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_func_3->create(place);
+ const IColumn* column[6] = {column_window.get(), column_mode.get(),
column_timestamp.get(),
+ column_event1.get(), column_event2.get(),
column_event3.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_func_3->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_func_3->insert_result_into(place, column_result);
+ // Old chain from t=0 reaches only level 1 (e2@51 is 51s away, outside 5s
window)
+ // New chain from t=50: e1@50 -> e2@51 (51>50 ✓, within 5s) -> e3@52
(52>51 ✓) = 3
+ EXPECT_EQ(column_result.get_data()[0], 3);
+ agg_func_3->destroy(place);
+}
+
+// Test INCREASE mode: old chain is better than new chain (max_level
preserved).
+// 3 conditions, window=100s, INCREASE mode
+// Row 0 (t=0s): event1
+// Row 1 (t=10s): event2
+// Row 2 (t=50s): event1 (restarts chain, old chain had level=2)
+// No more events after the restart — new chain can only reach level 1.
+// Correct result: 2 (old chain: e1@0 -> e2@10)
+TEST_F(VWindowFunnelV2Test, testIncreaseModeOldChainBetter) {
+ AggregateFunctionSimpleFactory factory =
AggregateFunctionSimpleFactory::instance();
+ DataTypes data_types_3 = {
+ std::make_shared<DataTypeInt64>(),
std::make_shared<DataTypeString>(),
+ std::make_shared<DataTypeDateTimeV2>(),
std::make_shared<DataTypeUInt8>(),
+ std::make_shared<DataTypeUInt8>(),
std::make_shared<DataTypeUInt8>()};
+ auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr,
false,
+ BeExecVersionManager::get_newest_version());
+ ASSERT_NE(agg_func_3, nullptr);
+
+ const int NUM_ROWS = 3;
+ auto column_mode = ColumnString::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_mode->insert(Field::create_field<TYPE_STRING>("increase"));
+ }
+ {
+ auto column_timestamp = ColumnDateTimeV2::create();
+ VecDateTimeValue tv0, tv1, tv2;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ tv1.unchecked_set_time(2022, 2, 28, 0, 0, 10);
+ tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+
+ // Row 0: e1=T, Row 1: e2=T, Row 2: e1=T (restart)
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_func_3->create(place);
+ const IColumn* column[6] = {column_window.get(), column_mode.get(),
+ column_timestamp.get(),
column_event1.get(),
+ column_event2.get(),
column_event3.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_func_3->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_func_3->insert_result_into(place, column_result);
+ // Old chain: e1@0 -> e2@10 = level 2
+ // New chain from t=50: only e1@50, no further events = level 1
+ // max(2, 1) = 2
+ EXPECT_EQ(column_result.get_data()[0], 2);
+ agg_func_3->destroy(place);
+ }
+ {
+ auto column_timestamp = ColumnDateTimeV2::create();
+ VecDateTimeValue tv0, tv1, tv2, tv3;
+ tv0.unchecked_set_time(2022, 2, 28, 0, 0, 0);
+ tv1.unchecked_set_time(2022, 2, 28, 0, 0, 10);
+ tv2.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+ tv3.unchecked_set_time(2022, 2, 28, 0, 0, 50);
+ auto dtv2_0 = tv0.to_datetime_v2();
+ auto dtv2_1 = tv1.to_datetime_v2();
+ auto dtv2_2 = tv2.to_datetime_v2();
+ auto dtv2_3 = tv3.to_datetime_v2();
+ column_timestamp->insert_data((char*)&dtv2_0, 0);
+ column_timestamp->insert_data((char*)&dtv2_1, 0);
+ column_timestamp->insert_data((char*)&dtv2_2, 0);
+ column_timestamp->insert_data((char*)&dtv2_3, 0);
+
+ auto column_event1 = ColumnUInt8::create();
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event2 = ColumnUInt8::create();
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0));
+
+ auto column_event3 = ColumnUInt8::create();
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0));
+ column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1));
+
+ auto column_window = ColumnInt64::create();
+ for (int i = 0; i < NUM_ROWS; i++) {
+ column_window->insert(Field::create_field<TYPE_BIGINT>(100));
+ }
+
+ std::unique_ptr<char[]> memory(new char[agg_func_3->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ agg_func_3->create(place);
+ const IColumn* column[6] = {column_window.get(), column_mode.get(),
+ column_timestamp.get(),
column_event1.get(),
+ column_event2.get(),
column_event3.get()};
+ for (int i = 0; i < NUM_ROWS; i++) {
+ agg_func_3->add(place, column, i, arena);
+ }
+
+ ColumnInt32 column_result;
+ agg_func_3->insert_result_into(place, column_result);
+ // Old chain: e1@0 -> e2@10 = level 2
+ // New chain from t=50: only e1@50, e3@50 can't advance (no e2 matched)
+ // max(2, 1) = 2
+ EXPECT_EQ(column_result.get_data()[0], 2);
+ agg_func_3->destroy(place);
+ }
+}
+
+} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
index f7f01eaeda0..f98c3e14df8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinAggregateFunctions.java
@@ -92,6 +92,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.agg.TopNWeighted;
import org.apache.doris.nereids.trees.expressions.functions.agg.Variance;
import org.apache.doris.nereids.trees.expressions.functions.agg.VarianceSamp;
import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnel;
+import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -187,7 +188,8 @@ public class BuiltinAggregateFunctions implements
FunctionHelper {
agg(TopNWeighted.class, "topn_weighted"),
agg(Variance.class, "var_pop", "variance_pop", "variance"),
agg(VarianceSamp.class, "var_samp", "variance_samp"),
- agg(WindowFunnel.class, "window_funnel")
+ agg(WindowFunnel.class, "window_funnel_v1"),
+ agg(WindowFunnelV2.class, "window_funnel_v2", "window_funnel")
);
ImmutableMap.Builder<String, Boolean> aggFuncNameNullableMapBuilder
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
index fe68663ff5f..848d3cfd6d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
@@ -65,7 +65,7 @@ public class WindowFunnel extends NullableAggregateFunction
public WindowFunnel(boolean distinct, boolean alwaysNullable, Expression
arg0, Expression arg1, Expression arg2,
Expression arg3, Expression... varArgs) {
- super("window_funnel", distinct, alwaysNullable,
+ super("window_funnel_v1", distinct, alwaysNullable,
ExpressionUtils.mergeArguments(arg0, arg1, arg2, arg3,
varArgs));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java
similarity index 74%
copy from
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java
index fe68663ff5f..fa4f5d8e469 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnelV2.java
@@ -36,11 +36,14 @@ import com.google.common.collect.ImmutableList;
import java.util.List;
/**
- * AggregateFunction 'window_funnel'. This class is generated by
GenerateFunction.
+ * AggregateFunction 'window_funnel_v2'. V2 implementation that only stores
matched events
+ * as (timestamp, event_index) pairs, dramatically reducing memory usage
compared to V1.
*/
-public class WindowFunnel extends NullableAggregateFunction
+public class WindowFunnelV2 extends NullableAggregateFunction
implements ExplicitlyCastableSignature {
+ public static final int MAX_EVENT_CONDITIONS = 127;
+
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(IntegerType.INSTANCE)
.varArgs(BigIntType.INSTANCE, StringType.INSTANCE,
DateTimeV2Type.WILDCARD,
@@ -51,32 +54,38 @@ public class WindowFunnel extends NullableAggregateFunction
/**
* constructor with 4 or more arguments.
*/
- public WindowFunnel(Expression arg0, Expression arg1, Expression arg2,
Expression arg3, Expression... varArgs) {
+ public WindowFunnelV2(Expression arg0, Expression arg1, Expression arg2,
Expression arg3,
+ Expression... varArgs) {
this(false, arg0, arg1, arg2, arg3, varArgs);
}
/**
* constructor with 4 or more arguments.
*/
- public WindowFunnel(boolean distinct, Expression arg0, Expression arg1,
Expression arg2,
+ public WindowFunnelV2(boolean distinct, Expression arg0, Expression arg1,
Expression arg2,
Expression arg3, Expression... varArgs) {
this(distinct, false, arg0, arg1, arg2, arg3, varArgs);
}
- public WindowFunnel(boolean distinct, boolean alwaysNullable, Expression
arg0, Expression arg1, Expression arg2,
+ public WindowFunnelV2(boolean distinct, boolean alwaysNullable, Expression
arg0, Expression arg1, Expression arg2,
Expression arg3, Expression... varArgs) {
- super("window_funnel", distinct, alwaysNullable,
+ super("window_funnel_v2", distinct, alwaysNullable,
ExpressionUtils.mergeArguments(arg0, arg1, arg2, arg3,
varArgs));
}
/** constructor for withChildren and reuse signature */
- private WindowFunnel(NullableAggregateFunctionParams functionParams) {
+ private WindowFunnelV2(NullableAggregateFunctionParams functionParams) {
super(functionParams);
}
@Override
public void checkLegalityBeforeTypeCoercion() {
String functionName = getName();
+ int eventCount = arity() - 3;
+ if (eventCount > MAX_EVENT_CONDITIONS) {
+ throw new AnalysisException("The " + functionName + " function
supports at most "
+ + MAX_EVENT_CONDITIONS + " event conditions, but got " +
eventCount);
+ }
if (!getArgumentType(0).isIntegerLikeType()) {
throw new AnalysisException("The window params of " + functionName
+ " function must be integer");
}
@@ -109,19 +118,19 @@ public class WindowFunnel extends
NullableAggregateFunction
* withDistinctAndChildren.
*/
@Override
- public WindowFunnel withDistinctAndChildren(boolean distinct,
List<Expression> children) {
+ public WindowFunnelV2 withDistinctAndChildren(boolean distinct,
List<Expression> children) {
Preconditions.checkArgument(children.size() >= 4);
- return new WindowFunnel(getFunctionParams(distinct, children));
+ return new WindowFunnelV2(getFunctionParams(distinct, children));
}
@Override
- public WindowFunnel withAlwaysNullable(boolean alwaysNullable) {
- return new
WindowFunnel(getAlwaysNullableFunctionParams(alwaysNullable));
+ public WindowFunnelV2 withAlwaysNullable(boolean alwaysNullable) {
+ return new
WindowFunnelV2(getAlwaysNullableFunctionParams(alwaysNullable));
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
- return visitor.visitWindowFunnel(this, context);
+ return visitor.visitWindowFunnelV2(this, context);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
index 5493bdc44f9..e8b3ae193db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/AggregateFunctionVisitor.java
@@ -90,6 +90,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.agg.TopNWeighted;
import org.apache.doris.nereids.trees.expressions.functions.agg.Variance;
import org.apache.doris.nereids.trees.expressions.functions.agg.VarianceSamp;
import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnel;
+import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2;
import
org.apache.doris.nereids.trees.expressions.functions.combinator.ForEachCombinator;
import
org.apache.doris.nereids.trees.expressions.functions.combinator.MergeCombinator;
import
org.apache.doris.nereids.trees.expressions.functions.combinator.UnionCombinator;
@@ -388,6 +389,10 @@ public interface AggregateFunctionVisitor<R, C> {
return visitNullableAggregateFunction(windowFunnel, context);
}
+ default R visitWindowFunnelV2(WindowFunnelV2 windowFunnelV2, C context) {
+ return visitNullableAggregateFunction(windowFunnelV2, context);
+ }
+
default R visitMergeCombinator(MergeCombinator combinator, C context) {
return visitAggregateFunction(combinator, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
index d70364e4f83..dd159db263b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
@@ -60,6 +60,7 @@ public class AggStateType extends DataType {
.put("var_samp", "variance_samp")
.put("hist", "histogram")
.put("map_agg", "map_agg_v2")
+ .put("window_funnel", "window_funnel_v2")
.build();
private final List<DataType> subTypes;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
index 51ef79d2874..c70f3844e59 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/CheckExpressionLegalityTest.java
@@ -21,15 +21,24 @@ import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
+import org.apache.doris.nereids.trees.expressions.Expression;
import
org.apache.doris.nereids.trees.expressions.functions.agg.BitmapUnionCount;
import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.WindowFunnelV2;
+import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
import org.apache.doris.nereids.util.MemoPatternMatchSupported;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.qe.ConnectContext;
+import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
+import java.util.List;
+
public class CheckExpressionLegalityTest implements MemoPatternMatchSupported {
@Test
public void testAvg() {
@@ -71,4 +80,22 @@ public class CheckExpressionLegalityTest implements
MemoPatternMatchSupported {
.applyBottomUp(new
ExpressionRewrite(CheckLegalityAfterRewrite.INSTANCE))
);
}
+
+ @Test
+ public void testWindowFunnelV2TooManyConditions() {
+ List<Expression> arguments = ImmutableList.<Expression>builder()
+ .add(new IntegerLiteral(10))
+ .add(new StringLiteral("default"))
+ .add(new DateTimeLiteral("2022-02-28 00:00:00"))
+ .addAll(ImmutableList.copyOf(java.util.Collections.nCopies(
+ WindowFunnelV2.MAX_EVENT_CONDITIONS + 1,
BooleanLiteral.TRUE)))
+ .build();
+ WindowFunnelV2 expression = new WindowFunnelV2(arguments.get(0),
arguments.get(1),
+ arguments.get(2), arguments.get(3), arguments.subList(4,
arguments.size())
+ .toArray(new Expression[0]));
+
+ ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
+ "supports at most " + WindowFunnelV2.MAX_EVENT_CONDITIONS + "
event conditions",
+ expression::checkLegalityBeforeTypeCoercion);
+ }
}
diff --git a/regression-test/data/nereids_p0/aggregate/window_funnel.out
b/regression-test/data/nereids_p0/aggregate/window_funnel.out
index f9c24637999..4c8c09490bb 100644
--- a/regression-test/data/nereids_p0/aggregate/window_funnel.out
+++ b/regression-test/data/nereids_p0/aggregate/window_funnel.out
@@ -117,7 +117,7 @@
100127 2
-- !window_funnel_fixed1 --
-100123 2
+100123 4
100125 3
100126 2
100127 2
diff --git a/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out
b/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out
new file mode 100644
index 00000000000..5d91239ad73
--- /dev/null
+++ b/regression-test/data/nereids_p0/aggregate/window_funnel_v2.out
@@ -0,0 +1,88 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !v2_default_small_window --
+1
+
+-- !v2_default_large_window --
+2
+
+-- !v2_datetimev2_small --
+1
+
+-- !v2_datetimev2_large --
+2
+
+-- !v2_multi_user_default0 --
+100123 4
+100125 3
+100126 2
+100127 2
+
+-- !v2_multi_user_default1 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_multi_user_default2 --
+100123 1
+100125 1
+100126 1
+100127 1
+
+-- !v2_default_neq --
+100123 4
+100125 3
+100126 2
+100127 2
+
+-- !v2_default_complex --
+100123 4
+100125 2
+100126 0
+100127 1
+
+-- !v2_deduplication0 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_deduplication1 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_fixed0 --
+100123 4
+100125 3
+100126 2
+100127 2
+
+-- !v2_fixed_reorder --
+2
+
+-- !v2_increase0 --
+100123 3
+100125 3
+100126 2
+100127 2
+
+-- !v2_increase_same_ts --
+2
+
+-- !v2_fixed_vs_v1 --
+100123 4
+
+-- !v2_explicit_name --
+100123 4
+
+-- !v2_increase_event0_overwrite --
+3
+
+-- !v2_increase_new_chain_better --
+3
+
+-- !v2_increase_old_chain_better --
+2
+
diff --git
a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
index eb048bacbb6..3c3f674ce5f 100644
---
a/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
+++
b/regression-test/data/nereids_p0/sql_functions/aggregate_functions/test_aggregate_window_functions.out
@@ -807,11 +807,11 @@ shanxi windows 1
2 5
-- !agg_window_window_funnel --
-100123 2
-100123 2
-100123 2
-100123 2
-100123 2
+100123 4
+100123 4
+100123 4
+100123 4
+100123 4
100125 3
100125 3
100125 3
diff --git
a/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy
b/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy
new file mode 100644
index 00000000000..b43b2584337
--- /dev/null
+++ b/regression-test/suites/nereids_p0/aggregate/window_funnel_v2.groovy
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("window_funnel_v2") {
+ sql "SET enable_nereids_planner=true"
+ sql "SET enable_fallback_to_original_planner=false"
+
+ // ==================== Basic DEFAULT mode tests ====================
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+ xwho varchar(50) NULL COMMENT 'xwho',
+ xwhen datetime COMMENT 'xwhen',
+ xwhat int NULL COMMENT 'xwhat'
+ )
+ DUPLICATE KEY(xwho)
+ DISTRIBUTED BY HASH(xwho) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:41:00', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 13:28:02', 2)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 16:15:01', 3)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 19:05:04', 4)"
+
+ // window=1 second, only event1 matches (events too far apart)
+ order_qt_v2_default_small_window """
+ select
+ window_funnel(
+ 1,
+ 'default',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+ // window=20000 seconds, both events match
+ order_qt_v2_default_large_window """
+ select
+ window_funnel(
+ 20000,
+ 'default',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+
+ // ==================== DateTimeV2 precision test ====================
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+ xwho varchar(50) NULL COMMENT 'xwho',
+ xwhen datetimev2(3) COMMENT 'xwhen',
+ xwhat int NULL COMMENT 'xwhat'
+ )
+ DUPLICATE KEY(xwho)
+ DISTRIBUTED BY HASH(xwho) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:41:00.111111', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 13:28:02.111111', 2)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 16:15:01.111111', 3)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 19:05:04.111111', 4)"
+
+ order_qt_v2_datetimev2_small """
+ select
+ window_funnel(
+ 1,
+ 'default',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+ order_qt_v2_datetimev2_large """
+ select
+ window_funnel(
+ 20000,
+ 'default',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+
+ // ==================== Multi-user default mode tests ====================
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE windowfunnel_v2_test(
+ user_id BIGINT,
+ event_name VARCHAR(64),
+ event_timestamp datetime,
+ phone_brand varchar(64),
+ tab_num int
+ ) distributed by hash(user_id) buckets 3
properties("replication_num"="1");
+ """
+ sql """
+ INSERT INTO windowfunnel_v2_test VALUES
+ (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+ (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+ (100123, '下单', '2022-05-14 10:04:00', "HONOR", 3),
+ (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+ (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+ (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+ (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+ (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+ (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+ (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+ (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+ """
+ // 3 hour window
+ order_qt_v2_multi_user_default0 """
+ SELECT
+ user_id,
+ window_funnel(3600 * 3, "default", event_timestamp, event_name =
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+ // 5 minute window
+ order_qt_v2_multi_user_default1 """
+ SELECT
+ user_id,
+ window_funnel(300, "default", event_timestamp, event_name = '登录',
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+ // 30 second window
+ order_qt_v2_multi_user_default2 """
+ SELECT
+ user_id,
+ window_funnel(30, "default", event_timestamp, event_name = '登录',
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+ // complicate expressions with != condition
+ order_qt_v2_default_neq """
+ SELECT
+ user_id,
+ window_funnel(3600000000, "default", event_timestamp, event_name =
'登录', event_name != '登陆', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id;
+ """
+ // Complex filter conditions
+ order_qt_v2_default_complex """
+ SELECT
+ user_id,
+ window_funnel(3600000000, "default", event_timestamp,
+ event_name = '登录' AND phone_brand in ('HONOR',
'XIAOMI', 'VIVO') AND tab_num not in (4, 5),
+ event_name = '访问' AND tab_num not in (4, 5),
+ event_name = '下单' AND tab_num not in (6, 7),
+ event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id;
+ """
+
+ // ==================== DEDUPLICATION mode tests ====================
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE windowfunnel_v2_test(
+ user_id BIGINT,
+ event_name VARCHAR(64),
+ event_timestamp datetime,
+ phone_brand varchar(64),
+ tab_num int
+ ) distributed by hash(user_id) buckets 3
properties("replication_num"="1");
+ """
+ sql """
+ INSERT INTO windowfunnel_v2_test VALUES
+ (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+ (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+ (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+ (100123, '登录', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '登录2', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '登录3', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '登录4', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '登录5', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+ (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+ (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+ (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+ (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+ (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+ (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+ (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+ """
+ order_qt_v2_deduplication0 """
+ SELECT
+ user_id,
+ window_funnel(3600, "deduplication", event_timestamp, event_name =
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+
+ // Test dedup with duplicate event2 (访问)
+ sql """ truncate table windowfunnel_v2_test; """
+ sql """
+ INSERT INTO windowfunnel_v2_test VALUES
+ (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+ (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+ (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+ (100123, '登录1', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '访问', '2022-05-14 10:04:00', 'HONOR', 3),
+ (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+ (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+ (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+ (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+ (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+ (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+ (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+ (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+ """
+ order_qt_v2_deduplication1 """
+ SELECT
+ user_id,
+ window_funnel(3600, "deduplication", event_timestamp, event_name =
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+
+ // ==================== FIXED mode tests (StarRocks-style semantics)
====================
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE windowfunnel_v2_test(
+ user_id BIGINT,
+ event_name VARCHAR(64),
+ event_timestamp datetime,
+ phone_brand varchar(64),
+ tab_num int
+ ) distributed by hash(user_id) buckets 3
properties("replication_num"="1");
+ """
+ sql """
+ INSERT INTO windowfunnel_v2_test VALUES
+ (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+ (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+ (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+ (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4),
+ (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+ (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+ (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+ (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+ (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+ (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+ (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+ """
+ // Note: In V2 fixed mode (StarRocks-style), unmatched rows don't break
the chain.
+ // The chain only breaks when a matched event's predecessor level hasn't
been matched.
+ order_qt_v2_fixed0 """
+ SELECT
+ user_id,
+ window_funnel(3600, "fixed", event_timestamp, event_name = '登录',
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+
+ // Test fixed mode where event order in conditions doesn't match data order
+ sql """ truncate table windowfunnel_v2_test; """
+ sql """
+ INSERT INTO windowfunnel_v2_test VALUES
+ (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+ (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+ (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+ (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4);
+ """
+ order_qt_v2_fixed_reorder """
+ select
+ window_funnel(
+ 20000,
+ 'fixed',
+ t.event_timestamp,
+ t.event_name = '登录',
+ t.event_name = '访问',
+ t.event_name = '付款',
+ t.event_name = '下单'
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+
+ // ==================== INCREASE mode tests ====================
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE windowfunnel_v2_test(
+ user_id BIGINT,
+ event_name VARCHAR(64),
+ event_timestamp datetime,
+ phone_brand varchar(64),
+ tab_num int
+ ) distributed by hash(user_id) buckets 3
properties("replication_num"="1");
+ """
+ sql """
+ INSERT INTO windowfunnel_v2_test VALUES
+ (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+ (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+ (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+ (100123, '付款', '2022-05-14 10:04:00', 'HONOR', 4),
+ (100125, '登录', '2022-05-15 11:00:00', 'XIAOMI', 1),
+ (100125, '访问', '2022-05-15 11:01:00', 'XIAOMI', 2),
+ (100125, '下单', '2022-05-15 11:02:00', 'XIAOMI', 6),
+ (100126, '登录', '2022-05-15 12:00:00', 'IPHONE', 1),
+ (100126, '访问', '2022-05-15 12:01:00', 'HONOR', 2),
+ (100127, '登录', '2022-05-15 11:30:00', 'VIVO', 1),
+ (100127, '访问', '2022-05-15 11:31:00', 'VIVO', 5);
+ """
+ order_qt_v2_increase0 """
+ SELECT
+ user_id,
+ window_funnel(3600, "increase", event_timestamp, event_name =
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+
+ // Test increase mode with same-timestamp events
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+ xwho varchar(50) NULL COMMENT 'xwho',
+ xwhen datetimev2(3) COMMENT 'xwhen',
+ xwhat int NULL COMMENT 'xwhat'
+ )
+ DUPLICATE KEY(xwho)
+ DISTRIBUTED BY HASH(xwho) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:41:00.111111', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 13:28:02.111111', 2)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 13:28:02.111111', 3)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 15:05:04.111111', 4)"
+ order_qt_v2_increase_same_ts """
+ select
+ window_funnel(
+ 20000,
+ 'increase',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2,
+ t.xwhat = 3,
+ t.xwhat = 4
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+
+ // ==================== V2 FIXED mode key difference from V1
====================
+ // In V1, unmatched rows (rows that match no event condition) break the
chain in FIXED mode.
+ // In V2, unmatched rows are not stored, so only matched events with level
jumps break the chain.
+ // This test shows the behavioral difference.
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE windowfunnel_v2_test(
+ user_id BIGINT,
+ event_name VARCHAR(64),
+ event_timestamp datetime,
+ phone_brand varchar(64),
+ tab_num int
+ ) distributed by hash(user_id) buckets 3
properties("replication_num"="1");
+ """
+ sql """
+ INSERT INTO windowfunnel_v2_test VALUES
+ (100123, '登录', '2022-05-14 10:01:00', 'HONOR', 1),
+ (100123, '访问', '2022-05-14 10:02:00', 'HONOR', 2),
+ (100123, '登录2', '2022-05-14 10:03:00', 'HONOR', 3),
+ (100123, '下单', '2022-05-14 10:04:00', "HONOR", 4),
+ (100123, '付款', '2022-05-14 10:10:00', 'HONOR', 4);
+ """
+ // V2 fixed mode: 登录2 doesn't match any condition, so it's not stored.
+ // The chain 登录->访问->下单->付款 is unbroken because there are no level jumps.
+ // V1 would return 2 here (登录2 physically breaks adjacency), V2 returns 4.
+ order_qt_v2_fixed_vs_v1 """
+ SELECT
+ user_id,
+ window_funnel(3600, "fixed", event_timestamp, event_name = '登录',
event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+
+ // ==================== Test using window_funnel_v2 explicit name
====================
+ order_qt_v2_explicit_name """
+ SELECT
+ user_id,
+ window_funnel_v2(3600, "fixed", event_timestamp, event_name =
'登录', event_name = '访问', event_name = '下单', event_name = '付款') AS level
+ FROM windowfunnel_v2_test
+ GROUP BY user_id
+ order BY user_id
+ """
+
+ // ==================== INCREASE mode: event-0 re-occurrence bug fix
====================
+ // Regression test for the bug where a later event-0 overwrites
events_timestamp[0]
+ // and breaks the INCREASE mode strict-increase check for an already-valid
chain.
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+ sql """
+ CREATE TABLE IF NOT EXISTS windowfunnel_v2_test (
+ xwho varchar(50) NULL COMMENT 'xwho',
+ xwhen datetimev2(3) COMMENT 'xwhen',
+ xwhat int NULL COMMENT 'xwhat'
+ )
+ DUPLICATE KEY(xwho)
+ DISTRIBUTED BY HASH(xwho) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ // Case 1: Old chain (from t=0) is valid and completes all 3 levels.
+ // The duplicate event-0 at t=50 should not destroy it.
+ sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:00.000', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:50.000', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:50.000', 2)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:01:00.000', 3)"
+ order_qt_v2_increase_event0_overwrite """
+ select
+ window_funnel(
+ 100,
+ 'increase',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2,
+ t.xwhat = 3
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+
+ // Case 2: Old chain can't complete (window too small), new chain is
better.
+ sql """ truncate table windowfunnel_v2_test; """
+ sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:00.000', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:50.000', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:51.000', 2)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:52.000', 3)"
+ order_qt_v2_increase_new_chain_better """
+ select
+ window_funnel(
+ 5,
+ 'increase',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2,
+ t.xwhat = 3
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+
+ // Case 3: Old chain is better (reached level 2), new chain only reaches
level 1.
+ sql """ truncate table windowfunnel_v2_test; """
+ sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:00.000', 1)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:10.000', 2)"
+ sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1',
'2022-03-12 10:00:50.000', 1)"
+ order_qt_v2_increase_old_chain_better """
+ select
+ window_funnel(
+ 100,
+ 'increase',
+ t.xwhen,
+ t.xwhat = 1,
+ t.xwhat = 2,
+ t.xwhat = 3
+ ) AS level
+ from windowfunnel_v2_test t;
+ """
+
+ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]