This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new ad76d9deeff branch-4.0: [opt](scheduler) Improve Graceful Shutdown
Behavior for BE and FE, and Optimize Query Retry During BE Shutdown #56601
#58019 (#58526)
ad76d9deeff is described below
commit ad76d9deefff3e094e7255ca59e27d7e2a975b82
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Dec 2 09:55:17 2025 +0800
branch-4.0: [opt](scheduler) Improve Graceful Shutdown Behavior for BE and
FE, and Optimize Query Retry During BE Shutdown #56601 #58019 (#58526)
bp #56601 #58019
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 6 ++
be/src/http/action/health_action.cpp | 27 +++++++-
be/src/runtime/exec_env.cpp | 7 ++
be/src/runtime/exec_env.h | 3 +
be/src/service/doris_main.cpp | 3 +
be/test/http/http_client_test.cpp | 4 +-
.../src/main/java/org/apache/doris/DorisFE.java | 52 +++++++++++++-
.../apache/doris/cloud/catalog/CloudReplica.java | 8 +--
.../java/org/apache/doris/common/FeConstants.java | 26 -------
.../java/org/apache/doris/httpv2/HttpServer.java | 24 ++++++-
.../doris/httpv2/entity/ResponseEntityBuilder.java | 5 ++
.../org/apache/doris/httpv2/rest/HealthAction.java | 6 ++
.../doris/httpv2/rest/RestApiStatusCode.java | 3 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 3 +-
.../org/apache/doris/load/StreamLoadHandler.java | 2 +-
.../commands/insert/AbstractInsertExecutor.java | 6 +-
.../plans/commands/insert/OlapInsertExecutor.java | 6 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 21 ++++--
.../main/java/org/apache/doris/system/Backend.java | 52 ++++++++++----
.../org/apache/doris/system/SystemInfoService.java | 40 ++++++-----
.../doris/utframe/DemoMultiBackendsTest.java | 3 +-
regression-test/framework/pom.xml | 4 +-
.../query_retry/test_retry_no_scan_node.groovy | 80 ++++++++++++++++++++++
24 files changed, 304 insertions(+), 88 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8f615cd47d7..e9fa7c5f6a9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1312,6 +1312,7 @@ DEFINE_String(user_files_secure_path, "${DORIS_HOME}");
DEFINE_Int32(fe_expire_duration_seconds, "60");
DEFINE_Int32(grace_shutdown_wait_seconds, "120");
+DEFINE_Int32(grace_shutdown_post_delay_seconds, "30");
DEFINE_Int16(bitmap_serialize_version, "1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 72602834d54..584f8b66abd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1364,6 +1364,12 @@ DECLARE_Int32(fe_expire_duration_seconds);
// , but if the waiting time exceed the limit, then be will exit directly.
// During this period, FE will not send any queries to BE and waiting for all
running queries to stop.
DECLARE_Int32(grace_shutdown_wait_seconds);
+// When using the graceful stop feature, after the main process waits for
+// all currently running tasks to finish, it will continue to wait for
+// an additional period to ensure that queries still running on other nodes
have also completed.
+// Since a BE node cannot detect the task execution status on other BE nodes,
+// you may need to increase this threshold to allow for a longer waiting time.
+DECLARE_Int32(grace_shutdown_post_delay_seconds);
// BitmapValue serialize version.
DECLARE_Int16(bitmap_serialize_version);
diff --git a/be/src/http/action/health_action.cpp
b/be/src/http/action/health_action.cpp
index 91be5f78f30..8c1976dea2e 100644
--- a/be/src/http/action/health_action.cpp
+++ b/be/src/http/action/health_action.cpp
@@ -24,21 +24,42 @@
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
+#include "runtime/exec_env.h"
namespace doris {
const static std::string HEADER_JSON = "application/json";
void HealthAction::handle(HttpRequest* req) {
+ std::string status;
+ std::string msg;
+ HttpStatus st;
+ // always return HttpStatus::OK
+ // because in k8s, we don't want the pod to be removed
+ // from service during shutdown
+ if (!doris::k_is_server_ready) {
+ status = "Server is not available";
+ msg = "Server is not ready";
+ st = HttpStatus::OK;
+ } else if (doris::k_doris_exit) {
+ status = "Server is not available";
+ msg = "Server is shutting down";
+ st = HttpStatus::OK;
+ } else {
+ status = "OK";
+ msg = "OK";
+ st = HttpStatus::OK;
+ }
+
std::stringstream ss;
ss << "{";
- ss << "\"status\": \"OK\",";
- ss << "\"msg\": \"To Be Added\"";
+ ss << "\"status\": \"" << status << "\",";
+ ss << "\"msg\": \"" << msg << "\"";
ss << "}";
std::string result = ss.str();
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
- HttpChannel::send_reply(req, HttpStatus::OK, result);
+ HttpChannel::send_reply(req, st, result);
}
} // end namespace doris
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 8c13157d03b..4bc3818b56b 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -177,6 +177,13 @@ void ExecEnv::wait_for_all_tasks_done() {
sleep(1);
++wait_seconds_passed;
}
+ // This is a conservative strategy.
+ // Because a query might still have fragments running on other BE nodes.
+ // In other words, the query hasn't truly terminated.
+ // If the current BE is shut down at this point,
+ // the FE will detect the downtime of a related BE and cancel the entire
query,
+ // defeating the purpose of a graceful stop.
+ sleep(config::grace_shutdown_post_delay_seconds);
}
bool ExecEnv::check_auth_token(const std::string& auth_token) {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 6cd71f8e542..96324d00ff6 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -132,7 +132,10 @@ class IndexPolicyMgr;
struct SyncRowsetStats;
class DeleteBitmapAggCache;
+// set to true when BE is shutting down
inline bool k_doris_exit = false;
+// set to true after BE start ready
+inline bool k_is_server_ready = false;
// Execution environment for queries/plan fragments.
// Contains all required global structures, and handles to
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 401456c6f3a..ead7426a564 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -602,12 +602,15 @@ int main(int argc, char** argv) {
exec_env->storage_engine().notify_listeners();
+ doris::k_is_server_ready = true;
+
while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
#endif
sleep(3);
}
+ doris::k_is_server_ready = false;
LOG(INFO) << "Doris main exiting.";
#if defined(LLVM_PROFILE)
__llvm_profile_write_file();
diff --git a/be/test/http/http_client_test.cpp
b/be/test/http/http_client_test.cpp
index 02699f0baef..d874df5bcd6 100644
--- a/be/test/http/http_client_test.cpp
+++ b/be/test/http/http_client_test.cpp
@@ -392,7 +392,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
st = client.execute(&response);
EXPECT_TRUE(st.ok());
std::cout << "response = " << response << "\n";
- EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
+ EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
}
{
@@ -423,7 +423,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
st = client.execute(&response);
EXPECT_TRUE(st.ok());
std::cout << "response = " << response << "\n";
- EXPECT_TRUE(response.find("To Be Added") != std::string::npos);
+ EXPECT_TRUE(response.find("Server is not ready") != std::string::npos);
}
{
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index 787989be8c9..c7e361cbf53 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -36,6 +36,8 @@ import org.apache.doris.journal.bdbje.BDBDebugger;
import org.apache.doris.journal.bdbje.BDBTool;
import org.apache.doris.journal.bdbje.BDBToolOptions;
import org.apache.doris.persist.meta.MetaReader;
+import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeService;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.service.ExecuteEnv;
@@ -63,7 +65,9 @@ import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class DorisFE {
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
@@ -82,6 +86,12 @@ public class DorisFE {
private static FileChannel processLockFileChannel;
private static FileLock processFileLock;
+ // set to true when all servers are ready.
+ private static final AtomicBoolean serverReady = new AtomicBoolean(false);
+
+ // HTTP server instance, used for graceful shutdown
+ private static HttpServer httpServer;
+
public static void main(String[] args) {
// Every doris version should have a final meta version, it should not
change
// between small releases. Add a check here to avoid mistake.
@@ -144,7 +154,19 @@ public class DorisFE {
}
Log4jConfig.initLogging(dorisHomeDir + "/conf/");
- Runtime.getRuntime().addShutdownHook(new
Thread(LogManager::shutdown));
+ // Add shutdown hook for graceful exit
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ LOG.info("Received shutdown signal, starting graceful
shutdown...");
+ serverReady.set(false);
+ gracefulShutdown();
+
+ // Shutdown HTTP server after main process graceful shutdown
is complete
+ if (httpServer != null) {
+ httpServer.shutdown();
+ }
+
+ LogManager.shutdown();
+ }));
// set dns cache ttl
java.security.Security.setProperty("networkaddress.cache.ttl",
"60");
@@ -202,7 +224,7 @@ public class DorisFE {
feServer.start();
if (options.enableHttpServer) {
- HttpServer httpServer = new HttpServer();
+ httpServer = new HttpServer();
httpServer.setPort(Config.http_port);
httpServer.setHttpsPort(Config.https_port);
httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);
@@ -231,11 +253,14 @@ public class DorisFE {
ThreadPoolManager.registerAllThreadPoolMetric();
startMonitor();
+
+ serverReady.set(true);
+ // JVM will exit when shutdown hook is completed
while (true) {
Thread.sleep(2000);
}
} catch (Throwable e) {
- // Some exception may thrown before LOG is inited.
+ // Some exception may throw before LOG is inited.
// So need to print to stdout
e.printStackTrace();
LOG.error("", e);
@@ -584,4 +609,25 @@ public class DorisFE {
public boolean enableHttpServer = true;
public boolean enableQeService = true;
}
+
+ public static boolean isServerReady() {
+ return serverReady.get();
+ }
+
+ private static void gracefulShutdown() {
+ // wait for all queries to finish
+ try {
+ long now = System.currentTimeMillis();
+ List<Coordinator> allCoordinators =
QeProcessorImpl.INSTANCE.getAllCoordinators();
+ while (!allCoordinators.isEmpty() && System.currentTimeMillis() -
now < 300 * 1000L) {
+ Thread.sleep(1000);
+ allCoordinators =
QeProcessorImpl.INSTANCE.getAllCoordinators();
+ LOG.info("waiting {} queries to finish before shutdown",
allCoordinators.size());
+ }
+ } catch (Throwable t) {
+ LOG.error("", t);
+ }
+
+ LOG.info("graceful shutdown finished");
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 4a15a83b740..e2482ccad1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -102,7 +102,7 @@ public class CloudReplica extends Replica {
public long getColocatedBeId(String clusterId) throws
ComputeGroupException {
CloudSystemInfoService infoService = ((CloudSystemInfoService)
Env.getCurrentSystemInfo());
List<Backend> bes =
infoService.getBackendsByClusterId(clusterId).stream()
- .filter(be ->
!be.isQueryDisabled()).collect(Collectors.toList());
+ .filter(be ->
be.isQueryAvailable()).collect(Collectors.toList());
String clusterName = infoService.getClusterNameByClusterId(clusterId);
if (bes.isEmpty()) {
LOG.warn("failed to get available be, cluster: {}-{}",
clusterName, clusterId);
@@ -420,11 +420,7 @@ public class CloudReplica extends Replica {
List<Backend> availableBes = new ArrayList<>();
List<Backend> decommissionAvailBes = new ArrayList<>();
for (Backend be : clusterBes) {
- long lastUpdateMs = be.getLastUpdateMs();
- long missTimeMs = Math.abs(lastUpdateMs -
System.currentTimeMillis());
- // be core or restart must in heartbeat_interval_second
- if ((be.isAlive() || missTimeMs <=
Config.heartbeat_interval_second * 1000L)
- && !be.isSmoothUpgradeSrc()) {
+ if (be.isQueryAvailable() && !be.isSmoothUpgradeSrc()) {
if (be.isDecommissioned()) {
decommissionAvailBes.add(be);
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 5303d11b0a6..491a8e036af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -36,16 +36,11 @@ public class FeConstants {
public static int checkpoint_interval_second = 60; // 1 minutes
- // dpp version
- public static String dpp_version = "3_2_0";
-
// bloom filter false positive probability
public static double default_bloom_filter_fpp = 0.05;
// set to true to skip some step when running FE unit test
public static boolean runningUnitTest = false;
- // use to set some mocked values for FE unit test
- public static Object unitTestConstant = null;
// set to false to disable internal schema db
public static boolean enableInternalSchemaDb = true;
@@ -68,29 +63,8 @@ public class FeConstants {
// use for copy into test
public static boolean disablePreHeat = false;
- public static final String FS_PREFIX_S3 = "s3";
- public static final String FS_PREFIX_S3A = "s3a";
- public static final String FS_PREFIX_S3N = "s3n";
- public static final String FS_PREFIX_OSS = "oss";
- public static final String FS_PREFIX_GCS = "gs";
- public static final String FS_PREFIX_BOS = "bos";
- public static final String FS_PREFIX_COS = "cos";
- public static final String FS_PREFIX_COSN = "cosn";
- public static final String FS_PREFIX_LAKEFS = "lakefs";
- public static final String FS_PREFIX_OBS = "obs";
- public static final String FS_PREFIX_OFS = "ofs";
- public static final String FS_PREFIX_GFS = "gfs";
- public static final String FS_PREFIX_JFS = "jfs";
- public static final String FS_PREFIX_HDFS = "hdfs";
- public static final String FS_PREFIX_VIEWFS = "viewfs";
- public static final String FS_PREFIX_FILE = "file";
-
public static final String INTERNAL_DB_NAME = "__internal_schema";
public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME =
"cloud_cache_hotspot";
public static String METADATA_FAILURE_RECOVERY_KEY =
"metadata_failure_recovery";
-
- public static String CLOUD_RETRY_E230 = "E-230";
-
- public static String BUILT_IN_STORAGE_VAULT_NAME =
"built_in_storage_vault";
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java
index 1902f471f1b..3c9a96140f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/HttpServer.java
@@ -23,11 +23,14 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.httpv2.config.SpringLog4j2Config;
import org.apache.doris.service.FrontendOptions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import
org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.ServletComponentScan;
import
org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
import java.util.HashMap;
import java.util.Map;
@@ -36,6 +39,8 @@ import java.util.Map;
@EnableConfigurationProperties
@ServletComponentScan
public class HttpServer extends SpringBootServletInitializer {
+ private static final Logger LOG = LogManager.getLogger(HttpServer.class);
+ private ConfigurableApplicationContext applicationContext;
private int port;
private int httpsPort;
private int acceptors;
@@ -176,9 +181,26 @@ public class HttpServer extends
SpringBootServletInitializer {
} else {
properties.put("logging.config", Config.custom_config_dir + "/" +
SpringLog4j2Config.SPRING_LOG_XML_FILE);
}
- new SpringApplicationBuilder()
+ // Disable automatic shutdown hook registration
+ // This prevents Spring Boot from responding to SIGTERM automatically
+ // allowing the main process (DorisFE) to control when the HTTP server
shuts down
+ this.applicationContext = new SpringApplicationBuilder()
.sources(HttpServer.class)
.properties(properties)
+ // Disable the automatic shutdown hook registration, there is
a shutdown hook in DorisFE.
+ .registerShutdownHook(false)
.run();
}
+
+ /**
+ * Explicitly shutdown the HTTP server.
+ * This method should be called by the main process (DorisFE) after its
graceful shutdown is complete.
+ */
+ public void shutdown() {
+ if (applicationContext != null) {
+ LOG.info("Shutting down HTTP server gracefully...");
+ applicationContext.close();
+ LOG.info("HTTP server shutdown complete");
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java
index 30564945505..7cc4b6774f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseEntityBuilder.java
@@ -67,4 +67,9 @@ public class ResponseEntityBuilder {
ResponseBody body = new
ResponseBody().code(RestApiStatusCode.NOT_FOUND).msg("Not Found").data(data);
return ResponseEntity.status(HttpStatus.OK).body(body);
}
+
+ public static ResponseEntity serviceUnavailable(String msg) {
+ ResponseBody body = new
ResponseBody().code(RestApiStatusCode.SERVICE_UNAVAILABLE).msg(msg);
+ return
ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(body);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java
index 39cd23d16af..235eb4456f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/HealthAction.java
@@ -17,8 +17,10 @@
package org.apache.doris.httpv2.rest;
+import org.apache.doris.DorisFE;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import jakarta.servlet.http.HttpServletRequest;
@@ -41,6 +43,10 @@ public class HealthAction extends RestBaseController {
executeCheckPassword(request, response);
}
+ if (!FeConstants.runningUnitTest && !DorisFE.isServerReady()) {
+ return ResponseEntityBuilder.serviceUnavailable("Server is not
ready");
+ }
+
Map<String, Object> result = new HashMap<>();
result.put(TOTAL_BACKEND_NUM,
Env.getCurrentSystemInfo().getAllBackendIds(false).size());
result.put(ONLINE_BACKEND_NUM,
Env.getCurrentSystemInfo().getAllBackendIds(true).size());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java
index 6952f2abb7c..fc803b2d955 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestApiStatusCode.java
@@ -23,7 +23,8 @@ public enum RestApiStatusCode {
UNAUTHORIZED(401),
BAD_REQUEST(403),
NOT_FOUND(404),
- INTERNAL_SERVER_ERROR(500);
+ INTERNAL_SERVER_ERROR(500),
+ SERVICE_UNAVAILABLE(503);
public int code;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 0710b1dea4b..40f65f0b3b7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -64,6 +64,7 @@ import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
@@ -291,7 +292,7 @@ public class MTMVTask extends AbstractTask {
exec(execPartitionNames, tableWithPartKey);
break; // Exit loop if execution is successful
} catch (Exception e) {
- if (!(Config.isCloudMode() &&
e.getMessage().contains(FeConstants.CLOUD_RETRY_E230))) {
+ if (!(Config.isCloudMode() &&
SystemInfoService.needRetryWithReplan(e.getMessage()))) {
throw e; // Re-throw if it's not a retryable exception
}
lastException = e;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 3bed51c1e47..6c0648c6022 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -92,7 +92,7 @@ public class StreamLoadHandler {
public static Backend selectBackend(String clusterName) throws
LoadException {
List<Backend> backends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName)
- .stream().filter(Backend::isAlive)
+ .stream().filter(Backend::isLoadAvailable)
.collect(Collectors.toList());
if (backends.isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index f1afbd135ec..9f490c090e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
@@ -37,6 +36,7 @@ import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStatusCode;
@@ -246,8 +246,8 @@ public abstract class AbstractInsertExecutor {
}
} catch (Throwable t) {
onFail(t);
- // retry insert into from select when meet E-230 in cloud
- if (Config.isCloudMode() &&
t.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
+ // retry insert into from select when meet "need re-plan error" or
no scan node in cloud
+ if (Config.isCloudMode() &&
SystemInfoService.needRetryWithReplan(t.getMessage())) {
throw t;
}
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index b078afd5abd..453e102aa72 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -26,7 +26,6 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
@@ -51,6 +50,7 @@ import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.transaction.BeginTransactionException;
@@ -286,8 +286,8 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
labelName, queryId, txnId, abortTxnException);
}
}
- // retry insert into from select when meet E-230 in cloud
- if (Config.isCloudMode() &&
t.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
+ // retry insert into from select when meet "need re-plan error" in
cloud
+ if (Config.isCloudMode() &&
SystemInfoService.needRetryWithReplan(t.getMessage())) {
return;
}
String firstErrorMsgPart = "";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 2bf55f150b6..0c2478a5ae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -500,7 +500,10 @@ public class StmtExecutor {
execute(queryId);
return;
} catch (UserException e) {
- if (!e.getMessage().contains(FeConstants.CLOUD_RETRY_E230) ||
i == retryTime) {
+ if (!SystemInfoService.needRetryWithReplan(e.getMessage()) ||
i == retryTime) {
+ // We have retried internally(in handleQueryWithRetry())
for other kinds of exceptions.
+ // And for error in SystemInfoService.NEED_REPLAN_ERRORS,
they are not handled internally but here
+ // so we just handle these errors, and throw exception for
other errors.
throw e;
}
if (this.coord != null && this.coord.isQueryCancelled()) {
@@ -516,8 +519,8 @@ public class StmtExecutor {
if (DebugPointUtil.isEnable("StmtExecutor.retry.longtime")) {
randomMillis = 1000;
}
- LOG.warn("receive E-230 tried={} first queryId={} last
queryId={} new queryId={} sleep={}ms",
- i, DebugUtil.printId(firstQueryId),
DebugUtil.printId(lastQueryId),
+ LOG.warn("receive '{}' tried={} first queryId={} last
queryId={} new queryId={} sleep={}ms",
+ e.getMessage(), i, DebugUtil.printId(firstQueryId),
DebugUtil.printId(lastQueryId),
DebugUtil.printId(queryId), randomMillis);
Thread.sleep(randomMillis);
context.getState().reset();
@@ -685,7 +688,9 @@ public class StmtExecutor {
if (LOG.isDebugEnabled()) {
LOG.debug("Command({}) process failed.",
originStmt.originStmt, e);
}
- if (Config.isCloudMode() &&
e.getDetailMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
+ if (Config.isCloudMode() &&
SystemInfoService.needRetryWithReplan(e.getDetailMessage())) {
+ // For errors in SystemInfoService.NEED_REPLAN_ERRORS,
+ // throw exception directly to trigger a replan retry
outside(in StmtExecutor.queryRetry())
throw e;
}
context.getState().setError(e.getMysqlErrorCode(),
e.getMessage());
@@ -907,14 +912,16 @@ public class StmtExecutor {
LOG.info("Query {} finished",
DebugUtil.printId(context.queryId));
break;
} catch (RpcException | UserException e) {
- if (Config.isCloudMode() &&
e.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
+ if (Config.isCloudMode() &&
SystemInfoService.needRetryWithReplan(e.getMessage())) {
+ // For errors in SystemInfoService.NEED_REPLAN_ERRORS,
+ // throw exception directly to trigger a replan retry
outside(in StmtExecutor.queryRetry())
throw e;
}
// If the previous try is timeout or cancelled, then do not
need try again.
if (this.coord != null && (this.coord.isQueryCancelled() ||
this.coord.isTimeout())) {
throw e;
}
- LOG.warn("due to exception {} retry {} rpc {} user {}",
+ LOG.warn("retry due to exception {}. retried {} times. is rpc
error: {}, is user error: {}.",
e.getMessage(), i, e instanceof RpcException, e
instanceof UserException);
boolean isNeedRetry = false;
@@ -1406,6 +1413,8 @@ public class StmtExecutor {
DebugUtil.printId(context.queryId()), e.getMessage());
LOG.warn(internalErrorSt.getErrorMsg());
coordBase.cancel(internalErrorSt);
+ // set to null so that the retry logic will generate a new
coordinator
+ this.coord = null;
throw e;
} finally {
coordBase.close();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 0f3ac2e2226..3f54db60a86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
@@ -37,6 +38,7 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -49,6 +51,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -153,10 +156,6 @@ public class Backend implements Writable {
// No need to persist, because only master FE handle heartbeat.
private int heartbeatFailureCounter = 0;
- // Not need serialize this field. If fe restart the state is reset to
false. Maybe fe will
- // send some queries to this BE, it is not an important problem.
- private AtomicBoolean isShutDown = new AtomicBoolean(false);
-
private long nextForceEditlogHeartbeatTime = System.currentTimeMillis() +
(new SecureRandom()).nextInt(60 * 1000);
public Backend() {
@@ -293,20 +292,38 @@ public class Backend implements Writable {
this.backendStatus.lastStreamLoadTime = lastStreamLoadTime;
}
+ // ATTN: This method only return the value of "isQueryDisabled",
+ // it does not determine the backend IS queryable or not, use
isQueryAvailable instead.
public boolean isQueryDisabled() {
return backendStatus.isQueryDisabled;
}
- public void setQueryDisabled(boolean isQueryDisabled) {
- this.backendStatus.isQueryDisabled = isQueryDisabled;
+ // return true if be status is changed
+ public boolean setQueryDisabled(boolean isQueryDisabled) {
+ if (this.backendStatus.isQueryDisabled != isQueryDisabled) {
+ this.backendStatus.isQueryDisabled = isQueryDisabled;
+ return true;
+ }
+ return false;
}
+ // ATTN: This method only return the value of "isLoadDisabled",
+ // it does not determine the backend IS loadable or not, use
isLoadAvailable instead.
public boolean isLoadDisabled() {
return backendStatus.isLoadDisabled;
}
- public void setLoadDisabled(boolean isLoadDisabled) {
- this.backendStatus.isLoadDisabled = isLoadDisabled;
+ // return true if be status is changed
+ public boolean setLoadDisabled(boolean isLoadDisabled) {
+ if (this.backendStatus.isLoadDisabled != isLoadDisabled) {
+ this.backendStatus.isLoadDisabled = isLoadDisabled;
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isShutDown() {
+ return backendStatus.isShutdown;
}
public void setActive(boolean isActive) {
@@ -524,15 +541,21 @@ public class Backend implements Writable {
}
public boolean isQueryAvailable() {
- return isAlive() && !isQueryDisabled() && !isShutDown.get();
+ String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault(
+ "Backend.isQueryAvailable", "unavailableBeIds", "");
+ if (!Strings.isNullOrEmpty(debugDeadBeIds)
+ && Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id ->
Long.parseLong(id) == this.id)) {
+ return false;
+ }
+ return isAlive() && !isQueryDisabled() && !isShutDown();
}
public boolean isScheduleAvailable() {
- return isAlive() && !isDecommissioned();
+ return isAlive() && !isDecommissioned() && !isShutDown();
}
public boolean isLoadAvailable() {
- return isAlive() && !isLoadDisabled();
+ return isAlive() && !isLoadDisabled() && !isShutDown();
}
public void setDisks(ImmutableMap<String, DiskInfo> disks) {
@@ -872,10 +895,10 @@ public class Backend implements Writable {
this.arrowFlightSqlPort = hbResponse.getArrowFlightSqlPort();
}
- if (this.isShutDown.get() != hbResponse.isShutDown()) {
+ if (this.backendStatus.isShutdown != hbResponse.isShutDown()) {
isChanged = true;
LOG.info("{} shutdown state is changed", this.toString());
- this.isShutDown.set(hbResponse.isShutDown());
+ this.backendStatus.isShutdown = hbResponse.isShutDown();
}
if (!this.getNodeRoleTag().value.equals(hbResponse.getNodeRole())
&& Tag.validNodeRoleTag(
@@ -990,6 +1013,8 @@ public class Backend implements Writable {
public volatile boolean isLoadDisabled = false;
@SerializedName("isActive")
public volatile boolean isActive = true;
+ @SerializedName("isShutdown")
+ public volatile boolean isShutdown = false;
// cloud mode, cloud control just query master, so not need
SerializedName
public volatile long currentFragmentNum = 0;
@@ -1101,3 +1126,4 @@ public class Backend implements Writable {
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 1e41eb8d501..b9c63f813b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -43,6 +43,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -79,6 +80,13 @@ public class SystemInfoService {
public static final String NOT_USING_VALID_CLUSTER_MSG =
"Not using valid cloud clusters, please use a cluster before
issuing any queries";
+ public static final String ERROR_E230 = "E-230";
+
+ public static final ImmutableSet<String> NEED_REPLAN_ERRORS =
ImmutableSet.of(
+ NO_SCAN_NODE_BACKEND_AVAILABLE_MSG,
+ ERROR_E230
+ );
+
protected volatile ImmutableMap<Long, Backend> idToBackendRef =
ImmutableMap.of();
protected volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef =
ImmutableMap.of();
@@ -998,17 +1006,11 @@ public class SystemInfoService {
}
if (alterClause.isQueryDisabled() != null) {
- if
(!alterClause.isQueryDisabled().equals(be.isQueryDisabled())) {
- be.setQueryDisabled(alterClause.isQueryDisabled());
- shouldModify = true;
- }
+ shouldModify =
be.setQueryDisabled(alterClause.isQueryDisabled());
}
if (alterClause.isLoadDisabled() != null) {
- if (!alterClause.isLoadDisabled().equals(be.isLoadDisabled()))
{
- be.setLoadDisabled(alterClause.isLoadDisabled());
- shouldModify = true;
- }
+ shouldModify =
be.setLoadDisabled(alterClause.isLoadDisabled());
}
if (shouldModify) {
@@ -1052,17 +1054,11 @@ public class SystemInfoService {
}
if (op.isQueryDisabled() != null) {
- if (!op.isQueryDisabled().equals(be.isQueryDisabled())) {
- be.setQueryDisabled(op.isQueryDisabled());
- shouldModify = true;
- }
+ shouldModify = be.setQueryDisabled(op.isQueryDisabled());
}
if (op.isLoadDisabled() != null) {
- if (!op.isLoadDisabled().equals(be.isLoadDisabled())) {
- be.setLoadDisabled(op.isLoadDisabled());
- shouldModify = true;
- }
+ shouldModify = be.setLoadDisabled(op.isLoadDisabled());
}
if (shouldModify) {
@@ -1156,4 +1152,16 @@ public class SystemInfoService {
return Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId);
}
+ // If the error msg contains certain keywords, we need to retry the query
with re-plan.
+ public static boolean needRetryWithReplan(String errorMsg) {
+ if (Strings.isNullOrEmpty(errorMsg)) {
+ return false;
+ }
+ for (String keyword : NEED_REPLAN_ERRORS) {
+ if (errorMsg.contains(keyword)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
index 62c7d140bf1..800af43aaa1 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
@@ -212,7 +212,8 @@ public class DemoMultiBackendsTest {
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size()
- 10));
Assert.assertEquals(
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,"
- +
"\"isLoadDisabled\":false,\"isActive\":true,\"currentFragmentNum\":0,\"lastFragmentUpdateTime\":0}",
+ +
"\"isLoadDisabled\":false,\"isActive\":true,\"isShutdown\":false,\"currentFragmentNum\":0,"
+ + "\"lastFragmentUpdateTime\":0}",
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size()
- 7));
Assert.assertEquals("0",
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 6));
Assert.assertEquals(Tag.VALUE_MIX,
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 5));
diff --git a/regression-test/framework/pom.xml
b/regression-test/framework/pom.xml
index 93fabc5f739..c946bb9bcf9 100644
--- a/regression-test/framework/pom.xml
+++ b/regression-test/framework/pom.xml
@@ -321,11 +321,11 @@ under the License.
<artifactId>aliyun-java-sdk-ram</artifactId>
<version>3.3.1</version>
</dependency>
- <dependency>
+ <!--dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
- </dependency>
+ </dependency-->
<!---->
<!-- txcloud ram -->
<dependency>
diff --git
a/regression-test/suites/cloud_p0/query_retry/test_retry_no_scan_node.groovy
b/regression-test/suites/cloud_p0/query_retry/test_retry_no_scan_node.groovy
new file mode 100644
index 00000000000..ce5417924e5
--- /dev/null
+++ b/regression-test/suites/cloud_p0/query_retry/test_retry_no_scan_node.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import org.apache.doris.regression.suite.SuiteCluster
+
+suite("test_retry_no_scan_node", "p0, docker") {
+ if (!isCloudMode()) {
+ return
+ }
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.setFeNum(1)
+ options.feConfigs.add('max_query_retry_time=100')
+ options.feConfigs.add('sys_log_verbose_modules=org')
+ options.setBeNum(2)
+ options.cloudMode = true
+ // 1. connect to master
+ options.connectToFollower = false
+
+ def queryTask = {
+ for (int i = 0; i < 100; i++) {
+ try {
+ log.info("query count: {}", i)
+ sql """select * from test_no_scan_node_table"""
+ Thread.sleep(100)
+ } catch (Exception e) {
+ logger.warn("select failed: ${e.message}")
+ assertFalse(true);
+ }
+ }
+ }
+
+ docker(options) {
+ def be1 = cluster.getBeByIndex(1)
+ def beId = be1.backendId;
+
+ try {
+ sql """
+ CREATE TABLE test_no_scan_node_table
+ ( k1 TINYINT, k2 INT not null )
+ DISTRIBUTED BY HASH(k2) BUCKETS 2 PROPERTIES (
"replication_num" = "1" );
+ """
+ sql """
+ INSERT INTO test_no_scan_node_table VALUES (1, 1), (2, 2), (3,
3);
+ """
+
+ def result = sql """select * from test_no_scan_node_table order by
k2;"""
+ log.info("insert result : {}", result)
+ assertEquals([[1, 1], [2, 2], [3, 3]], result)
+
+ // this should be run at least 10 seconds
+ def queryThread = Thread.start(queryTask)
+
+ // inject query not available error
+ cluster.injectDebugPoints(NodeType.FE, ['Backend.isQueryAvailable'
: [unavailableBeIds:beId]])
+ // query should have no failure
+ // wait query thread finish
+ queryThread.join(15000);
+ } finally {
+ cluster.clearFrontendDebugPoints()
+ cluster.clearBackendDebugPoints()
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]