This is an automated email from the ASF dual-hosted git repository.

morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new df232fe3aa7 [opt](point query) reduce point-query network overhead by 
resending query context on cache miss (#62836)
df232fe3aa7 is described below

commit df232fe3aa7f4bb54784b17b9c9583ce012823cc
Author: HonestManXin <[email protected]>
AuthorDate: Wed May 6 15:48:19 2026 +0800

    [opt](point query) reduce point-query network overhead by resending query 
context on cache miss (#62836)
    
    During each point query operation, the FE sends the queried table,
    output expression, and query option to the BE. When the hit rate of the
    BE lookup connection cache is very high, this request parameter is
    invalid.​
    If the request sent by FE to BE ignores these parameters, BE will
    request FE to resend these parameters when it clearly knows that its
    cache is missing. When the cache hit rate of BE lookup connection is
    very high, this benefit will be significant. Generally, only when a
    connection is newly established for the first time will there be an
    additional resend request (this can also be optimized in the future).​
    In our production environment, when performing 340,000 QPS/s queries on
    an 11-column table, the outbound network traffic of FE reaches as high
    as 2.29 GB/s, but the actual inbound network traffic of the Client is
    only 430 MB/s, which means that up to 1.9 GB/s of traffic is the request
    traffic sent by FE to BE. After applying this optimization, the outbound
    network traffic of FE is only 550 MB/s, and the query throughput under
    the same concurrency has also increased by 10%.
---
 be/src/service/internal_service.cpp                |   3 +
 be/src/service/point_query_executor.cpp            |  19 ++
 .../main/java/org/apache/doris/common/Config.java  |  21 +++
 .../org/apache/doris/qe/PointQueryExecutor.java    | 194 ++++++++++++++-------
 gensrc/proto/internal_service.proto                |   2 +
 .../data/point_query_p0/test_point_query.out       |  12 ++
 .../suites/point_query_p0/test_point_query.groovy  |  26 +++
 7 files changed, 214 insertions(+), 63 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 44373ad49a4..8d560e538c8 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -963,6 +963,9 @@ Status PInternalService::_tablet_fetch_data(const 
PTabletKeyLookupRequest* reque
                                             PTabletKeyLookupResponse* 
response) {
     PointQueryExecutor executor;
     RETURN_IF_ERROR(executor.init(request, response));
+    if (response->has_need_resend_query_context() && 
response->need_resend_query_context()) {
+        return Status::OK();
+    }
     RETURN_IF_ERROR(executor.lookup_up());
     executor.print_profile();
     return Status::OK();
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index 53dd2900052..441284a251b 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -285,6 +285,25 @@ Status PointQueryExecutor::init(const 
PTabletKeyLookupRequest* request,
         _reusable = cache_handle;
         _profile_metrics.hit_lookup_cache = true;
     } else {
+        // Lightweight request: FE may omit reusable query context and rely on 
uuid cache.
+        // If cache miss and required parameters are absent, ask FE to resend 
a full request.
+        if (uuid != 0 && (!request->has_desc_tbl() || 
request->desc_tbl().empty() ||
+                          !request->has_output_expr() || 
request->output_expr().empty() ||
+                          !request->has_query_options() || 
request->query_options().empty())) {
+            if (VLOG_DEBUG_IS_ON) {
+                VLOG_DEBUG << "lookup connection cache miss, ask FE to resend 
query context"
+                           << ", tablet_id=" << request->tablet_id()
+                           << ", uuid_high=" << request->uuid().uuid_high()
+                           << ", uuid_low=" << request->uuid().uuid_low();
+            }
+            response->set_need_resend_query_context(true);
+            return Status::OK();
+        }
+        if (uuid == 0 && (!request->has_desc_tbl() || 
request->desc_tbl().empty() ||
+                          !request->has_output_expr() || 
request->output_expr().empty())) {
+            return Status::InvalidArgument(
+                    "tablet_fetch_data requires desc_tbl/output_expr when uuid 
is not set");
+        }
         // init handle
         auto reusable_ptr = std::make_shared<Reusable>();
         TDescriptorTable t_desc_tbl;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ef00c3508ea..63cec327c20 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -888,6 +888,27 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int max_point_query_retry_time = 2;
 
+    /**
+     * If set to true, FE may omit heavy reusable parameters 
(desc_tbl/output_expr/query_options)
+     * in point lookup requests (PTabletKeyLookupRequest) when executing 
prepared statements.
+     * BE will first try to find reusable context from LookupConnectionCache 
by uuid; if missing,
+     * BE asks FE to resend a full request with these parameters via
+     * response.need_resend_query_context.
+     *
+     * This can greatly reduce FE outbound network throughput when cache hit 
rate is high.
+     */
+    @ConfField(mutable = true, description = {
+            "是否启用 point query 轻量请求。开启后,FE 在 PreparedStatement 执行阶段会优先省略"
+                    + " desc_tbl/output_expr/query_options,BE 若未命中可复用缓存则会要求 FE 
补发完整请求。"
+                    + "当 BE 侧缓存命中率较高时,可以显著降低 FE 的出网带宽。",
+            "Whether to enable lightweight point-query requests. When enabled, 
FE will omit"
+                    + " desc_tbl/output_expr/query_options on the first 
PreparedStatement execute"
+                    + " request, and BE will ask FE to resend the full request 
if reusable cache"
+                    + " is missing. This can significantly reduce FE outbound 
bandwidth when the"
+                    + " BE-side reusable cache hit rate is high."
+    })
+    public static boolean enable_lightweight_lookup_request = false;
+
     /**
      * The tryLock timeout configuration of catalog lock.
      * Normally it does not need to change, unless you need to test something.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
index 2327fd6669c..95a534e777d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
@@ -307,80 +307,63 @@ public class PointQueryExecutor implements CoordInterface 
{
         long timeoutTs = System.currentTimeMillis() + timeoutMs;
         RowBatch rowBatch = new RowBatch();
         InternalService.PTabletKeyLookupResponse pResult = null;
+        boolean includeQueryContext = true;
         try {
             
Preconditions.checkNotNull(shortCircuitQueryContext.serializedDescTable);
 
-            InternalService.PTabletKeyLookupRequest.Builder requestBuilder
-                    = InternalService.PTabletKeyLookupRequest.newBuilder()
-                    .setTabletId(tabletID)
-                    .setDescTbl(shortCircuitQueryContext.serializedDescTable)
-                    
.setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
-                    
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions)
-                    .setIsBinaryRow(ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE);
-            // Set timezone for functions like from_unixtime
-            String timeZone = 
ConnectContext.get().getSessionVariable().getTimeZone();
-            if ("CST".equals(timeZone)) {
-                timeZone = "Asia/Shanghai";
+            boolean allowLightweight = Config.enable_lightweight_lookup_request
+                    && ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE
+                    && shortCircuitQueryContext.cacheID != null;
+
+            includeQueryContext = !allowLightweight;
+
+            // First try: lightweight request (omit 
desc_tbl/output_expr/query_options) when enabled.
+            InternalService.PTabletKeyLookupRequest request = 
buildLookupRequest(includeQueryContext);
+            pResult = fetchTabletData(status, backend, request, timeoutTs);
+            if (pResult == null) {
+                return null;
             }
-            requestBuilder.setTimeZone(timeZone);
-            if (snapshotVisibleVersions != null && 
!snapshotVisibleVersions.isEmpty()) {
-                requestBuilder.setVersion(snapshotVisibleVersions.get(0));
+        } catch (TException e) {
+            throw e;
+        }
+
+        Status resultStatus = new Status(pResult.getStatus());
+        if (resultStatus.getErrorCode() != TStatusCode.OK) {
+            status.updateStatus(resultStatus.getErrorCode(), 
resultStatus.getErrorMsg());
+            return null;
+        }
+
+        // Lightweight request miss: resend a full request with query context.
+        if (pResult.hasNeedResendQueryContext() && 
pResult.getNeedResendQueryContext()) {
+            if (includeQueryContext) {
+                LOG.warn("backend {} requests query context resend for a full 
point-query request, tablet_id={}",
+                        backend.getBrpcAddress(), tabletID);
+                status.updateStatus(TStatusCode.INTERNAL_ERROR,
+                        "backend requests query context resend but request 
already contains query context");
+                return null;
             }
-            // Only set cacheID for prepared statement excute phase,
-            // otherwise leading to many redundant cost in BE side
-            if (shortCircuitQueryContext.cacheID != null
-                        && ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE) {
-                InternalService.UUID.Builder uuidBuilder = 
InternalService.UUID.newBuilder();
-                
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
-                
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
-                requestBuilder.setUuid(uuidBuilder);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("lookup query context miss on backend {}, resend 
full request, tablet_id={}",
+                        backend.getBrpcAddress(), tabletID);
             }
-            addKeyTuples(requestBuilder);
-
-            InternalService.PTabletKeyLookupRequest request = 
requestBuilder.build();
-            Future<InternalService.PTabletKeyLookupResponse> futureResponse =
-                    
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(),
 request);
-            long currentTs = System.currentTimeMillis();
-            if (currentTs >= timeoutTs) {
-                LOG.warn("fetch result timeout {}", backend.getBrpcAddress());
-                status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request 
timeout");
+            InternalService.PTabletKeyLookupRequest fullRequest = 
buildLookupRequest(true);
+            pResult = fetchTabletData(status, backend, fullRequest, timeoutTs);
+            if (pResult == null) {
                 return null;
             }
-            try {
-                pResult = futureResponse.get(timeoutTs - currentTs, 
TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                // continue to get result
-                LOG.warn("future get interrupted Exception");
-                if (isCancel) {
-                    status.updateStatus(TStatusCode.CANCELLED, "cancelled");
-                    return null;
-                }
-            } catch (TimeoutException e) {
-                futureResponse.cancel(true);
-                LOG.warn("fetch result timeout {}, addr {}", timeoutTs - 
currentTs, backend.getBrpcAddress());
-                status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch 
result timeout");
+            resultStatus = new Status(pResult.getStatus());
+            if (resultStatus.getErrorCode() != TStatusCode.OK) {
+                status.updateStatus(resultStatus.getErrorCode(), 
resultStatus.getErrorMsg());
                 return null;
             }
-        } catch (RpcException e) {
-            LOG.warn("query fetch rpc exception {}, e {}", 
backend.getBrpcAddress(), e);
-            status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
-            SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
-            return null;
-        } catch (ExecutionException e) {
-            LOG.warn("query fetch execution exception {}, addr {}", e, 
backend.getBrpcAddress());
-            if (e.getMessage().contains("time out")) {
-                // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
-                status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
-            } else {
-                status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, 
e.getMessage());
-                SimpleScheduler.addToBlacklist(backend.getId(), 
e.getMessage());
+            if (pResult.hasNeedResendQueryContext() && 
pResult.getNeedResendQueryContext()) {
+                LOG.warn("backend {} still requests query context resend after 
full point-query request, "
+                                + "tablet_id={}",
+                        backend.getBrpcAddress(), tabletID);
+                status.updateStatus(TStatusCode.INTERNAL_ERROR,
+                        "backend still requests query context resend after 
full request");
+                return null;
             }
-            return null;
-        }
-        Status resultStatus = new Status(pResult.getStatus());
-        if (resultStatus.getErrorCode() != TStatusCode.OK) {
-            status.updateStatus(resultStatus.getErrorCode(), 
resultStatus.getErrorMsg());
-            return null;
         }
 
         if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
@@ -416,6 +399,91 @@ public class PointQueryExecutor implements CoordInterface {
         return rowBatch;
     }
 
+    private InternalService.PTabletKeyLookupRequest buildLookupRequest(boolean 
includeQueryContext)
+            throws TException {
+        InternalService.PTabletKeyLookupRequest.Builder requestBuilder
+                = InternalService.PTabletKeyLookupRequest.newBuilder()
+                .setTabletId(tabletID)
+                .setIsBinaryRow(ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE);
+
+        if (includeQueryContext) {
+            
requestBuilder.setDescTbl(shortCircuitQueryContext.serializedDescTable)
+                    
.setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
+                    
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions);
+        }
+
+        // Set timezone for functions like from_unixtime
+        String timeZone = 
ConnectContext.get().getSessionVariable().getTimeZone();
+        if ("CST".equals(timeZone)) {
+            timeZone = "Asia/Shanghai";
+        }
+        requestBuilder.setTimeZone(timeZone);
+
+        if (snapshotVisibleVersions != null && 
!snapshotVisibleVersions.isEmpty()) {
+            requestBuilder.setVersion(snapshotVisibleVersions.get(0));
+        }
+
+        // Only set cacheID for prepared statement execute phase,
+        // otherwise leading to many redundant cost in BE side
+        if (shortCircuitQueryContext.cacheID != null
+                && ConnectContext.get().command == 
MysqlCommand.COM_STMT_EXECUTE) {
+            InternalService.UUID.Builder uuidBuilder = 
InternalService.UUID.newBuilder();
+            
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
+            
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
+            requestBuilder.setUuid(uuidBuilder);
+        }
+
+        addKeyTuples(requestBuilder);
+        return requestBuilder.build();
+    }
+
+    private InternalService.PTabletKeyLookupResponse fetchTabletData(
+            Status status, Backend backend, 
InternalService.PTabletKeyLookupRequest request, long timeoutTs) {
+        Future<InternalService.PTabletKeyLookupResponse> futureResponse;
+        try {
+            futureResponse = BackendServiceProxy.getInstance()
+                    .fetchTabletDataAsync(backend.getBrpcAddress(), request);
+        } catch (RpcException e) {
+            LOG.warn("query fetch rpc exception {}, e {}", 
backend.getBrpcAddress(), e);
+            status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
+            SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
+            return null;
+        }
+        long currentTs = System.currentTimeMillis();
+        if (currentTs >= timeoutTs) {
+            LOG.warn("fetch result timeout {}", backend.getBrpcAddress());
+            status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request 
timeout");
+            return null;
+        }
+        try {
+            return futureResponse.get(timeoutTs - currentTs, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            futureResponse.cancel(true);
+            Thread.currentThread().interrupt();
+            if (isCancel) {
+                status.updateStatus(TStatusCode.CANCELLED, "cancelled");
+            } else {
+                status.updateStatus(TStatusCode.INTERNAL_ERROR, "interrupted 
while waiting for point query result");
+            }
+            return null;
+        } catch (TimeoutException e) {
+            futureResponse.cancel(true);
+            LOG.warn("fetch result timeout {}, addr {}", timeoutTs - 
currentTs, backend.getBrpcAddress());
+            status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch 
result timeout");
+            return null;
+        } catch (ExecutionException e) {
+            LOG.warn("query fetch execution exception {}, addr {}", e, 
backend.getBrpcAddress());
+            if (e.getMessage() != null && e.getMessage().contains("time out")) 
{
+                // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
+                status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
+            } else {
+                status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, 
e.getMessage());
+                SimpleScheduler.addToBlacklist(backend.getId(), 
e.getMessage());
+            }
+            return null;
+        }
+    }
+
     public void cancel() {
         isCancel = true;
     }
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 50274c17627..43e77cd037e 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -393,6 +393,8 @@ message PTabletKeyLookupResponse {
     required PStatus status = 1;
     optional bytes row_batch = 2;
     optional bool empty_batch = 3;
+    // BE asks FE to resend once with full query context when lookup cache 
misses.
+    optional bool need_resend_query_context = 4;
 }
 
 //Add message definition to fetch and update cache
diff --git a/regression-test/data/point_query_p0/test_point_query.out 
b/regression-test/data/point_query_p0/test_point_query.out
index e03c0741865..1fbd2e227ac 100644
--- a/regression-test/data/point_query_p0/test_point_query.out
+++ b/regression-test/data/point_query_p0/test_point_query.out
@@ -149,6 +149,18 @@
 -- !sql --
 0      1       2       3
 
+-- !point_select_lightweight --
+1231   119291.110000000        ddd     laooq   \N      2020-01-01T12:36:38     
\N      1022-01-01      \N      1.111   [119181.111100000, 819019.119100000, 
null]      \N      0       0
+
+-- !point_select_lightweight --
+1231   119291.110000000        ddd     laooq   \N      2020-01-01T12:36:38     
\N      1022-01-01      \N      1.111   [119181.111100000, 819019.119100000, 
null]      \N      0       0
+
+-- !point_select_lightweight --
+1237   120939.111300000        a    ddd        laooq   2030-01-02      
2020-01-01T12:36:38     22.822  7022-01-01      false   90696620686827832.374   
[1.100000000, 2.200000000, 3.300000000, 4.400000000, 5.500000000]       []      
0       0
+
+-- !point_select_lightweight --
+1237   120939.111300000        a    ddd        laooq   2030-01-02      
2020-01-01T12:36:38     22.822  7022-01-01      false   90696620686827832.374   
[1.100000000, 2.200000000, 3.300000000, 4.400000000, 5.500000000]       []      
0       0
+
 -- !sql --
 0      1111111
 
diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy 
b/regression-test/suites/point_query_p0/test_point_query.groovy
index 431687c63a7..6bb17da5a09 100644
--- a/regression-test/suites/point_query_p0/test_point_query.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query.groovy
@@ -32,6 +32,8 @@ suite("test_point_query") {
     def user = context.config.jdbcUser
     def password = context.config.jdbcPassword
     def realDb = "regression_test_serving_p0"
+    def lightweightLookupConfig = sql """ ADMIN SHOW FRONTEND CONFIG LIKE 
'enable_lightweight_lookup_request'; """
+    String oldLightweightLookupValue = lightweightLookupConfig[0][1]
     // Parse url
     String jdbcUrl = context.config.jdbcUrl
     String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
@@ -271,6 +273,29 @@ suite("test_point_query") {
                 qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ * 
from ${tableName} where customer_key = 0"""
             }
         }
+
+        sql """ADMIN SET FRONTEND CONFIG ("enable_lightweight_lookup_request" 
= "true")"""
+        try {
+            connect(user, password, prepare_url) {
+                def lightweightStmt = prepareStatement "select /*+ 
SET_VAR(enable_nereids_planner=true) */ * from ${realDb}.tbl_point_query0 where 
k1 = ? and k2 = ? and k3 = ?"
+                assertEquals(lightweightStmt.class, 
com.mysql.cj.jdbc.ServerPreparedStatement);
+                lightweightStmt.setInt(1, 1231)
+                lightweightStmt.setBigDecimal(2, new BigDecimal("119291.11"))
+                lightweightStmt.setString(3, "ddd")
+                qe_point_select_lightweight lightweightStmt
+                qe_point_select_lightweight lightweightStmt
+
+                lightweightStmt.setInt(1, 1237)
+                lightweightStmt.setBigDecimal(2, new 
BigDecimal("120939.11130"))
+                lightweightStmt.setString(3, "a    ddd")
+                qe_point_select_lightweight lightweightStmt
+                qe_point_select_lightweight lightweightStmt
+                lightweightStmt.close()
+            }
+        } finally {
+            sql """ADMIN SET FRONTEND CONFIG 
("enable_lightweight_lookup_request" = "${oldLightweightLookupValue}")"""
+        }
+
         sql "DROP TABLE IF EXISTS test_ODS_EBA_LLREPORT";
         sql """
             CREATE TABLE `test_ODS_EBA_LLREPORT` (
@@ -312,6 +337,7 @@ suite("test_point_query") {
             WHERE
              aaaid = '1111111'"""
     } finally {
+        sql """ADMIN SET FRONTEND CONFIG ("enable_lightweight_lookup_request" 
= "${oldLightweightLookupValue}")"""
     }
 
     // test partial update/delete


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to