This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 11b6fb9d10 [PipelineX](feature) Support data gen operator (#24823)
11b6fb9d10 is described below
commit 11b6fb9d1082347720dc57f0e6e6dd2991c16ba9
Author: Gabriel <[email protected]>
AuthorDate: Sat Sep 23 19:28:47 2023 +0800
[PipelineX](feature) Support data gen operator (#24823)
---
be/src/pipeline/exec/datagen_operator.cpp | 82 ++++++++++++++++++++++
be/src/pipeline/exec/datagen_operator.h | 40 +++++++++++
be/src/pipeline/pipeline_x/operator.cpp | 2 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 5 ++
4 files changed, 129 insertions(+)
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 817c8e26d9..43049eab5b 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -21,6 +21,8 @@
#include "pipeline/exec/operator.h"
#include "util/runtime_profile.h"
+#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
+#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
#include "vec/exec/vdata_gen_scan_node.h"
namespace doris {
@@ -42,4 +44,84 @@ Status DataGenOperator::close(RuntimeState* state) {
return Status::OK();
}
+DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : OperatorX<DataGenLocalState>(pool, tnode, descs),
+ _tuple_id(tnode.data_gen_scan_node.tuple_id),
+ _tuple_desc(nullptr),
+ _runtime_filter_descs(tnode.runtime_filters) {}
+
+Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
+ RETURN_IF_ERROR(OperatorX<DataGenLocalState>::init(tnode, state));
+ // set _table_func here
+ switch (tnode.data_gen_scan_node.func_name) {
+ case TDataGenFunctionName::NUMBERS:
+ break;
+ default:
+ return Status::InternalError("Unsupported function type");
+ }
+ return Status::OK();
+}
+
+Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
+ // get tuple desc
+ _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+ if (nullptr == _tuple_desc) {
+ return Status::InternalError("Failed to get tuple descriptor.");
+ }
+ return Status::OK();
+}
+
+Status DataGenSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
+ if (state == nullptr || block == nullptr) {
+ return Status::InternalError("input is NULL pointer");
+ }
+ RETURN_IF_CANCELLED(state);
+ CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ bool eos = false;
+ Status res = local_state._table_func->get_next(state, block, &eos);
+ source_state = eos ? SourceState::FINISHED : source_state;
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+ block->columns()));
+ local_state.reached_limit(block, source_state);
+ return res;
+}
+
+Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+ auto& p = _parent->cast<DataGenSourceOperatorX>();
+ _table_func = std::make_shared<vectorized::VNumbersTVF>(p._tuple_id,
p._tuple_desc);
+ _table_func->set_tuple_desc(p._tuple_desc);
+ RETURN_IF_ERROR(_table_func->set_scan_ranges(info.scan_ranges));
+
+ // TODO: use runtime filter to filte result block, maybe this node need
derive from vscan_node.
+ for (const auto& filter_desc : p._runtime_filter_descs) {
+ IRuntimeFilter* runtime_filter = nullptr;
+ if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
+
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
+ filter_desc, state->query_options(), p.id(), false));
+
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
+ filter_desc.filter_id, p.id(), &runtime_filter));
+ } else {
+
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
+ filter_desc, state->query_options(), p.id(), false));
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(
+ filter_desc.filter_id, p.id(), &runtime_filter));
+ }
+ runtime_filter->init_profile(_runtime_profile.get());
+ }
+ return Status::OK();
+}
+
+Status DataGenLocalState::close(RuntimeState* state) {
+ if (_closed) {
+ return Status::OK();
+ }
+ _table_func->close(state);
+ return PipelineXLocalState<>::close(state);
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/datagen_operator.h
b/be/src/pipeline/exec/datagen_operator.h
index 48d525e80b..f2d97536a0 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -21,6 +21,7 @@
#include "common/status.h"
#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
#include "vec/exec/vdata_gen_scan_node.h"
namespace doris {
@@ -48,4 +49,43 @@ public:
Status close(RuntimeState* state) override;
};
+class DataGenSourceOperatorX;
+class DataGenLocalState final : public PipelineXLocalState<> {
+public:
+ ENABLE_FACTORY_CREATOR(DataGenLocalState);
+
+ DataGenLocalState(RuntimeState* state, OperatorXBase* parent)
+ : PipelineXLocalState<>(state, parent) {}
+ ~DataGenLocalState() = default;
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status close(RuntimeState* state) override;
+
+private:
+ friend class DataGenSourceOperatorX;
+ std::shared_ptr<vectorized::VDataGenFunctionInf> _table_func;
+};
+
+class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
+public:
+ DataGenSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
+ Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override;
+
+ [[nodiscard]] bool is_source() const override { return true; }
+
+private:
+ friend class DataGenLocalState;
+ // Tuple id resolved in prepare() to set _tuple_desc;
+ TupleId _tuple_id;
+
+ // Descriptor of tuples generated
+ const TupleDescriptor* _tuple_desc;
+
+ std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+};
+
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 068551da90..42c5842d0f 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -25,6 +25,7 @@
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
@@ -404,6 +405,7 @@ DECLARE_OPERATOR_X(EmptySetLocalState)
DECLARE_OPERATOR_X(UnionSourceLocalState)
DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState)
DECLARE_OPERATOR_X(PartitionSortSourceLocalState)
+DECLARE_OPERATOR_X(DataGenLocalState)
#undef DECLARE_OPERATOR_X
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index e8dc9f3f29..1f7289aced 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -756,6 +756,11 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->add_operator(op));
break;
}
+ case TPlanNodeType::DATA_GEN_SCAN_NODE: {
+ op.reset(new DataGenSourceOperatorX(pool, tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ break;
+ }
case TPlanNodeType::SELECT_NODE: {
op.reset(new SelectOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]