This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a3fd13fee67 [fix](catalog) set timeout for split fetch (#39346) 
(#39624)
a3fd13fee67 is described below

commit a3fd13fee671d9667e85d8d1be454f2bdb2a2b94
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Aug 20 21:59:55 2024 +0800

    [fix](catalog) set timeout for split fetch (#39346) (#39624)
    
    bp #39346
---
 be/src/vec/exec/scan/split_source_connector.cpp                  | 5 +++++
 .../src/main/java/org/apache/doris/datasource/SplitSource.java   | 9 +++++++--
 .../main/java/org/apache/doris/service/FrontendServiceImpl.java  | 6 +++++-
 gensrc/thrift/FrontendService.thrift                             | 1 +
 4 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/be/src/vec/exec/scan/split_source_connector.cpp 
b/be/src/vec/exec/scan/split_source_connector.cpp
index 478af522e76..cefe5b70216 100644
--- a/be/src/vec/exec/scan/split_source_connector.cpp
+++ b/be/src/vec/exec/scan/split_source_connector.cpp
@@ -47,6 +47,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, 
TFileRangeDesc* rang
     if (_scan_index == _scan_ranges.size() && !_last_batch) {
         SCOPED_TIMER(_get_split_timer);
         Status coord_status;
+        // No need to set timeout because on FE side, there is a max fetch time
         FrontendServiceConnection 
coord(_state->exec_env()->frontend_client_cache(),
                                         _state->get_query_ctx()->coord_addr, 
&coord_status);
         RETURN_IF_ERROR(coord_status);
@@ -56,6 +57,10 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, 
TFileRangeDesc* rang
         TFetchSplitBatchResult result;
         try {
             coord->fetchSplitBatch(result, request);
+            if (result.__isset.status && result.status.status_code != 
TStatusCode::OK) {
+                return Status::IOError<false>("Failed to get batch of split 
source: {}",
+                                              result.status.error_msgs[0]);
+            }
         } catch (std::exception& e) {
             return Status::IOError<false>("Failed to get batch of split 
source: {}", e.what());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
index 8515e686f36..e24af768781 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
@@ -73,7 +73,7 @@ public class SplitSource {
         }
         List<TScanRangeLocations> scanRanges = 
Lists.newArrayListWithExpectedSize(maxBatchSize);
         long startTime = System.currentTimeMillis();
-        while (scanRanges.size() < maxBatchSize) {
+        while (scanRanges.size() < maxBatchSize && System.currentTimeMillis() 
- startTime < maxWaitTime) {
             BlockingQueue<Collection<TScanRangeLocations>> splits = 
splitAssignment.getAssignedSplits(backend);
             if (splits == null) {
                 isLastBatch.set(true);
@@ -92,10 +92,15 @@ public class SplitSource {
                         break;
                     }
                 } catch (InterruptedException e) {
-                    throw new UserException("Failed to get next batch of 
splits", e);
+                    throw new UserException(e.getMessage(), e);
                 }
             }
         }
+
+        if (scanRanges.isEmpty() && !isLastBatch.get()) {
+            // This is timeout
+            throw new UserException("Timeout. Max wait time(ms): " + 
maxWaitTime);
+        }
         return scanRanges;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b04b5aa3892..7f727c51bab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -976,10 +976,14 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         try {
             List<TScanRangeLocations> locations = 
splitSource.getNextBatch(request.getMaxNumSplits());
             result.setSplits(locations);
+            result.status = new TStatus(TStatusCode.OK);
             return result;
         } catch (Exception e) {
-            throw new TException("Failed to get split source " + 
request.getSplitSourceId(), e);
+            LOG.warn("failed to fetch split batch with source id {}", 
request.getSplitSourceId(), e);
+            result.status = new TStatus(TStatusCode.INTERNAL_ERROR);
+            result.status.addToErrorMsgs(e.getMessage());
         }
+        return result;
     }
 
     @Override
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 7d5b94bd9fe..39edf990a63 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1478,6 +1478,7 @@ struct TFetchSplitBatchRequest {
 
 struct TFetchSplitBatchResult {
     1: optional list<Planner.TScanRangeLocations> splits
+    2: optional Status.TStatus status
 }
 
 struct TFetchRunningQueriesResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to