This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 11b6fb9d10 [PipelineX](feature) Support data gen operator (#24823)
11b6fb9d10 is described below

commit 11b6fb9d1082347720dc57f0e6e6dd2991c16ba9
Author: Gabriel <[email protected]>
AuthorDate: Sat Sep 23 19:28:47 2023 +0800

    [PipelineX](feature) Support data gen operator (#24823)
---
 be/src/pipeline/exec/datagen_operator.cpp          | 82 ++++++++++++++++++++++
 be/src/pipeline/exec/datagen_operator.h            | 40 +++++++++++
 be/src/pipeline/pipeline_x/operator.cpp            |  2 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  5 ++
 4 files changed, 129 insertions(+)

diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 817c8e26d9..43049eab5b 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -21,6 +21,8 @@
 
 #include "pipeline/exec/operator.h"
 #include "util/runtime_profile.h"
+#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
+#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
 #include "vec/exec/vdata_gen_scan_node.h"
 
 namespace doris {
@@ -42,4 +44,84 @@ Status DataGenOperator::close(RuntimeState* state) {
     return Status::OK();
 }
 
+DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
+                                               const DescriptorTbl& descs)
+        : OperatorX<DataGenLocalState>(pool, tnode, descs),
+          _tuple_id(tnode.data_gen_scan_node.tuple_id),
+          _tuple_desc(nullptr),
+          _runtime_filter_descs(tnode.runtime_filters) {}
+
+Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
+    RETURN_IF_ERROR(OperatorX<DataGenLocalState>::init(tnode, state));
+    // set _table_func here
+    switch (tnode.data_gen_scan_node.func_name) {
+    case TDataGenFunctionName::NUMBERS:
+        break;
+    default:
+        return Status::InternalError("Unsupported function type");
+    }
+    return Status::OK();
+}
+
+Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
+    // get tuple desc
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+    if (nullptr == _tuple_desc) {
+        return Status::InternalError("Failed to get tuple descriptor.");
+    }
+    return Status::OK();
+}
+
+Status DataGenSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
+                                         SourceState& source_state) {
+    if (state == nullptr || block == nullptr) {
+        return Status::InternalError("input is NULL pointer");
+    }
+    RETURN_IF_CANCELLED(state);
+    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    bool eos = false;
+    Status res = local_state._table_func->get_next(state, block, &eos);
+    source_state = eos ? SourceState::FINISHED : source_state;
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
+                                                           block->columns()));
+    local_state.reached_limit(block, source_state);
+    return res;
+}
+
+Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+    auto& p = _parent->cast<DataGenSourceOperatorX>();
+    _table_func = std::make_shared<vectorized::VNumbersTVF>(p._tuple_id, 
p._tuple_desc);
+    _table_func->set_tuple_desc(p._tuple_desc);
+    RETURN_IF_ERROR(_table_func->set_scan_ranges(info.scan_ranges));
+
+    // TODO: use runtime filter to filte result block, maybe this node need 
derive from vscan_node.
+    for (const auto& filter_desc : p._runtime_filter_descs) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
+            
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
+                    filter_desc, state->query_options(), p.id(), false));
+            
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
+                    filter_desc.filter_id, p.id(), &runtime_filter));
+        } else {
+            
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
+                    filter_desc, state->query_options(), p.id(), false));
+            RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(
+                    filter_desc.filter_id, p.id(), &runtime_filter));
+        }
+        runtime_filter->init_profile(_runtime_profile.get());
+    }
+    return Status::OK();
+}
+
+Status DataGenLocalState::close(RuntimeState* state) {
+    if (_closed) {
+        return Status::OK();
+    }
+    _table_func->close(state);
+    return PipelineXLocalState<>::close(state);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/datagen_operator.h 
b/be/src/pipeline/exec/datagen_operator.h
index 48d525e80b..f2d97536a0 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/exec/vdata_gen_scan_node.h"
 
 namespace doris {
@@ -48,4 +49,43 @@ public:
     Status close(RuntimeState* state) override;
 };
 
+class DataGenSourceOperatorX;
+class DataGenLocalState final : public PipelineXLocalState<> {
+public:
+    ENABLE_FACTORY_CREATOR(DataGenLocalState);
+
+    DataGenLocalState(RuntimeState* state, OperatorXBase* parent)
+            : PipelineXLocalState<>(state, parent) {}
+    ~DataGenLocalState() = default;
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status close(RuntimeState* state) override;
+
+private:
+    friend class DataGenSourceOperatorX;
+    std::shared_ptr<vectorized::VDataGenFunctionInf> _table_func;
+};
+
+class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
+public:
+    DataGenSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+
+    [[nodiscard]] bool is_source() const override { return true; }
+
+private:
+    friend class DataGenLocalState;
+    // Tuple id resolved in prepare() to set _tuple_desc;
+    TupleId _tuple_id;
+
+    // Descriptor of tuples generated
+    const TupleDescriptor* _tuple_desc;
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+};
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 068551da90..42c5842d0f 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -25,6 +25,7 @@
 #include "pipeline/exec/analytic_sink_operator.h"
 #include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/datagen_operator.h"
 #include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
 #include "pipeline/exec/exchange_sink_operator.h"
@@ -404,6 +405,7 @@ DECLARE_OPERATOR_X(EmptySetLocalState)
 DECLARE_OPERATOR_X(UnionSourceLocalState)
 DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState)
 DECLARE_OPERATOR_X(PartitionSortSourceLocalState)
+DECLARE_OPERATOR_X(DataGenLocalState)
 
 #undef DECLARE_OPERATOR_X
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e8dc9f3f29..1f7289aced 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -756,6 +756,11 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
         break;
     }
+    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
+        op.reset(new DataGenSourceOperatorX(pool, tnode, descs));
+        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        break;
+    }
     case TPlanNodeType::SELECT_NODE: {
         op.reset(new SelectOperatorX(pool, tnode, descs));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to