github-actions[bot] commented on code in PR #24818:
URL: https://github.com/apache/doris/pull/24818#discussion_r1334944110
##########
be/src/pipeline/pipeline.h:
##########
@@ -112,11 +112,11 @@ class Pipeline : public
std::enable_shared_from_this<Pipeline> {
RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
- const RowDescriptor& output_row_desc() const {
- return operatorXs[operatorXs.size() - 1]->row_desc();
+ [[nodiscard]] const RowDescriptor& output_row_desc() const {
Review Comment:
warning: function 'output_row_desc' should be marked [[nodiscard]]
[modernize-use-nodiscard]
```suggestion
[[nodiscard]] const RowDescriptor& output_row_desc() const {
```
##########
be/src/pipeline/exec/table_function_operator.cpp:
##########
@@ -41,4 +42,287 @@ Status TableFunctionOperator::close(doris::RuntimeState*
state) {
return StatefulOperator::close(state);
}
+TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state,
OperatorXBase* parent)
+ : PipelineXLocalState<>(state, parent),
_child_block(vectorized::Block::create_unique()) {}
+
+Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo&
info) {
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+ auto& p = _parent->cast<TableFunctionOperatorX>();
+ _vfn_ctxs.resize(p._vfn_ctxs.size());
+ for (size_t i = 0; i < _vfn_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._vfn_ctxs[i]->clone(state, _vfn_ctxs[i]));
+
+ const std::string& tf_name =
_vfn_ctxs[i]->root()->fn().name.function_name;
+ vectorized::TableFunction* fn = nullptr;
+ RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name,
state->obj_pool(), &fn));
+ fn->set_expr_context(_vfn_ctxs[i]);
+ _fns.push_back(fn);
+ }
+
+ _cur_child_offset = -1;
+ return Status::OK();
+}
+
+void TableFunctionLocalState::_copy_output_slots(
+ std::vector<vectorized::MutableColumnPtr>& columns) {
+ if (!_current_row_insert_times) {
+ return;
+ }
+ auto& p = _parent->cast<TableFunctionOperatorX>();
+ for (auto index : p._output_slot_indexs) {
+ auto src_column = _child_block->get_by_position(index).column;
+ columns[index]->insert_many_from(*src_column, _cur_child_offset,
_current_row_insert_times);
+ }
+ _current_row_insert_times = 0;
+}
+
+// Returns the index of fn of the last eos counted from back to front
+// eg: there are 3 functions in `_fns`
+// eos: false, true, true
+// return: 1
+//
+// eos: false, false, true
+// return: 2
+//
+// eos: false, false, false
+// return: -1
+//
+// eos: true, true, true
+// return: 0
+//
+// return:
+// 0: all fns are eos
+// -1: all fns are not eos
+// >0: some of fns are eos
+int TableFunctionLocalState::_find_last_fn_eos_idx() const {
+ for (int i = _parent->cast<TableFunctionOperatorX>()._fn_num - 1; i >= 0;
--i) {
Review Comment:
warning: variable 'i' is not initialized [cppcoreguidelines-init-variables]
```suggestion
for (int i = 0 = _parent->cast<TableFunctionOperatorX>()._fn_num - 1; i
>= 0; --i) {
```
##########
be/src/pipeline/exec/table_function_operator.h:
##########
@@ -45,4 +46,115 @@ class TableFunctionOperator final : public
StatefulOperator<TableFunctionOperato
Status close(RuntimeState* state) override;
};
+
+class TableFunctionOperatorX;
+class TableFunctionLocalState final : public PipelineXLocalState<> {
+public:
+ using Parent = TableFunctionOperatorX;
+ ENABLE_FACTORY_CREATOR(TableFunctionLocalState);
+ TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent);
+ ~TableFunctionLocalState() override = default;
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status process_next_child_row();
+ Status get_expanded_block(RuntimeState* state, vectorized::Block*
output_block,
+ SourceState& source_state);
+
+private:
+ friend class TableFunctionOperatorX;
+ friend class StatefulOperatorX<TableFunctionLocalState>;
+
+ void _copy_output_slots(std::vector<vectorized::MutableColumnPtr>&
columns);
+ bool _roll_table_functions(int last_eos_idx);
+ // return:
+ // 0: all fns are eos
+ // -1: all fns are not eos
+ // >0: some of fns are eos
+ int _find_last_fn_eos_idx() const;
+ bool _is_inner_and_empty();
+
+ std::vector<vectorized::TableFunction*> _fns;
+ vectorized::VExprContextSPtrs _vfn_ctxs;
+ int64_t _cur_child_offset = 0;
+ std::unique_ptr<vectorized::Block> _child_block;
+ int _current_row_insert_times = 0;
+ SourceState _child_source_state;
+};
+
+class TableFunctionOperatorX final : public
StatefulOperatorX<TableFunctionLocalState> {
+public:
+ using Base = StatefulOperatorX<TableFunctionLocalState>;
+ TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(doris::RuntimeState* state) override;
+ Status open(doris::RuntimeState* state) override;
+
+ bool need_more_input_data(RuntimeState* state) const override {
+ auto& local_state =
state->get_local_state(id())->cast<TableFunctionLocalState>();
+ return !local_state._child_block->rows() &&
+ local_state._child_source_state != SourceState::FINISHED;
+ }
+
+ Status push(RuntimeState* state, vectorized::Block* input_block,
+ SourceState source_state) const override {
+ CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ if (input_block->rows() == 0) {
+ return Status::OK();
+ }
+
+ for (auto* fn : local_state._fns) {
+ RETURN_IF_ERROR(fn->process_init(input_block, state));
+ }
+ RETURN_IF_ERROR(local_state.process_next_child_row());
+ return Status::OK();
+ }
+
+ Status pull(RuntimeState* state, vectorized::Block* output_block,
+ SourceState& source_state) const override {
+ CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block,
source_state));
+ local_state.reached_limit(output_block, source_state);
+ return Status::OK();
+ }
+
+private:
+ friend class TableFunctionLocalState;
+
+ Status _prepare_output_slot_ids(const TPlanNode& tnode);
+
+ /* Now the output tuples for table function node is base_table_tuple +
tf1 + tf2 + ...
+ But not all slots are used, the real used slots are inside
table_function_node.outputSlotIds.
+ For case like explode_bitmap:
+ SELECT a2,count(*) as a3 FROM A WHERE a1 IN
+ (SELECT c1 FROM B LATERAL VIEW explode_bitmap(b1) C as c1)
+ GROUP BY a2 ORDER BY a3;
+ Actually we only need to output column c1, no need to output columns
in bitmap table B.
+ Copy large bitmap columns are very expensive and slow.
+
+ Here we check if the slot is really used, otherwise we avoid copy it and
just insert a default value.
+
+ A better solution is:
+ 1. FE: create a new output tuple
based on the real output slots;
+ 2. BE: refractor (V)TableFunctionNode output rows based no the new tuple;
+ */
+ [[nodiscard]] inline bool _slot_need_copy(SlotId slot_id) const {
Review Comment:
warning: function '_slot_need_copy' should be marked [[nodiscard]]
[modernize-use-nodiscard]
```suggestion
[[nodiscard]] inline bool _slot_need_copy(SlotId slot_id) const {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]