This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bd582aee75a [pipelineX](minor) refine code (#25015)
bd582aee75a is described below
commit bd582aee75adc5d82e65280dca18b731ecf08755
Author: Gabriel <[email protected]>
AuthorDate: Sat Oct 7 10:45:33 2023 +0800
[pipelineX](minor) refine code (#25015)
---
be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +
be/src/pipeline/exec/analytic_sink_operator.h | 2 +-
...ream_sink.h => multi_cast_data_stream_sink.cpp} | 31 +++------
be/src/pipeline/exec/multi_cast_data_stream_sink.h | 74 +++++++++++++++++++++-
.../exec/multi_cast_data_stream_source.cpp | 12 ++--
.../pipeline/exec/multi_cast_data_stream_source.h | 71 +--------------------
be/src/pipeline/exec/union_source_operator.cpp | 2 +-
be/src/pipeline/exec/union_source_operator.h | 2 +-
be/src/pipeline/pipeline_x/dependency.h | 6 +-
be/src/pipeline/pipeline_x/operator.cpp | 19 ++----
be/src/pipeline/pipeline_x/operator.h | 4 --
.../pipeline_x/pipeline_x_fragment_context.cpp | 1 +
12 files changed, 104 insertions(+), 122 deletions(-)
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 2b35e1b6a2c..d839be0dc16 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -215,4 +215,6 @@ Status
AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
return Status::OK();
}
+template class DataSinkOperatorX<AnalyticSinkLocalState>;
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 41d276205be..c8583925865 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -102,4 +102,4 @@ private:
};
} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
similarity index 57%
copy from be/src/pipeline/exec/multi_cast_data_stream_sink.h
copy to be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index e137a7e6558..b44f15d13e9 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -15,33 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
-
-#include "operator.h"
-#include "vec/sink/multi_cast_data_stream_sink.h"
+#include "multi_cast_data_stream_sink.h"
namespace doris::pipeline {
-class MultiCastDataStreamSinkOperatorBuilder final
- : public DataSinkOperatorBuilder<vectorized::MultiCastDataStreamSink> {
-public:
- MultiCastDataStreamSinkOperatorBuilder(int32_t id, DataSink* sink)
- : DataSinkOperatorBuilder(id, "MultiCastDataStreamSinkOperator",
sink) {}
-
- OperatorPtr build_operator() override;
-};
-
-class MultiCastDataStreamSinkOperator final
- : public DataSinkOperator<MultiCastDataStreamSinkOperatorBuilder> {
-public:
- MultiCastDataStreamSinkOperator(OperatorBuilderBase* operator_builder,
DataSink* sink)
- : DataSinkOperator(operator_builder, sink) {}
-
- bool can_write() override { return true; }
-};
-
OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
}
+Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ auto& p = _parent->cast<MultiCastDataStreamSinkOperatorX>();
+ _shared_state->multi_cast_data_streamer =
std::make_shared<pipeline::MultiCastDataStreamer>(
+ p._row_desc, p._pool, p._cast_sender_count);
+ return Status::OK();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index e137a7e6558..f949b624c7b 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -18,6 +18,7 @@
#pragma once
#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
namespace doris::pipeline {
@@ -40,8 +41,75 @@ public:
bool can_write() override { return true; }
};
-OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() {
- return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
-}
+class MultiCastDataStreamSinkOperatorX;
+class MultiCastDataStreamSinkLocalState final
+ : public PipelineXSinkLocalState<MultiCastDependency> {
+ ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
+ MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
+ : Base(parent, state) {}
+
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ friend class MultiCastDataStreamSinkOperatorX;
+ friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
+ using Base = PipelineXSinkLocalState<MultiCastDependency>;
+ using Parent = MultiCastDataStreamSinkOperatorX;
+
+private:
+ std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
+};
+
+class MultiCastDataStreamSinkOperatorX final
+ : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> {
+ using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
+
+public:
+ MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
+ const int cast_sender_count, ObjectPool*
pool,
+ const TMultiCastDataStreamSink& sink,
+ const RowDescriptor& row_desc)
+ : Base(sink_id, sources),
+ _pool(pool),
+ _row_desc(row_desc),
+ _cast_sender_count(cast_sender_count) {}
+ ~MultiCastDataStreamSinkOperatorX() override = default;
+ Status init(const TDataSink& tsink) override { return Status::OK(); }
+
+ Status open(doris::RuntimeState* state) override { return Status::OK(); };
+
+ Status prepare(RuntimeState* state) override { return Status::OK(); }
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override {
+ CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ SCOPED_TIMER(local_state.profile()->total_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+ if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+ auto st =
local_state._shared_state->multi_cast_data_streamer->push(
+ state, in_block, source_state == SourceState::FINISHED);
+ // TODO: improvement: if sink returned END_OF_FILE, pipeline task
can be finished
+ if (st.template is<ErrorCode::END_OF_FILE>()) {
+ return Status::OK();
+ }
+ return st;
+ }
+ return Status::OK();
+ }
+
+ RowDescriptor& row_desc() override { return _row_desc; }
+
+ std::shared_ptr<pipeline::MultiCastDataStreamer>
create_multi_cast_data_streamer() {
+ auto multi_cast_data_streamer =
std::make_shared<pipeline::MultiCastDataStreamer>(
+ _row_desc, _pool, _cast_sender_count);
+ return multi_cast_data_streamer;
+ }
+
+private:
+ friend class MultiCastDataStreamSinkLocalState;
+ ObjectPool* _pool;
+ RowDescriptor _row_desc;
+ int _cast_sender_count;
+ friend class MultiCastDataStreamSinkLocalState;
+};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index c0e7b146594..c70d87f59e0 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -130,11 +130,9 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
- if (p._t_data_stream_sink.__isset.output_exprs) {
- _output_expr_contexts.resize(p._output_expr_contexts.size());
- for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
- RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state,
_output_expr_contexts[i]));
- }
+ _output_expr_contexts.resize(p._output_expr_contexts.size());
+ for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
+ RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state,
_output_expr_contexts[i]));
}
return Status::OK();
}
@@ -150,7 +148,7 @@ Status
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
if (!local_state._output_expr_contexts.empty()) {
output_block = &tmp_block;
}
- local_state._shared_state->_multi_cast_data_streamer->pull(_consumer_id,
output_block, &eos);
+ local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id,
output_block, &eos);
if (!local_state._conjuncts.empty()) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
output_block,
@@ -162,9 +160,11 @@ Status
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
local_state._output_expr_contexts, *output_block, block));
materialize_block_inplace(*block);
}
+ COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
if (eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index aa20272d07b..3d2b8157fa7 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -108,6 +108,7 @@ public:
private:
vectorized::VExprContextSPtrs _output_expr_contexts;
};
+
class MultiCastDataStreamerSourceOperatorX final
: public OperatorX<MultiCastDataStreamSourceLocalState> {
public:
@@ -169,73 +170,5 @@ private:
const RowDescriptor& _row_desc() { return _row_descriptor; }
};
-// sink operator
-
-class MultiCastDataStreamSinkOperatorX;
-class MultiCastDataStreamSinkLocalState final
- : public PipelineXSinkLocalState<MultiCastDependency> {
- ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
- MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
- : Base(parent, state) {}
- friend class MultiCastDataStreamSinkOperatorX;
- friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
- using Base = PipelineXSinkLocalState<MultiCastDependency>;
- using Parent = MultiCastDataStreamSinkOperatorX;
-};
-
-class MultiCastDataStreamSinkOperatorX final
- : public DataSinkOperatorX<MultiCastDataStreamSinkLocalState> {
- using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
-
-public:
- friend class UnionSinkLocalState;
- MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
- const int cast_sender_count, ObjectPool*
pool,
- const TMultiCastDataStreamSink& sink,
- const RowDescriptor& row_desc)
- : Base(sink_id, sources),
- _pool(pool),
- _row_desc(row_desc),
- _cast_sender_count(cast_sender_count) {}
- ~MultiCastDataStreamSinkOperatorX() override = default;
- Status init(const TDataSink& tsink) override { return Status::OK(); }
-
- Status open(doris::RuntimeState* state) override { return Status::OK(); };
-
- Status prepare(RuntimeState* state) override { return Status::OK(); }
-
- Status sink(RuntimeState* state, vectorized::Block* in_block,
- SourceState source_state) override {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
- SCOPED_TIMER(local_state.profile()->total_time_counter());
- COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
- COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- auto st =
local_state._shared_state->_multi_cast_data_streamer->push(
- state, in_block, source_state == SourceState::FINISHED);
- // TODO: improvement: if sink returned END_OF_FILE, pipeline task
can be finished
- if (st.template is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
- }
- return st;
- }
- return Status::OK();
- }
-
- std::shared_ptr<pipeline::MultiCastDataStreamer>
multi_cast_data_streamer() {
- auto multi_cast_data_streamer =
std::make_shared<pipeline::MultiCastDataStreamer>(
- _row_desc, _pool, _cast_sender_count);
- return multi_cast_data_streamer;
- }
-
- RowDescriptor& row_desc() override { return _row_desc; }
-
-private:
- ObjectPool* _pool;
- RowDescriptor _row_desc;
- int _cast_sender_count;
- friend class MultiCastDataStreamSinkLocalState;
-};
-
} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 67a8ac6d4af..54a9603e7f6 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -124,7 +124,7 @@ Status UnionSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
return Status::OK();
}
-std::shared_ptr<DataQueue> UnionSourceLocalState::data_queue() {
+std::shared_ptr<DataQueue> UnionSourceLocalState::create_data_queue() {
auto& p = _parent->cast<Parent>();
std::shared_ptr<DataQueue> data_queue =
std::make_shared<DataQueue>(p._child_size, _dependency);
return data_queue;
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index a22a0e8c0a2..d02176fc8de 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -78,7 +78,7 @@ public:
UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
Status init(RuntimeState* state, LocalStateInfo& info) override;
- std::shared_ptr<DataQueue> data_queue();
+ std::shared_ptr<DataQueue> create_data_queue();
private:
friend class UnionSourceOperatorX;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 2ea0992bebc..c69b49870d5 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -111,7 +111,7 @@ protected:
class WriteDependency : public Dependency {
public:
WriteDependency(int id, std::string name) : Dependency(id, name),
_ready_for_write(true) {}
- virtual ~WriteDependency() = default;
+ ~WriteDependency() override = default;
bool is_write_dependency() override { return true; }
@@ -428,7 +428,7 @@ private:
struct MultiCastSharedState {
public:
- std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
+ std::shared_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
};
class MultiCastDependency final : public WriteDependency {
@@ -438,7 +438,7 @@ public:
~MultiCastDependency() override = default;
void* shared_state() override { return (void*)&_multi_cast_state; };
MultiCastDependency* can_read(const int consumer_id) {
- if
(_multi_cast_state._multi_cast_data_streamer->can_read(consumer_id)) {
+ if (_multi_cast_state.multi_cast_data_streamer->can_read(consumer_id))
{
return nullptr;
} else {
return this;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index c6331b04fbe..81bee3063de 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -37,6 +37,7 @@
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
+#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
@@ -257,20 +258,14 @@ Status DataSinkOperatorXBase::init(const TPlanNode&
tnode, RuntimeState* state)
return Status::OK();
}
-template <typename LocalStateType>
-Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState*
state,
-
LocalSinkStateInfo& info) {
- auto local_state = LocalStateType::create_shared(this, state);
- state->emplace_sink_local_state(id(), local_state);
- return local_state->init(state, info);
-}
-
template <typename LocalStateType>
Status DataSinkOperatorX<LocalStateType>::setup_local_states(
RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) {
DCHECK(infos.size() == 1);
for (auto& info : infos) {
- RETURN_IF_ERROR(setup_local_state(state, info));
+ auto local_state = LocalStateType::create_shared(this, state);
+ state->emplace_sink_local_state(id(), local_state);
+ RETURN_IF_ERROR(local_state->init(state, info));
}
return Status::OK();
}
@@ -279,12 +274,12 @@ template <>
Status
DataSinkOperatorX<MultiCastDataStreamSinkLocalState>::setup_local_states(
RuntimeState* state, std::vector<LocalSinkStateInfo>& infos) {
auto multi_cast_data_streamer =
-
static_cast<MultiCastDataStreamSinkOperatorX*>(this)->multi_cast_data_streamer();
+
static_cast<MultiCastDataStreamSinkOperatorX*>(this)->create_multi_cast_data_streamer();
for (auto& info : infos) {
auto local_state =
MultiCastDataStreamSinkLocalState::create_shared(this, state);
state->emplace_sink_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
- local_state->_shared_state->_multi_cast_data_streamer =
multi_cast_data_streamer;
+ local_state->_shared_state->multi_cast_data_streamer =
multi_cast_data_streamer;
}
return Status::OK();
@@ -331,7 +326,7 @@ Status
OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
RETURN_IF_ERROR(local_state->init(state, info));
if (child_count != 0) {
if (!data_queue) {
- data_queue = local_state->data_queue();
+ data_queue = local_state->create_data_queue();
}
local_state->_shared_state->data_queue = data_queue;
}
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 53a294412a2..149d28265e3 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -419,8 +419,6 @@ public:
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
- virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo&
info) = 0;
-
virtual Status setup_local_states(RuntimeState* state,
std::vector<LocalSinkStateInfo>& infos)
= 0;
@@ -529,8 +527,6 @@ public:
: DataSinkOperatorXBase(id, sources) {}
~DataSinkOperatorX() override = default;
- Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override;
-
Status setup_local_states(RuntimeState* state,
std::vector<LocalSinkStateInfo>& infos) override;
void get_dependency(std::vector<DependencySPtr>& dependency) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 4dedbce44f1..85a9527eeb6 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -60,6 +60,7 @@
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
+#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]