This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b6702a1e3 [Bug](point query) cancel future when meet timeout in 
PointQueryExec (#21573)
3b6702a1e3 is described below

commit 3b6702a1e314a8dec18407def0c6bda8995e909e
Author: lihangyu <[email protected]>
AuthorDate: Tue Jul 25 18:18:09 2023 +0800

    [Bug](point query) cancel future when meet timeout in PointQueryExec 
(#21573)
    
    1. cancel future when meet timeout and add config to modify rpc timeout
    2. add config to modify numof BackendServiceProxy since under high 
concurrent work load GRPC channel will be blocked
---
 .../src/main/java/org/apache/doris/common/Config.java   |  8 ++++++++
 .../main/java/org/apache/doris/qe/PointQueryExec.java   | 17 +++++++++--------
 .../java/org/apache/doris/rpc/BackendServiceProxy.java  |  2 +-
 3 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 8135696228..636fd8a46f 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -399,6 +399,10 @@ public class Config extends ConfigBase {
     @ConfField(description = {"MySQL 服务的最大任务线程数", "The max number of task 
threads in MySQL service"})
     public static int max_mysql_service_task_threads_num = 4096;
 
+    @ConfField(description = {"BackendServiceProxy数量, 用于池化GRPC channel",
+            "BackendServiceProxy pool size for pooling GRPC channels."})
+    public static int backend_proxy_num = 48;
+
     @ConfField(description = {
             "集群 ID,用于内部认证。通常在集群第一次启动时,会随机生成一个 cluster id. 用户也可以手动指定。",
             "Cluster id used for internal authentication. Usually a random 
integer generated when master FE "
@@ -459,6 +463,10 @@ public class Config extends ConfigBase {
             "The timeout of RPC between FE and Broker, in milliseconds"})
     public static int broker_timeout_ms = 10000; // 10s
 
+    @ConfField(description = {"主键高并发点查短路径超时时间。",
+            "The timeout of RPC for high concurrenty short circuit query"})
+    public static int point_query_timeout_ms = 10000; // 10s
+
     @ConfField(mutable = true, masterOnly = true, description = {"Insert load 
的默认超时时间,单位是秒。",
             "Default timeout for insert load job, in seconds."})
     public static int insert_load_default_timeout_second = 14400; // 4 hour
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index e62e849be0..bda52b94ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -63,7 +63,7 @@ public class PointQueryExec {
     private ArrayList<Expr> outputExprs;
     private DescriptorTable descriptorTable;
     private long tabletID = 0;
-    private long timeoutMs = 1000; // default 1s
+    private long timeoutMs = Config.point_query_timeout_ms; // default 10s
 
     private boolean isCancel = false;
     private boolean isBinaryProtocol = false;
@@ -185,7 +185,7 @@ public class PointQueryExec {
             while (pResult == null) {
                 InternalService.PTabletKeyLookupRequest request = 
requestBuilder.build();
                 Future<InternalService.PTabletKeyLookupResponse> 
futureResponse =
-                        
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), 
request);
+                         
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), 
request);
                 long currentTs = System.currentTimeMillis();
                 if (currentTs >= timeoutTs) {
                     LOG.warn("fetch result timeout {}", 
backend.getBrpcAdress());
@@ -201,15 +201,20 @@ public class PointQueryExec {
                         status.setStatus(Status.CANCELLED);
                         return null;
                     }
+                } catch (TimeoutException e) {
+                    futureResponse.cancel(true);
+                    LOG.warn("fetch result timeout {}, addr {}", timeoutTs - 
currentTs, backend.getBrpcAdress());
+                    status.setStatus("query timeout");
+                    return null;
                 }
             }
         } catch (RpcException e) {
-            LOG.warn("fetch result rpc exception {}", backend.getBrpcAdress());
+            LOG.warn("fetch result rpc exception {}, e {}", 
backend.getBrpcAdress(), e);
             status.setRpcStatus(e.getMessage());
             SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
             return null;
         } catch (ExecutionException e) {
-            LOG.warn("fetch result execution exception {}", 
backend.getBrpcAdress());
+            LOG.warn("fetch result execution exception {}, addr {}", e, 
backend.getBrpcAdress());
             if (e.getMessage().contains("time out")) {
                 // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
                 status.setStatus(new Status(TStatusCode.TIMEOUT, 
e.getMessage()));
@@ -218,10 +223,6 @@ public class PointQueryExec {
                 SimpleScheduler.addToBlacklist(backend.getId(), 
e.getMessage());
             }
             return null;
-        } catch (TimeoutException e) {
-            LOG.warn("fetch result timeout {}", backend.getBrpcAdress());
-            status.setStatus("query timeout");
-            return null;
         }
         TStatusCode code = 
TStatusCode.findByValue(pResult.getStatus().getStatusCode());
         if (code != TStatusCode.OK) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index c7c6a144c6..39dfd7915f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -61,7 +61,7 @@ public class BackendServiceProxy {
     }
 
     private static class Holder {
-        private static final int PROXY_NUM = 20;
+        private static final int PROXY_NUM = Config.backend_proxy_num;
         private static BackendServiceProxy[] proxies = new 
BackendServiceProxy[PROXY_NUM];
         private static AtomicInteger count = new AtomicInteger();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to