HappenLee commented on code in PR #46181: URL: https://github.com/apache/doris/pull/46181#discussion_r1912703644
########## be/src/pipeline/exec/analytic_sink_operator.cpp: ########## @@ -42,180 +110,521 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<AnalyticSinkOperatorX>(); - _shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size()); - _shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size()); - - size_t agg_size = p._agg_expr_ctxs.size(); - _agg_expr_ctxs.resize(agg_size); - _shared_state->agg_input_columns.resize(agg_size); - for (int i = 0; i < agg_size; ++i) { - _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]); + + _agg_functions_size = p._agg_functions_size; + _agg_expr_ctxs.resize(_agg_functions_size); + _agg_functions.resize(_agg_functions_size); + _agg_input_columns.resize(_agg_functions_size); + _offsets_of_aggregate_states.resize(_agg_functions_size); + _result_column_nullable_flags.resize(_agg_functions_size); + + for (int i = 0; i < _agg_functions_size; ++i) { + _agg_functions[i] = p._agg_functions[i]->clone(state, state->obj_pool()); + _agg_input_columns[i].resize(p._num_agg_input[i]); _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size()); for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) { RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, _agg_expr_ctxs[i][j])); + _agg_input_columns[i][j] = _agg_expr_ctxs[i][j]->root()->data_type()->create_column(); } - - for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) { - _shared_state->agg_input_columns[i][j] = - _agg_expr_ctxs[i][j]->root()->data_type()->create_column(); - } + _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i]; + _result_column_nullable_flags[i] = + !_agg_functions[i]->function()->get_return_type()->is_nullable() && + _agg_functions[i]->data_type()->is_nullable(); } - _partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size()); - for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); i++) { + + _partition_exprs_size = p._partition_by_eq_expr_ctxs.size(); + _partition_by_eq_expr_ctxs.resize(_partition_exprs_size); + _partition_by_columns.resize(_partition_exprs_size); + for (size_t i = 0; i < _partition_exprs_size; i++) { RETURN_IF_ERROR( p._partition_by_eq_expr_ctxs[i]->clone(state, _partition_by_eq_expr_ctxs[i])); + _partition_by_columns[i] = + _partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); } - _order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size()); - for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); i++) { + + _order_by_exprs_size = p._order_by_eq_expr_ctxs.size(); + _order_by_eq_expr_ctxs.resize(_order_by_exprs_size); + _order_by_columns.resize(_order_by_exprs_size); + for (size_t i = 0; i < _order_by_exprs_size; i++) { RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, _order_by_eq_expr_ctxs[i])); + _order_by_columns[i] = _order_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); + } + + // only support one order by column, so need two columns upper and lower bound + // _range_result_columns.resize(2); + _range_result_columns.resize(_order_by_exprs_size); + // should change the order by exprs to range column, IF FE have support range window + for (size_t i = 0; i < _order_by_exprs_size; i++) { + // RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, _order_by_eq_expr_ctxs[i])); + _range_result_columns[i] = _order_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); } + + _fn_place_ptr = _agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states, + p._align_aggregate_states); + _create_agg_status(); return Status::OK(); } -bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos& found_partition_end) { - auto& shared_state = *_shared_state; - if (shared_state.input_eos || - (shared_state.current_row_position < - shared_state.partition_by_end.pos)) { //now still have partition data - return false; +Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_close_timer); + if (_closed) { + return Status::OK(); + } + + _destroy_agg_status(); + _agg_arena_pool = nullptr; + + std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns; + _result_window_columns.swap(tmp_result_window_columns); + return PipelineXSinkLocalState<AnalyticSharedState>::close(state, exec_status); +} + +bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + int64_t current_row_start = 0; + int64_t current_row_end = _current_row_position + _rows_end_offset + 1; + + _reset_agg_status(); + if (!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) { + current_row_start = _partition_by_pose.start; + } else { + current_row_start = _current_row_position + _rows_start_offset; + } + // Make sure range_start <= range_end Review Comment: what case happen? Fox exmaple in comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org