github-actions[bot] commented on code in PR #27366:
URL: https://github.com/apache/doris/pull/27366#discussion_r1400419901
##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -79,37 +83,46 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
return Status::OK();
}
+Status AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
Review Comment:
warning: method '_destroy_agg_status' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_source_operator.h:110:
```diff
- Status _destroy_agg_status(vectorized::AggregateDataPtr data);
+ static Status _destroy_agg_status(vectorized::AggregateDataPtr data);
```
##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -376,9 +438,9 @@
key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.hash_table->template
get_null_key_data<
vectorized::AggregateDataPtr>();
- for (size_t i = 0; i <
_shared_state->aggregate_evaluators.size(); ++i)
-
_shared_state->aggregate_evaluators[i]->insert_result_info(
- mapped +
_dependency->offsets_of_aggregate_states()[i],
+ for (size_t i = 0; i <
shared_state.aggregate_evaluators.size(); ++i)
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
for (size_t i = 0; i <
shared_state.aggregate_evaluators.size(); ++i) {
```
be/src/pipeline/exec/aggregation_source_operator.cpp:443:
```diff
- value_columns[i].get());
+ value_columns[i].get());
+ }
```
##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -66,6 +66,126 @@
return Status::OK();
}
+bool AnalyticSinkLocalState::_whether_need_next_partition(
+ vectorized::BlockRowPos& found_partition_end) {
+ auto& shared_state = *_shared_state;
+ if (shared_state.input_eos ||
+ (shared_state.current_row_position <
+ shared_state.partition_by_end.pos)) { //now still have partition data
+ return false;
+ }
+ if ((shared_state.partition_by_eq_expr_ctxs.empty() &&
!shared_state.input_eos) ||
+ (found_partition_end.pos == 0)) { //no partition, get until fetch to
EOS
+ return true;
+ }
+ if (!shared_state.partition_by_eq_expr_ctxs.empty() &&
+ found_partition_end.pos == shared_state.all_block_end.pos &&
+ !shared_state.input_eos) { //current partition data calculate done
+ return true;
+ }
+ return false;
+}
+
+//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(
Review Comment:
warning: method '_compare_row_to_find_end' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_sink_operator.h:81:
```diff
- vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
+ static vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
```
##########
be/src/pipeline/exec/data_queue.cpp:
##########
@@ -41,7 +41,8 @@ DataQueue::DataQueue(int child_count, WriteDependency*
dependency)
_cur_bytes_in_queue(child_count),
_cur_blocks_nums_in_queue(child_count),
_flag_queue_idx(0),
- _dependency(dependency) {
+ _source_dependency(nullptr),
Review Comment:
warning: member initializer for '_source_dependency' is redundant
[modernize-use-default-member-init]
```suggestion
,
```
##########
be/src/pipeline/exec/analytic_sink_operator.h:
##########
@@ -45,20 +45,45 @@ class AnalyticSinkOperator final : public
StreamingOperator<AnalyticSinkOperator
bool can_write() override { return _node->can_write(); }
};
+class AnalyticSinkDependency final : public Dependency {
+public:
+ using SharedState = AnalyticSharedState;
+ AnalyticSinkDependency(int id, int node_id)
+ : Dependency(id, node_id, "AnalyticSinkDependency", true) {}
+ ~AnalyticSinkDependency() override = default;
+};
+
class AnalyticSinkOperatorX;
-class AnalyticSinkLocalState : public
PipelineXSinkLocalState<AnalyticDependency> {
+class AnalyticSinkLocalState : public
PipelineXSinkLocalState<AnalyticSinkDependency> {
ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);
public:
AnalyticSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
- : PipelineXSinkLocalState<AnalyticDependency>(parent, state) {}
+ : PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
private:
friend class AnalyticSinkOperatorX;
+ bool _refresh_need_more_input() {
Review Comment:
warning: method '_refresh_need_more_input' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static bool _refresh_need_more_input() {
```
##########
be/src/pipeline/exec/analytic_source_operator.h:
##########
@@ -71,6 +79,22 @@ class AnalyticLocalState final : public
PipelineXLocalState<AnalyticDependency>
void _insert_result_info(int64_t current_block_rows);
void _update_order_by_range();
+ bool _refresh_need_more_input() {
Review Comment:
warning: method '_refresh_need_more_input' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static bool _refresh_need_more_input() {
```
##########
be/src/pipeline/exec/streaming_aggregation_sink_operator.h:
##########
@@ -93,6 +93,16 @@ class StreamingAggSinkLocalState final
Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block,
doris::vectorized::Block* out_block);
bool _should_expand_preagg_hash_tables();
+ void _make_nullable_output_key(vectorized::Block* block) {
Review Comment:
warning: method '_make_nullable_output_key' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void _make_nullable_output_key(vectorized::Block* block) {
```
##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -66,6 +66,126 @@ Status AnalyticSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
return Status::OK();
}
+bool AnalyticSinkLocalState::_whether_need_next_partition(
Review Comment:
warning: method '_whether_need_next_partition' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_sink_operator.h:84:
```diff
- bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
+ static bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
```
##########
be/src/pipeline/exec/analytic_source_operator.cpp:
##########
@@ -37,8 +37,129 @@ AnalyticLocalState::AnalyticLocalState(RuntimeState* state,
OperatorXBase* paren
_agg_functions_size(0),
_agg_functions_created(false) {}
+//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx,
Review Comment:
warning: method '_compare_row_to_find_end' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_source_operator.h:93:
```diff
- vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
+ static vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
```
##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -301,15 +341,37 @@
return Status::OK();
}
+Status AggLocalState::_merge_spilt_data() {
Review Comment:
warning: method '_merge_spilt_data' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_source_operator.h:112:
```diff
- Status _merge_spilt_data();
+ static Status _merge_spilt_data();
```
##########
be/src/pipeline/exec/analytic_source_operator.cpp:
##########
@@ -37,8 +37,129 @@
_agg_functions_size(0),
_agg_functions_created(false) {}
+//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx,
+
vectorized::BlockRowPos start,
+
vectorized::BlockRowPos end,
+ bool
need_check_first) {
+ auto& shared_state = *_shared_state;
+ int64_t start_init_row_num = start.row_num;
+ vectorized::ColumnPtr start_column =
+
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
+ vectorized::ColumnPtr start_next_block_column = start_column;
+
+ DCHECK_LE(start.block_num, end.block_num);
+ DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1);
+ int64_t start_block_num = start.block_num;
+ int64_t end_block_num = end.block_num;
+ int64_t mid_blcok_num = end.block_num;
+ // To fix this problem: https://github.com/apache/doris/issues/15951
+ // in this case, the partition by column is last row of block, so it's
pointed to a new block at row = 0, range is: [left, right)
+ // From the perspective of order by column, the two values are exactly
equal.
+ // so the range will be get wrong because it's compare_at == 0 with next
block at row = 0
+ if (need_check_first && end.block_num > 0 && end.row_num == 0) {
+ end.block_num--;
+ end_block_num--;
+ end.row_num = shared_state.input_blocks[end_block_num].rows();
+ }
+ //binary search find in which block
+ while (start_block_num < end_block_num) {
+ mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
+ start_next_block_column =
+
shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
+ //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0]
+ if (start_column->compare_at(start_init_row_num, 0,
*start_next_block_column, 1) == 0) {
+ start_block_num = mid_blcok_num;
+ } else {
+ end_block_num = mid_blcok_num - 1;
+ }
+ }
+
+ // have check the start.block_num: start_column[start_init_row_num] with
mid_blcok_num start_next_block_column[0]
+ // now next block must not be result, so need check with end_block_num:
start_next_block_column[last_row]
+ if (end_block_num == mid_blcok_num - 1) {
+ start_next_block_column =
+
shared_state.input_blocks[end_block_num].get_by_position(idx).column;
+ int64_t block_size = shared_state.input_blocks[end_block_num].rows();
+ if ((start_column->compare_at(start_init_row_num, block_size - 1,
*start_next_block_column,
+ 1) == 0)) {
+ start.block_num = end_block_num + 1;
+ start.row_num = 0;
+ return start;
+ }
+ }
+
+ //check whether need get column again, maybe same as first init
+ // if the start_block_num have move to forword, so need update start block
num and compare it from row_num=0
+ if (start_block_num != start.block_num) {
+ start_init_row_num = 0;
+ start.block_num = start_block_num;
+ start_column =
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
+ }
+ //binary search, set start and end pos
+ int64_t start_pos = start_init_row_num;
+ int64_t end_pos = shared_state.input_blocks[start.block_num].rows();
+ //if end_block_num haven't moved, only start_block_num go to the end block
+ //so could use the end.row_num for binary search
+ if (start.block_num == end.block_num) {
+ end_pos = end.row_num;
+ }
+ while (start_pos < end_pos) {
+ int64_t mid_pos = (start_pos + end_pos) >> 1;
+ if (start_column->compare_at(start_init_row_num, mid_pos,
*start_column, 1)) {
+ end_pos = mid_pos;
+ } else {
+ start_pos = mid_pos + 1;
+ }
+ }
+ start.row_num = start_pos; //update row num, return the find end
+ return start;
+}
+
+vectorized::BlockRowPos AnalyticLocalState::_get_partition_by_end() {
+ auto& shared_state = *_shared_state;
+ if (shared_state.current_row_position <
+ shared_state.partition_by_end.pos) { //still have data, return
partition_by_end directly
+ return shared_state.partition_by_end;
+ }
+
+ if (shared_state.partition_by_eq_expr_ctxs.empty() ||
+ (shared_state.input_total_rows == 0)) { //no partition_by, the all
block is end
+ return shared_state.all_block_end;
+ }
+
+ vectorized::BlockRowPos cal_end = shared_state.all_block_end;
+ for (size_t i = 0; i < shared_state.partition_by_eq_expr_ctxs.size();
+ ++i) { //have partition_by, binary search the partiton end
+ cal_end =
_compare_row_to_find_end(shared_state.partition_by_column_idxs[i],
+ shared_state.partition_by_end,
cal_end);
+ }
+ cal_end.pos =
shared_state.input_block_first_row_positions[cal_end.block_num] +
cal_end.row_num;
+ return cal_end;
+}
+
+bool AnalyticLocalState::_whether_need_next_partition(
Review Comment:
warning: method '_whether_need_next_partition' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_source_operator.h:96:
```diff
- bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
+ static bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
```
##########
be/src/pipeline/exec/aggregation_source_operator.h:
##########
@@ -90,6 +108,24 @@ class AggLocalState final : public
PipelineXLocalState<AggDependency> {
Status _serialize_with_serialized_key_result_with_spilt_data(RuntimeState*
state,
vectorized::Block* block,
SourceState&
source_state);
+ Status _destroy_agg_status(vectorized::AggregateDataPtr data);
+ Status _reset_hash_table();
+ Status _merge_spilt_data();
+ void _make_nullable_output_key(vectorized::Block* block) {
Review Comment:
warning: method '_make_nullable_output_key' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void _make_nullable_output_key(vectorized::Block* block) {
```
##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -175,6 +175,68 @@ void
HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}
+Status HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block& block,
Review Comment:
warning: method '_do_evaluate' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block&
block,
```
##########
be/src/pipeline/exec/data_queue.cpp:
##########
@@ -41,7 +41,8 @@
_cur_bytes_in_queue(child_count),
_cur_blocks_nums_in_queue(child_count),
_flag_queue_idx(0),
- _dependency(dependency) {
+ _source_dependency(nullptr),
+ _sink_dependency(nullptr) {
Review Comment:
warning: member initializer for '_sink_dependency' is redundant
[modernize-use-default-member-init]
```suggestion
{
```
##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -222,33 +199,28 @@ class RuntimeFilterDependency final : public Dependency {
public:
RuntimeFilterDependency(int id, int node_id, std::string name)
: Dependency(id, node_id, name) {}
- RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task);
+ Dependency* is_blocked_by(PipelineXTask* task);
Review Comment:
warning: annotate this function with 'override' or (rarely) 'final'
[modernize-use-override]
```suggestion
Dependency* is_blocked_by(PipelineXTask* task) override;
```
##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -175,6 +175,68 @@
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}
+Status HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block& block,
+
vectorized::VExprContextSPtrs& exprs,
+ RuntimeProfile::Counter&
expr_call_timer,
+ std::vector<int>&
res_col_ids) {
+ for (size_t i = 0; i < exprs.size(); ++i) {
+ int result_col_id = -1;
+ // execute build column
+ {
+ SCOPED_TIMER(&expr_call_timer);
+ RETURN_IF_ERROR(exprs[i]->execute(&block, &result_col_id));
+ }
+
+ // TODO: opt the column is const
+ block.get_by_position(result_col_id).column =
+
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
+ res_col_ids[i] = result_col_id;
+ }
+ return Status::OK();
+}
+
+std::vector<uint16_t> HashJoinBuildSinkLocalState::_convert_block_to_null(
+ vectorized::Block& block) {
+ std::vector<uint16_t> results;
+ for (int i = 0; i < block.columns(); ++i) {
+ if (auto& column_type = block.safe_get_by_position(i);
!column_type.type->is_nullable()) {
+ DCHECK(!column_type.column->is_nullable());
+ column_type.column = make_nullable(column_type.column);
+ column_type.type = make_nullable(column_type.type);
+ results.emplace_back(i);
+ }
+ }
+ return results;
+}
+
+Status HashJoinBuildSinkLocalState::_extract_join_column(
Review Comment:
warning: method '_extract_join_column' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status HashJoinBuildSinkLocalState::_extract_join_column(
```
##########
be/src/pipeline/exec/hashjoin_probe_operator.cpp:
##########
@@ -402,6 +402,48 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
return Status::OK();
}
+Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
Review Comment:
warning: method '_extract_join_column' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status
HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]