This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 06dcd369097 [Chore](executor) do not retry query when catched
UserException and cloud mode is off (#36202)
06dcd369097 is described below
commit 06dcd36909735b0ba150c8f9803a69407d037d5f
Author: Pxl <[email protected]>
AuthorDate: Fri Jun 14 19:07:20 2024 +0800
[Chore](executor) do not retry query when catched UserException and cloud
mode is off (#36202)
## Proposed changes
do not retry query when catched UserException and cloud mode is off
---
be/src/pipeline/local_exchange/local_exchanger.cpp | 16 ++++------------
.../src/main/java/org/apache/doris/qe/StmtExecutor.java | 6 ++++--
2 files changed, 8 insertions(+), 14 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 466ca860398..51d2c8268e7 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -306,24 +306,16 @@ Status
LocalMergeSortExchanger::build_merger(RuntimeState* state,
vectorized::BlockSupplier block_supplier = [&, id =
channel_id](vectorized::Block* block,
bool*
eos) {
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[id].try_dequeue(next_block)) {
- block->swap(next_block);
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit *
_num_sources) {
- _free_blocks.enqueue(std::move(next_block));
- }
- sub_mem_usage(local_state, id, block->allocated_bytes());
- } else {
- *eos = true;
- }
- } else if (_data_queue[id].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[id].try_dequeue(next_block)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit *
_num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
sub_mem_usage(local_state, id, block->allocated_bytes());
+ } else if (all_finished) {
+ *eos = true;
}
return Status::OK();
};
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 9d2568c973d..6fb5a062c07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -847,13 +847,13 @@ public class StmtExecutor {
// cloud mode retry
LOG.debug("due to exception {} retry {} rpc {} user {}",
e.getMessage(), i, e instanceof RpcException, e
instanceof UserException);
- String msg = e.getMessage();
- boolean isNeedRetry = true;
+ boolean isNeedRetry = false;
if (Config.isCloudMode()) {
isNeedRetry = false;
// errCode = 2, detailMessage = There is no scanNode
Backend available.[10003: not alive]
List<String> bes =
Env.getCurrentSystemInfo().getAllBackendIds().stream()
.map(id ->
Long.toString(id)).collect(Collectors.toList());
+ String msg = e.getMessage();
if (e instanceof UserException
&&
msg.contains(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG)) {
Matcher matcher = beIpPattern.matcher(msg);
@@ -878,6 +878,8 @@ public class StmtExecutor {
}
}
}
+ } else {
+ isNeedRetry = e instanceof RpcException;
}
if (i != retryTime - 1 && isNeedRetry
&& context.getConnectType().equals(ConnectType.MYSQL)
&& !context.getMysqlChannel().isSend()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]