This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 329c22da00 [pipelineX](feature) Support table function operator 
(#24818)
329c22da00 is described below

commit 329c22da00521100499c37e7524ebde4acde0df9
Author: Gabriel <[email protected]>
AuthorDate: Sat Sep 23 19:26:53 2023 +0800

    [pipelineX](feature) Support table function operator (#24818)
---
 be/src/pipeline/exec/aggregation_sink_operator.h   |  10 +-
 be/src/pipeline/exec/aggregation_source_operator.h |   3 +-
 be/src/pipeline/exec/exchange_sink_operator.h      |   6 +-
 be/src/pipeline/exec/exchange_source_operator.h    |  12 +-
 be/src/pipeline/exec/join_build_sink_operator.h    |   4 +-
 be/src/pipeline/exec/join_probe_operator.h         |   8 +-
 .../exec/streaming_aggregation_sink_operator.h     |   2 +
 be/src/pipeline/exec/table_function_operator.cpp   | 284 +++++++++++++++++++++
 be/src/pipeline/exec/table_function_operator.h     | 112 ++++++++
 be/src/pipeline/pipeline.h                         |   6 +-
 be/src/pipeline/pipeline_x/operator.cpp            |   3 +
 be/src/pipeline/pipeline_x/operator.h              |  31 ++-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |   6 +
 13 files changed, 449 insertions(+), 38 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 34ba9e2449..bae9e1a8ec 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -52,9 +52,9 @@ template <typename DependencyType, typename Derived>
 class AggSinkLocalState : public PipelineXSinkLocalState<DependencyType> {
 public:
     using Base = PipelineXSinkLocalState<DependencyType>;
-    virtual ~AggSinkLocalState() = default;
+    ~AggSinkLocalState() override = default;
 
-    virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) 
override;
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
 
@@ -332,7 +332,7 @@ template <typename LocalStateType = 
BlockingAggSinkLocalState>
 class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
 public:
     AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
-    virtual ~AggSinkOperatorX() = default;
+    ~AggSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      DataSinkOperatorX<LocalStateType>::_name);
@@ -343,8 +343,8 @@ public:
     Status prepare(RuntimeState* state) override;
     Status open(RuntimeState* state) override;
 
-    virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
-                        SourceState source_state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
 
     using DataSinkOperatorX<LocalStateType>::id;
 
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 274424f1f9..2d5ca45dec 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -50,11 +50,12 @@ public:
 
 class AggSourceOperatorX;
 
-class AggLocalState : public PipelineXLocalState<AggDependency> {
+class AggLocalState final : public PipelineXLocalState<AggDependency> {
 public:
     using Base = PipelineXLocalState<AggDependency>;
     ENABLE_FACTORY_CREATOR(AggLocalState);
     AggLocalState(RuntimeState* state, OperatorXBase* parent);
+    ~AggLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status close(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 0b05491901..2c899ec49a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -120,7 +120,7 @@ public:
             : WriteDependency(id, "ChannelDependency"),
               _sender_id(sender_id),
               _local_recvr(local_recvr) {}
-    virtual ~ChannelDependency() = default;
+    ~ChannelDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 
@@ -150,7 +150,7 @@ private:
     vectorized::VDataStreamRecvr* _local_recvr;
 };
 
-class ExchangeSinkLocalState : public PipelineXSinkLocalState<> {
+class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
     ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
 
 public:
@@ -312,4 +312,4 @@ private:
 };
 
 } // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index df6d44494e..a7e146b54d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -50,7 +50,7 @@ public:
     bool is_pending_finish() const override;
 };
 
-struct ExchangeDataDependency : public Dependency {
+struct ExchangeDataDependency final : public Dependency {
 public:
     ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
     ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue* 
sender_queue)
@@ -89,7 +89,7 @@ private:
 };
 
 class ExchangeSourceOperatorX;
-class ExchangeLocalState : public PipelineXLocalState<> {
+class ExchangeLocalState final : public PipelineXLocalState<> {
     ENABLE_FACTORY_CREATOR(ExchangeLocalState);
     ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
 
@@ -123,12 +123,12 @@ public:
                      SourceState& source_state) override;
 
     Status close(RuntimeState* state) override;
-    bool is_source() const override { return true; }
+    [[nodiscard]] bool is_source() const override { return true; }
 
-    RowDescriptor input_row_desc() const { return _input_row_desc; }
+    [[nodiscard]] RowDescriptor input_row_desc() const { return 
_input_row_desc; }
 
-    int num_senders() const { return _num_senders; }
-    bool is_merging() const { return _is_merging; }
+    [[nodiscard]] int num_senders() const { return _num_senders; }
+    [[nodiscard]] bool is_merging() const { return _is_merging; }
 
     std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr() {
         return _sub_plan_query_statistics_recvr;
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h 
b/be/src/pipeline/exec/join_build_sink_operator.h
index 361982dd97..55553df462 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -36,7 +36,7 @@ public:
 protected:
     JoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : PipelineXSinkLocalState<DependencyType>(parent, state) {}
-    virtual ~JoinBuildSinkLocalState() = default;
+    ~JoinBuildSinkLocalState() override = default;
     template <typename LocalStateType>
     friend class JoinBuildSinkOperatorX;
 
@@ -53,7 +53,7 @@ template <typename LocalStateType>
 class JoinBuildSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
 public:
     JoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
-    virtual ~JoinBuildSinkOperatorX() = default;
+    ~JoinBuildSinkOperatorX() override = default;
 
 protected:
     void _init_join_op();
diff --git a/be/src/pipeline/exec/join_probe_operator.h 
b/be/src/pipeline/exec/join_probe_operator.h
index e3ed33e4d9..1a7090b232 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -31,8 +31,8 @@ template <typename DependencyType, typename Derived>
 class JoinProbeLocalState : public PipelineXLocalState<DependencyType> {
 public:
     using Base = PipelineXLocalState<DependencyType>;
-    virtual Status init(RuntimeState* state, LocalStateInfo& info) override;
-    virtual Status close(RuntimeState* state) override;
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status close(RuntimeState* state) override;
     virtual void add_tuple_is_null_column(vectorized::Block* block) = 0;
 
 protected:
@@ -42,7 +42,7 @@ protected:
             : Base(state, parent),
               _child_block(vectorized::Block::create_unique()),
               _child_source_state(SourceState::DEPEND_ON_SOURCE) {}
-    virtual ~JoinProbeLocalState() = default;
+    ~JoinProbeLocalState() override = default;
     void _construct_mutable_join_block();
     Status _build_output_block(vectorized::Block* origin_block, 
vectorized::Block* output_block,
                                bool keep_origin = true);
@@ -67,7 +67,7 @@ class JoinProbeOperatorX : public 
StatefulOperatorX<LocalStateType> {
 public:
     using Base = StatefulOperatorX<LocalStateType>;
     JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
-    virtual Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
     Status open(doris::RuntimeState* state) override;
     [[nodiscard]] const RowDescriptor& row_desc() override { return 
*_output_row_desc; }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 1796716804..e92c5ff4e6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -81,6 +81,7 @@ public:
     using Base = AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;
     ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState);
     StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
+    ~StreamingAggSinkLocalState() override = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status close(RuntimeState* state, Status exec_status) override;
@@ -108,6 +109,7 @@ private:
 class StreamingAggSinkOperatorX final : public 
AggSinkOperatorX<StreamingAggSinkLocalState> {
 public:
     StreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    ~StreamingAggSinkOperatorX() override = default;
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
diff --git a/be/src/pipeline/exec/table_function_operator.cpp 
b/be/src/pipeline/exec/table_function_operator.cpp
index 3f6133c3fc..7ce7ac3410 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -21,6 +21,7 @@
 
 #include "pipeline/exec/operator.h"
 #include "vec/core/block.h"
+#include "vec/exprs/table_function/table_function_factory.h"
 
 namespace doris {
 class RuntimeState;
@@ -41,4 +42,287 @@ Status TableFunctionOperator::close(doris::RuntimeState* 
state) {
     return StatefulOperator::close(state);
 }
 
+TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state, 
OperatorXBase* parent)
+        : PipelineXLocalState<>(state, parent), 
_child_block(vectorized::Block::create_unique()) {}
+
+Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo& 
info) {
+    RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+    auto& p = _parent->cast<TableFunctionOperatorX>();
+    _vfn_ctxs.resize(p._vfn_ctxs.size());
+    for (size_t i = 0; i < _vfn_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._vfn_ctxs[i]->clone(state, _vfn_ctxs[i]));
+
+        const std::string& tf_name = 
_vfn_ctxs[i]->root()->fn().name.function_name;
+        vectorized::TableFunction* fn = nullptr;
+        RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, 
state->obj_pool(), &fn));
+        fn->set_expr_context(_vfn_ctxs[i]);
+        _fns.push_back(fn);
+    }
+
+    _cur_child_offset = -1;
+    return Status::OK();
+}
+
+void TableFunctionLocalState::_copy_output_slots(
+        std::vector<vectorized::MutableColumnPtr>& columns) {
+    if (!_current_row_insert_times) {
+        return;
+    }
+    auto& p = _parent->cast<TableFunctionOperatorX>();
+    for (auto index : p._output_slot_indexs) {
+        auto src_column = _child_block->get_by_position(index).column;
+        columns[index]->insert_many_from(*src_column, _cur_child_offset, 
_current_row_insert_times);
+    }
+    _current_row_insert_times = 0;
+}
+
+// Returns the index of fn of the last eos counted from back to front
+// eg: there are 3 functions in `_fns`
+//      eos:    false, true, true
+//      return: 1
+//
+//      eos:    false, false, true
+//      return: 2
+//
+//      eos:    false, false, false
+//      return: -1
+//
+//      eos:    true, true, true
+//      return: 0
+//
+// return:
+//  0: all fns are eos
+// -1: all fns are not eos
+// >0: some of fns are eos
+int TableFunctionLocalState::_find_last_fn_eos_idx() const {
+    for (int i = _parent->cast<TableFunctionOperatorX>()._fn_num - 1; i >= 0; 
--i) {
+        if (!_fns[i]->eos()) {
+            if (i == _parent->cast<TableFunctionOperatorX>()._fn_num - 1) {
+                return -1;
+            } else {
+                return i + 1;
+            }
+        }
+    }
+    // all eos
+    return 0;
+}
+
+// Roll to reset the table function.
+// Eg:
+//  There are 3 functions f1, f2 and f3 in `_fns`.
+//  If `last_eos_idx` is 1, which means f2 and f3 are eos.
+//  So we need to forward f1, and reset f2 and f3.
+bool TableFunctionLocalState::_roll_table_functions(int last_eos_idx) {
+    int i = last_eos_idx - 1;
+    for (; i >= 0; --i) {
+        _fns[i]->forward();
+        if (!_fns[i]->eos()) {
+            break;
+        }
+    }
+    if (i == -1) {
+        // after forward, all functions are eos.
+        // we should process next child row to get more table function results.
+        return false;
+    }
+
+    for (int j = i + 1; j < _parent->cast<TableFunctionOperatorX>()._fn_num; 
++j) {
+        _fns[j]->reset();
+    }
+
+    return true;
+}
+
+bool TableFunctionLocalState::_is_inner_and_empty() {
+    for (int i = 0; i < _parent->cast<TableFunctionOperatorX>()._fn_num; i++) {
+        // if any table function is not outer and has empty result, go to next 
child row
+        if (!_fns[i]->is_outer() && _fns[i]->current_empty()) {
+            return true;
+        }
+    }
+    return false;
+}
+
+Status TableFunctionLocalState::get_expanded_block(RuntimeState* state,
+                                                   vectorized::Block* 
output_block,
+                                                   SourceState& source_state) {
+    auto& p = _parent->cast<TableFunctionOperatorX>();
+    vectorized::MutableBlock m_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+            output_block, p._output_slots);
+    vectorized::MutableColumns& columns = m_block.mutable_columns();
+
+    for (int i = 0; i < p._fn_num; i++) {
+        if (columns[i + p._child_slots.size()]->is_nullable()) {
+            _fns[i]->set_nullable();
+        }
+    }
+
+    while (columns[p._child_slots.size()]->size() < state->batch_size()) {
+        RETURN_IF_CANCELLED(state);
+        RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while 
getting next batch."));
+
+        if (_child_block->rows() == 0) {
+            break;
+        }
+
+        bool skip_child_row = false;
+        while (columns[p._child_slots.size()]->size() < state->batch_size()) {
+            int idx = _find_last_fn_eos_idx();
+            if (idx == 0 || skip_child_row) {
+                _copy_output_slots(columns);
+                // all table functions' results are exhausted, process next 
child row.
+                RETURN_IF_ERROR(process_next_child_row());
+                if (_cur_child_offset == -1) {
+                    break;
+                }
+            } else if (idx < p._fn_num && idx != -1) {
+                // some of table functions' results are exhausted.
+                if (!_roll_table_functions(idx)) {
+                    // continue to process next child row.
+                    continue;
+                }
+            }
+
+            // if any table function is not outer and has empty result, go to 
next child row
+            if (skip_child_row = _is_inner_and_empty(); skip_child_row) {
+                continue;
+            }
+            if (p._fn_num == 1) {
+                _current_row_insert_times += _fns[0]->get_value(
+                        columns[p._child_slots.size()],
+                        state->batch_size() - 
columns[p._child_slots.size()]->size());
+            } else {
+                for (int i = 0; i < p._fn_num; i++) {
+                    _fns[i]->get_value(columns[i + p._child_slots.size()]);
+                }
+                _current_row_insert_times++;
+                _fns[p._fn_num - 1]->forward();
+            }
+        }
+    }
+
+    _copy_output_slots(columns);
+
+    size_t row_size = columns[p._child_slots.size()]->size();
+    for (auto index : p._useless_slot_indexs) {
+        columns[index]->insert_many_defaults(row_size - 
columns[index]->size());
+    }
+
+    // 3. eval conjuncts
+    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
+                                                           
output_block->columns()));
+
+    if (_child_source_state == SourceState::FINISHED && _cur_child_offset == 
-1) {
+        source_state = SourceState::FINISHED;
+    }
+    return Status::OK();
+}
+
+Status TableFunctionLocalState::process_next_child_row() {
+    _cur_child_offset++;
+
+    if (_cur_child_offset >= _child_block->rows()) {
+        // release block use count.
+        for (vectorized::TableFunction* fn : _fns) {
+            RETURN_IF_ERROR(fn->process_close());
+        }
+
+        _child_block->clear_column_data(_parent->cast<TableFunctionOperatorX>()
+                                                ._child_x->row_desc()
+                                                .num_materialized_slots());
+        _cur_child_offset = -1;
+        return Status::OK();
+    }
+
+    for (vectorized::TableFunction* fn : _fns) {
+        RETURN_IF_ERROR(fn->process_row(_cur_child_offset));
+    }
+
+    return Status::OK();
+}
+
+TableFunctionOperatorX::TableFunctionOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
+                                               const DescriptorTbl& descs)
+        : Base(pool, tnode, descs) {}
+
+Status TableFunctionOperatorX::_prepare_output_slot_ids(const TPlanNode& 
tnode) {
+    // Prepare output slot ids
+    if (tnode.table_function_node.outputSlotIds.empty()) {
+        return Status::InternalError("Output slots of table function node is 
empty");
+    }
+    SlotId max_id = -1;
+    for (auto slot_id : tnode.table_function_node.outputSlotIds) {
+        if (slot_id > max_id) {
+            max_id = slot_id;
+        }
+    }
+    _output_slot_ids = std::vector<bool>(max_id + 1, false);
+    for (auto slot_id : tnode.table_function_node.outputSlotIds) {
+        _output_slot_ids[slot_id] = true;
+    }
+
+    return Status::OK();
+}
+
+Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(Base::init(tnode, state));
+
+    for (const TExpr& texpr : tnode.table_function_node.fnCallExprList) {
+        vectorized::VExprContextSPtr ctx;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(texpr, ctx));
+        _vfn_ctxs.push_back(ctx);
+
+        auto root = ctx->root();
+        const std::string& tf_name = root->fn().name.function_name;
+        vectorized::TableFunction* fn = nullptr;
+        RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, 
_pool, &fn));
+        fn->set_expr_context(ctx);
+        _fns.push_back(fn);
+    }
+    _fn_num = _fns.size();
+
+    // Prepare output slot ids
+    RETURN_IF_ERROR(_prepare_output_slot_ids(tnode));
+    return Status::OK();
+}
+
+Status TableFunctionOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+
+    for (auto fn : _fns) {
+        RETURN_IF_ERROR(fn->prepare());
+    }
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_vfn_ctxs, state, 
_row_descriptor));
+
+    // get current all output slots
+    for (const auto& tuple_desc : _row_descriptor.tuple_descriptors()) {
+        for (const auto& slot_desc : tuple_desc->slots()) {
+            _output_slots.push_back(slot_desc);
+        }
+    }
+
+    // get all input slots
+    for (const auto& child_tuple_desc : 
_child_x->row_desc().tuple_descriptors()) {
+        for (const auto& child_slot_desc : child_tuple_desc->slots()) {
+            _child_slots.push_back(child_slot_desc);
+        }
+    }
+
+    for (size_t i = 0; i < _child_slots.size(); i++) {
+        if (_slot_need_copy(i)) {
+            _output_slot_indexs.push_back(i);
+        } else {
+            _useless_slot_indexs.push_back(i);
+        }
+    }
+
+    return Status::OK();
+}
+
+Status TableFunctionOperatorX::open(doris::RuntimeState* state) {
+    RETURN_IF_ERROR(Base::open(state));
+    return vectorized::VExpr::open(_vfn_ctxs, state);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/table_function_operator.h 
b/be/src/pipeline/exec/table_function_operator.h
index a4b4e77141..541938ec72 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/exec/vtable_function_node.h"
 
 namespace doris {
@@ -45,4 +46,115 @@ public:
 
     Status close(RuntimeState* state) override;
 };
+
+class TableFunctionOperatorX;
+class TableFunctionLocalState final : public PipelineXLocalState<> {
+public:
+    using Parent = TableFunctionOperatorX;
+    ENABLE_FACTORY_CREATOR(TableFunctionLocalState);
+    TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent);
+    ~TableFunctionLocalState() override = default;
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status process_next_child_row();
+    Status get_expanded_block(RuntimeState* state, vectorized::Block* 
output_block,
+                              SourceState& source_state);
+
+private:
+    friend class TableFunctionOperatorX;
+    friend class StatefulOperatorX<TableFunctionLocalState>;
+
+    void _copy_output_slots(std::vector<vectorized::MutableColumnPtr>& 
columns);
+    bool _roll_table_functions(int last_eos_idx);
+    // return:
+    //  0: all fns are eos
+    // -1: all fns are not eos
+    // >0: some of fns are eos
+    int _find_last_fn_eos_idx() const;
+    bool _is_inner_and_empty();
+
+    std::vector<vectorized::TableFunction*> _fns;
+    vectorized::VExprContextSPtrs _vfn_ctxs;
+    int64_t _cur_child_offset = 0;
+    std::unique_ptr<vectorized::Block> _child_block;
+    int _current_row_insert_times = 0;
+    SourceState _child_source_state;
+};
+
+class TableFunctionOperatorX final : public 
StatefulOperatorX<TableFunctionLocalState> {
+public:
+    using Base = StatefulOperatorX<TableFunctionLocalState>;
+    TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(doris::RuntimeState* state) override;
+    Status open(doris::RuntimeState* state) override;
+
+    bool need_more_input_data(RuntimeState* state) const override {
+        auto& local_state = 
state->get_local_state(id())->cast<TableFunctionLocalState>();
+        return !local_state._child_block->rows() &&
+               local_state._child_source_state != SourceState::FINISHED;
+    }
+
+    Status push(RuntimeState* state, vectorized::Block* input_block,
+                SourceState source_state) const override {
+        CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        if (input_block->rows() == 0) {
+            return Status::OK();
+        }
+
+        for (auto* fn : local_state._fns) {
+            RETURN_IF_ERROR(fn->process_init(input_block, state));
+        }
+        RETURN_IF_ERROR(local_state.process_next_child_row());
+        return Status::OK();
+    }
+
+    Status pull(RuntimeState* state, vectorized::Block* output_block,
+                SourceState& source_state) const override {
+        CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block, 
source_state));
+        local_state.reached_limit(output_block, source_state);
+        return Status::OK();
+    }
+
+private:
+    friend class TableFunctionLocalState;
+
+    Status _prepare_output_slot_ids(const TPlanNode& tnode);
+
+    /*  Now the output tuples for table function node is base_table_tuple + 
tf1 + tf2 + ...
+        But not all slots are used, the real used slots are inside 
table_function_node.outputSlotIds.
+        For case like explode_bitmap:
+            SELECT a2,count(*) as a3 FROM A WHERE a1 IN
+                (SELECT c1 FROM B LATERAL VIEW explode_bitmap(b1) C as c1)
+            GROUP BY a2 ORDER BY a3;
+        Actually we only need to output column c1, no need to output columns 
in bitmap table B.
+        Copy large bitmap columns are very expensive and slow.
+
+    Here we check if the slot is really used, otherwise we avoid copy it and 
just insert a default value.
+
+                                              A better solution is:
+                                              1. FE: create a new output tuple 
based on the real output slots;
+    2. BE: refractor (V)TableFunctionNode output rows based no the new tuple;
+    */
+    [[nodiscard]] inline bool _slot_need_copy(SlotId slot_id) const {
+        auto id = _output_slots[slot_id]->id();
+        return (id < _output_slot_ids.size()) && (_output_slot_ids[id]);
+    }
+
+    std::vector<SlotDescriptor*> _child_slots;
+    std::vector<SlotDescriptor*> _output_slots;
+
+    vectorized::VExprContextSPtrs _vfn_ctxs;
+
+    std::vector<vectorized::TableFunction*> _fns;
+    int _fn_num = 0;
+
+    std::vector<bool> _output_slot_ids;
+    std::vector<int> _output_slot_indexs;
+    std::vector<int> _useless_slot_indexs;
+
+    std::vector<int> _child_slot_sizes;
+};
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index b29aaeb00b..c67ef6d29e 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -112,11 +112,11 @@ public:
 
     RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
 
