This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b935b738822 [fix](window_funnel) fix wrong result of fixed mode
(#40459) (#40960)
b935b738822 is described below
commit b935b73882212c812415600245295209d32df402
Author: TengJianPing <[email protected]>
AuthorDate: Thu Sep 19 10:56:08 2024 +0800
[fix](window_funnel) fix wrong result of fixed mode (#40459) (#40960)
## Proposed changes
Issue Number: close #xxx
BP #40146 and #40459
---
.../aggregate_function_window_funnel.h | 121 +++++++++------------
1 file changed, 49 insertions(+), 72 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
index 5a31b88a69b..2afa82e9a6d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
@@ -83,14 +83,11 @@ struct WindowFunnelState {
int64_t window;
bool enable_mode;
WindowFunnelMode window_funnel_mode;
- mutable MutableColumnPtr timestamp_column;
- mutable MutableColumns event_columns;
+ mutable vectorized::MutableBlock mutable_block;
ColumnVector<NativeType>::Container* timestamp_column_data;
std::vector<ColumnVector<UInt8>::Container*> event_columns_datas;
- Block block;
SortDescription sort_description {1};
bool sorted;
- bool is_merge;
WindowFunnelState() {
event_count = 0;
@@ -101,30 +98,38 @@ struct WindowFunnelState {
sort_description[0].direction = 1;
sort_description[0].nulls_direction = -1;
sorted = false;
- is_merge = false;
}
WindowFunnelState(int arg_event_count) : WindowFunnelState() {
- timestamp_column = ColumnVector<NativeType>::create();
+ event_count = arg_event_count;
+ auto timestamp_column = ColumnVector<NativeType>::create();
timestamp_column_data =
&assert_cast<ColumnVector<NativeType>&>(*timestamp_column).get_data();
- event_count = arg_event_count;
- event_columns.resize(event_count);
+
+ MutableColumns event_columns;
for (int i = 0; i < event_count; i++) {
- event_columns[i] = ColumnVector<UInt8>::create();
+ auto event_column = ColumnVector<UInt8>::create();
event_columns_datas.emplace_back(
-
&assert_cast<ColumnVector<UInt8>&>(*event_columns[i]).get_data());
+
&assert_cast<ColumnVector<UInt8>&>(*event_column).get_data());
+ event_columns.emplace_back(std::move(event_column));
}
+ Block tmp_block;
+ tmp_block.insert({std::move(timestamp_column),
+
DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"});
+ for (int i = 0; i < event_count; i++) {
+ tmp_block.insert({std::move(event_columns[i]),
+
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
+ "event_" + std::to_string(i)});
+ }
+
+ mutable_block = MutableBlock(std::move(tmp_block));
}
void reset() {
window = 0;
- timestamp_column->clear();
- for (auto& column : event_columns) {
- column->clear();
- }
- block.clear_column_data();
+ mutable_block.clear();
+ timestamp_column_data = nullptr;
+ event_columns_datas.clear();
sorted = false;
- is_merge = false;
}
void add(const IColumn** arg_columns, ssize_t row_num, int64_t win,
WindowFunnelMode mode) {
@@ -144,24 +149,11 @@ struct WindowFunnelState {
if (sorted) {
return;
}
- if (!is_merge) {
- Block tmp_block;
- tmp_block.insert({std::move(timestamp_column),
-
DataTypeFactory::instance().create_data_type(TYPE_INDEX),
- "timestamp"});
- for (int i = 0; i < event_count; i++) {
- tmp_block.insert({std::move(event_columns[i]),
-
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
- "event_" + std::to_string(i)});
- }
- block = tmp_block.clone_without_columns();
- sort_block(tmp_block, block, sort_description, 0);
- } else {
- auto tmp_block = block.clone_without_columns();
- sort_block(block, tmp_block, sort_description, 0);
- block = std::move(tmp_block);
- }
+ Block tmp_block = mutable_block.to_block();
+ auto block = tmp_block.clone_without_columns();
+ sort_block(tmp_block, block, sort_description, 0);
+ mutable_block = MutableBlock(std::move(block));
sorted = true;
}
@@ -174,9 +166,9 @@ struct WindowFunnelState {
TimeInterval interval(SECOND, window, false);
int column_idx = 1;
- const auto& first_event_column = block.get_by_position(column_idx);
+ const auto& first_event_column =
mutable_block.get_column_by_position(column_idx);
const auto& first_event_data =
- assert_cast<const
ColumnVector<UInt8>&>(*first_event_column.column).get_data();
+ assert_cast<const
ColumnVector<UInt8>&>(*first_event_column).get_data();
auto match_row = simd::find_one(first_event_data.data(), start_row,
row_count);
start_row = match_row + 1;
if (match_row < row_count) {
@@ -188,12 +180,13 @@ struct WindowFunnelState {
column_idx++;
auto last_match_row = match_row;
- for (; column_idx < event_count + 1; column_idx++) {
- const auto& event_column = block.get_by_position(column_idx);
+ ++match_row;
+ for (; column_idx < event_count + 1 && match_row < row_count;
+ column_idx++, match_row++) {
+ const auto& event_column =
mutable_block.get_column_by_position(column_idx);
const auto& event_data =
- assert_cast<const
ColumnVector<UInt8>&>(*event_column.column).get_data();
+ assert_cast<const
ColumnVector<UInt8>&>(*event_column).get_data();
if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
- ++match_row;
if (event_data[match_row] == 1) {
auto current_timestamp =
binary_cast<NativeType,
DateValueType>(timestamp_data[match_row]);
@@ -204,7 +197,7 @@ struct WindowFunnelState {
}
break;
}
- match_row = simd::find_one(event_data.data(), match_row + 1,
row_count);
+ match_row = simd::find_one(event_data.data(), match_row,
row_count);
if (match_row < row_count) {
auto current_timestamp =
binary_cast<NativeType,
DateValueType>(timestamp_data[match_row]);
@@ -227,10 +220,9 @@ struct WindowFunnelState {
for (int tmp_column_idx = 1; tmp_column_idx <
column_idx;
tmp_column_idx++) {
const auto& tmp_event_column =
- block.get_by_position(tmp_column_idx);
+
mutable_block.get_column_by_position(tmp_column_idx);
const auto& tmp_event_data =
- assert_cast<const
ColumnVector<UInt8>&>(
- *tmp_event_column.column)
+ assert_cast<const
ColumnVector<UInt8>&>(*tmp_event_column)
.get_data();
auto dup_match_row =
simd::find_one(tmp_event_data.data(),
last_match_row + 1, match_row);
@@ -258,11 +250,11 @@ struct WindowFunnelState {
int _get_internal() const {
size_t start_row = 0;
int max_found_event_count = 0;
- const auto& ts_column = block.get_by_position(0).column->get_ptr();
+ const auto& ts_column =
mutable_block.get_column_by_position(0)->get_ptr();
const auto& timestamp_data =
assert_cast<const
ColumnVector<NativeType>&>(*ts_column).get_data().data();
- auto row_count = block.rows();
+ auto row_count = mutable_block.rows();
while (start_row < row_count) {
auto found_event_count =
_match_event_list<WINDOW_FUNNEL_MODE>(start_row,
row_count, timestamp_data);
@@ -274,7 +266,7 @@ struct WindowFunnelState {
return max_found_event_count;
}
int get() const {
- auto row_count = block.rows();
+ auto row_count = mutable_block.rows();
if (event_count == 0 || row_count == 0) {
return 0;
}
@@ -294,16 +286,13 @@ struct WindowFunnelState {
}
void merge(const WindowFunnelState& other) {
- is_merge = true;
- MutableBlock mutable_block(&block);
- if (!other.block.empty()) {
- auto st = mutable_block.merge(other.block);
+ if (!other.mutable_block.empty()) {
+ auto st = mutable_block.merge(other.mutable_block.to_block());
if (!st.ok()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
st.to_string());
return;
}
}
- block = mutable_block.to_block();
event_count = event_count > 0 ? event_count : other.event_count;
window = window > 0 ? window : other.window;
@@ -328,27 +317,13 @@ struct WindowFunnelState {
size_t compressed_bytes = 0;
Status status;
std::string buff;
- if (is_merge) {
- // as the branch-2.1 is used the new impl of window funnel, and
the be_exec_version is 5
- // but in branch-3.0 this be_exec_version have update to 7, so
when upgrade from branch-2.1 to branch-3.0
- // maybe have error send the branch-3.0 version of version 7 to
branch-2.1([0---version--5])
- status = block.serialize(
- 5, &pblock, &uncompressed_bytes, &compressed_bytes,
- segment_v2::CompressionTypePB::ZSTD); // ZSTD for better
compression ratio
- } else {
- Block tmp_block;
- tmp_block.insert({std::move(timestamp_column),
-
DataTypeFactory::instance().create_data_type(TYPE_INDEX),
- "timestamp"});
- for (int i = 0; i < event_count; i++) {
- tmp_block.insert({std::move(event_columns[i]),
-
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
- "event_" + std::to_string(i)});
- }
- status = tmp_block.serialize(
- 5, &pblock, &uncompressed_bytes, &compressed_bytes,
- segment_v2::CompressionTypePB::ZSTD); // ZSTD for better
compression ratio
- }
+ Block block = mutable_block.to_block();
+ // as the branch-2.1 is used the new impl of window funnel, and the
be_exec_version is 5
+ // but in branch-3.0 this be_exec_version have update to 7, so when
upgrade from branch-2.1 to branch-3.0
+ // maybe have error send the branch-3.0 version of version 7 to
branch-2.1([0---version--5])
+ status = block.serialize(
+ 5, &pblock, &uncompressed_bytes, &compressed_bytes,
+ segment_v2::CompressionTypePB::ZSTD); // ZSTD for better
compression ratio
if (!status.ok()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
status.to_string());
return;
@@ -385,10 +360,12 @@ struct WindowFunnelState {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Failed to parse window_funnel data to
block");
}
+ Block block;
auto status = block.deserialize(pblock);
if (!status.ok()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
status.to_string());
}
+ mutable_block = MutableBlock(std::move(block));
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]