This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 36d6788  [Optimize] Use compact mode to send query plan thrift data 
structure. (#6702)
36d6788 is described below

commit 36d6788bc3e068512fb90bb61beed70113db9f22
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Sep 25 12:13:29 2021 +0800

    [Optimize] Use compact mode to send query plan thrift data structure. 
(#6702)
    
    In some cases, the query plan thrift structure of a query may be very large
    (for example, when there are many columns in SQL), resulting in a large 
number
    of "send fragment timeout" errors.
    
    This PR adds an FE config to control whether to transmit the query plan in 
a compressed format.
    
    Using compressed format transmission can reduce the size by ~50%. But it 
may reduce
    the concurrency by ~10%. Therefore, in the high concurrency small query 
scenario,
    you can choose to turn off compaction.
---
 be/src/service/internal_service.cpp                |  9 ++--
 be/src/service/internal_service.h                  |  2 +-
 docs/en/administrator-guide/config/fe_config.md    |  7 +++
 docs/zh-CN/administrator-guide/config/fe_config.md |  7 +++
 .../main/java/org/apache/doris/common/Config.java  |  8 +++
 .../main/java/org/apache/doris/qe/Coordinator.java |  5 +-
 .../org/apache/doris/rpc/BackendServiceClient.java | 36 ++++++++++++-
 .../org/apache/doris/rpc/BackendServiceProxy.java  | 61 ++++++++++++----------
 gensrc/proto/internal_service.proto                |  1 +
 9 files changed, 100 insertions(+), 36 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 58d743a..89dd7ef 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -88,10 +88,11 @@ void 
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     auto st = Status::OK();
     if (request->has_request()) {
-        st = _exec_plan_fragment(request->request());
+        bool compact = request->has_compact() ? request->compact() : false;
+        st = _exec_plan_fragment(request->request(), compact);
     } else {
         // TODO(yangzhengguo) this is just for compatible with old version, 
this should be removed in the release 0.15
-        st = _exec_plan_fragment(cntl->request_attachment().to_string());
+        st = _exec_plan_fragment(cntl->request_attachment().to_string(), 
false);
     }
     if (!st.ok()) {
         LOG(WARNING) << "exec plan fragment failed, errmsg=" << 
st.get_error_msg();
@@ -148,12 +149,12 @@ void 
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
 }
 
 template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& 
ser_request) {
+Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& 
ser_request, bool compact) {
     TExecPlanFragmentParams t_request;
     {
         const uint8_t* buf = (const uint8_t*)ser_request.data();
         uint32_t len = ser_request.size();
-        RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
+        RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, 
&t_request));
     }
     // LOG(INFO) << "exec plan fragment, fragment_instance_id=" << 
print_id(t_request.params.fragment_instance_id)
     //  << ", coord=" << t_request.coord << ", backend=" << 
t_request.backend_num;
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 5bbebce..f68243d 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -107,7 +107,7 @@ public:
                             google::protobuf::Closure* done) override;
 
 private:
-    Status _exec_plan_fragment(const std::string& s_request);
+    Status _exec_plan_fragment(const std::string& s_request, bool compact);
 
     Status _fold_constant_expr(const std::string& ser_request, 
PConstantExprResult* response);
 
diff --git a/docs/en/administrator-guide/config/fe_config.md 
b/docs/en/administrator-guide/config/fe_config.md
index 0dbadd6..fd58307 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -2038,3 +2038,10 @@ the transaction will be cleaned after 
transaction_clean_interval_second seconds
 The default value when user property max_query_instances is equal or less than 
0. This config is used to limit the max number of instances for a user. This 
parameter is less than or equal to 0 means unlimited.
 
 The default value is -1。
+
+### use_compact_thrift_rpc
+
+Default: true
+
+Whether to use compressed format to send query plan structure. After it is 
turned on, the size of the query plan structure can be reduced by about 50%, 
thereby avoiding some "send fragment timeout" errors.
+However, in some high-concurrency small query scenarios, the concurrency may 
be reduced by about 10%.
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md 
b/docs/zh-CN/administrator-guide/config/fe_config.md
index 7897212..3737c08 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -2048,3 +2048,10 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
 默认值:-1
 
 用户属性max_query_instances小于等于0时,使用该配置,用来限制单个用户同一时刻可使用的查询instance个数。该参数小于等于0表示无限制。
+
+### use_compact_thrift_rpc
+
+默认值:true
+
+是否使用压缩格式发送查询计划结构体。开启后,可以降低约50%的查询计划结构体大小,从而避免一些 "send fragment timeout" 错误。
+但是在某些高并发小查询场景下,可能会降低约10%的并发度。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 645e1d4..5504cde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1500,4 +1500,12 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = false, masterOnly = true)
     public static int default_schema_change_scheduler_interval_millisecond = 
