This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aa30b5a6115 [fix](catalog) set timeout for split fetch (#39346)
aa30b5a6115 is described below
commit aa30b5a6115d07bda2c0e0973ff77baefb83c78c
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Aug 16 09:40:57 2024 +0800
[fix](catalog) set timeout for split fetch (#39346)
When fetch splits in batch, BE will send rpc to FE to fetch batch of
splits.
The FE may be blocked when listing file from hdfs, causing BE block too.
This PR add timeout on FE side to avoid BE block.
---
be/src/vec/exec/scan/split_source_connector.cpp | 5 +++++
.../src/main/java/org/apache/doris/datasource/SplitSource.java | 9 +++++++--
.../main/java/org/apache/doris/service/FrontendServiceImpl.java | 6 +++++-
gensrc/thrift/FrontendService.thrift | 1 +
4 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/be/src/vec/exec/scan/split_source_connector.cpp
b/be/src/vec/exec/scan/split_source_connector.cpp
index 478af522e76..cefe5b70216 100644
--- a/be/src/vec/exec/scan/split_source_connector.cpp
+++ b/be/src/vec/exec/scan/split_source_connector.cpp
@@ -47,6 +47,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next,
TFileRangeDesc* rang
if (_scan_index == _scan_ranges.size() && !_last_batch) {
SCOPED_TIMER(_get_split_timer);
Status coord_status;
+ // No need to set timeout because on FE side, there is a max fetch time
FrontendServiceConnection
coord(_state->exec_env()->frontend_client_cache(),
_state->get_query_ctx()->coord_addr,
&coord_status);
RETURN_IF_ERROR(coord_status);
@@ -56,6 +57,10 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next,
TFileRangeDesc* rang
TFetchSplitBatchResult result;
try {
coord->fetchSplitBatch(result, request);
+ if (result.__isset.status && result.status.status_code !=
TStatusCode::OK) {
+ return Status::IOError<false>("Failed to get batch of split
source: {}",
+ result.status.error_msgs[0]);
+ }
} catch (std::exception& e) {
return Status::IOError<false>("Failed to get batch of split
source: {}", e.what());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
index 8515e686f36..e24af768781 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java
@@ -73,7 +73,7 @@ public class SplitSource {
}
List<TScanRangeLocations> scanRanges =
Lists.newArrayListWithExpectedSize(maxBatchSize);
long startTime = System.currentTimeMillis();
- while (scanRanges.size() < maxBatchSize) {
+ while (scanRanges.size() < maxBatchSize && System.currentTimeMillis()
- startTime < maxWaitTime) {
BlockingQueue<Collection<TScanRangeLocations>> splits =
splitAssignment.getAssignedSplits(backend);
if (splits == null) {
isLastBatch.set(true);
@@ -92,10 +92,15 @@ public class SplitSource {
break;
}
} catch (InterruptedException e) {
- throw new UserException("Failed to get next batch of
splits", e);
+ throw new UserException(e.getMessage(), e);
}
}
}
+
+ if (scanRanges.isEmpty() && !isLastBatch.get()) {
+ // This is timeout
+ throw new UserException("Timeout. Max wait time(ms): " +
maxWaitTime);
+ }
return scanRanges;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c7df696c7a0..1e6d8e987c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -985,10 +985,14 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
try {
List<TScanRangeLocations> locations =
splitSource.getNextBatch(request.getMaxNumSplits());
result.setSplits(locations);
+ result.status = new TStatus(TStatusCode.OK);
return result;
} catch (Exception e) {
- throw new TException("Failed to get split source " +
request.getSplitSourceId(), e);
+ LOG.warn("failed to fetch split batch with source id {}",
request.getSplitSourceId(), e);
+ result.status = new TStatus(TStatusCode.INTERNAL_ERROR);
+ result.status.addToErrorMsgs(e.getMessage());
}
+ return result;
}
@Override
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index a77aa2362b0..a3d2ad26967 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1558,6 +1558,7 @@ struct TFetchSplitBatchRequest {
struct TFetchSplitBatchResult {
1: optional list<Planner.TScanRangeLocations> splits
+ 2: optional Status.TStatus status
}
service FrontendService {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]