This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8a0cd3f2da6 [Chore](exec-version) fix some missing be_exec_version
request (#34653)
8a0cd3f2da6 is described below
commit 8a0cd3f2da60cabdd6744b5266f1c88dd7d6ebea
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 11 10:59:33 2024 +0800
[Chore](exec-version) fix some missing be_exec_version request (#34653)
fix some missing be_exec_version request
---
be/src/agent/be_exec_version_manager.h | 12 ++++++------
be/src/pipeline/exec/exchange_sink_buffer.cpp | 13 +++++++++++--
be/src/pipeline/exec/exchange_sink_buffer.h | 4 +++-
be/src/pipeline/exec/exchange_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/result_file_sink_operator.cpp | 17 +++++++----------
be/src/runtime/fragment_mgr.cpp | 1 +
be/src/runtime/runtime_state.cpp | 1 +
be/src/runtime/runtime_state.h | 6 +++---
be/src/vec/core/block.cpp | 3 ++-
be/src/vec/data_types/data_type_string.cpp | 5 -----
be/src/vec/sink/vdata_stream_sender.cpp | 2 +-
be/src/vec/sink/vdata_stream_sender.h | 22 ++++++++--------------
.../java/org/apache/doris/qe/SessionVariable.java | 1 +
13 files changed, 46 insertions(+), 45 deletions(-)
diff --git a/be/src/agent/be_exec_version_manager.h
b/be/src/agent/be_exec_version_manager.h
index 746542088be..8f8b43a2728 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -20,23 +20,23 @@
#include <fmt/format.h>
#include <glog/logging.h>
+#include "common/status.h"
+
namespace doris {
class BeExecVersionManager {
public:
BeExecVersionManager() = delete;
- static bool check_be_exec_version(int be_exec_version) {
+ static Status check_be_exec_version(int be_exec_version) {
if (be_exec_version > max_be_exec_version || be_exec_version <
min_be_exec_version) {
- LOG(WARNING) << fmt::format(
+ return Status::InternalError(
"Received be_exec_version is not supported,
be_exec_version={}, "
"min_be_exec_version={}, max_be_exec_version={}, maybe due
to FE version not "
- "match "
- "with BE.",
+ "match with BE.",
be_exec_version, min_be_exec_version, max_be_exec_version);
- return false;
}
- return true;
+ return Status::OK();
}
static int get_newest_version() { return max_be_exec_version; }
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index abbcf580271..9a4764a24b9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -157,6 +157,10 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
_rpc_channel_is_idle[ins_id] = false;
_busy_channels++;
}
+ if (request.block) {
+ RETURN_IF_ERROR(
+
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
+ }
_instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
@@ -187,6 +191,10 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
_rpc_channel_is_idle[ins_id] = false;
_busy_channels++;
}
+ if (request.block_holder->get_block()) {
+ RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
+ request.block_holder->get_block()->be_exec_version()));
+ }
_instance_to_broadcast_package_queue[ins_id].emplace(request);
}
if (send_now) {
@@ -216,7 +224,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
- if (request.block) {
+ if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
}
if (!request.exec_status.ok()) {
@@ -295,7 +303,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
- if (request.block_holder->get_block()) {
+ if (request.block_holder->get_block() &&
+ !request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
auto send_callback = request.channel->get_send_callback(id,
request.eos);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index cd948f654af..2a0d75e42cb 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -79,6 +79,8 @@ public:
PBlock* get_block() { return _pblock.get(); }
+ void reset_block() { _pblock->Clear(); }
+
private:
friend class BroadcastPBlockHolderMemLimiter;
std::unique_ptr<PBlock> _pblock;
@@ -190,7 +192,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx
{
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int
send_id, int be_number,
RuntimeState* state, ExchangeSinkLocalState* parent);
- ~ExchangeSinkBuffer() = default;
+ ~ExchangeSinkBuffer() override = default;
void register_sink(TUniqueId);
Status add_block(TransmitInfo&& request);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 11633f4fcf2..0ccded0b825 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -381,11 +381,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (serialized) {
auto cur_block =
local_state._serializer.get_block()->to_block();
if (!cur_block.empty()) {
-
static_cast<void>(local_state._serializer.serialize_block(
+
RETURN_IF_ERROR(local_state._serializer.serialize_block(
&cur_block, block_holder->get_block(),
local_state.channels.size()));
} else {
- block_holder->get_block()->Clear();
+ block_holder->reset_block();
}
local_state._broadcast_pb_mem_limiter->acquire(*block_holder);
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 93272f64911..381e234e78b 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -37,10 +37,7 @@
ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent
ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const
RowDescriptor& row_desc,
const std::vector<TExpr>&
t_output_expr)
- : DataSinkOperatorX(operator_id, 0),
- _row_desc(row_desc),
- _t_output_expr(t_output_expr),
- _is_top_sink(true) {}
+ : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc),
_t_output_expr(t_output_expr) {}
ResultFileSinkOperatorX::ResultFileSinkOperatorX(
int operator_id, const RowDescriptor& row_desc, const TResultFileSink&
sink,
@@ -59,9 +56,9 @@ ResultFileSinkLocalState::~ResultFileSinkLocalState() =
default;
Status ResultFileSinkOperatorX::init(const TDataSink& tsink) {
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::init(tsink));
- auto& sink = tsink.result_file_sink;
+ const auto& sink = tsink.result_file_sink;
CHECK(sink.__isset.file_options);
- _file_opts.reset(new ResultFileOptions(sink.file_options));
+ _file_opts = std::make_unique<ResultFileOptions>(sink.file_options);
CHECK(sink.__isset.storage_backend_type);
_storage_type = sink.storage_backend_type;
@@ -186,7 +183,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
} else {
if (final_status.ok()) {
bool all_receiver_eof = true;
- for (auto channel : _channels) {
+ for (auto* channel : _channels) {
if (!channel->is_receiver_eof()) {
all_receiver_eof = false;
break;
@@ -201,7 +198,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
if (_only_local_exchange) {
if (!_output_block->empty()) {
Status status;
- for (auto channel : _channels) {
+ for (auto* channel : _channels) {
if (!channel->is_receiver_eof()) {
status =
channel->send_local_block(_output_block.get());
HANDLE_CHANNEL_STATUS(state, channel, status);
@@ -221,10 +218,10 @@ Status ResultFileSinkLocalState::close(RuntimeState*
state, Status exec_status)
RETURN_IF_ERROR(_serializer->serialize_block(
&cur_block, _block_holder->get_block(),
_channels.size()));
} else {
- _block_holder->get_block()->Clear();
+ _block_holder->reset_block();
}
Status status;
- for (auto channel : _channels) {
+ for (auto* channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status =
channel->send_local_block(&cur_block);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6dab87e1592..2b0625207e2 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1025,6 +1025,7 @@ Status FragmentMgr::exec_external_plan_fragment(const
TScanOpenParams& params,
query_options.execution_timeout = params.execution_timeout;
query_options.mem_limit = params.mem_limit;
query_options.query_type = TQueryType::EXTERNAL;
+ query_options.be_exec_version = BeExecVersionManager::get_newest_version();
query_options.__set_enable_pipeline_x_engine(true);
exec_fragment_params.__set_query_options(query_options);
VLOG_ROW << "external exec_plan_fragment params is "
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 504f6deabf3..17349c93079 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -243,6 +243,7 @@ RuntimeState::RuntimeState()
_unreported_error_idx(0),
_per_fragment_instance_idx(0) {
_query_options.batch_size = DEFAULT_BATCH_SIZE;
+ _query_options.be_exec_version =
BeExecVersionManager::get_newest_version();
_timezone = TimezoneUtils::default_time_zone;
_timestamp_ms = 0;
_nano_seconds = 0;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 4df2b0a45a5..af628ff9e89 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -35,6 +35,7 @@
#include <utility>
#include <vector>
+#include "agent/be_exec_version_manager.h"
#include "cctz/time_zone.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/factory_creator.h"
@@ -350,9 +351,8 @@ public:
int32_t runtime_filter_max_in_num() const { return
_query_options.runtime_filter_max_in_num; }
int be_exec_version() const {
- if (!_query_options.__isset.be_exec_version) {
- return 0;
- }
+ DCHECK(_query_options.__isset.be_exec_version &&
+
BeExecVersionManager::check_be_exec_version(_query_options.be_exec_version));
return _query_options.be_exec_version;
}
bool enable_local_shuffle() const {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 95af060dfc7..676674e8ec0 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -86,7 +86,7 @@ Block::Block(const std::vector<SlotDescriptor*>& slots,
size_t block_size,
Status Block::deserialize(const PBlock& pblock) {
swap(Block());
int be_exec_version = pblock.has_be_exec_version() ?
pblock.be_exec_version() : 0;
- CHECK(BeExecVersionManager::check_be_exec_version(be_exec_version));
+
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
const char* buf = nullptr;
std::string compression_scratch;
@@ -885,6 +885,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
/*std::string* compressed_buffer,*/ size_t*
uncompressed_bytes,
size_t* compressed_bytes,
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data) const {
+
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
pblock->set_be_exec_version(be_exec_version);
// calc uncompressed size for allocation
diff --git a/be/src/vec/data_types/data_type_string.cpp
b/be/src/vec/data_types/data_type_string.cpp
index 644bfac4c31..05c763c0e3a 100644
--- a/be/src/vec/data_types/data_type_string.cpp
+++ b/be/src/vec/data_types/data_type_string.cpp
@@ -101,11 +101,6 @@ int64_t
DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column,
auto ptr = column.convert_to_full_column_if_const();
const auto& data_column = assert_cast<const ColumnString&>(*ptr.get());
- if (be_exec_version == 0) {
- return sizeof(IColumn::Offset) * (column.size() + 1) +
sizeof(uint64_t) +
- data_column.get_chars().size() + column.size();
- }
-
return sizeof(IColumn::Offset) * (column.size() + 1) +
sizeof(uint64_t) +
data_column.get_chars().size();
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 69053c6bdac..dd221c6aaa3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -244,7 +244,7 @@ Status Channel<Parent>::send_remote_block(PBlock* block,
bool eos, Status exec_s
if (!exec_status.ok()) {
exec_status.to_protobuf(_brpc_request->mutable_exec_status());
}
- if (block != nullptr) {
+ if (block != nullptr && !block->column_metas().empty()) {
_brpc_request->set_allocated_block(block);
}
_brpc_request->set_packet_seq(_packet_seq++);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 2dda2baf3da..0ceec97f1fc 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -25,7 +25,6 @@
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
-#include <stdint.h>
#include <atomic>
#include <cstddef>
@@ -33,6 +32,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <utility>
#include <vector>
#include "common/config.h"
@@ -113,17 +113,15 @@ public:
// combination. buffer_size is specified in bytes and a soft limit on
// how much tuple data is getting accumulated before being sent; it only
applies
// when data is added via add_row() and not sent directly via send_batch().
- Channel(Parent* parent, const RowDescriptor& row_desc, const
TNetworkAddress& brpc_dest,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id)
+ Channel(Parent* parent, const RowDescriptor& row_desc, TNetworkAddress
brpc_dest,
+ TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
: _parent(parent),
_row_desc(row_desc),
- _fragment_instance_id(fragment_instance_id),
+ _fragment_instance_id(std::move(fragment_instance_id)),
_dest_node_id(dest_node_id),
- _num_data_bytes_sent(0),
- _packet_seq(0),
_need_close(false),
_closed(false),
- _brpc_dest_addr(brpc_dest),
+ _brpc_dest_addr(std::move(brpc_dest)),
_is_local((_brpc_dest_addr.hostname ==
BackendOptions::get_localhost()) &&
(_brpc_dest_addr.port == config::brpc_port)),
_serializer(_parent, _is_local) {
@@ -224,8 +222,8 @@ protected:
PlanNodeId _dest_node_id;
// the number of RowBatch.data bytes sent successfully
- int64_t _num_data_bytes_sent;
- int64_t _packet_seq;
+ int64_t _num_data_bytes_sent {};
+ int64_t _packet_seq {};
bool _need_close;
bool _closed;
@@ -275,11 +273,7 @@ public:
ch_roll_pb_block();
}
- ~PipChannel() override {
- if (Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block) {
- delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block;
- }
- }
+ ~PipChannel() override { delete
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
void ch_roll_pb_block() override {
// We have two choices here.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a7f2a453d2e..ef3c27bad96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3606,6 +3606,7 @@ public class SessionVariable implements Serializable,
Writable {
queryOptions.setQueryTimeout(queryTimeoutS);
queryOptions.setInsertTimeout(insertTimeoutS);
queryOptions.setAnalyzeTimeout(analyzeTimeoutS);
+ queryOptions.setBeExecVersion(Config.be_exec_version);
return queryOptions;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]