This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dc524a5f564 [refactor](cleancode) remove unused code from be (#35756)
dc524a5f564 is described below
commit dc524a5f564f180f2514bf19821a22af0afdcbcd
Author: yiguolei <[email protected]>
AuthorDate: Sat Jun 1 22:20:25 2024 +0800
[refactor](cleancode) remove unused code from be (#35756)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/exec/data_sink.h | 4 ----
be/src/exec/exec_node.h | 2 --
be/src/pipeline/exec/exchange_sink_buffer.cpp | 10 ----------
be/src/pipeline/exec/exchange_sink_buffer.h | 1 -
be/src/pipeline/exec/multi_cast_data_streamer.h | 8 --------
be/src/pipeline/pipeline_fragment_context.cpp | 3 ---
be/src/pipeline/pipeline_task.cpp | 17 +++++++++--------
be/src/pipeline/pipeline_task.h | 6 ++----
be/src/vec/exec/vanalytic_eval_node.cpp | 13 -------------
be/src/vec/exec/vanalytic_eval_node.h | 2 --
be/src/vec/exec/vpartition_sort_node.cpp | 5 -----
be/src/vec/exec/vpartition_sort_node.h | 1 -
be/src/vec/sink/async_writer_sink.h | 4 ----
be/src/vec/sink/multi_cast_data_stream_sink.h | 3 ---
be/src/vec/sink/vdata_stream_sender.cpp | 16 ----------------
be/src/vec/sink/vdata_stream_sender.h | 13 -------------
be/src/vec/sink/vmemory_scratch_sink.cpp | 4 ----
be/src/vec/sink/vmemory_scratch_sink.h | 2 --
be/src/vec/sink/writer/async_result_writer.h | 7 -------
19 files changed, 11 insertions(+), 110 deletions(-)
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 5258929ba79..2d76078e7e5 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -71,8 +71,6 @@ public:
return send(state, block, eos);
}
- [[nodiscard]] virtual bool is_pending_finish() const { return false; }
-
// Releases all resources that were allocated in prepare()/send().
// Further send() calls are illegal after calling close().
// It must be okay to call this multiple times. Subsequent calls should
@@ -102,8 +100,6 @@ public:
const RowDescriptor& row_desc() { return _row_desc; }
- virtual bool can_write() { return true; }
-
std::shared_ptr<QueryStatistics> get_query_statistics_ptr();
protected:
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 10b035835d7..2dedee61ba5 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -142,8 +142,6 @@ public:
return Status::OK();
}
- bool can_read() const { return _can_read; }
-
[[nodiscard]] virtual bool can_terminate_early() { return false; }
// Sink Data to ExecNode to do some stock work, both need impl with
method: get_result
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 8893db54cc5..e29991890f5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -106,16 +106,6 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}
-bool ExchangeSinkBuffer::can_write() const {
- size_t max_package_size =
- config::exchg_buffer_queue_capacity_factor *
_instance_to_package_queue.size();
- size_t total_package_size = 0;
- for (auto& [_, q] : _instance_to_package_queue) {
- total_package_size += q.size();
- }
- return total_package_size <= max_package_size;
-}
-
void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
if (_finish_dependency && _should_stop && all_done) {
_finish_dependency->set_ready();
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 8eed559e712..683a485f2ca 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -202,7 +202,6 @@ public:
Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
- bool can_write() const;
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 2078a729227..e812067e52c 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -56,14 +56,6 @@ public:
Status push(RuntimeState* state, vectorized::Block* block, bool eos);
- // use sink to check can_write, now always true after we support spill to
disk
- bool can_write() { return true; }
-
- bool can_read(int sender_idx) {
- std::lock_guard l(_mutex);
- return _sender_pos_to_read[sender_idx] != _multi_cast_blocks.end() ||
_eos;
- }
-
const RowDescriptor& row_desc() { return _row_desc; }
RuntimeProfile* profile() { return _profile; }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index dbfdaba6d91..8347892c6bf 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -189,9 +189,6 @@ void PipelineFragmentContext::cancel(const Status reason) {
// _exec_env->result_queue_mgr()->update_queue_status(id,
Status::Aborted(msg));
for (auto& tasks : _tasks) {
for (auto& task : tasks) {
- if (task->is_finished()) {
- continue;
- }
task->clear_blocking_state();
}
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 52a76828804..c43410e68a4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -429,7 +429,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_m
void PipelineTask::finalize() {
std::unique_lock<std::mutex> lc(_dependency_lock);
- _finished = true;
+ _finalized = true;
_sink_shared_state.reset();
_op_shared_states.clear();
_le_state_map.clear();
@@ -475,17 +475,18 @@ std::string PipelineTask::debug_string() {
debug_string_buffer,
"PipelineTask[this = {}, open = {}, eos = {}, finish = {}, dry run
= {}, elapse time "
"= {}s], block dependency = {}, is running = {}\noperators: ",
- (void*)this, _opened, _eos, _finished, _dry_run, elapsed,
- cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() :
"NULL", is_running());
+ (void*)this, _opened, _eos, _finalized, _dry_run, elapsed,
+ cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() :
"NULL",
+ is_running());
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
- _opened && !_finished ?
_operators[i]->debug_string(_state, i)
- : _operators[i]->debug_string(i));
+ _opened && !_finalized ?
_operators[i]->debug_string(_state, i)
+ :
_operators[i]->debug_string(i));
}
fmt::format_to(debug_string_buffer, "\n{}\n",
- _opened && !_finished ? _sink->debug_string(_state,
_operators.size())
- :
_sink->debug_string(_operators.size()));
- if (_finished) {
+ _opened && !_finalized ? _sink->debug_string(_state,
_operators.size())
+ :
_sink->debug_string(_operators.size()));
+ if (_finalized) {
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 6bc65905be6..20c83f6a97e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -84,8 +84,6 @@ public:
void finalize();
- bool is_finished() const { return _finished.load(); }
-
std::string debug_string();
bool is_pending_finish() {
@@ -142,7 +140,7 @@ public:
void clear_blocking_state() {
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
- if (!_finished) {
+ if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
@@ -303,7 +301,7 @@ private:
Dependency* _execution_dep = nullptr;
- std::atomic<bool> _finished {false};
+ std::atomic<bool> _finalized {false};
std::mutex _dependency_lock;
std::atomic<bool> _running {false};
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index fbd49aa145a..410964c1969 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -298,19 +298,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState*
state) {
return ExecNode::release_resource(state);
}
-//TODO: maybe could have better strategy, not noly when need data to sink data
-//even could get some resources in advance as soon as possible
-bool VAnalyticEvalNode::can_write() {
- return _need_more_input;
-}
-
-bool VAnalyticEvalNode::can_read() {
- if (_need_more_input) {
- return false;
- }
- return true;
-}
-
Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block*
block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vanalytic_eval_node.h
b/be/src/vec/exec/vanalytic_eval_node.h
index 45f7ce5b1e8..9b302b32ed6 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -83,8 +83,6 @@ public:
void release_resource(RuntimeState* state) override;
Status sink(doris::RuntimeState* state, vectorized::Block* input_block,
bool eos) override;
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) override;
- bool can_read();
- bool can_write();
protected:
using ExecNode::debug_string;
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp
b/be/src/vec/exec/vpartition_sort_node.cpp
index 15d8124c653..a8b986d130e 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -317,11 +317,6 @@ Status VPartitionSortNode::alloc_resource(RuntimeState*
state) {
return Status::OK();
}
-bool VPartitionSortNode::can_read() {
- std::lock_guard<std::mutex> lock(_buffer_mutex);
- return !_blocks_buffer.empty() || _can_read;
-}
-
Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block*
output_block,
bool* eos) {
SCOPED_TIMER(_exec_timer);
diff --git a/be/src/vec/exec/vpartition_sort_node.h
b/be/src/vec/exec/vpartition_sort_node.h
index 481a99719fb..a9edca80df9 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -233,7 +233,6 @@ public:
Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos)
override;
void debug_profile();
- bool can_read();
private:
Status _init_hash_method();
diff --git a/be/src/vec/sink/async_writer_sink.h
b/be/src/vec/sink/async_writer_sink.h
index 150ebeaf4f2..526064404a4 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -94,8 +94,6 @@ public:
return _writer->sink(block, eos);
}
- bool can_write() override { return _writer->can_write(); }
-
Status close(RuntimeState* state, Status exec_status) override {
// if the init failed, the _writer may be nullptr. so here need check
if (_writer) {
@@ -104,8 +102,6 @@ public:
return DataSink::close(state, exec_status);
}
- [[nodiscard]] bool is_pending_finish() const override { return
_writer->is_pending_finish(); }
-
protected:
const std::vector<TExpr>& _t_output_expr;
VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h
b/be/src/vec/sink/multi_cast_data_stream_sink.h
index 7cc057013aa..d6b85010c5a 100644
--- a/be/src/vec/sink/multi_cast_data_stream_sink.h
+++ b/be/src/vec/sink/multi_cast_data_stream_sink.h
@@ -41,9 +41,6 @@ public:
Status open(doris::RuntimeState* state) override { return Status::OK(); };
- // use sink to check can_write, now always true after we support spill to
disk
- bool can_write() override { return _multi_cast_data_streamer->can_write();
}
-
std::shared_ptr<pipeline::MultiCastDataStreamer>&
get_multi_cast_data_streamer() {
return _multi_cast_data_streamer;
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 63f2aa19515..529a8256e77 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -925,22 +925,6 @@ void
VDataStreamSender::register_pipeline_channels(pipeline::ExchangeSinkBuffer*
}
}
-bool VDataStreamSender::channel_all_can_write() {
- if ((_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1)
&&
- !_only_local_exchange) {
- // This condition means we need use broadcast buffer, so we should
make sure
- // there are available buffer before running pipeline
- return !_broadcast_pb_blocks->empty();
- } else {
- for (auto channel : _channels) {
- if (!channel->can_write()) {
- return false;
- }
- }
- return true;
- }
-}
-
template class Channel<pipeline::ExchangeSinkLocalState>;
template class Channel<VDataStreamSender>;
template class Channel<pipeline::ResultFileSinkLocalState>;
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 5ca31bcbe44..b6346787ebb 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -132,8 +132,6 @@ public:
void register_pipeline_channels(pipeline::ExchangeSinkBuffer* buffer);
- bool channel_all_can_write();
-
int sender_id() const { return _sender_id; }
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
@@ -327,17 +325,6 @@ public:
virtual void ch_roll_pb_block();
- bool can_write() {
- if (!is_local()) {
- return true;
- }
-
- // if local recvr queue mem over the exchange node mem limit, we must
ensure each queue
- // has one block to do merge sort in exchange node to prevent the
logic dead lock
- return !_local_recvr || _local_recvr->is_closed() ||
!_local_recvr->exceeds_limit(0) ||
- _local_recvr->sender_queue_empty(_parent->sender_id());
- }
-
bool is_receiver_eof() const { return
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
void set_receiver_eof(Status st) { _receiver_status = st; }
diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp
b/be/src/vec/sink/vmemory_scratch_sink.cpp
index eca9e65ab49..95266ba6de0 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.cpp
+++ b/be/src/vec/sink/vmemory_scratch_sink.cpp
@@ -99,10 +99,6 @@ Status MemoryScratchSink::open(RuntimeState* state) {
return VExpr::open(_output_vexpr_ctxs, state);
}
-bool MemoryScratchSink::can_write() {
- return _queue->size() < 10;
-}
-
Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
diff --git a/be/src/vec/sink/vmemory_scratch_sink.h
b/be/src/vec/sink/vmemory_scratch_sink.h
index 3a1dd9991d4..7e2d042cb8d 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.h
+++ b/be/src/vec/sink/vmemory_scratch_sink.h
@@ -62,8 +62,6 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
- bool can_write() override;
-
private:
Status _prepare_vexpr(RuntimeState* state);
cctz::time_zone _timezone_obj;
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index b1426a48806..5e21dc13e12 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -65,13 +65,6 @@ public:
virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
- bool can_write() {
- std::lock_guard l(_m);
- return _data_queue_is_available() || _is_finished();
- }
-
- [[nodiscard]] bool is_pending_finish() const { return
!_writer_thread_closed; }
-
// sink the block date to date queue, it is async
Status sink(Block* block, bool eos);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]