This is an automated email from the ASF dual-hosted git repository.
mymeiyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c23c9914ad4 [fix](fe) set cloud version_cache_ttl to 0 temporarily if
retry a query with -230 (#63721)
c23c9914ad4 is described below
commit c23c9914ad4b776884e9359b0206ab91f8b7fca7
Author: meiyi <[email protected]>
AuthorDate: Tue Jun 30 21:38:03 2026 +0800
[fix](fe) set cloud version_cache_ttl to 0 temporarily if retry a query
with -230 (#63721)
If a query get E-230 error and `cloud_partition_version_cache_ttl_ms` is
not set to 0, this pr set the session var to 0 temporarily to get the
newest version.
---
.../java/org/apache/doris/qe/StmtExecutor.java | 36 +++++++++++++-
.../java/org/apache/doris/qe/StmtExecutorTest.java | 48 +++++++++++++++++++
.../cloud_p0/query_retry/test_retry_e-230.groovy | 55 ++++++++++++++++++++++
3 files changed, 138 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 8d4f29cda86..dac8b50ac05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -593,6 +593,7 @@ public class StmtExecutor {
TUniqueId firstQueryId = queryId;
int retryTime = Config.max_query_retry_time;
retryTime = retryTime <= 0 ? 1 : retryTime + 1;
+ boolean disableCloudVersionCacheOnRetry = false;
// If the query is an `outfile` statement,
// we execute it only once to avoid exporting redundant data.
if (parsedStmt instanceof Queriable) {
@@ -600,7 +601,11 @@ public class StmtExecutor {
}
for (int i = 1; i <= retryTime; i++) {
try {
- execute(queryId);
+ if (disableCloudVersionCacheOnRetry) {
+ executeWithVersionCacheDisabled(queryId);
+ } else {
+ execute(queryId);
+ }
return;
} catch (UserException e) {
if (!SystemInfoService.needRetryWithReplan(e.getMessage()) ||
i == retryTime) {
@@ -612,6 +617,7 @@ public class StmtExecutor {
if (this.coord != null && this.coord.isQueryCancelled()) {
throw e;
}
+ disableCloudVersionCacheOnRetry =
shouldDisableCloudVersionCacheOnRetry(e.getMessage());
TUniqueId lastQueryId = queryId;
queryId = UniqueIdUtils.fastUniqueId();
int randomMillis = 10 + (int) (Math.random() * 10);
@@ -632,6 +638,34 @@ public class StmtExecutor {
}
}
+ // Temporarily disable the cloud version cache for this single attempt so
that the retry
+ // re-fetches the visible version from meta-service, then restore the
user-set TTLs.
+ private void executeWithVersionCacheDisabled(TUniqueId queryId) throws
Exception {
+ SessionVariable sessionVariable = context.getSessionVariable();
+ long oldPartitionTtl = sessionVariable.cloudPartitionVersionCacheTtlMs;
+ long oldTableTtl = sessionVariable.cloudTableVersionCacheTtlMs;
+ try {
+ sessionVariable.cloudPartitionVersionCacheTtlMs = 0;
+ sessionVariable.cloudTableVersionCacheTtlMs = 0;
+ LOG.info("temporarily set {} from {} to 0 and {} from {} to 0
before retry. {}",
+ SessionVariable.CLOUD_PARTITION_VERSION_CACHE_TTL_MS,
oldPartitionTtl,
+ SessionVariable.CLOUD_TABLE_VERSION_CACHE_TTL_MS,
oldTableTtl,
+ context.getQueryIdentifier());
+ execute(queryId);
+ } finally {
+ sessionVariable.cloudPartitionVersionCacheTtlMs = oldPartitionTtl;
+ sessionVariable.cloudTableVersionCacheTtlMs = oldTableTtl;
+ }
+ }
+
+ boolean shouldDisableCloudVersionCacheOnRetry(String errorMessage) {
+ return Config.isCloudMode()
+ && errorMessage != null
+ && errorMessage.contains(SystemInfoService.ERROR_E230)
+ &&
(context.getSessionVariable().cloudPartitionVersionCacheTtlMs != 0
+ || context.getSessionVariable().cloudTableVersionCacheTtlMs !=
0);
+ }
+
public void execute(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 5ebcb2e2d59..b7b12bc91c3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -388,4 +388,52 @@ public class StmtExecutorTest extends TestWithFeService {
"ParsedStatement should be a LogicalPlanAdapter after
parseByNereids(), but was: "
+ (parsedStatement == null ? "null" :
parsedStatement.getClass().getName()));
}
+
+ @Test
+ public void testShouldDisableCloudVersionCacheOnRetryForE230() {
+ String originalCloudUniqueId = Config.cloud_unique_id;
+ String originalDeployMode = Config.deploy_mode;
+ long originalPartitionTtl =
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs;
+ long originalTableTtl =
connectContext.getSessionVariable().cloudTableVersionCacheTtlMs;
+ try {
+ Config.cloud_unique_id = "test-cloud-id";
+ StmtExecutor executor = new StmtExecutor(connectContext, "select
1");
+
+
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L;
+ connectContext.getSessionVariable().cloudTableVersionCacheTtlMs =
1000L;
+
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
+ "errCode = 2, detailMessage = E-230 versions are already
compacted"));
+
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
+ "errCode = 2, detailMessage = some other error"));
+ // null error message must not trigger the disable.
+
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(null));
+
+ // Non-cloud mode must never disable the version cache, even on
E-230.
+ Config.cloud_unique_id = "";
+ Config.deploy_mode = "";
+
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
+ "errCode = 2, detailMessage = E-230 versions are already
compacted"));
+ Config.cloud_unique_id = "test-cloud-id";
+
+
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L;
+ connectContext.getSessionVariable().cloudTableVersionCacheTtlMs =
1000L;
+
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
+ "errCode = 2, detailMessage = E-230 versions are already
compacted"));
+
+
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 1000L;
+ connectContext.getSessionVariable().cloudTableVersionCacheTtlMs =
0L;
+
Assertions.assertTrue(executor.shouldDisableCloudVersionCacheOnRetry(
+ "errCode = 2, detailMessage = E-230 versions are already
compacted"));
+
+
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0L;
+ connectContext.getSessionVariable().cloudTableVersionCacheTtlMs =
0L;
+
Assertions.assertFalse(executor.shouldDisableCloudVersionCacheOnRetry(
+ "errCode = 2, detailMessage = E-230 versions are already
compacted"));
+ } finally {
+ Config.cloud_unique_id = originalCloudUniqueId;
+ Config.deploy_mode = originalDeployMode;
+
connectContext.getSessionVariable().cloudPartitionVersionCacheTtlMs =
originalPartitionTtl;
+ connectContext.getSessionVariable().cloudTableVersionCacheTtlMs =
originalTableTtl;
+ }
+ }
}
diff --git
a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
index b2ad94dd2a2..8d87400e253 100644
--- a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
+++ b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
@@ -162,7 +162,62 @@ suite("test_retry_e-230", 'docker') {
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 4000 && cost < 100000)
+ def restoreTbl = 'test_retry_e_230_restore_tbl'
+ sql """ DROP TABLE IF EXISTS ${restoreTbl} """
+ sql """
+ CREATE TABLE ${restoreTbl} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ for (def i = 1; i <= 3; i++) {
+ insert_sql """INSERT INTO ${restoreTbl} VALUES (${i},
${100 * i})""", 1
+ }
+
+ sql """ set cloud_partition_version_cache_ttl_ms = 3600000 """
+ sql """ set cloud_table_version_cache_ttl_ms = 3600000 """
+ def row_cnt = sql """select count() from ${restoreTbl}"""
+ assertEquals(row_cnt[0][0], 3)
+
+ cluster.injectDebugPoints(NodeType.BE,
['CloudTablet.capture_rs_readers.return.e-230' : null])
+ cluster.injectDebugPoints(NodeType.FE,
['StmtExecutor.retry.longtime' : null])
+ insert_sql """INSERT INTO ${restoreTbl} VALUES (4, 400)""", 1
+
+ def futrue5 = thread {
+ Thread.sleep(4000)
+ cluster.clearBackendDebugPoints()
+ }
+
+ begin = System.currentTimeMillis();
+ def futrue6 = thread {
+ def result = sql """select * from ${restoreTbl} order by
k1"""
+ log.info("select result: {}", result)
+ }
+
+ futrue6.get()
+ cost = System.currentTimeMillis() - begin;
+ log.info("time cost restore check : {}", cost)
+ futrue5.get()
+ assertTrue(cost > 4000 && cost < 100000)
+
+ def ttlRows = sql_return_maparray """
+ select
+ @@session.cloud_partition_version_cache_ttl_ms as
partition_ttl,
+ @@session.cloud_table_version_cache_ttl_ms as table_ttl
+ """
+ assertEquals("3600000", ttlRows[0].partition_ttl.toString())
+ assertEquals("3600000", ttlRows[0].table_ttl.toString())
+ row_cnt = sql """select count() from ${restoreTbl}"""
+ assertEquals(row_cnt[0][0], 4)
} finally {
+ sql """ set cloud_partition_version_cache_ttl_ms = DEFAULT """
+ sql """ set cloud_table_version_cache_ttl_ms = DEFAULT """
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]