This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1d1a9e2bfc [improvement](graceful shutdown) waiting for all query
finished when graceful shutdown (#23865)
1d1a9e2bfc is described below
commit 1d1a9e2bfcc64c162b788e02c12592dc80362f1c
Author: yiguolei <[email protected]>
AuthorDate: Tue Sep 5 09:52:28 2023 +0800
[improvement](graceful shutdown) waiting for all query finished when
graceful shutdown (#23865)
In some cloud native deployment scenario, BE(especially the Compute Node
BE) will be add to cluster and remove from cluster very frequently. User's
query will fail if there is a fragment is running on the shutting down BE.
Users could use stop_be.sh --grace, then BE will wait all running queries to
stop to avoiding running query failure, but if the waiting time exceed the
limit, then be will exit directly. During this period, FE will not send any
queries to BE and waiting for all runn [...]
---
be/src/agent/heartbeat_server.cpp | 2 ++
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 5 +++++
be/src/runtime/exec_env.cpp | 20 ++++++++++++++++++++
be/src/runtime/exec_env.h | 2 ++
be/src/runtime/fragment_mgr.h | 5 +++++
be/src/service/doris_main.cpp | 3 +++
docs/en/docs/admin-manual/config/be-config.md | 7 ++++++-
docs/zh-CN/docs/admin-manual/config/be-config.md | 7 ++++++-
.../main/java/org/apache/doris/system/Backend.java | 12 +++++++++++-
.../org/apache/doris/system/BackendHbResponse.java | 9 ++++++++-
.../java/org/apache/doris/system/HeartbeatMgr.java | 6 +++++-
.../apache/doris/system/SystemInfoServiceTest.java | 2 +-
gensrc/thrift/HeartbeatService.thrift | 1 +
14 files changed, 77 insertions(+), 6 deletions(-)
diff --git a/be/src/agent/heartbeat_server.cpp
b/be/src/agent/heartbeat_server.cpp
index 981756efa6..326758ffeb 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -79,6 +79,8 @@ void HeartbeatServer::heartbeat(THeartbeatResult&
heartbeat_result,
heartbeat_result.backend_info.__set_version(get_short_version());
heartbeat_result.backend_info.__set_be_start_time(_be_epoch);
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
+ // If be is gracefully stop, then k_doris_exist is set to true
+ heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit);
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 741f4a0856..41f608056d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1086,6 +1086,8 @@ DEFINE_Int32(partition_topn_partition_threshold, "1024");
DEFINE_Int32(fe_expire_duration_seconds, "60");
+DEFINE_Int32(grace_shutdown_wait_seconds, "120");
+
#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2007bf3e4f..7bff893c78 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1149,6 +1149,11 @@ DECLARE_Int32(partition_topn_partition_threshold);
// as an abnormal fe, this will cause be to cancel this fe's related query.
DECLARE_Int32(fe_expire_duration_seconds);
+// If use stop_be.sh --grace, then BE has to wait all running queries to stop
to avoiding running query failure
+// , but if the waiting time exceed the limit, then be will exit directly.
+// During this period, FE will not send any queries to BE and waiting for all
running queries to stop.
+DECLARE_Int32(grace_shutdown_wait_seconds);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index eb0337d331..c30fb20db8 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -23,6 +23,7 @@
#include <utility>
#include "common/config.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/frontend_info.h"
#include "time.h"
#include "util/debug_util.h"
@@ -135,4 +136,23 @@ std::map<TNetworkAddress, FrontendInfo>
ExecEnv::get_running_frontends() {
return res;
}
+void ExecEnv::wait_for_all_tasks_done() {
+ // For graceful shutdown, need to wait for all running queries to stop
+ int32_t wait_seconds_passed = 0;
+ while (true) {
+ int num_queries = _fragment_mgr->running_query_num();
+ if (num_queries < 1) {
+ break;
+ }
+ if (wait_seconds_passed > doris::config::grace_shutdown_wait_seconds) {
+ LOG(INFO) << "There are still " << num_queries << " queries
running, but "
+ << wait_seconds_passed << " seconds passed, has to exist
now";
+ break;
+ }
+ LOG(INFO) << "There are still " << num_queries << " queries running,
waiting...";
+ sleep(1);
+ ++wait_seconds_passed;
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 30c6839166..212456b69b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -199,6 +199,8 @@ public:
this->_stream_load_executor = stream_load_executor;
}
+ void wait_for_all_tasks_done();
+
void update_frontends(const std::vector<TFrontendInfo>& new_infos);
std::map<TNetworkAddress, FrontendInfo> get_frontends();
std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 5691e08e31..e31a5925da 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -144,6 +144,11 @@ public:
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
+ int32_t running_query_num() {
+ std::unique_lock<std::mutex> ctx_lock(_lock);
+ return _query_ctx_map.size();
+ }
+
private:
void cancel_unlocked_impl(const TUniqueId& id, const
PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock,
bool is_pipeline,
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 8d24f39936..36f9cb02dd 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -612,6 +612,9 @@ int main(int argc, char** argv) {
sleep(3);
}
+ // For graceful shutdown, need to wait for all running queries to stop
+ exec_env->wait_for_all_tasks_done();
+
return 0;
}
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index a22186cbf0..08c1206746 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -1488,4 +1488,9 @@ Indicates how many tablets failed to load in the data
directory. At the same tim
#### `brpc_streaming_client_batch_bytes`
* Description: The batch size for sending data by brpc streaming client
-* Default value: 262144
\ No newline at end of file
+* Default value: 262144
+
+#### `grace_shutdown_wait_seconds`
+
+* Description: In cloud native deployment scenario, BE will be add to cluster
and remove from cluster very frequently. User's query will fail if there is a
fragment is running on the shuting down BE. Users could use stop_be.sh --grace,
then BE will wait all running queries to stop to avoiding running query
failure, but if the waiting time exceed the limit, then be will exit directly.
During this period, FE will not send any queries to BE and waiting for all
running queries to stop.
+* Default value: 120
\ No newline at end of file
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 14eefd09d6..7501a80019 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -1517,4 +1517,9 @@ load tablets from header failed, failed tablets size:
xxx, path=xxx
#### `brpc_streaming_client_batch_bytes`
* 描述: brpc streaming 客户端发送数据时的攒批大小(字节)
-* 默认值: 262144
\ No newline at end of file
+* 默认值: 262144
+
+#### `grace_shutdown_wait_seconds`
+
+* 描述: 在云原生的部署模式下,为了节省资源一个BE 可能会被频繁的加入集群或者从集群中移除。 如果在这个BE
上有正在运行的Query,那么这个Query 会失败。 用户可以使用 stop_be.sh --grace 的方式来关闭一个BE 节点,此时BE
会等待当前正在这个BE 上运行的所有查询都结束才会退出。 同时,在这个时间范围内FE 也不会分发新的query 到这个机器上。
如果超过grace_shutdown_wait_seconds这个阈值,那么BE 也会直接退出,防止一些查询长期不退出导致节点没法快速下掉的情况。
+* 默认值: 120
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 99cd99dca0..a4bf77d3b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -134,6 +134,10 @@ public class Backend implements Writable {
// No need to persist, because only master FE handle heartbeat.
private int heartbeatFailureCounter = 0;
+ // Not need serialize this field. If fe restart the state is reset to
false. Maybe fe will
+ // send some queries to this BE, it is not an important problem.
+ private AtomicBoolean isShutDown = new AtomicBoolean(false);
+
public Backend() {
this.host = "";
this.version = "";
@@ -330,7 +334,7 @@ public class Backend implements Writable {
}
public boolean isQueryAvailable() {
- return isAlive() && !isQueryDisabled();
+ return isAlive() && !isQueryDisabled() && !isShutDown.get();
}
public boolean isScheduleAvailable() {
@@ -659,6 +663,12 @@ public class Backend implements Writable {
this.brpcPort = hbResponse.getBrpcPort();
}
+ if (this.isShutDown.get() != hbResponse.isShutDown()) {
+ isChanged = true;
+ LOG.info("{} shutdown state is changed", this.toString());
+ this.isShutDown.set(hbResponse.isShutDown());
+ }
+
if (!this.getNodeRoleTag().value.equals(hbResponse.getNodeRole())
&& Tag.validNodeRoleTag(
hbResponse.getNodeRole())) {
isChanged = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
index 0e09707a4a..a0baf60e9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
@@ -42,13 +42,15 @@ public class BackendHbResponse extends HeartbeatResponse
implements Writable {
private long beStartTime;
private String host;
private String version = "";
+ @SerializedName(value = "isShutDown")
+ private boolean isShutDown = false;
public BackendHbResponse() {
super(HeartbeatResponse.Type.BACKEND);
}
public BackendHbResponse(long beId, int bePort, int httpPort, int
brpcPort, long hbTime, long beStartTime,
- String version, String nodeRole) {
+ String version, String nodeRole, boolean isShutDown) {
super(HeartbeatResponse.Type.BACKEND);
this.beId = beId;
this.status = HbStatus.OK;
@@ -59,6 +61,7 @@ public class BackendHbResponse extends HeartbeatResponse
implements Writable {
this.beStartTime = beStartTime;
this.version = version;
this.nodeRole = nodeRole;
+ this.isShutDown = isShutDown;
}
public BackendHbResponse(long beId, String errMsg) {
@@ -104,6 +107,10 @@ public class BackendHbResponse extends HeartbeatResponse
implements Writable {
return nodeRole;
}
+ public boolean isShutDown() {
+ return isShutDown;
+ }
+
@Override
protected void readFields(DataInput in) throws IOException {
super.readFields(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index cc2730328a..7dc1275afe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -262,8 +262,12 @@ public class HeartbeatMgr extends MasterDaemon {
if (tBackendInfo.isSetBeNodeRole()) {
nodeRole = tBackendInfo.getBeNodeRole();
}
+ boolean isShutDown = false;
+ if (tBackendInfo.isSetIsShutdown()) {
+ isShutDown = tBackendInfo.isIsShutdown();
+ }
return new BackendHbResponse(backendId, bePort, httpPort,
brpcPort,
- System.currentTimeMillis(), beStartTime, version,
nodeRole);
+ System.currentTimeMillis(), beStartTime, version,
nodeRole, isShutDown);
} else {
return new BackendHbResponse(backendId, backend.getHost(),
result.getStatus().getErrorMsgs().isEmpty()
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index d207e0ce2a..3c50bd47c8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -97,7 +97,7 @@ public class SystemInfoServiceTest {
System.out.println(Env.getCurrentEnvJournalVersion());
BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234,
1234, 1234, 1234, 1234, "test",
- Tag.VALUE_COMPUTATION);
+ Tag.VALUE_COMPUTATION, false);
// Write objects to file
File file1 = new File("./BackendHbResponseSerialization");
diff --git a/gensrc/thrift/HeartbeatService.thrift
b/gensrc/thrift/HeartbeatService.thrift
index b9b18a2b85..0d6badac3b 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -49,6 +49,7 @@ struct TBackendInfo {
5: optional string version
6: optional i64 be_start_time
7: optional string be_node_role
+ 8: optional bool is_shutdown
}
struct THeartbeatResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]