-    const RowDescriptor& output_row_desc() const {
-        return operatorXs[operatorXs.size() - 1]->row_desc();
+    [[nodiscard]] const RowDescriptor& output_row_desc() const {
+        return operatorXs.back()->row_desc();
     }
 
-    PipelineId id() const { return _pipeline_id; }
+    [[nodiscard]] PipelineId id() const { return _pipeline_id; }
 
 private:
     void _init_profile();
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index a9490bd967..068551da90 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -45,6 +45,7 @@
 #include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/exec/streaming_aggregation_sink_operator.h"
 #include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/table_function_operator.h"
 #include "pipeline/exec/union_sink_operator.h"
 #include "pipeline/exec/union_source_operator.h"
 #include "util/debug_util.h"
@@ -394,6 +395,7 @@ DECLARE_OPERATOR_X(OlapScanLocalState)
 DECLARE_OPERATOR_X(AnalyticLocalState)
 DECLARE_OPERATOR_X(SortLocalState)
 DECLARE_OPERATOR_X(AggLocalState)
+DECLARE_OPERATOR_X(TableFunctionLocalState)
 DECLARE_OPERATOR_X(ExchangeLocalState)
 DECLARE_OPERATOR_X(RepeatLocalState)
 DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
@@ -411,6 +413,7 @@ template class StreamingOperatorX<SelectLocalState>;
 template class StatefulOperatorX<HashJoinProbeLocalState>;
 template class StatefulOperatorX<RepeatLocalState>;
 template class StatefulOperatorX<NestedLoopJoinProbeLocalState>;
+template class StatefulOperatorX<TableFunctionLocalState>;
 
 template class PipelineXSinkLocalState<HashJoinDependency>;
 template class PipelineXSinkLocalState<SortDependency>;
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index f219b68f2e..92f2b0b9da 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -94,7 +94,7 @@ public:
     // If use projection, we should clear `_origin_block`.
     void clear_origin_block();
 
-    bool reached_limit() const;
+    [[nodiscard]] bool reached_limit() const;
     void reached_limit(vectorized::Block* block, SourceState& source_state);
     RuntimeProfile* profile() { return _runtime_profile.get(); }
 
@@ -114,7 +114,7 @@ public:
     void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; }
     void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
 