500;
+
+    /*
+     * If set to true, the thrift structure of query plan will be sent to BE 
in compact mode.
+     * This will significantly reduce the size of rpc data, which can reduce 
the chance of rpc timeout.
+     * But this may slightly decrease the concurrency of queries, because 
compress and decompress cost more CPU.
+     */
+    @ConfField(mutable = true, masterOnly = false)
+    public static boolean use_compact_thrift_rpc = true;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e4196dc..4e67f81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -596,7 +596,8 @@ public class Coordinator {
                         
cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
                         switch (code) {
                             case TIMEOUT:
-                                throw new 
RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: 
" + pair.first.backend.getId());
+                                throw new 
RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: "
+                                        + pair.first.backend.getId());
                             case THRIFT_RPC_ERROR:
                                 
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
                                 throw new 
RpcException(pair.first.backend.getHost(), "rpc failed");
@@ -1903,7 +1904,6 @@ public class Coordinator {
                 if (this.hasCanceled) {
                     return false;
                 }
-                brpcAddress = toBrpcHost(address);
 
                 try {
                     
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
@@ -1940,7 +1940,6 @@ public class Coordinator {
         }
 
         public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentAsync() throws TException, RpcException {
-            TNetworkAddress brpcAddress = null;
             try {
                 brpcAddress = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
             } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 143b066..370f425 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -22,18 +22,26 @@ import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.PBackendServiceGrpc;
 import org.apache.doris.thrift.TNetworkAddress;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import io.grpc.ManagedChannel;
 import io.grpc.netty.NettyChannelBuilder;
 
 public class BackendServiceClient {
-    private static final int MAX_RETRY_NUM = 3;
+    public static final Logger LOG = 
LogManager.getLogger(BackendServiceClient.class);
+
+    private static final int MAX_RETRY_NUM = 0;
+    private final TNetworkAddress address;
     private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
     private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
     private final ManagedChannel channel;
 
     public BackendServiceClient(TNetworkAddress address) {
+        this.address = address;
         channel = NettyChannelBuilder.forAddress(address.getHostname(), 
address.getPort())
                 .flowControlWindow(Config.grpc_max_message_size_bytes)
                 .maxInboundMessageSize(Config.grpc_max_message_size_bytes)
@@ -92,4 +100,30 @@ public class BackendServiceClient {
     public Future<InternalService.PConstantExprResult> 
foldConstantExpr(InternalService.PConstantExprRequest request) {
         return stub.foldConstantExpr(request);
     }
+
+    public void shutdown() {
+        if (!channel.isShutdown()) {
+            channel.shutdown();
+            try {
+                if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
+                    LOG.warn("Timed out gracefully shutting down connection: 
{}. ", channel);
+                }
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+
+        if (!channel.isTerminated()) {
+            channel.shutdownNow();
+            try {
+                if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
+                    LOG.warn("Timed out forcefully shutting down connection: 
{}. ", channel);
+                }
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+
+        LOG.warn("shut down backend service client: {}", address);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 53b648f..e010fe8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.rpc;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -31,57 +32,55 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class BackendServiceProxy {
     private static final Logger LOG = 
LogManager.getLogger(BackendServiceProxy.class);
-    private static volatile BackendServiceProxy INSTANCE;
+    // use exclusive lock to make sure only one thread can add or remove 
client from serviceMap.
+    // use concurrent map to allow access serviceMap in multi thread.
+    private ReentrantLock lock = new ReentrantLock();
     private final Map<TNetworkAddress, BackendServiceClient> serviceMap;
-    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
     public BackendServiceProxy() {
-        serviceMap = Maps.newHashMap();
+        serviceMap = Maps.newConcurrentMap();
+    }
+
+    private static class SingletonHolder {
+        private static final BackendServiceProxy INSTANCE = new 
BackendServiceProxy();
     }
 
     public static BackendServiceProxy getInstance() {
-        if (INSTANCE == null) {
-            synchronized (BackendServiceProxy.class) {
-                if (INSTANCE == null) {
-                    INSTANCE = new BackendServiceProxy();
-                }
-            }
-        }
-        return INSTANCE;
+        return SingletonHolder.INSTANCE;
     }
 
     public void removeProxy(TNetworkAddress address) {
-        lock.writeLock().lock();
+        LOG.warn("begin to remove proxy: {}", address);
+        BackendServiceClient service;
+        lock.lock();
         try {
-            serviceMap.remove(address);
+            service = serviceMap.remove(address);
         } finally {
-            lock.writeLock().unlock();
+            lock.unlock();
         }
-    }
 
-    private BackendServiceClient getProxy(TNetworkAddress address) {
-        BackendServiceClient service;
-        lock.readLock().lock();
-        try {
-            service = serviceMap.get(address);
-        } finally {
-            lock.readLock().unlock();
+        if (service != null) {
+            service.shutdown();
         }
+    }
 
+    private BackendServiceClient getProxy(TNetworkAddress address) {
+        BackendServiceClient service = serviceMap.get(address);
         if (service != null) {
             return service;
         }
 
         // not exist, create one and return.
-        lock.writeLock().lock();
+        lock.lock();
         try {
             service = serviceMap.get(address);
             if (service == null) {
@@ -90,15 +89,23 @@ public class BackendServiceProxy {
             }
             return service;
         } finally {
-            lock.writeLock().unlock();
+            lock.unlock();
         }
     }
 
     public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentAsync(
             TNetworkAddress address, TExecPlanFragmentParams tRequest)
             throws TException, RpcException {
-        final InternalService.PExecPlanFragmentRequest pRequest = 
InternalService.PExecPlanFragmentRequest.newBuilder()
-                .setRequest(ByteString.copyFrom(new 
TSerializer().serialize(tRequest))).build();
+        InternalService.PExecPlanFragmentRequest.Builder builder = 
InternalService.PExecPlanFragmentRequest.newBuilder();
+        if (Config.use_compact_thrift_rpc) {
+            builder.setRequest(ByteString.copyFrom(new TSerializer(new 
TCompactProtocol.Factory()).serialize(tRequest)));
+            builder.setCompact(true);
+        } else {
+            builder.setRequest(ByteString.copyFrom(new 
TSerializer().serialize(tRequest))).build();
+            builder.setCompact(false);
+        }
+
+        final InternalService.PExecPlanFragmentRequest pRequest = 
builder.build();
         try {
             final BackendServiceClient client = getProxy(address);
             return client.execPlanFragmentAsync(pRequest);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 0b81b49..016f4d1 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -116,6 +116,7 @@ message PTabletWriterCancelResult {
 
 message PExecPlanFragmentRequest {
     optional bytes request = 1;
+    optional bool compact = 2;
 };
 
 message PExecPlanFragmentResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to