HappenLee commented on code in PR #46181:
URL: https://github.com/apache/doris/pull/46181#discussion_r1912703644


##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -42,180 +110,521 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) 
{
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<AnalyticSinkOperatorX>();
-    
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
-    
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
-
-    size_t agg_size = p._agg_expr_ctxs.size();
-    _agg_expr_ctxs.resize(agg_size);
-    _shared_state->agg_input_columns.resize(agg_size);
-    for (int i = 0; i < agg_size; ++i) {
-        _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]);
+
+    _agg_functions_size = p._agg_functions_size;
+    _agg_expr_ctxs.resize(_agg_functions_size);
+    _agg_functions.resize(_agg_functions_size);
+    _agg_input_columns.resize(_agg_functions_size);
+    _offsets_of_aggregate_states.resize(_agg_functions_size);
+    _result_column_nullable_flags.resize(_agg_functions_size);
+
+    for (int i = 0; i < _agg_functions_size; ++i) {
+        _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
+        _agg_input_columns[i].resize(p._num_agg_input[i]);
         _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size());
         for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) {
             RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, 
_agg_expr_ctxs[i][j]));
+            _agg_input_columns[i][j] = 
_agg_expr_ctxs[i][j]->root()->data_type()->create_column();
         }
-
-        for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) {
-            _shared_state->agg_input_columns[i][j] =
-                    _agg_expr_ctxs[i][j]->root()->data_type()->create_column();
-        }
+        _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
+        _result_column_nullable_flags[i] =
+                
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
+                _agg_functions[i]->data_type()->is_nullable();
     }
-    _partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size());
-    for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); i++) {
+
+    _partition_exprs_size = p._partition_by_eq_expr_ctxs.size();
+    _partition_by_eq_expr_ctxs.resize(_partition_exprs_size);
+    _partition_by_columns.resize(_partition_exprs_size);
+    for (size_t i = 0; i < _partition_exprs_size; i++) {
         RETURN_IF_ERROR(
                 p._partition_by_eq_expr_ctxs[i]->clone(state, 
_partition_by_eq_expr_ctxs[i]));
+        _partition_by_columns[i] =
+                
_partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
     }
-    _order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size());
-    for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); i++) {
+
+    _order_by_exprs_size = p._order_by_eq_expr_ctxs.size();
+    _order_by_eq_expr_ctxs.resize(_order_by_exprs_size);
+    _order_by_columns.resize(_order_by_exprs_size);
+    for (size_t i = 0; i < _order_by_exprs_size; i++) {
         RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, 
_order_by_eq_expr_ctxs[i]));
+        _order_by_columns[i] = 
_order_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
+    }
+
+    // only support one order by column, so need two columns upper and lower 
bound
+    // _range_result_columns.resize(2);
+    _range_result_columns.resize(_order_by_exprs_size);
+    // should change the order by exprs to range column, IF FE have support 
range window
+    for (size_t i = 0; i < _order_by_exprs_size; i++) {
+        // RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, 
_order_by_eq_expr_ctxs[i]));
+        _range_result_columns[i] = 
_order_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
     }
+
+    _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
+                                                   p._align_aggregate_states);
+    _create_agg_status();
     return Status::OK();
 }
 
-bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos& 
found_partition_end) {
-    auto& shared_state = *_shared_state;
-    if (shared_state.input_eos ||
-        (shared_state.current_row_position <
-         shared_state.partition_by_end.pos)) { //now still have partition data
-        return false;
+Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_close_timer);
+    if (_closed) {
+        return Status::OK();
+    }
+
+    _destroy_agg_status();
+    _agg_arena_pool = nullptr;
+
+    std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
+    _result_window_columns.swap(tmp_result_window_columns);
+    return PipelineXSinkLocalState<AnalyticSharedState>::close(state, 
exec_status);
+}
+
+bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t batch_rows,
+                                                        int64_t 
current_block_base_pos) {
+    while (_current_row_position < _partition_by_pose.end) {
+        int64_t current_row_start = 0;
+        int64_t current_row_end = _current_row_position + _rows_end_offset + 1;
+
+        _reset_agg_status();
+        if 
(!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) {
+            current_row_start = _partition_by_pose.start;
+        } else {
+            current_row_start = _current_row_position + _rows_start_offset;
+        }
+        // Make sure range_start <= range_end

Review Comment:
   what case happen? Fox exmaple in comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to