This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 633f2d52a4 [minor](log) add some logs (#17287)
633f2d52a4 is described below

commit 633f2d52a41a60c6d2c706a0684fa3bb3a4e8bec
Author: Gabriel <[email protected]>
AuthorDate: Wed Mar 1 22:41:50 2023 +0800

    [minor](log) add some logs (#17287)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 124 +++++++++++++++++---------
 be/src/vec/sink/vtablet_sink.cpp              |  10 ++-
 2 files changed, 87 insertions(+), 47 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index aa3948e0a1..da536d66ec 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -197,53 +197,91 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         return Status::OK();
     }
 
-#define DO_RPC(QUEUE, BLOCK, HOLDER)                                           
                    \
-    auto& request = QUEUE.front();                                             
                    \
-    if (!_instance_to_request[id]) {                                           
                    \
-        _construct_request(id);                                                
                    \
-    }                                                                          
                    \
-    auto brpc_request = _instance_to_request[id];                              
                    \
-    brpc_request->set_eos(request.eos);                                        
                    \
-    brpc_request->set_packet_seq(_instance_to_seq[id]++);                      
                    \
-    if (request.BLOCK) {                                                       
                    \
-        brpc_request->set_allocated_block(request.BLOCK);                      
                    \
-    }                                                                          
                    \
-    auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, 
request.eos, HOLDER);          \
-    _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);          
                    \
-    _closure->addFailedHandler(                                                
                    \
-            [&](const InstanceLoId& id, const std::string& err) { _failed(id, 
err); });            \
-    _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,   
                    \
-                                    const PTransmitDataResult& result) {       
                    \
-        Status s = Status(result.status());                                    
                    \
-        if (!s.ok()) {                                                         
                    \
-            _failed(id,                                                        
                    \
-                    fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));   \
-        } else if (eos) {                                                      
                    \
-            _ended(id);                                                        
                    \
-        } else {                                                               
                    \
-            _send_rpc(id);                                                     
                    \
-        }                                                                      
                    \
-    });                                                                        
                    \
-    {                                                                          
                    \
-        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
    \
-        if (enable_http_send_block(*brpc_request)) {                           
                    \
-            RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), 
_closure,           \
-                                                *brpc_request, 
request.channel->_brpc_dest_addr)); \
-        } else {                                                               
                    \
-            transmit_block(*request.channel->_brpc_stub, _closure, 
*brpc_request);                 \
-        }                                                                      
                    \
-    }                                                                          
                    \
-    if (request.BLOCK) {                                                       
                    \
-        brpc_request->release_block();                                         
                    \
-    }                                                                          
                    \
-    QUEUE.pop();
-
     if (!q.empty()) {
         // If we have data to shuffle which is not broadcasted
-        DO_RPC(q, block.get(), nullptr)
+        auto& request = q.front();
+        if (!_instance_to_request[id]) {
+            _construct_request(id);
+        }
+        auto brpc_request = _instance_to_request[id];
+        brpc_request->set_eos(request.eos);
+        brpc_request->set_packet_seq(_instance_to_seq[id]++);
+        if (request.block) {
+            brpc_request->set_allocated_block(request.block.get());
+        }
+        auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, 
request.eos, nullptr);
+        _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        _closure->addFailedHandler(
+                [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
+        _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& 
eos,
+                                        const PTransmitDataResult& result) {
+            Status s = Status(result.status());
+            if (!s.ok()) {
+                _failed(id,
+                        fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));
+            } else if (eos) {
+                _ended(id);
+            } else {
+                _send_rpc(id);
+            }
+        });
+        {
+            
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+            if (enable_http_send_block(*brpc_request)) {
+                
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure,
+                                                    *brpc_request,
+                                                    
request.channel->_brpc_dest_addr));
+            } else {
+                transmit_block(*request.channel->_brpc_stub, _closure, 
*brpc_request);
+            }
+        }
+        if (request.block) {
+            brpc_request->release_block();
+        }
+        q.pop();
     } else if (!broadcast_q.empty()) {
         // If we have data to shuffle which is broadcasted
-        DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder)
+        auto& request = broadcast_q.front();
+        if (!_instance_to_request[id]) {
+            _construct_request(id);
+        }
+        auto brpc_request = _instance_to_request[id];
+        brpc_request->set_eos(request.eos);
+        brpc_request->set_packet_seq(_instance_to_seq[id]++);
+        if (request.block_holder->get_block()) {
+            
brpc_request->set_allocated_block(request.block_holder->get_block());
+        }
+        auto* _closure =
+                new SelfDeleteClosure<PTransmitDataResult>(id, request.eos, 
request.block_holder);
+        _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
+        _closure->addFailedHandler(
+                [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
+        _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& 
eos,
+                                        const PTransmitDataResult& result) {
+            Status s = Status(result.status());
+            if (!s.ok()) {
+                _failed(id,
+                        fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));
+            } else if (eos) {
+                _ended(id);
+            } else {
+                _send_rpc(id);
+            }
+        });
+        {
+            
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+            if (enable_http_send_block(*brpc_request)) {
+                
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure,
+                                                    *brpc_request,
+                                                    
request.channel->_brpc_dest_addr));
+            } else {
+                transmit_block(*request.channel->_brpc_stub, _closure, 
*brpc_request);
+            }
+        }
+        if (request.block_holder->get_block()) {
+            brpc_request->release_block();
+        }
+        broadcast_q.pop();
     } else {
         _instance_to_sending_by_pipeline[id] = true;
         return Status::OK();
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index bd35929f85..ac58b4f3e7 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1384,13 +1384,15 @@ Status VOlapTableSink::_validate_column(RuntimeState* 
state, const TypeDescripto
                         invalid = true;
                     }
                 }
-
-                if (dec_val > _get_decimalv2_min_or_max<false>(type) ||
-                    dec_val < _get_decimalv2_min_or_max<true>(type)) {
+                const auto& max_decimalv2 = 
_get_decimalv2_min_or_max<false>(type);
+                const auto& min_decimalv2 = 
_get_decimalv2_min_or_max<true>(type);
+                if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
                     fmt::format_to(error_msg, "{}", "decimal value is not 
valid for definition");
                     fmt::format_to(error_msg, ", value={}", 
dec_val.to_string());
-                    fmt::format_to(error_msg, ", precision={}, scale={}; ", 
type.precision,
+                    fmt::format_to(error_msg, ", precision={}, scale={}", 
type.precision,
                                    type.scale);
+                    fmt::format_to(error_msg, ", min={}, max={}; ", 
min_decimalv2.to_string(),
+                                   max_decimalv2.to_string());
                     invalid = true;
                 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to