This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 038be784cce [bugfix](jni) using light pool to do transmit block 
(#35256)
038be784cce is described below

commit 038be784cce3dc0105340b68745b877ff3af1371
Author: yiguolei <[email protected]>
AuthorDate: Fri May 24 14:31:22 2024 +0800

    [bugfix](jni) using light pool to do transmit block (#35256)
    
    * [bugfix](jni) using light pool to do transmit block
    
    * f
    
    * f
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/common/config.cpp            |  1 +
 be/src/common/config.h              |  1 +
 be/src/service/internal_service.cpp | 30 +++++++++++++++++++++++-------
 3 files changed, 25 insertions(+), 7 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0a1045d736e..2c6f200ab25 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -499,6 +499,7 @@ DEFINE_Int32(brpc_heavy_work_pool_threads, "-1");
 DEFINE_Int32(brpc_light_work_pool_threads, "-1");
 DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
 DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
+DEFINE_mBool(enable_bthread_transmit_block, "true");
 
 // The maximum amount of data that can be processed by a stream load
 DEFINE_mInt64(streaming_load_max_mb, "10240");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 50e629b6795..fcd22ae24c9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -549,6 +549,7 @@ DECLARE_Int32(brpc_heavy_work_pool_threads);
 DECLARE_Int32(brpc_light_work_pool_threads);
 DECLARE_Int32(brpc_heavy_work_pool_max_queue_size);
 DECLARE_Int32(brpc_light_work_pool_max_queue_size);
+DECLARE_mBool(enable_bthread_transmit_block);
 
 // The maximum amount of data that can be processed by a stream load
 DECLARE_mInt64(streaming_load_max_mb);
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 34de650809a..a91ef8b4dc1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1498,13 +1498,29 @@ void 
PInternalService::transmit_block(google::protobuf::RpcController* controlle
                                       const PTransmitDataParams* request,
                                       PTransmitDataResult* response,
                                       google::protobuf::Closure* done) {
-    int64_t receive_time = GetCurrentTimeNanos();
-    response->set_receive_time(receive_time);
-
-    // under high concurrency, thread pool will have a lot of lock contention.
-    // May offer failed to the thread pool, so that we should avoid using 
thread
-    // pool here.
-    _transmit_block(controller, request, response, done, Status::OK());
+    if (config::enable_bthread_transmit_block) {
+        int64_t receive_time = GetCurrentTimeNanos();
+        response->set_receive_time(receive_time);
+        // under high concurrency, thread pool will have a lot of lock 
contention.
+        // May offer failed to the thread pool, so that we should avoid using 
thread
+        // pool here.
+        _transmit_block(controller, request, response, done, Status::OK());
+    } else {
+        bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+            int64_t receive_time = GetCurrentTimeNanos();
+            response->set_receive_time(receive_time);
+            // Sometimes transmit block function is the last owner of 
PlanFragmentExecutor
+            // It will release the object. And the object maybe a JNIContext.
+            // JNIContext will hold some TLS object. It could not work 
correctly under bthread
+            // Context. So that put the logic into pthread.
+            // But this is rarely happens, so this config is disabled by 
default.
+            _transmit_block(controller, request, response, done, Status::OK());
+        });
+        if (!ret) {
+            offer_failed(response, done, _light_work_pool);
+            return;
+        }
+    }
 }
 
 void PInternalService::transmit_block_by_http(google::protobuf::RpcController* 
controller,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to