This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 570dfb41e65 [fix](cloud) Fix cloud meet e-230 not retry query from
observer (#37625)
570dfb41e65 is described below
commit 570dfb41e65fe22facb64c5f51728919f7dd80ee
Author: deardeng <[email protected]>
AuthorDate: Fri Jul 12 15:55:34 2024 +0800
[fix](cloud) Fix cloud meet e-230 not retry query from observer (#37625)
---
.../java/org/apache/doris/qe/ConnectProcessor.java | 3 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 10 +-
.../cloud_p0/query_retry/test_retry_e-230.groovy | 205 +++++++++++----------
3 files changed, 116 insertions(+), 102 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index fb13633319b..cf1573810c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -758,8 +758,7 @@ public abstract class ConnectProcessor {
UUID uuid = UUID.randomUUID();
queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
}
-
- executor.execute(queryId);
+ executor.queryRetry(queryId);
} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index fcd0d25a130..783b6b95490 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -528,10 +528,15 @@ public class StmtExecutor {
public void execute() throws Exception {
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
- TUniqueId firstQueryId = queryId;
if (Config.enable_print_request_before_execution) {
LOG.info("begin to execute query {} {}", queryId, originStmt ==
null ? "null" : originStmt.originStmt);
}
+ queryRetry(queryId);
+ }
+
+ public void queryRetry(TUniqueId queryId) throws Exception {
+ TUniqueId firstQueryId = queryId;
+ UUID uuid;
int retryTime = Config.max_query_retry_time;
retryTime = retryTime <= 0 ? 1 : retryTime + 1;
for (int i = 1; i <= retryTime; i++) {
@@ -751,6 +756,9 @@ public class StmtExecutor {
if (LOG.isDebugEnabled()) {
LOG.debug("Command({}) process failed.",
originStmt.originStmt, e);
}
+ if (Config.isCloudMode() &&
e.getDetailMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
+ throw e;
+ }
context.getState().setError(e.getMysqlErrorCode(),
e.getMessage());
throw new NereidsException("Command (" + originStmt.originStmt
+ ") process failed",
new AnalysisException(e.getMessage(), e));
diff --git
a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
index 8f96d1fd9d2..2d8ca3f5296 100644
--- a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
+++ b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
@@ -24,120 +24,127 @@ suite("test_retry_e-230") {
}
def options = new ClusterOptions()
options.enableDebugPoints()
- options.setFeNum(1)
+ // one master, one observer
+ options.setFeNum(2)
options.feConfigs.add('max_query_retry_time=100')
options.feConfigs.add('sys_log_verbose_modules=org')
options.setBeNum(1)
options.cloudMode = true
- docker(options) {
- def tbl = 'test_retry_e_230_tbl'
- def tbl1 = 'table_1'
- def tbl2 = 'table_2'
- sql """ DROP TABLE IF EXISTS ${tbl} """
- sql """ DROP TABLE IF EXISTS ${tbl1} """
- sql """ DROP TABLE IF EXISTS ${tbl2} """
- try {
- sql """set global experimental_enable_pipeline_x_engine=false"""
- cluster.injectDebugPoints(NodeType.BE,
['CloudTablet.capture_rs_readers.return.e-230' : null])
+ // 1. connect to master
+ options.connectToFollower = false
+ for (def j = 0; j < 2; j++) {
+ docker(options) {
+ def tbl = 'test_retry_e_230_tbl'
+ def tbl1 = 'table_1'
+ def tbl2 = 'table_2'
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sql """ DROP TABLE IF EXISTS ${tbl1} """
+ sql """ DROP TABLE IF EXISTS ${tbl2} """
+ try {
+ sql """set global
experimental_enable_pipeline_x_engine=false"""
+ cluster.injectDebugPoints(NodeType.BE,
['CloudTablet.capture_rs_readers.return.e-230' : null])
- sql """
- CREATE TABLE ${tbl} (
- `k1` int(11) NULL,
- `k2` int(11) NULL
- )
- DUPLICATE KEY(`k1`, `k2`)
- COMMENT 'OLAP'
- DISTRIBUTED BY HASH(`k1`) BUCKETS 1
- PROPERTIES (
- "replication_num"="1"
- );
- """
- for (def i = 1; i <= 5; i++) {
- sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
- }
+ sql """
+ CREATE TABLE ${tbl} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ for (def i = 1; i <= 5; i++) {
+ sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
+ }
- cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime'
: null])
- def futrue1 = thread {
- Thread.sleep(3000)
- cluster.clearBackendDebugPoints()
- }
+ cluster.injectDebugPoints(NodeType.FE,
['StmtExecutor.retry.longtime' : null])
+ def futrue1 = thread {
+ Thread.sleep(3000)
+ cluster.clearBackendDebugPoints()
+ }
- def begin = System.currentTimeMillis();
- def futrue2 = thread {
- def result = try_sql """select * from ${tbl}"""
- }
+ def begin = System.currentTimeMillis();
+ def futrue2 = thread {
+ def result = try_sql """select * from ${tbl}"""
+ }
- futrue2.get()
- def cost = System.currentTimeMillis() - begin;
- log.info("time cost: {}", cost)
- futrue1.get()
- // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
- assertTrue(cost > 3000 && cost < 100000)
+ futrue2.get()
+ def cost = System.currentTimeMillis() - begin;
+ log.info("time cost: {}", cost)
+ futrue1.get()
+ // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
+ assertTrue(cost > 3000 && cost < 100000)
- sql """
- CREATE TABLE IF NOT EXISTS ${tbl1} (
- `siteid` int(11) NOT NULL COMMENT "",
- `citycode` int(11) NOT NULL COMMENT "",
- `userid` int(11) NOT NULL COMMENT "",
- `pv` int(11) NOT NULL COMMENT ""
- ) ENGINE=OLAP
- DUPLICATE KEY(`siteid`)
- COMMENT "OLAP"
- DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
- PROPERTIES (
- "replication_allocation" = "tag.location.default: 1",
- "in_memory" = "false",
- "storage_format" = "V2"
- )
- """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbl1} (
+ `siteid` int(11) NOT NULL COMMENT "",
+ `citycode` int(11) NOT NULL COMMENT "",
+ `userid` int(11) NOT NULL COMMENT "",
+ `pv` int(11) NOT NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`siteid`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2"
+ )
+ """
- sql """
- CREATE TABLE IF NOT EXISTS ${tbl2} (
- `siteid` int(11) NOT NULL COMMENT "",
- `citycode` int(11) NOT NULL COMMENT "",
- `userid` int(11) NOT NULL COMMENT "",
- `pv` int(11) NOT NULL COMMENT ""
- ) ENGINE=OLAP
- DUPLICATE KEY(`siteid`)
- COMMENT "OLAP"
- DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
- PROPERTIES (
- "replication_allocation" = "tag.location.default: 1",
- "in_memory" = "false",
- "storage_format" = "V2"
- )
- """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbl2} (
+ `siteid` int(11) NOT NULL COMMENT "",
+ `citycode` int(11) NOT NULL COMMENT "",
+ `userid` int(11) NOT NULL COMMENT "",
+ `pv` int(11) NOT NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`siteid`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2"
+ )
+ """
- sql """
- insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
- """
+ sql """
+ insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
+ """
- // dp again
- cluster.injectDebugPoints(NodeType.BE,
['CloudTablet.capture_rs_readers.return.e-230' : null])
+ // dp again
+ cluster.injectDebugPoints(NodeType.BE,
['CloudTablet.capture_rs_readers.return.e-230' : null])
- def futrue3 = thread {
- Thread.sleep(4000)
- cluster.clearBackendDebugPoints()
- }
+ def futrue3 = thread {
+ Thread.sleep(4000)
+ cluster.clearBackendDebugPoints()
+ }
- begin = System.currentTimeMillis();
- def futrue4 = thread {
- def result = try_sql """insert into ${tbl2} select * from
${tbl1}"""
- }
+ begin = System.currentTimeMillis();
+ def futrue4 = thread {
+ def result = try_sql """insert into ${tbl2} select * from
${tbl1}"""
+ }
- futrue4.get()
- cost = System.currentTimeMillis() - begin;
- log.info("time cost insert into select : {}", cost)
- futrue3.get()
- // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
- assertTrue(cost > 4000 && cost < 100000)
+ futrue4.get()
+ cost = System.currentTimeMillis() - begin;
+ log.info("time cost insert into select : {}", cost)
+ futrue3.get()
+ // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
+ assertTrue(cost > 4000 && cost < 100000)
- } finally {
- cluster.clearFrontendDebugPoints()
- cluster.clearBackendDebugPoints()
- sql """ DROP TABLE IF EXISTS ${tbl} """
- sql """ DROP TABLE IF EXISTS ${tbl1} """
- sql """ DROP TABLE IF EXISTS ${tbl2} """
- }
+ } finally {
+ cluster.clearFrontendDebugPoints()
+ cluster.clearBackendDebugPoints()
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sql """ DROP TABLE IF EXISTS ${tbl1} """
+ sql """ DROP TABLE IF EXISTS ${tbl2} """
+ }
+ }
+ // 2. connect to follower
+ options.connectToFollower = true
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]