This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 816e31beccf [fix](cloud) Fix auto-start functionality when
encountering TVF and external queries (#59963)
816e31beccf is described below
commit 816e31beccffc0d3a3326ad11915f35f7b73ced0
Author: deardeng <[email protected]>
AuthorDate: Tue Feb 24 14:20:43 2026 +0800
[fix](cloud) Fix auto-start functionality when encountering TVF and
external queries (#59963)
---
.../doris/cloud/system/CloudSystemInfoService.java | 212 ++++++++++++++++-----
.../doris/common/profile/SummaryProfile.java | 9 +
.../doris/nereids/errors/QueryPlanningErrors.java | 8 +-
.../doris/nereids/jobs/rewrite/RewriteJob.java | 14 +-
.../nereids/jobs/scheduler/SimpleJobScheduler.java | 14 +-
.../cloud_p0/multi_cluster/test_auto_start.groovy | 53 ++++++
6 files changed, 251 insertions(+), 59 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 24a09364ad1..4f01a4546f5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -34,11 +34,14 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.RandomIdentifierGenerator;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.trees.plans.commands.info.ModifyBackendOp;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.resource.Tag;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
@@ -58,7 +61,6 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -67,6 +69,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -800,12 +803,13 @@ public class CloudSystemInfoService extends
SystemInfoService {
if (Strings.isNullOrEmpty(cluster)) {
throw new AnalysisException("cluster name is empty");
}
+ waitForAutoStart(cluster);
List<Backend> backends = getBackendsByClusterName(cluster);
for (Backend be : backends) {
idToBackend.put(be.getId(), be);
}
- } catch (ComputeGroupException e) {
+ } catch (ComputeGroupException | DdlException e) {
throw new AnalysisException(e.getMessage());
}
@@ -1416,91 +1420,159 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
public String waitForAutoStart(String clusterName) throws DdlException {
- if (Config.isNotCloudMode()) {
- return null;
- }
- if (!Config.enable_auto_start_for_cloud_cluster) {
+ if (Config.isNotCloudMode() ||
!Config.enable_auto_start_for_cloud_cluster) {
return null;
}
- clusterName = getClusterNameAutoStart(clusterName);
- if (Strings.isNullOrEmpty(clusterName)) {
- LOG.warn("auto start in cloud mode, but clusterName empty {}",
clusterName);
+ String resolvedClusterName = getClusterNameAutoStart(clusterName);
+ if (Strings.isNullOrEmpty(resolvedClusterName)) {
+ LOG.warn("auto start in cloud mode, but clusterName empty {}",
resolvedClusterName);
return null;
}
- String clusterStatus = getCloudStatusByName(clusterName);
- if (Strings.isNullOrEmpty(clusterStatus)) {
+ String clusterStatusStr = getCloudStatusByName(resolvedClusterName);
+ Cloud.ClusterStatus clusterStatus =
parseClusterStatusOrNull(clusterStatusStr, resolvedClusterName);
+ if (clusterStatus == null) {
+ LOG.warn("auto start in cloud mode, but clusterStatus empty {}",
clusterStatusStr);
// for cluster rename or cluster dropped
- LOG.warn("cant find clusterStatus in fe, clusterName {}",
clusterName);
return null;
}
- if (Cloud.ClusterStatus.valueOf(clusterStatus) ==
Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
- LOG.warn("auto start cluster {} in manual shutdown status",
clusterName);
- throw new DdlException("cluster " + clusterName + " is in manual
shutdown");
+ if (clusterStatus == Cloud.ClusterStatus.MANUAL_SHUTDOWN) {
+ LOG.warn("auto start cluster {} in manual shutdown status",
resolvedClusterName);
+ throw new DdlException("cluster " + resolvedClusterName + " is in
manual shutdown");
}
- // nofity ms -> wait for clusterStatus to normal
- LOG.debug("auto start wait cluster {} status {}", clusterName,
clusterStatus);
- if (Cloud.ClusterStatus.valueOf(clusterStatus) !=
Cloud.ClusterStatus.NORMAL) {
+ // notify ms -> wait for clusterStatus to normal
+ LOG.debug("auto start wait cluster {} status {}", resolvedClusterName,
clusterStatus);
+ if (clusterStatus != Cloud.ClusterStatus.NORMAL) {
// ATTN: prevent `Automatic Analyzer` daemon threads from pulling
up clusters
// FeConstants.INTERNAL_DB_NAME ? see
StatisticsUtil.buildConnectContext
- List<String> ignoreDbNameList =
Arrays.asList(Config.auto_start_ignore_resume_db_names);
- if (ConnectContext.get() != null &&
ignoreDbNameList.contains(ConnectContext.get().getDatabase())) {
+ ConnectContext ctx = ConnectContext.get();
+ if (shouldIgnoreAutoStart(ctx)) {
LOG.warn("auto start daemon thread db {}, not resume cluster
{}-{}",
- ConnectContext.get().getDatabase(), clusterName,
clusterStatus);
+ ctx.getDatabase(), resolvedClusterName, clusterStatus);
return null;
}
- Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
- builder.setCloudUniqueId(Config.cloud_unique_id);
- builder.setRequestIp(FrontendOptions.getLocalHostAddressCached());
-
builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS);
+ notifyMetaServiceToResumeCluster(resolvedClusterName);
+ }
+ // wait 5 mins
+ int retryTimes = Config.auto_start_wait_to_resume_times < 0 ? 300 :
Config.auto_start_wait_to_resume_times;
+ String finalClusterName = resolvedClusterName;
+ String initialClusterStatus = clusterStatusStr;
+ withTemporaryNereidsTimeout(() -> {
+ waitForClusterToResume(finalClusterName, retryTimes,
initialClusterStatus);
+ });
+ return resolvedClusterName;
+ }
- ClusterPB.Builder clusterBuilder = ClusterPB.newBuilder();
- clusterBuilder.setClusterId(getCloudClusterIdByName(clusterName));
- clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME);
- builder.setCluster(clusterBuilder);
+ private Cloud.ClusterStatus parseClusterStatusOrNull(String
clusterStatusStr, String clusterName) {
+ if (Strings.isNullOrEmpty(clusterStatusStr)) {
+ // for cluster rename or cluster dropped
+ LOG.warn("cant find clusterStatus in fe, clusterName {}",
clusterName);
+ return null;
+ }
+ try {
+ return Cloud.ClusterStatus.valueOf(clusterStatusStr);
+ } catch (Throwable t) {
+ LOG.warn("invalid clusterStatus {} for clusterName {}",
clusterStatusStr, clusterName, t);
+ return null;
+ }
+ }
- Cloud.AlterClusterResponse response;
- try {
- Cloud.AlterClusterRequest request = builder.build();
- response =
MetaServiceProxy.getInstance().alterCluster(request);
- LOG.info("alter cluster, request: {}, response: {}", request,
response);
- if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.OK) {
- LOG.warn("notify to resume cluster not ok, cluster {},
response: {}", clusterName, response);
- }
- LOG.info("notify to resume cluster {}, response: {} ",
clusterName, response);
- } catch (RpcException e) {
- LOG.warn("failed to notify to resume cluster {}", clusterName,
e);
- throw new DdlException("notify to resume cluster not ok");
+ private boolean shouldIgnoreAutoStart(ConnectContext ctx) {
+ if (ctx == null) {
+ return false;
+ }
+ String dbName = ctx.getDatabase();
+ if (Strings.isNullOrEmpty(dbName) ||
Config.auto_start_ignore_resume_db_names == null) {
+ return false;
+ }
+ for (String ignore : Config.auto_start_ignore_resume_db_names) {
+ if (dbName.equals(ignore)) {
+ return true;
}
}
- // wait 5 mins
- int retryTimes = Config.auto_start_wait_to_resume_times < 0 ? 300 :
Config.auto_start_wait_to_resume_times;
+ return false;
+ }
+
+ private void notifyMetaServiceToResumeCluster(String clusterName) throws
DdlException {
+ Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+ builder.setRequestIp(FrontendOptions.getLocalHostAddressCached());
+ builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS);
+
+ ClusterPB.Builder clusterBuilder = ClusterPB.newBuilder();
+ clusterBuilder.setClusterId(getCloudClusterIdByName(clusterName));
+ clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME);
+ builder.setCluster(clusterBuilder);
+
+ try {
+ Cloud.AlterClusterRequest request = builder.build();
+ Cloud.AlterClusterResponse response =
MetaServiceProxy.getInstance().alterCluster(request);
+ LOG.info("alter cluster, request: {}, response: {}", request,
response);
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("notify to resume cluster not ok, cluster {},
response: {}", clusterName, response);
+ }
+ LOG.info("notify to resume cluster {}, response: {} ",
clusterName, response);
+ } catch (RpcException e) {
+ LOG.warn("failed to notify to resume cluster {}", clusterName, e);
+ throw new DdlException("notify to resume cluster not ok");
+ }
+ }
+
+ /**
+ * Wait for cluster to resume to NORMAL status with alive backends.
+ * @param clusterName the name of the cluster
+ * @param retryTimes maximum number of retry attempts
+ * @param initialClusterStatus the initial cluster status
+ * @throws DdlException if the cluster fails to resume within the retry
limit
+ */
+ private void waitForClusterToResume(String clusterName, int retryTimes,
String initialClusterStatus)
+ throws DdlException {
int retryTime = 0;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
boolean hasAutoStart = false;
boolean existAliveBe = true;
- while
((!String.valueOf(Cloud.ClusterStatus.NORMAL).equals(clusterStatus) ||
!existAliveBe)
+ String clusterStatusStr = initialClusterStatus;
+ Cloud.ClusterStatus clusterStatus =
parseClusterStatusOrNull(clusterStatusStr, clusterName);
+ Cloud.ClusterStatus lastLoggedStatus = clusterStatus;
+ boolean lastLoggedExistAliveBe = existAliveBe;
+
+ while ((clusterStatus != Cloud.ClusterStatus.NORMAL || !existAliveBe)
&& retryTime < retryTimes) {
hasAutoStart = true;
++retryTime;
// sleep random millis [0.5, 1] s
- int randomSeconds = 500 + (int) (Math.random() * (1000 - 500));
- LOG.info("resume cluster {} retry times {}, wait randomMillis: {},
current status: {}",
- clusterName, retryTime, randomSeconds, clusterStatus);
+ int sleepMs = ThreadLocalRandom.current().nextInt(500, 1001);
try {
if (retryTime > retryTimes / 2) {
// sleep random millis [1, 1.5] s
- randomSeconds = 1000 + (int) (Math.random() * (1000 -
500));
+ sleepMs = ThreadLocalRandom.current().nextInt(1000, 1501);
}
- Thread.sleep(randomSeconds);
+ Thread.sleep(sleepMs);
} catch (InterruptedException e) {
LOG.info("change cluster sleep wait InterruptedException: ",
e);
}
- clusterStatus = getCloudStatusByName(clusterName);
+ clusterStatusStr = getCloudStatusByName(clusterName);
+ clusterStatus = parseClusterStatusOrNull(clusterStatusStr,
clusterName);
// Check that the bes node in the cluster have at least one alive
existAliveBe =
getBackendsByClusterName(clusterName).stream().anyMatch(Backend::isAlive);
+
+ // Reduce log spam: log when status changes / alive-be changes /
every 10 retries
+ boolean statusChanged = lastLoggedStatus != clusterStatus;
+ boolean aliveChanged = lastLoggedExistAliveBe != existAliveBe;
+ boolean periodicLog = (retryTime % 10 == 0);
+ if (statusChanged || aliveChanged || periodicLog) {
+ LOG.info("resume cluster {} retry {}/{}, sleepMs {}, status
{}(raw={}), existAliveBe {}",
+ clusterName, retryTime, retryTimes, sleepMs,
+ clusterStatus, clusterStatusStr, existAliveBe);
+ lastLoggedStatus = clusterStatus;
+ lastLoggedExistAliveBe = existAliveBe;
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("resume cluster {} retry {}/{}, sleepMs {}, status
{}(raw={}), existAliveBe {}",
+ clusterName, retryTime, retryTimes, sleepMs,
+ clusterStatus, clusterStatusStr, existAliveBe);
+ }
}
if (retryTime >= retryTimes) {
// auto start timeout
@@ -1513,9 +1585,47 @@ public class CloudSystemInfoService extends
SystemInfoService {
if (hasAutoStart) {
LOG.info("auto start cluster {}, start cost {} ms", clusterName,
stopWatch.getTime());
}
- return clusterName;
}
+ /**
+ * Temporarily set nereids timeout and restore it after execution.
+ * @param runnable the code to execute with the temporary timeout
+ * @throws DdlException if the runnable throws DdlException
+ */
+ private void withTemporaryNereidsTimeout(RunnableWithException runnable)
throws DdlException {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx == null) {
+ runnable.run();
+ return;
+ }
+
+ SessionVariable sessionVariable = ctx.getSessionVariable();
+ if (!sessionVariable.enableNereidsTimeout) {
+ runnable.run();
+ return;
+ }
+
+ StmtExecutor executor = ctx.getExecutor();
+ if (executor == null) {
+ runnable.run();
+ return;
+ }
+
+ SummaryProfile profile = ctx.getExecutor().getSummaryProfile();
+ if (profile == null) {
+ runnable.run();
+ return;
+ }
+ profile.setWarmup(true);
+ runnable.run();
+ }
+
+ @FunctionalInterface
+ private interface RunnableWithException {
+ void run() throws DdlException;
+ }
+
+
public void tryCreateInstance(String instanceId, String name, boolean
sseEnabled) throws DdlException {
Cloud.CreateInstanceRequest.Builder builder =
Cloud.CreateInstanceRequest.newBuilder();
builder.setInstanceId(instanceId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index bf172ded5b2..1d3ba49b74b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -145,10 +145,19 @@ public class SummaryProfile {
public static final String SPLITS_ASSIGNMENT_WEIGHT = "Splits Assignment
Weight";
public static final String ICEBERG_SCAN_METRICS = "Iceberg Scan Metrics";
public static final String PAIMON_SCAN_METRICS = "Paimon Scan Metrics";
+ private boolean isWarmUp = false;
+ public void setWarmup(boolean isWarmUp) {
+ this.isWarmUp = isWarmUp;
+ }
+
+ public boolean isWarmup() {
+ return isWarmUp;
+ }
// These info will display on FE's web ui table, every one will be
displayed as
// a column, so that should not
// add many columns here. Add to ExecutionSummary list.
+
public static final ImmutableList<String> SUMMARY_CAPTIONS =
ImmutableList.of(PROFILE_ID, TASK_TYPE,
START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER,
DEFAULT_CATALOG, DEFAULT_DB, SQL_STATEMENT);
public static final ImmutableList<String> SUMMARY_KEYS = new
ImmutableList.Builder<String>()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/errors/QueryPlanningErrors.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/errors/QueryPlanningErrors.java
index 9647df55bd9..0e1dd8848a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/errors/QueryPlanningErrors.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/errors/QueryPlanningErrors.java
@@ -36,8 +36,12 @@ public class QueryPlanningErrors {
* @return exception with timeout message
*/
public static AnalysisException planTimeoutError(long elapsedS, long
timeoutS, SummaryProfile profile) {
- long parseTime = profile.getParseSqlTimeMs();
- String planTime = profile.getPlanTime();
+ long parseTime = -1;
+ String planTime = "N/A";
+ if (profile != null) {
+ parseTime = profile.getParseSqlTimeMs();
+ planTime = profile.getPlanTime();
+ }
return new AnalysisException(String.format("Nereids cost too much time
(%ss > %ss)."
+ " You should increment timeout by set '%s'"
+ " or disable check timeout by set '%s' to false."
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/rewrite/RewriteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/rewrite/RewriteJob.java
index 4dad89278ce..f9a85474001 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/rewrite/RewriteJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/rewrite/RewriteJob.java
@@ -17,6 +17,8 @@
package org.apache.doris.nereids.jobs.rewrite;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.errors.QueryPlanningErrors;
import org.apache.doris.nereids.jobs.JobContext;
@@ -36,9 +38,15 @@ public interface RewriteJob {
CascadesContext context = jobContext.getCascadesContext();
SessionVariable sessionVariable =
context.getConnectContext().getSessionVariable();
long elapsedS =
context.getStatementContext().getStopwatch().elapsed(TimeUnit.MILLISECONDS) /
1000;
- if (sessionVariable.enableNereidsTimeout && elapsedS >
sessionVariable.nereidsTimeoutSecond) {
- throw QueryPlanningErrors.planTimeoutError(elapsedS,
sessionVariable.nereidsTimeoutSecond,
-
context.getConnectContext().getExecutor().getSummaryProfile());
+ if (sessionVariable.enableNereidsTimeout) {
+ SummaryProfile summaryProfile =
SummaryProfile.getSummaryProfile(context.getConnectContext());
+ long timeoutS = sessionVariable.nereidsTimeoutSecond;
+ if (summaryProfile != null && summaryProfile.isWarmup()) {
+ timeoutS = Config.auto_start_wait_to_resume_times;
+ }
+ if (elapsedS > timeoutS) {
+ throw QueryPlanningErrors.planTimeoutError(elapsedS, timeoutS,
summaryProfile);
+ }
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
index 67bb56b84e2..71c3b484d2d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
@@ -17,6 +17,8 @@
package org.apache.doris.nereids.jobs.scheduler;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.errors.QueryPlanningErrors;
import org.apache.doris.nereids.jobs.Job;
@@ -35,9 +37,15 @@ public class SimpleJobScheduler implements JobScheduler {
SessionVariable sessionVariable =
context.getConnectContext().getSessionVariable();
while (!pool.isEmpty()) {
long elapsedS =
context.getStatementContext().getStopwatch().elapsed(TimeUnit.MILLISECONDS) /
1000;
- if (sessionVariable.enableNereidsTimeout && elapsedS >
sessionVariable.nereidsTimeoutSecond) {
- throw QueryPlanningErrors.planTimeoutError(elapsedS,
sessionVariable.nereidsTimeoutSecond,
-
context.getConnectContext().getExecutor().getSummaryProfile());
+ if (sessionVariable.enableNereidsTimeout) {
+ SummaryProfile summaryProfile =
SummaryProfile.getSummaryProfile(context.getConnectContext());
+ long timeoutS = sessionVariable.nereidsTimeoutSecond;
+ if (summaryProfile != null && summaryProfile.isWarmup()) {
+ timeoutS = Config.auto_start_wait_to_resume_times;
+ }
+ if (elapsedS > timeoutS) {
+ throw QueryPlanningErrors.planTimeoutError(elapsedS,
timeoutS, summaryProfile);
+ }
}
Job job = pool.pop();
job.execute();
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy
index 4ed68da1132..d0ce463e252 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_auto_start.groovy
@@ -137,6 +137,7 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker') {
cluster.stopBackends(1,2,3)
+ // Test 1: Regular SELECT query with auto-start
// select
def future1 = thread {
def begin = System.currentTimeMillis();
@@ -170,6 +171,58 @@ suite('test_auto_start_in_cloud', 'multi_cluster, docker')
{
future1.get()
future2.get()
+ // Wait for cluster to be fully NORMAL before next test
+ awaitUntil(5) {
+ tag = getCloudBeTagByName(clusterName)
+ jsonObject = jsonSlurper.parseText(tag)
+ String cluster_status = jsonObject.compute_group_status
+ cluster_status == "NORMAL"
+ }
+
+ // Test 2: TVF query with auto-start
+ // Re-suspend cluster for TVF test
+ set_cluster_status(uniqueId, cloudClusterId, "SUSPENDED", ms)
+ awaitUntil(5) {
+ tag = getCloudBeTagByName(clusterName)
+ jsonObject = jsonSlurper.parseText(tag)
+ String cluster_status = jsonObject.compute_group_status
+ cluster_status == "SUSPENDED"
+ }
+ cluster.stopBackends(1,2,3)
+
+ // TVF query should also trigger auto-start and wait until cluster
resumed.
+ // Regression for PR #59963: auto-start path should work for
TVF/external-like queries
+ // that may need to fetch backends during planning.
+ def futureTfv = thread {
+ def begin = System.currentTimeMillis();
+ def connInfo = context.threadLocalConn.get()
+ def tvfRet = connect('admin', '',
connInfo.conn.getMetaData().getURL()) {
+ sql """select * from numbers("number" = "10")"""
+ }
+ def cost = System.currentTimeMillis() - begin;
+ log.info("tvf result size {} time cost: {}", tvfRet.size(), cost)
+ assertTrue(cost > 5000)
+ assertEquals(10, tvfRet.size())
+ }
+
+ // cloud control for TVF test
+ def future2Tvf = thread {
+ // check cluster "TO_RESUME"
+ awaitUntil(5) {
+ tag = getCloudBeTagByName(clusterName)
+ logger.info("tag = {}", tag)
+ jsonObject = jsonSlurper.parseText(tag)
+ String cluster_status = jsonObject.compute_group_status
+ cluster_status == "TO_RESUME"
+ }
+ sleep(5 * 1000)
+ cluster.startBackends(1,2,3)
+ set_cluster_status(uniqueId, cloudClusterId, "NORMAL", ms)
+ }
+
+ futureTfv.get()
+ future2Tvf.get()
+
tag = getCloudBeTagByName(clusterName)
logger.info("tag check = {}", tag)
jsonObject = jsonSlurper.parseText(tag)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]