HappenLee commented on code in PR #46181: URL: https://github.com/apache/doris/pull/46181#discussion_r1912710329
########## be/src/pipeline/exec/analytic_sink_operator.cpp: ########## @@ -42,180 +110,521 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<AnalyticSinkOperatorX>(); - _shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size()); - _shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size()); - - size_t agg_size = p._agg_expr_ctxs.size(); - _agg_expr_ctxs.resize(agg_size); - _shared_state->agg_input_columns.resize(agg_size); - for (int i = 0; i < agg_size; ++i) { - _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]); + + _agg_functions_size = p._agg_functions_size; + _agg_expr_ctxs.resize(_agg_functions_size); + _agg_functions.resize(_agg_functions_size); + _agg_input_columns.resize(_agg_functions_size); + _offsets_of_aggregate_states.resize(_agg_functions_size); + _result_column_nullable_flags.resize(_agg_functions_size); + + for (int i = 0; i < _agg_functions_size; ++i) { + _agg_functions[i] = p._agg_functions[i]->clone(state, state->obj_pool()); + _agg_input_columns[i].resize(p._num_agg_input[i]); _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size()); for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) { RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, _agg_expr_ctxs[i][j])); + _agg_input_columns[i][j] = _agg_expr_ctxs[i][j]->root()->data_type()->create_column(); } - - for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) { - _shared_state->agg_input_columns[i][j] = - _agg_expr_ctxs[i][j]->root()->data_type()->create_column(); - } + _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i]; + _result_column_nullable_flags[i] = + !_agg_functions[i]->function()->get_return_type()->is_nullable() && + _agg_functions[i]->data_type()->is_nullable(); } - _partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size()); - for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); i++) { + + _partition_exprs_size = p._partition_by_eq_expr_ctxs.size(); + _partition_by_eq_expr_ctxs.resize(_partition_exprs_size); + _partition_by_columns.resize(_partition_exprs_size); + for (size_t i = 0; i < _partition_exprs_size; i++) { RETURN_IF_ERROR( p._partition_by_eq_expr_ctxs[i]->clone(state, _partition_by_eq_expr_ctxs[i])); + _partition_by_columns[i] = + _partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); } - _order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size()); - for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); i++) { + + _order_by_exprs_size = p._order_by_eq_expr_ctxs.size(); + _order_by_eq_expr_ctxs.resize(_order_by_exprs_size); + _order_by_columns.resize(_order_by_exprs_size); + for (size_t i = 0; i < _order_by_exprs_size; i++) { RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, _order_by_eq_expr_ctxs[i])); + _order_by_columns[i] = _order_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); + } + + // only support one order by column, so need two columns upper and lower bound + // _range_result_columns.resize(2); + _range_result_columns.resize(_order_by_exprs_size); + // should change the order by exprs to range column, IF FE have support range window + for (size_t i = 0; i < _order_by_exprs_size; i++) { + // RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, _order_by_eq_expr_ctxs[i])); + _range_result_columns[i] = _order_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); } + + _fn_place_ptr = _agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states, + p._align_aggregate_states); + _create_agg_status(); return Status::OK(); } -bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos& found_partition_end) { - auto& shared_state = *_shared_state; - if (shared_state.input_eos || - (shared_state.current_row_position < - shared_state.partition_by_end.pos)) { //now still have partition data - return false; +Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_close_timer); + if (_closed) { + return Status::OK(); + } + + _destroy_agg_status(); + _agg_arena_pool = nullptr; + + std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns; + _result_window_columns.swap(tmp_result_window_columns); + return PipelineXSinkLocalState<AnalyticSharedState>::close(state, exec_status); +} + +bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + int64_t current_row_start = 0; + int64_t current_row_end = _current_row_position + _rows_end_offset + 1; + + _reset_agg_status(); + if (!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) { + current_row_start = _partition_by_pose.start; + } else { + current_row_start = _current_row_position + _rows_start_offset; + } + // Make sure range_start <= range_end + current_row_start = std::min(current_row_start, current_row_end); + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, current_row_start, + current_row_end); + _insert_result_info(1); + _current_row_position++; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } + } + return false; +} + +bool AnalyticSinkLocalState::_get_next_for_unbounded_rows(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + // [preceding, current_row], [current_row, following] rewrite it's same + // as could reuse the previous calculate result, so don't call _reset_agg_status function + // going on calculate, add up data, no need to reset state + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _current_row_position, _current_row_position + 1); + _insert_result_info(1); + _current_row_position++; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } } - if ((_partition_by_eq_expr_ctxs.empty() && !shared_state.input_eos) || - (found_partition_end.pos == 0)) { //no partition, get until fetch to EOS - return true; + return false; +} + +bool AnalyticSinkLocalState::_get_next_for_partition(int64_t batch_rows, + int64_t current_block_base_pos) { + if (_current_row_position == _partition_by_pose.start) { + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _partition_by_pose.start, _partition_by_pose.end); } - if (!_partition_by_eq_expr_ctxs.empty() && - found_partition_end.pos == shared_state.all_block_end.pos && - !shared_state.input_eos) { //current partition data calculate done - return true; + + // the end pos maybe after multis blocks, but should output by batch size and should not exceed partition end + auto window_end_pos = _current_row_position + batch_rows; + window_end_pos = std::min<int64_t>(window_end_pos, _partition_by_pose.end); + + auto previous_window_frame_width = _current_row_position - current_block_base_pos; + auto current_window_frame_width = window_end_pos - current_block_base_pos; + // should not exceed block batch size + current_window_frame_width = std::min<int64_t>(current_window_frame_width, batch_rows); + auto real_deal_with_width = current_window_frame_width - previous_window_frame_width; + + _insert_result_info(real_deal_with_width); + _current_row_position += real_deal_with_width; + return _current_row_position - current_block_base_pos >= batch_rows; +} + +bool AnalyticSinkLocalState::_get_next_for_unbounded_range(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + _update_order_by_range(); + if (_current_row_position == _order_by_pose.start) { + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _order_by_pose.start, _order_by_pose.end); + } + auto previous_window_frame_width = _current_row_position - current_block_base_pos; + auto current_window_frame_width = _order_by_pose.end - current_block_base_pos; + current_window_frame_width = std::min<int64_t>(current_window_frame_width, batch_rows); + auto real_deal_with_width = current_window_frame_width - previous_window_frame_width; + + _insert_result_info(real_deal_with_width); + _current_row_position += real_deal_with_width; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } } return false; } -//_partition_by_columns,_order_by_columns save in blocks, so if need to calculate the boundary, may find in which blocks firstly -BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(int64_t idx, BlockRowPos start, - BlockRowPos end, - bool need_check_first) { - auto& shared_state = *_shared_state; - int64_t start_init_row_num = start.row_num; - vectorized::ColumnPtr start_column = - shared_state.input_blocks[start.block_num].get_by_position(idx).column; - vectorized::ColumnPtr start_next_block_column = start_column; - - DCHECK_LE(start.block_num, end.block_num); - DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1); - int64_t start_block_num = start.block_num; - int64_t end_block_num = end.block_num; - int64_t mid_blcok_num = end.block_num; - // To fix this problem: https://github.com/apache/doris/issues/15951 - // in this case, the partition by column is last row of block, so it's pointed to a new block at row = 0, range is: [left, right) - // From the perspective of order by column, the two values are exactly equal. - // so the range will be get wrong because it's compare_at == 0 with next block at row = 0 - if (need_check_first && end.block_num > 0 && end.row_num == 0) { - end.block_num--; - end_block_num--; - end.row_num = shared_state.input_blocks[end_block_num].rows(); - } - //binary search find in which block - while (start_block_num < end_block_num) { - mid_blcok_num = (start_block_num + end_block_num + 1) >> 1; - start_next_block_column = - shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column; - //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0] - if (start_column->compare_at(start_init_row_num, 0, *start_next_block_column, 1) == 0) { - start_block_num = mid_blcok_num; +bool AnalyticSinkLocalState::_get_next_for_range_between(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + _reset_agg_status(); + if (!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) { + _order_by_pose.start = _partition_by_pose.start; } else { - end_block_num = mid_blcok_num - 1; - } - } - - // have check the start.block_num: start_column[start_init_row_num] with mid_blcok_num start_next_block_column[0] - // now next block must not be result, so need check with end_block_num: start_next_block_column[last_row] - if (end_block_num == mid_blcok_num - 1) { - start_next_block_column = - shared_state.input_blocks[end_block_num].get_by_position(idx).column; - int64_t block_size = shared_state.input_blocks[end_block_num].rows(); - if ((start_column->compare_at(start_init_row_num, block_size - 1, *start_next_block_column, - 1) == 0)) { - start.block_num = end_block_num + 1; - start.row_num = 0; - return start; - } - } - - //check whether need get column again, maybe same as first init - // if the start_block_num have move to forword, so need update start block num and compare it from row_num=0 - if (start_block_num != start.block_num) { - start_init_row_num = 0; - start.block_num = start_block_num; - start_column = shared_state.input_blocks[start.block_num].get_by_position(idx).column; - } - //binary search, set start and end pos - int64_t start_pos = start_init_row_num; - int64_t end_pos = shared_state.input_blocks[start.block_num].rows(); - //if end_block_num haven't moved, only start_block_num go to the end block - //so could use the end.row_num for binary search - if (start.block_num == end.block_num) { - end_pos = end.row_num; - } - while (start_pos < end_pos) { - int64_t mid_pos = (start_pos + end_pos) >> 1; - if (start_column->compare_at(start_init_row_num, mid_pos, *start_column, 1)) { - end_pos = mid_pos; + _order_by_pose.start = find_first_not_equal( + _range_result_columns[0].get(), _order_by_columns[0].get(), + _current_row_position, _order_by_pose.start, _partition_by_pose.end); + } + + if (!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_end) { + _order_by_pose.end = _partition_by_pose.end; } else { - start_pos = mid_pos + 1; + _order_by_pose.end = find_first_not_equal( + _range_result_columns[1].get(), _order_by_columns[0].get(), + _current_row_position, _order_by_pose.end, _partition_by_pose.end); } + // Make sure range_start <= range_end + // current_row_start = std::min(current_row_start, current_row_end); + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _order_by_pose.start, _order_by_pose.end); + _insert_result_info(1); + _current_row_position++; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } + } + if (_current_row_position == _partition_by_pose.end) { + _order_by_pose.start = _partition_by_pose.end; // update to next partition pos + _order_by_pose.end = _partition_by_pose.end; } - start.row_num = start_pos; //update row num, return the find end - return start; + return false; } -BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() { - auto& shared_state = *_shared_state; - if (shared_state.current_row_position < - shared_state.partition_by_end.pos) { //still have data, return partition_by_end directly - return shared_state.partition_by_end; +Status AnalyticSinkLocalState::_execute_impl() { + while (_output_block_index < _input_blocks.size()) { + { + _get_partition_by_end(); + if (!_partition_by_pose.is_ended) { + break; + } + _init_result_columns(); + auto batch_rows = _input_blocks[_output_block_index].rows(); + auto current_block_base_pos = + _input_block_first_row_positions[_output_block_index] - _have_removed_rows; + bool should_output = false; + + { + SCOPED_TIMER(_evaluation_timer); + should_output = + (this->*_executor.get_next_impl)(batch_rows, current_block_base_pos); + } + + if (should_output) { + vectorized::Block block; + _output_current_block(&block); + _refresh_buffer_and_dependency_state(&block); + } + if (_current_row_position == _partition_by_pose.end) { + _reset_state_for_next_partition(); + } + } } + return Status::OK(); +} + +void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int64_t partition_end, + int64_t frame_start, int64_t frame_end) { + // here is the core function, should not add timer + for (size_t i = 0; i < _agg_functions_size; ++i) { + std::vector<const vectorized::IColumn*> agg_columns; + for (int j = 0; j < _agg_input_columns[i].size(); ++j) { + agg_columns.push_back(_agg_input_columns[i][j].get()); + } + _agg_functions[i]->function()->add_range_single_place( + partition_start, partition_end, frame_start, frame_end, + _fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(), + _agg_arena_pool.get()); - if (_partition_by_eq_expr_ctxs.empty() || - (shared_state.input_total_rows == 0)) { //no partition_by, the all block is end - return shared_state.all_block_end; + // If the end is not greater than the start, the current window should be empty. + // _current_window_empty = false; + _current_window_empty = + std::min(frame_end, partition_end) <= std::max(frame_start, partition_start); } +} - BlockRowPos cal_end = shared_state.all_block_end; - for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); - ++i) { //have partition_by, binary search the partiton end - cal_end = _compare_row_to_find_end(shared_state.partition_by_column_idxs[i], - shared_state.partition_by_end, cal_end); +void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width) { + // here is the core function, should not add timer + for (size_t i = 0; i < _agg_functions_size; ++i) { + for (size_t j = 0; j < real_deal_with_width; ++j) { + if (_result_column_nullable_flags[i]) { + if (_current_window_empty) { + _result_window_columns[i]->insert_default(); + } else { + auto* dst = assert_cast<vectorized::ColumnNullable*>( + _result_window_columns[i].get()); + dst->get_null_map_data().push_back(0); + _agg_functions[i]->insert_result_info( + _fn_place_ptr + _offsets_of_aggregate_states[i], + &dst->get_nested_column()); + } + continue; Review Comment: why not use if else ? what are you doing ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org