This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a0cd3f2da6 [Chore](exec-version) fix some missing be_exec_version 
request (#34653)
8a0cd3f2da6 is described below

commit 8a0cd3f2da60cabdd6744b5266f1c88dd7d6ebea
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 11 10:59:33 2024 +0800

    [Chore](exec-version) fix some missing be_exec_version request (#34653)
    
    fix some missing be_exec_version request
---
 be/src/agent/be_exec_version_manager.h             | 12 ++++++------
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 13 +++++++++++--
 be/src/pipeline/exec/exchange_sink_buffer.h        |  4 +++-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  4 ++--
 be/src/pipeline/exec/result_file_sink_operator.cpp | 17 +++++++----------
 be/src/runtime/fragment_mgr.cpp                    |  1 +
 be/src/runtime/runtime_state.cpp                   |  1 +
 be/src/runtime/runtime_state.h                     |  6 +++---
 be/src/vec/core/block.cpp                          |  3 ++-
 be/src/vec/data_types/data_type_string.cpp         |  5 -----
 be/src/vec/sink/vdata_stream_sender.cpp            |  2 +-
 be/src/vec/sink/vdata_stream_sender.h              | 22 ++++++++--------------
 .../java/org/apache/doris/qe/SessionVariable.java  |  1 +
 13 files changed, 46 insertions(+), 45 deletions(-)

diff --git a/be/src/agent/be_exec_version_manager.h 
b/be/src/agent/be_exec_version_manager.h
index 746542088be..8f8b43a2728 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -20,23 +20,23 @@
 #include <fmt/format.h>
 #include <glog/logging.h>
 
+#include "common/status.h"
+
 namespace doris {
 
 class BeExecVersionManager {
 public:
     BeExecVersionManager() = delete;
 
-    static bool check_be_exec_version(int be_exec_version) {
+    static Status check_be_exec_version(int be_exec_version) {
         if (be_exec_version > max_be_exec_version || be_exec_version < 
min_be_exec_version) {
-            LOG(WARNING) << fmt::format(
+            return Status::InternalError(
                     "Received be_exec_version is not supported, 
be_exec_version={}, "
                     "min_be_exec_version={}, max_be_exec_version={}, maybe due 
to FE version not "
-                    "match "
-                    "with BE.",
+                    "match with BE.",
                     be_exec_version, min_be_exec_version, max_be_exec_version);
-            return false;
         }
-        return true;
+        return Status::OK();
     }
 
     static int get_newest_version() { return max_be_exec_version; }
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index abbcf580271..9a4764a24b9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -157,6 +157,10 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
             _rpc_channel_is_idle[ins_id] = false;
             _busy_channels++;
         }
+        if (request.block) {
+            RETURN_IF_ERROR(
+                    
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
+        }
         _instance_to_package_queue[ins_id].emplace(std::move(request));
         _total_queue_size++;
         if (_queue_dependency && _total_queue_size > _queue_capacity) {
@@ -187,6 +191,10 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
             _rpc_channel_is_idle[ins_id] = false;
             _busy_channels++;
         }
+        if (request.block_holder->get_block()) {
+            RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
+                    request.block_holder->get_block()->be_exec_version()));
+        }
         _instance_to_broadcast_package_queue[ins_id].emplace(request);
     }
     if (send_now) {
@@ -216,7 +224,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
-        if (request.block) {
+        if (request.block && !request.block->column_metas().empty()) {
             brpc_request->set_allocated_block(request.block.get());
         }
         if (!request.exec_status.ok()) {
@@ -295,7 +303,8 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
-        if (request.block_holder->get_block()) {
+        if (request.block_holder->get_block() &&
+            !request.block_holder->get_block()->column_metas().empty()) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index cd948f654af..2a0d75e42cb 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -79,6 +79,8 @@ public:
 
     PBlock* get_block() { return _pblock.get(); }
 
+    void reset_block() { _pblock->Clear(); }
+
 private:
     friend class BroadcastPBlockHolderMemLimiter;
     std::unique_ptr<PBlock> _pblock;
@@ -190,7 +192,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx 
{
 public:
     ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, int be_number,
                        RuntimeState* state, ExchangeSinkLocalState* parent);
-    ~ExchangeSinkBuffer() = default;
+    ~ExchangeSinkBuffer() override = default;
     void register_sink(TUniqueId);
 
     Status add_block(TransmitInfo&& request);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 11633f4fcf2..0ccded0b825 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -381,11 +381,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                 if (serialized) {
                     auto cur_block = 
local_state._serializer.get_block()->to_block();
                     if (!cur_block.empty()) {
-                        
static_cast<void>(local_state._serializer.serialize_block(
+                        
RETURN_IF_ERROR(local_state._serializer.serialize_block(
                                 &cur_block, block_holder->get_block(),
                                 local_state.channels.size()));
                     } else {
-                        block_holder->get_block()->Clear();
+                        block_holder->reset_block();
                     }
 
                     
local_state._broadcast_pb_mem_limiter->acquire(*block_holder);
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 93272f64911..381e234e78b 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -37,10 +37,7 @@ 
ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent
 
 ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const 
RowDescriptor& row_desc,
                                                  const std::vector<TExpr>& 
t_output_expr)
-        : DataSinkOperatorX(operator_id, 0),
-          _row_desc(row_desc),
-          _t_output_expr(t_output_expr),
-          _is_top_sink(true) {}
+        : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), 
_t_output_expr(t_output_expr) {}
 
 ResultFileSinkOperatorX::ResultFileSinkOperatorX(
         int operator_id, const RowDescriptor& row_desc, const TResultFileSink& 
sink,
@@ -59,9 +56,9 @@ ResultFileSinkLocalState::~ResultFileSinkLocalState() = 
default;
 
 Status ResultFileSinkOperatorX::init(const TDataSink& tsink) {
     RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::init(tsink));
-    auto& sink = tsink.result_file_sink;
+    const auto& sink = tsink.result_file_sink;
     CHECK(sink.__isset.file_options);
-    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    _file_opts = std::make_unique<ResultFileOptions>(sink.file_options);
     CHECK(sink.__isset.storage_backend_type);
     _storage_type = sink.storage_backend_type;
 
@@ -186,7 +183,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
     } else {
         if (final_status.ok()) {
             bool all_receiver_eof = true;
-            for (auto channel : _channels) {
+            for (auto* channel : _channels) {
                 if (!channel->is_receiver_eof()) {
                     all_receiver_eof = false;
                     break;
@@ -201,7 +198,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
             if (_only_local_exchange) {
                 if (!_output_block->empty()) {
                     Status status;
-                    for (auto channel : _channels) {
+                    for (auto* channel : _channels) {
                         if (!channel->is_receiver_eof()) {
                             status = 
channel->send_local_block(_output_block.get());
                             HANDLE_CHANNEL_STATUS(state, channel, status);
@@ -221,10 +218,10 @@ Status ResultFileSinkLocalState::close(RuntimeState* 
state, Status exec_status)
                             RETURN_IF_ERROR(_serializer->serialize_block(
                                     &cur_block, _block_holder->get_block(), 
_channels.size()));
                         } else {
-                            _block_holder->get_block()->Clear();
+                            _block_holder->reset_block();
                         }
                         Status status;
-                        for (auto channel : _channels) {
+                        for (auto* channel : _channels) {
                             if (!channel->is_receiver_eof()) {
                                 if (channel->is_local()) {
                                     status = 
channel->send_local_block(&cur_block);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 6dab87e1592..2b0625207e2 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1025,6 +1025,7 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
     query_options.execution_timeout = params.execution_timeout;
     query_options.mem_limit = params.mem_limit;
     query_options.query_type = TQueryType::EXTERNAL;
+    query_options.be_exec_version = BeExecVersionManager::get_newest_version();
     query_options.__set_enable_pipeline_x_engine(true);
     exec_fragment_params.__set_query_options(query_options);
     VLOG_ROW << "external exec_plan_fragment params is "
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 504f6deabf3..17349c93079 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -243,6 +243,7 @@ RuntimeState::RuntimeState()
           _unreported_error_idx(0),
           _per_fragment_instance_idx(0) {
     _query_options.batch_size = DEFAULT_BATCH_SIZE;
+    _query_options.be_exec_version = 
BeExecVersionManager::get_newest_version();
     _timezone = TimezoneUtils::default_time_zone;
     _timestamp_ms = 0;
     _nano_seconds = 0;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 4df2b0a45a5..af628ff9e89 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -35,6 +35,7 @@
 #include <utility>
 #include <vector>
 
+#include "agent/be_exec_version_manager.h"
 #include "cctz/time_zone.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/factory_creator.h"
@@ -350,9 +351,8 @@ public:
     int32_t runtime_filter_max_in_num() const { return 
_query_options.runtime_filter_max_in_num; }
 
     int be_exec_version() const {
-        if (!_query_options.__isset.be_exec_version) {
-            return 0;
-        }
+        DCHECK(_query_options.__isset.be_exec_version &&
+               
BeExecVersionManager::check_be_exec_version(_query_options.be_exec_version));
         return _query_options.be_exec_version;
     }
     bool enable_local_shuffle() const {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 95af060dfc7..676674e8ec0 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -86,7 +86,7 @@ Block::Block(const std::vector<SlotDescriptor*>& slots, 
size_t block_size,
 Status Block::deserialize(const PBlock& pblock) {
     swap(Block());
     int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
-    CHECK(BeExecVersionManager::check_be_exec_version(be_exec_version));
+    
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
 
     const char* buf = nullptr;
     std::string compression_scratch;
@@ -885,6 +885,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
                         /*std::string* compressed_buffer,*/ size_t* 
uncompressed_bytes,
                         size_t* compressed_bytes, 
segment_v2::CompressionTypePB compression_type,
                         bool allow_transfer_large_data) const {
+    
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
     pblock->set_be_exec_version(be_exec_version);
 
     // calc uncompressed size for allocation
diff --git a/be/src/vec/data_types/data_type_string.cpp 
b/be/src/vec/data_types/data_type_string.cpp
index 644bfac4c31..05c763c0e3a 100644
--- a/be/src/vec/data_types/data_type_string.cpp
+++ b/be/src/vec/data_types/data_type_string.cpp
@@ -101,11 +101,6 @@ int64_t 
DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column,
         auto ptr = column.convert_to_full_column_if_const();
         const auto& data_column = assert_cast<const ColumnString&>(*ptr.get());
 
-        if (be_exec_version == 0) {
-            return sizeof(IColumn::Offset) * (column.size() + 1) + 
sizeof(uint64_t) +
-                   data_column.get_chars().size() + column.size();
-        }
-
         return sizeof(IColumn::Offset) * (column.size() + 1) + 
sizeof(uint64_t) +
                data_column.get_chars().size();
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 69053c6bdac..dd221c6aaa3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -244,7 +244,7 @@ Status Channel<Parent>::send_remote_block(PBlock* block, 
bool eos, Status exec_s
     if (!exec_status.ok()) {
         exec_status.to_protobuf(_brpc_request->mutable_exec_status());
     }
-    if (block != nullptr) {
+    if (block != nullptr && !block->column_metas().empty()) {
         _brpc_request->set_allocated_block(block);
     }
     _brpc_request->set_packet_seq(_packet_seq++);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 2dda2baf3da..0ceec97f1fc 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -25,7 +25,6 @@
 #include <gen_cpp/data.pb.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
-#include <stdint.h>
 
 #include <atomic>
 #include <cstddef>
@@ -33,6 +32,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "common/config.h"
@@ -113,17 +113,15 @@ public:
     // combination. buffer_size is specified in bytes and a soft limit on
     // how much tuple data is getting accumulated before being sent; it only 
applies
     // when data is added via add_row() and not sent directly via send_batch().
-    Channel(Parent* parent, const RowDescriptor& row_desc, const 
TNetworkAddress& brpc_dest,
-            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id)
+    Channel(Parent* parent, const RowDescriptor& row_desc, TNetworkAddress 
brpc_dest,
+            TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
             : _parent(parent),
               _row_desc(row_desc),
-              _fragment_instance_id(fragment_instance_id),
+              _fragment_instance_id(std::move(fragment_instance_id)),
               _dest_node_id(dest_node_id),
-              _num_data_bytes_sent(0),
-              _packet_seq(0),
               _need_close(false),
               _closed(false),
-              _brpc_dest_addr(brpc_dest),
+              _brpc_dest_addr(std::move(brpc_dest)),
               _is_local((_brpc_dest_addr.hostname == 
BackendOptions::get_localhost()) &&
                         (_brpc_dest_addr.port == config::brpc_port)),
               _serializer(_parent, _is_local) {
@@ -224,8 +222,8 @@ protected:
     PlanNodeId _dest_node_id;
 
     // the number of RowBatch.data bytes sent successfully
-    int64_t _num_data_bytes_sent;
-    int64_t _packet_seq;
+    int64_t _num_data_bytes_sent {};
+    int64_t _packet_seq {};
 
     bool _need_close;
     bool _closed;
@@ -275,11 +273,7 @@ public:
         ch_roll_pb_block();
     }
 
-    ~PipChannel() override {
-        if (Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block) {
-            delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block;
-        }
-    }
+    ~PipChannel() override { delete 
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
 
     void ch_roll_pb_block() override {
         // We have two choices here.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a7f2a453d2e..ef3c27bad96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3606,6 +3606,7 @@ public class SessionVariable implements Serializable, 
Writable {
         queryOptions.setQueryTimeout(queryTimeoutS);
         queryOptions.setInsertTimeout(insertTimeoutS);
         queryOptions.setAnalyzeTimeout(analyzeTimeoutS);
+        queryOptions.setBeExecVersion(Config.be_exec_version);
         return queryOptions;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to