Copilot commented on code in PR #61566: URL: https://github.com/apache/doris/pull/61566#discussion_r3013157244
########## be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h: ########## @@ -0,0 +1,599 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <algorithm> +#include <iterator> +#include <utility> +#include <vector> + +#include "common/cast_set.h" +#include "common/exception.h" +#include "common/status.h" +#include "core/assert_cast.h" +#include "core/column/column_string.h" +#include "core/data_type/data_type_number.h" +#include "core/types.h" +#include "core/value/vdatetime_value.h" +#include "exprs/aggregate/aggregate_function.h" +#include "exprs/aggregate/aggregate_function_window_funnel.h" // for WindowFunnelMode, string_to_window_funnel_mode +#include "util/var_int.h" + +namespace doris { +#include "common/compile_check_begin.h" +class Arena; +class BufferReadable; +class BufferWritable; +class IColumn; +} // namespace doris + +namespace doris { + +/// Merge two event lists, utilizing sorted flags to optimize. +/// After merge, all events are in `events_list` and it is sorted. +template <typename T> +void merge_events_list(T& events_list, size_t prefix_size, bool prefix_sorted, bool suffix_sorted) { + if (!prefix_sorted && !suffix_sorted) { + std::stable_sort(std::begin(events_list), std::end(events_list)); + } else { + const auto begin = std::begin(events_list); + const auto middle = std::next(begin, prefix_size); + const auto end = std::end(events_list); + + if (!prefix_sorted) { + std::stable_sort(begin, middle); + } + if (!suffix_sorted) { + std::stable_sort(middle, end); + } + std::inplace_merge(begin, middle, end); + } +} + +/// V2 state: stores only matched events as (timestamp, event_index) pairs. +/// Compared to V1 which stores all rows with N boolean columns, V2 only stores +/// events that actually match at least one condition, dramatically reducing memory. +/// +/// To prevent same-row multi-condition chain advancement (where a single row +/// matching multiple conditions could incorrectly advance the funnel through +/// multiple levels), we use the high bit of event_idx as a "same-row continuation" +/// flag. When a row matches multiple conditions, the first event stored for that +/// row has bit 7 = 0, and subsequent events from the same row have bit 7 = 1. +/// The algorithm uses this to ensure each funnel step comes from a different row. +/// +/// This approach adds ZERO storage overhead — each event remains 9 bytes (UInt64 + UInt8). +struct WindowFunnelStateV2 { + /// (timestamp_int_val, 1-based event_index with continuation flag in bit 7) + /// + /// Bit layout of event_idx: + /// bit 7 (0x80): continuation flag — 1 means this event is from the same row + /// as the preceding event in events_list + /// bits 0-6: actual 1-based event index (supports up to 127 conditions) + /// + /// Sorted by timestamp only (via operator<). stable_sort preserves insertion order + /// for equal timestamps, so same-row events remain consecutive after sorting. + struct TimestampEvent { + UInt64 timestamp; + UInt8 event_idx; // includes continuation flag in bit 7 + + /// Sort by timestamp only. For same timestamp, stable_sort preserves insertion + /// order, keeping same-row events consecutive. + bool operator<(const TimestampEvent& other) const { return timestamp < other.timestamp; } + bool operator<=(const TimestampEvent& other) const { return timestamp <= other.timestamp; } + }; + + static constexpr UInt8 CONTINUATION_FLAG = 0x80; + static constexpr UInt8 EVENT_IDX_MASK = 0x7F; + + /// Extract the actual 1-based event index (stripping continuation flag). + static int get_event_idx(UInt8 raw) { return (raw & EVENT_IDX_MASK); } + /// Check if this event is a continuation of the same row as the previous event. + static bool is_continuation(UInt8 raw) { return (raw & CONTINUATION_FLAG) != 0; } + + static constexpr int64_t WINDOW_UNSET = -1; + + int event_count = 0; + int64_t window = WINDOW_UNSET; + WindowFunnelMode window_funnel_mode = WindowFunnelMode::INVALID; + bool sorted = true; + std::vector<TimestampEvent> events_list; + + WindowFunnelStateV2() = default; + WindowFunnelStateV2(int arg_event_count) : event_count(arg_event_count) {} + + void reset() { + events_list.clear(); + sorted = true; + } + + void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) { + window = win; + window_funnel_mode = mode; + + // get_data() returns DateV2Value<DateTimeV2ValueType>; convert to packed UInt64 + auto timestamp = assert_cast<const ColumnVector<TYPE_DATETIMEV2>&>(*arg_columns[2]) + .get_data()[row_num] + .to_date_int_val(); + + // Iterate from last event to first (reverse order). + // This ensures that after stable_sort, events with the same timestamp + // appear in descending event_index order, which is important for correct + // matching when one row satisfies multiple conditions. + // + // The first event stored for this row has continuation=0; + // subsequent events from the same row have continuation=1 (bit 7 set). + bool first_match = true; + for (int i = event_count - 1; i >= 0; --i) { + auto event_val = + assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]; + if (event_val) { + UInt8 packed_idx = cast_set<UInt8>(i + 1); + if (!first_match) { + packed_idx |= CONTINUATION_FLAG; + } + first_match = false; Review Comment: `event_idx` packs the 1-based condition index into the low 7 bits and uses the high bit as a continuation flag, but `packed_idx = cast_set<UInt8>(i + 1)` will silently overflow/corrupt values when there are >127 funnel conditions. Please add a hard validation (e.g., reject/throw when `event_count > 127` or use a wider type) so queries with many conditions don’t return incorrect results without an error. ########## be/src/exprs/aggregate/aggregate_function_window_funnel_v2.cpp: ########## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/aggregate/aggregate_function_window_funnel_v2.h" + +#include <string> + +#include "common/logging.h" +#include "core/data_type/data_type.h" +#include "core/types.h" +#include "exprs/aggregate/aggregate_function_simple_factory.h" +#include "exprs/aggregate/helpers.h" + +namespace doris { +#include "common/compile_check_begin.h" + +AggregateFunctionPtr create_aggregate_function_window_funnel_v2(const std::string& name, + const DataTypes& argument_types, + const DataTypePtr& result_type, + const bool result_is_nullable, + const AggregateFunctionAttr& attr) { + if (argument_types.size() < 4) { + LOG(WARNING) << "window_funnel_v2 requires at least 4 arguments (window, mode, timestamp, " + "and at least 1 boolean condition), but got " + << argument_types.size() << "."; + return nullptr; + } + if (argument_types[2]->get_primitive_type() == TYPE_DATETIMEV2) { + return creator_without_type::create<AggregateFunctionWindowFunnelV2>( + argument_types, result_is_nullable, attr); + } else { + LOG(WARNING) << "Only support DateTime type as window argument!"; + return nullptr; + } Review Comment: The warning message is misleading: the code checks the *3rd* argument type (`argument_types[2]`, the timestamp) but logs "window argument". Consider updating the message to refer to the timestamp parameter and (ideally) include the actual type received to aid debugging. ```suggestion LOG(WARNING) << "Only support DateTimeV2 type as timestamp argument, but got " << argument_types[2]->get_name() << "."; return nullptr; ``` ########## be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp: ########## @@ -0,0 +1,1167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <stddef.h> + +#include <memory> +#include <ostream> + +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_date_or_datetime_v2.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "core/string_buffer.hpp" +#include "core/value/vdatetime_value.h" +#include "exprs/aggregate/aggregate_function.h" +#include "exprs/aggregate/aggregate_function_simple_factory.h" +#include "gtest/gtest_pred_impl.h" + +namespace doris { +class IColumn; +} // namespace doris + +namespace doris { + +void register_aggregate_function_window_funnel_v2(AggregateFunctionSimpleFactory& factory); + +class VWindowFunnelV2Test : public testing::Test { +public: + AggregateFunctionPtr agg_function; + + VWindowFunnelV2Test() {} + + void SetUp() { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + DataTypes data_types = { + std::make_shared<DataTypeInt64>(), std::make_shared<DataTypeString>(), + std::make_shared<DataTypeDateTimeV2>(), std::make_shared<DataTypeUInt8>(), + std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>(), + std::make_shared<DataTypeUInt8>()}; + agg_function = factory.get("window_funnel_v2", data_types, nullptr, false, + BeExecVersionManager::get_newest_version()); + EXPECT_NE(agg_function, nullptr); + } + + void TearDown() {} + + Arena arena; +}; + +TEST_F(VWindowFunnelV2Test, testEmpty) { + std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + + ColumnString buf; + VectorBufferWriter buf_writer(buf); + agg_function->serialize(place, buf_writer); + buf_writer.commit(); + LOG(INFO) << "buf size : " << buf.size(); + VectorBufferReader buf_reader(buf.get_data_at(0)); + agg_function->deserialize(place, buf_reader, arena); + + std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]); + AggregateDataPtr place2 = memory2.get(); + agg_function->create(place2); + + agg_function->merge(place, place2, arena); + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], 0); + + ColumnInt32 column_result2; + agg_function->insert_result_into(place2, column_result2); + EXPECT_EQ(column_result2.get_data()[0], 0); + + agg_function->destroy(place); + agg_function->destroy(place2); +} + +TEST_F(VWindowFunnelV2Test, testSerialize) { + const int NUM_CONDS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_mode->insert(Field::create_field<TYPE_STRING>("default")); + } + + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_CONDS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1)); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1)); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1)); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_window->insert(Field::create_field<TYPE_BIGINT>(2)); + } + + std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_CONDS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], 3); + + ColumnString buf; + VectorBufferWriter buf_writer(buf); + agg_function->serialize(place, buf_writer); + buf_writer.commit(); + agg_function->destroy(place); + + std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]); + AggregateDataPtr place2 = memory2.get(); + agg_function->create(place2); + + VectorBufferReader buf_reader(buf.get_data_at(0)); + agg_function->deserialize(place2, buf_reader, arena); + + ColumnInt32 column_result2; + agg_function->insert_result_into(place2, column_result2); + EXPECT_EQ(column_result2.get_data()[0], 3); + agg_function->destroy(place2); +} + +TEST_F(VWindowFunnelV2Test, testDefaultSortedNoMerge) { + const int NUM_CONDS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_mode->insert(Field::create_field<TYPE_STRING>("default")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_CONDS; i++) { + VecDateTimeValue time_value; + time_value.unchecked_set_time(2022, 2, 28, 0, 0, i); + auto dtv2 = time_value.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(1)); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event1->insert(Field::create_field<TYPE_BOOLEAN>(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(1)); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event2->insert(Field::create_field<TYPE_BOOLEAN>(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(1)); + column_event3->insert(Field::create_field<TYPE_BOOLEAN>(0)); + + auto column_event4 = ColumnUInt8::create(); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(0)); + column_event4->insert(Field::create_field<TYPE_BOOLEAN>(1)); + + for (int win = 0; win < NUM_CONDS + 1; win++) { + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_CONDS; i++) { + column_window->insert(Field::create_field<TYPE_BIGINT>(win)); + } + + std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_function->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), + column_timestamp.get(), column_event1.get(), + column_event2.get(), column_event3.get(), + column_event4.get()}; + for (int i = 0; i < NUM_CONDS; i++) { + agg_function->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_function->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], + win < 0 ? 1 : (win < NUM_CONDS ? win + 1 : NUM_CONDS)); Review Comment: This expectation contains a dead branch: `win` is iterated from 0 to `NUM_CONDS`, so `win < 0` is always false. Either simplify the expected expression or extend the loop / add a separate test if negative windows are intended to be covered. ```suggestion win < NUM_CONDS ? win + 1 : NUM_CONDS); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
