This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 329c22da00 [pipelineX](feature) Support table function operator
(#24818)
329c22da00 is described below
commit 329c22da00521100499c37e7524ebde4acde0df9
Author: Gabriel <[email protected]>
AuthorDate: Sat Sep 23 19:26:53 2023 +0800
[pipelineX](feature) Support table function operator (#24818)
---
be/src/pipeline/exec/aggregation_sink_operator.h | 10 +-
be/src/pipeline/exec/aggregation_source_operator.h | 3 +-
be/src/pipeline/exec/exchange_sink_operator.h | 6 +-
be/src/pipeline/exec/exchange_source_operator.h | 12 +-
be/src/pipeline/exec/join_build_sink_operator.h | 4 +-
be/src/pipeline/exec/join_probe_operator.h | 8 +-
.../exec/streaming_aggregation_sink_operator.h | 2 +
be/src/pipeline/exec/table_function_operator.cpp | 284 +++++++++++++++++++++
be/src/pipeline/exec/table_function_operator.h | 112 ++++++++
be/src/pipeline/pipeline.h | 6 +-
be/src/pipeline/pipeline_x/operator.cpp | 3 +
be/src/pipeline/pipeline_x/operator.h | 31 ++-
.../pipeline_x/pipeline_x_fragment_context.cpp | 6 +
13 files changed, 449 insertions(+), 38 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 34ba9e2449..bae9e1a8ec 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -52,9 +52,9 @@ template <typename DependencyType, typename Derived>
class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
public:
using Base = PipelineXSinkLocalState<DependencyType>;
- virtual ~AggSinkLocalState() = default;
+ ~AggSinkLocalState() override = default;
- virtual Status init(RuntimeState* state, LocalSinkStateInfo& info)
override;
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
@@ -332,7 +332,7 @@ template <typename LocalStateType =
BlockingAggSinkLocalState>
class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
public:
AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
- virtual ~AggSinkOperatorX() = default;
+ ~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<LocalStateType>::_name);
@@ -343,8 +343,8 @@ public:
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
- virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
- SourceState source_state) override;
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override;
using DataSinkOperatorX<LocalStateType>::id;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index 274424f1f9..2d5ca45dec 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -50,11 +50,12 @@ public:
class AggSourceOperatorX;
-class AggLocalState : public PipelineXLocalState<AggDependency> {
+class AggLocalState final : public PipelineXLocalState<AggDependency> {
public:
using Base = PipelineXLocalState<AggDependency>;
ENABLE_FACTORY_CREATOR(AggLocalState);
AggLocalState(RuntimeState* state, OperatorXBase* parent);
+ ~AggLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status close(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 0b05491901..2c899ec49a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -120,7 +120,7 @@ public:
: WriteDependency(id, "ChannelDependency"),
_sender_id(sender_id),
_local_recvr(local_recvr) {}
- virtual ~ChannelDependency() = default;
+ ~ChannelDependency() override = default;
void* shared_state() override { return nullptr; }
@@ -150,7 +150,7 @@ private:
vectorized::VDataStreamRecvr* _local_recvr;
};
-class ExchangeSinkLocalState : public PipelineXSinkLocalState<> {
+class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
public:
@@ -312,4 +312,4 @@ private:
};
} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index df6d44494e..a7e146b54d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -50,7 +50,7 @@ public:
bool is_pending_finish() const override;
};
-struct ExchangeDataDependency : public Dependency {
+struct ExchangeDataDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue*
sender_queue)
@@ -89,7 +89,7 @@ private:
};
class ExchangeSourceOperatorX;
-class ExchangeLocalState : public PipelineXLocalState<> {
+class ExchangeLocalState final : public PipelineXLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeLocalState);
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
@@ -123,12 +123,12 @@ public:
SourceState& source_state) override;
Status close(RuntimeState* state) override;
- bool is_source() const override { return true; }
+ [[nodiscard]] bool is_source() const override { return true; }
- RowDescriptor input_row_desc() const { return _input_row_desc; }
+ [[nodiscard]] RowDescriptor input_row_desc() const { return
_input_row_desc; }
- int num_senders() const { return _num_senders; }
- bool is_merging() const { return _is_merging; }
+ [[nodiscard]] int num_senders() const { return _num_senders; }
+ [[nodiscard]] bool is_merging() const { return _is_merging; }
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr() {
return _sub_plan_query_statistics_recvr;
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h
b/be/src/pipeline/exec/join_build_sink_operator.h
index 361982dd97..55553df462 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -36,7 +36,7 @@ public:
protected:
JoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<DependencyType>(parent, state) {}
- virtual ~JoinBuildSinkLocalState() = default;
+ ~JoinBuildSinkLocalState() override = default;
template <typename LocalStateType>
friend class JoinBuildSinkOperatorX;
@@ -53,7 +53,7 @@ template <typename LocalStateType>
class JoinBuildSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
public:
JoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
- virtual ~JoinBuildSinkOperatorX() = default;
+ ~JoinBuildSinkOperatorX() override = default;
protected:
void _init_join_op();
diff --git a/be/src/pipeline/exec/join_probe_operator.h
b/be/src/pipeline/exec/join_probe_operator.h
index e3ed33e4d9..1a7090b232 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -31,8 +31,8 @@ template <typename DependencyType, typename Derived>
class JoinProbeLocalState : public PipelineXLocalState<DependencyType> {
public:
using Base = PipelineXLocalState<DependencyType>;
- virtual Status init(RuntimeState* state, LocalStateInfo& info) override;
- virtual Status close(RuntimeState* state) override;
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status close(RuntimeState* state) override;
virtual void add_tuple_is_null_column(vectorized::Block* block) = 0;
protected:
@@ -42,7 +42,7 @@ protected:
: Base(state, parent),
_child_block(vectorized::Block::create_unique()),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {}
- virtual ~JoinProbeLocalState() = default;
+ ~JoinProbeLocalState() override = default;
void _construct_mutable_join_block();
Status _build_output_block(vectorized::Block* origin_block,
vectorized::Block* output_block,
bool keep_origin = true);
@@ -67,7 +67,7 @@ class JoinProbeOperatorX : public
StatefulOperatorX<LocalStateType> {
public:
using Base = StatefulOperatorX<LocalStateType>;
JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
- virtual Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status open(doris::RuntimeState* state) override;
[[nodiscard]] const RowDescriptor& row_desc() override { return
*_output_row_desc; }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 1796716804..e92c5ff4e6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -81,6 +81,7 @@ public:
using Base = AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;
ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState);
StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState*
state);
+ ~StreamingAggSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status close(RuntimeState* state, Status exec_status) override;
@@ -108,6 +109,7 @@ private:
class StreamingAggSinkOperatorX final : public
AggSinkOperatorX<StreamingAggSinkLocalState> {
public:
StreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ ~StreamingAggSinkOperatorX() override = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
diff --git a/be/src/pipeline/exec/table_function_operator.cpp
b/be/src/pipeline/exec/table_function_operator.cpp
index 3f6133c3fc..7ce7ac3410 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -21,6 +21,7 @@
#include "pipeline/exec/operator.h"
#include "vec/core/block.h"
+#include "vec/exprs/table_function/table_function_factory.h"
namespace doris {
class RuntimeState;
@@ -41,4 +42,287 @@ Status TableFunctionOperator::close(doris::RuntimeState*
state) {
return StatefulOperator::close(state);
}
+TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state,
OperatorXBase* parent)
+ : PipelineXLocalState<>(state, parent),
_child_block(vectorized::Block::create_unique()) {}
+
+Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo&
info) {
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+ auto& p = _parent->cast<TableFunctionOperatorX>();
+ _vfn_ctxs.resize(p._vfn_ctxs.size());
+ for (size_t i = 0; i < _vfn_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._vfn_ctxs[i]->clone(state, _vfn_ctxs[i]));
+
+ const std::string& tf_name =
_vfn_ctxs[i]->root()->fn().name.function_name;
+ vectorized::TableFunction* fn = nullptr;
+ RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name,
state->obj_pool(), &fn));
+ fn->set_expr_context(_vfn_ctxs[i]);
+ _fns.push_back(fn);
+ }
+
+ _cur_child_offset = -1;
+ return Status::OK();
+}
+
+void TableFunctionLocalState::_copy_output_slots(
+ std::vector<vectorized::MutableColumnPtr>& columns) {
+ if (!_current_row_insert_times) {
+ return;
+ }
+ auto& p = _parent->cast<TableFunctionOperatorX>();
+ for (auto index : p._output_slot_indexs) {
+ auto src_column = _child_block->get_by_position(index).column;
+ columns[index]->insert_many_from(*src_column, _cur_child_offset,
_current_row_insert_times);
+ }
+ _current_row_insert_times = 0;
+}
+
+// Returns the index of fn of the last eos counted from back to front
+// eg: there are 3 functions in `_fns`
+// eos: false, true, true
+// return: 1
+//
+// eos: false, false, true
+// return: 2
+//
+// eos: false, false, false
+// return: -1
+//
+// eos: true, true, true
+// return: 0
+//
+// return:
+// 0: all fns are eos
+// -1: all fns are not eos
+// >0: some of fns are eos
+int TableFunctionLocalState::_find_last_fn_eos_idx() const {
+ for (int i = _parent->cast<TableFunctionOperatorX>()._fn_num - 1; i >= 0;
--i) {
+ if (!_fns[i]->eos()) {
+ if (i == _parent->cast<TableFunctionOperatorX>()._fn_num - 1) {
+ return -1;
+ } else {
+ return i + 1;
+ }
+ }
+ }
+ // all eos
+ return 0;
+}
+
+// Roll to reset the table function.
+// Eg:
+// There are 3 functions f1, f2 and f3 in `_fns`.
+// If `last_eos_idx` is 1, which means f2 and f3 are eos.
+// So we need to forward f1, and reset f2 and f3.
+bool TableFunctionLocalState::_roll_table_functions(int last_eos_idx) {
+ int i = last_eos_idx - 1;
+ for (; i >= 0; --i) {
+ _fns[i]->forward();
+ if (!_fns[i]->eos()) {
+ break;
+ }
+ }
+ if (i == -1) {
+ // after forward, all functions are eos.
+ // we should process next child row to get more table function results.
+ return false;
+ }
+
+ for (int j = i + 1; j < _parent->cast<TableFunctionOperatorX>()._fn_num;
++j) {
+ _fns[j]->reset();
+ }
+
+ return true;
+}
+
+bool TableFunctionLocalState::_is_inner_and_empty() {
+ for (int i = 0; i < _parent->cast<TableFunctionOperatorX>()._fn_num; i++) {
+ // if any table function is not outer and has empty result, go to next
child row
+ if (!_fns[i]->is_outer() && _fns[i]->current_empty()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+Status TableFunctionLocalState::get_expanded_block(RuntimeState* state,
+ vectorized::Block*
output_block,
+ SourceState& source_state) {
+ auto& p = _parent->cast<TableFunctionOperatorX>();
+ vectorized::MutableBlock m_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+ output_block, p._output_slots);
+ vectorized::MutableColumns& columns = m_block.mutable_columns();
+
+ for (int i = 0; i < p._fn_num; i++) {
+ if (columns[i + p._child_slots.size()]->is_nullable()) {
+ _fns[i]->set_nullable();
+ }
+ }
+
+ while (columns[p._child_slots.size()]->size() < state->batch_size()) {
+ RETURN_IF_CANCELLED(state);
+ RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while
getting next batch."));
+
+ if (_child_block->rows() == 0) {
+ break;
+ }
+
+ bool skip_child_row = false;
+ while (columns[p._child_slots.size()]->size() < state->batch_size()) {
+ int idx = _find_last_fn_eos_idx();
+ if (idx == 0 || skip_child_row) {
+ _copy_output_slots(columns);
+ // all table functions' results are exhausted, process next
child row.
+ RETURN_IF_ERROR(process_next_child_row());
+ if (_cur_child_offset == -1) {
+ break;
+ }
+ } else if (idx < p._fn_num && idx != -1) {
+ // some of table functions' results are exhausted.
+ if (!_roll_table_functions(idx)) {
+ // continue to process next child row.
+ continue;
+ }
+ }
+
+ // if any table function is not outer and has empty result, go to
next child row
+ if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
+ continue;
+ }
+ if (p._fn_num == 1) {
+ _current_row_insert_times += _fns[0]->get_value(
+ columns[p._child_slots.size()],
+ state->batch_size() -
columns[p._child_slots.size()]->size());
+ } else {
+ for (int i = 0; i < p._fn_num; i++) {
+ _fns[i]->get_value(columns[i + p._child_slots.size()]);
+ }
+ _current_row_insert_times++;
+ _fns[p._fn_num - 1]->forward();
+ }
+ }
+ }
+
+ _copy_output_slots(columns);
+
+ size_t row_size = columns[p._child_slots.size()]->size();
+ for (auto index : p._useless_slot_indexs) {
+ columns[index]->insert_many_defaults(row_size -
columns[index]->size());
+ }
+
+ // 3. eval conjuncts
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
+
output_block->columns()));
+
+ if (_child_source_state == SourceState::FINISHED && _cur_child_offset ==
-1) {
+ source_state = SourceState::FINISHED;
+ }
+ return Status::OK();
+}
+
+Status TableFunctionLocalState::process_next_child_row() {
+ _cur_child_offset++;
+
+ if (_cur_child_offset >= _child_block->rows()) {
+ // release block use count.
+ for (vectorized::TableFunction* fn : _fns) {
+ RETURN_IF_ERROR(fn->process_close());
+ }
+
+ _child_block->clear_column_data(_parent->cast<TableFunctionOperatorX>()
+ ._child_x->row_desc()
+ .num_materialized_slots());
+ _cur_child_offset = -1;
+ return Status::OK();
+ }
+
+ for (vectorized::TableFunction* fn : _fns) {
+ RETURN_IF_ERROR(fn->process_row(_cur_child_offset));
+ }
+
+ return Status::OK();
+}
+
+TableFunctionOperatorX::TableFunctionOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : Base(pool, tnode, descs) {}
+
+Status TableFunctionOperatorX::_prepare_output_slot_ids(const TPlanNode&
tnode) {
+ // Prepare output slot ids
+ if (tnode.table_function_node.outputSlotIds.empty()) {
+ return Status::InternalError("Output slots of table function node is
empty");
+ }
+ SlotId max_id = -1;
+ for (auto slot_id : tnode.table_function_node.outputSlotIds) {
+ if (slot_id > max_id) {
+ max_id = slot_id;
+ }
+ }
+ _output_slot_ids = std::vector<bool>(max_id + 1, false);
+ for (auto slot_id : tnode.table_function_node.outputSlotIds) {
+ _output_slot_ids[slot_id] = true;
+ }
+
+ return Status::OK();
+}
+
+Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
+ RETURN_IF_ERROR(Base::init(tnode, state));
+
+ for (const TExpr& texpr : tnode.table_function_node.fnCallExprList) {
+ vectorized::VExprContextSPtr ctx;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(texpr, ctx));
+ _vfn_ctxs.push_back(ctx);
+
+ auto root = ctx->root();
+ const std::string& tf_name = root->fn().name.function_name;
+ vectorized::TableFunction* fn = nullptr;
+ RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name,
_pool, &fn));
+ fn->set_expr_context(ctx);
+ _fns.push_back(fn);
+ }
+ _fn_num = _fns.size();
+
+ // Prepare output slot ids
+ RETURN_IF_ERROR(_prepare_output_slot_ids(tnode));
+ return Status::OK();
+}
+
+Status TableFunctionOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
+
+ for (auto fn : _fns) {
+ RETURN_IF_ERROR(fn->prepare());
+ }
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_vfn_ctxs, state,
_row_descriptor));
+
+ // get current all output slots
+ for (const auto& tuple_desc : _row_descriptor.tuple_descriptors()) {
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ _output_slots.push_back(slot_desc);
+ }
+ }
+
+ // get all input slots
+ for (const auto& child_tuple_desc :
_child_x->row_desc().tuple_descriptors()) {
+ for (const auto& child_slot_desc : child_tuple_desc->slots()) {
+ _child_slots.push_back(child_slot_desc);
+ }
+ }
+
+ for (size_t i = 0; i < _child_slots.size(); i++) {
+ if (_slot_need_copy(i)) {
+ _output_slot_indexs.push_back(i);
+ } else {
+ _useless_slot_indexs.push_back(i);
+ }
+ }
+
+ return Status::OK();
+}
+
+Status TableFunctionOperatorX::open(doris::RuntimeState* state) {
+ RETURN_IF_ERROR(Base::open(state));
+ return vectorized::VExpr::open(_vfn_ctxs, state);
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/table_function_operator.h
b/be/src/pipeline/exec/table_function_operator.h
index a4b4e77141..541938ec72 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -21,6 +21,7 @@
#include "common/status.h"
#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
#include "vec/exec/vtable_function_node.h"
namespace doris {
@@ -45,4 +46,115 @@ public:
Status close(RuntimeState* state) override;
};
+
+class TableFunctionOperatorX;
+class TableFunctionLocalState final : public PipelineXLocalState<> {
+public:
+ using Parent = TableFunctionOperatorX;
+ ENABLE_FACTORY_CREATOR(TableFunctionLocalState);
+ TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent);
+ ~TableFunctionLocalState() override = default;
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status process_next_child_row();
+ Status get_expanded_block(RuntimeState* state, vectorized::Block*
output_block,
+ SourceState& source_state);
+
+private:
+ friend class TableFunctionOperatorX;
+ friend class StatefulOperatorX<TableFunctionLocalState>;
+
+ void _copy_output_slots(std::vector<vectorized::MutableColumnPtr>&
columns);
+ bool _roll_table_functions(int last_eos_idx);
+ // return:
+ // 0: all fns are eos
+ // -1: all fns are not eos
+ // >0: some of fns are eos
+ int _find_last_fn_eos_idx() const;
+ bool _is_inner_and_empty();
+
+ std::vector<vectorized::TableFunction*> _fns;
+ vectorized::VExprContextSPtrs _vfn_ctxs;
+ int64_t _cur_child_offset = 0;
+ std::unique_ptr<vectorized::Block> _child_block;
+ int _current_row_insert_times = 0;
+ SourceState _child_source_state;
+};
+
+class TableFunctionOperatorX final : public
StatefulOperatorX<TableFunctionLocalState> {
+public:
+ using Base = StatefulOperatorX<TableFunctionLocalState>;
+ TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(doris::RuntimeState* state) override;
+ Status open(doris::RuntimeState* state) override;
+
+ bool need_more_input_data(RuntimeState* state) const override {
+ auto& local_state =
state->get_local_state(id())->cast<TableFunctionLocalState>();
+ return !local_state._child_block->rows() &&
+ local_state._child_source_state != SourceState::FINISHED;
+ }
+
+ Status push(RuntimeState* state, vectorized::Block* input_block,
+ SourceState source_state) const override {
+ CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ if (input_block->rows() == 0) {
+ return Status::OK();
+ }
+
+ for (auto* fn : local_state._fns) {
+ RETURN_IF_ERROR(fn->process_init(input_block, state));
+ }
+ RETURN_IF_ERROR(local_state.process_next_child_row());
+ return Status::OK();
+ }
+
+ Status pull(RuntimeState* state, vectorized::Block* output_block,
+ SourceState& source_state) const override {
+ CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block,
source_state));
+ local_state.reached_limit(output_block, source_state);
+ return Status::OK();
+ }
+
+private:
+ friend class TableFunctionLocalState;
+
+ Status _prepare_output_slot_ids(const TPlanNode& tnode);
+
+ /* Now the output tuples for table function node is base_table_tuple +
tf1 + tf2 + ...
+ But not all slots are used, the real used slots are inside
table_function_node.outputSlotIds.
+ For case like explode_bitmap:
+ SELECT a2,count(*) as a3 FROM A WHERE a1 IN
+ (SELECT c1 FROM B LATERAL VIEW explode_bitmap(b1) C as c1)
+ GROUP BY a2 ORDER BY a3;
+ Actually we only need to output column c1, no need to output columns
in bitmap table B.
+ Copy large bitmap columns are very expensive and slow.
+
+ Here we check if the slot is really used, otherwise we avoid copy it and
just insert a default value.
+
+ A better solution is:
+ 1. FE: create a new output tuple
based on the real output slots;
+ 2. BE: refractor (V)TableFunctionNode output rows based no the new tuple;
+ */
+ [[nodiscard]] inline bool _slot_need_copy(SlotId slot_id) const {
+ auto id = _output_slots[slot_id]->id();
+ return (id < _output_slot_ids.size()) && (_output_slot_ids[id]);
+ }
+
+ std::vector<SlotDescriptor*> _child_slots;
+ std::vector<SlotDescriptor*> _output_slots;
+
+ vectorized::VExprContextSPtrs _vfn_ctxs;
+
+ std::vector<vectorized::TableFunction*> _fns;
+ int _fn_num = 0;
+
+ std::vector<bool> _output_slot_ids;
+ std::vector<int> _output_slot_indexs;
+ std::vector<int> _useless_slot_indexs;
+
+ std::vector<int> _child_slot_sizes;
+};
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index b29aaeb00b..c67ef6d29e 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -112,11 +112,11 @@ public:
RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
- const RowDescriptor& output_row_desc() const {
- return operatorXs[operatorXs.size() - 1]->row_desc();
+ [[nodiscard]] const RowDescriptor& output_row_desc() const {
+ return operatorXs.back()->row_desc();
}
- PipelineId id() const { return _pipeline_id; }
+ [[nodiscard]] PipelineId id() const { return _pipeline_id; }
private:
void _init_profile();
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index a9490bd967..068551da90 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -45,6 +45,7 @@
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/table_function_operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "util/debug_util.h"
@@ -394,6 +395,7 @@ DECLARE_OPERATOR_X(OlapScanLocalState)
DECLARE_OPERATOR_X(AnalyticLocalState)
DECLARE_OPERATOR_X(SortLocalState)
DECLARE_OPERATOR_X(AggLocalState)
+DECLARE_OPERATOR_X(TableFunctionLocalState)
DECLARE_OPERATOR_X(ExchangeLocalState)
DECLARE_OPERATOR_X(RepeatLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
@@ -411,6 +413,7 @@ template class StreamingOperatorX<SelectLocalState>;
template class StatefulOperatorX<HashJoinProbeLocalState>;
template class StatefulOperatorX<RepeatLocalState>;
template class StatefulOperatorX<NestedLoopJoinProbeLocalState>;
+template class StatefulOperatorX<TableFunctionLocalState>;
template class PipelineXSinkLocalState<HashJoinDependency>;
template class PipelineXSinkLocalState<SortDependency>;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index f219b68f2e..92f2b0b9da 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -94,7 +94,7 @@ public:
// If use projection, we should clear `_origin_block`.
void clear_origin_block();
- bool reached_limit() const;
+ [[nodiscard]] bool reached_limit() const;
void reached_limit(vectorized::Block* block, SourceState& source_state);
RuntimeProfile* profile() { return _runtime_profile.get(); }
@@ -114,7 +114,7 @@ public:
void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; }
void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
- virtual std::string debug_string(int indentation_level = 0) const;
+ [[nodiscard]] virtual std::string debug_string(int indentation_level = 0)
const;
protected:
friend class OperatorXBase;
@@ -181,9 +181,9 @@ public:
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
- virtual Status prepare(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
- virtual Status open(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
Status finalize(RuntimeState* state) override { return Status::OK(); }
@@ -201,7 +201,7 @@ public:
return false;
}
- bool is_pending_finish() const override {
+ [[nodiscard]] bool is_pending_finish() const override {
LOG(FATAL) << "should not reach here!";
return false;
}
@@ -228,7 +228,7 @@ public:
return _row_descriptor;
}
- std::string debug_string() const override { return ""; }
+ [[nodiscard]] std::string debug_string() const override { return ""; }
virtual std::string debug_string(int indentation_level = 0) const;
@@ -305,7 +305,7 @@ public:
OperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl&
descs)
: OperatorXBase(pool, tnode, descs) {}
OperatorX(ObjectPool* pool, int id) : OperatorXBase(pool, id) {};
- virtual ~OperatorX() = default;
+ ~OperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override;
Status setup_local_states(RuntimeState* state,
std::vector<LocalStateInfo>& info) override;
@@ -317,7 +317,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase {
public:
PipelineXLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalStateBase(state, parent) {}
- virtual ~PipelineXLocalState() {}
+ ~PipelineXLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
@@ -399,7 +399,7 @@ public:
virtual Status close(RuntimeState* state, Status exec_status) = 0;
virtual Status try_close(RuntimeState* state, Status exec_status) = 0;
- virtual std::string debug_string(int indentation_level) const;
+ [[nodiscard]] virtual std::string debug_string(int indentation_level)
const;
template <class TARGET>
TARGET& cast() {
@@ -421,7 +421,9 @@ public:
RuntimeProfile* profile() { return _profile; }
MemTracker* mem_tracker() { return _mem_tracker.get(); }
QueryStatistics* query_statistics() { return _query_statistics.get(); }
- RuntimeProfile* faker_runtime_profile() const { return
_faker_runtime_profile.get(); }
+ [[nodiscard]] RuntimeProfile* faker_runtime_profile() const {
+ return _faker_runtime_profile.get();
+ }
RuntimeProfile::Counter* rows_input_counter() { return
_rows_input_counter; }
@@ -508,7 +510,7 @@ public:
return false;
}
- bool is_pending_finish() const override {
+ [[nodiscard]] bool is_pending_finish() const override {
LOG(FATAL) << "should not reach here!";
return false;
}
@@ -519,9 +521,10 @@ public:
[[nodiscard]] std::string debug_string() const override { return ""; }
- virtual std::string debug_string(int indentation_level) const;
+ [[nodiscard]] virtual std::string debug_string(int indentation_level)
const;
- virtual std::string debug_string(RuntimeState* state, int
indentation_level) const;
+ [[nodiscard]] virtual std::string debug_string(RuntimeState* state,
+ int indentation_level)
const;
[[nodiscard]] bool is_sink() const override { return true; }
@@ -531,7 +534,7 @@ public:
return state->get_sink_local_state(id())->close(state, exec_status);
}
- virtual Status try_close(RuntimeState* state, Status exec_status) {
+ [[nodiscard]] virtual Status try_close(RuntimeState* state, Status
exec_status) {
return state->get_sink_local_state(id())->try_close(state,
exec_status);
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index f8ea574c7c..e8dc9f3f29 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -70,6 +70,7 @@
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/table_function_operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "pipeline/task_scheduler.h"
@@ -740,6 +741,11 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->add_operator(op));
break;
}
+ case TPlanNodeType::TABLE_FUNCTION_NODE: {
+ op.reset(new TableFunctionOperatorX(pool, tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ break;
+ }
case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
op.reset(new AssertNumRowsOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]