-    virtual std::string debug_string(int indentation_level = 0) const;
+    [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) 
const;
 
 protected:
     friend class OperatorXBase;
@@ -181,9 +181,9 @@ public:
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
 
-    virtual Status prepare(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
-    virtual Status open(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
 
     Status finalize(RuntimeState* state) override { return Status::OK(); }
 
@@ -201,7 +201,7 @@ public:
         return false;
     }
 
-    bool is_pending_finish() const override {
+    [[nodiscard]] bool is_pending_finish() const override {
         LOG(FATAL) << "should not reach here!";
         return false;
     }
@@ -228,7 +228,7 @@ public:
         return _row_descriptor;
     }
 
-    std::string debug_string() const override { return ""; }
+    [[nodiscard]] std::string debug_string() const override { return ""; }
 
     virtual std::string debug_string(int indentation_level = 0) const;
 
@@ -305,7 +305,7 @@ public:
     OperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs)
             : OperatorXBase(pool, tnode, descs) {}
     OperatorX(ObjectPool* pool, int id) : OperatorXBase(pool, id) {};
-    virtual ~OperatorX() = default;
+    ~OperatorX() override = default;
 
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override;
     Status setup_local_states(RuntimeState* state, 
std::vector<LocalStateInfo>& info) override;
@@ -317,7 +317,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase {
 public:
     PipelineXLocalState(RuntimeState* state, OperatorXBase* parent)
             : PipelineXLocalStateBase(state, parent) {}
-    virtual ~PipelineXLocalState() {}
+    ~PipelineXLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override {
         _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
@@ -399,7 +399,7 @@ public:
     virtual Status close(RuntimeState* state, Status exec_status) = 0;
     virtual Status try_close(RuntimeState* state, Status exec_status) = 0;
 
-    virtual std::string debug_string(int indentation_level) const;
+    [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const;
 
     template <class TARGET>
     TARGET& cast() {
@@ -421,7 +421,9 @@ public:
     RuntimeProfile* profile() { return _profile; }
     MemTracker* mem_tracker() { return _mem_tracker.get(); }
     QueryStatistics* query_statistics() { return _query_statistics.get(); }
-    RuntimeProfile* faker_runtime_profile() const { return 
_faker_runtime_profile.get(); }
+    [[nodiscard]] RuntimeProfile* faker_runtime_profile() const {
+        return _faker_runtime_profile.get();
+    }
 
     RuntimeProfile::Counter* rows_input_counter() { return 
_rows_input_counter; }
 
@@ -508,7 +510,7 @@ public:
         return false;
     }
 
-    bool is_pending_finish() const override {
+    [[nodiscard]] bool is_pending_finish() const override {
         LOG(FATAL) << "should not reach here!";
         return false;
     }
@@ -519,9 +521,10 @@ public:
 
     [[nodiscard]] std::string debug_string() const override { return ""; }
 
-    virtual std::string debug_string(int indentation_level) const;
+    [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const;
 
-    virtual std::string debug_string(RuntimeState* state, int 
indentation_level) const;
+    [[nodiscard]] virtual std::string debug_string(RuntimeState* state,
+                                                   int indentation_level) 
const;
 
     [[nodiscard]] bool is_sink() const override { return true; }
 
@@ -531,7 +534,7 @@ public:
         return state->get_sink_local_state(id())->close(state, exec_status);
     }
 
-    virtual Status try_close(RuntimeState* state, Status exec_status) {
+    [[nodiscard]] virtual Status try_close(RuntimeState* state, Status 
exec_status) {
         return state->get_sink_local_state(id())->try_close(state, 
exec_status);
     }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index f8ea574c7c..e8dc9f3f29 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -70,6 +70,7 @@
 #include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/exec/streaming_aggregation_sink_operator.h"
 #include "pipeline/exec/streaming_aggregation_source_operator.h"
+#include "pipeline/exec/table_function_operator.h"
 #include "pipeline/exec/union_sink_operator.h"
 #include "pipeline/exec/union_source_operator.h"
 #include "pipeline/task_scheduler.h"
@@ -740,6 +741,11 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
         break;
     }
+    case TPlanNodeType::TABLE_FUNCTION_NODE: {
+        op.reset(new TableFunctionOperatorX(pool, tnode, descs));
+        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        break;
+    }
     case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
         op.reset(new AssertNumRowsOperatorX(pool, tnode, descs));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to