This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f00750f7fe8 [feature](merge-cloud) Support auto suspend and auto start
cluster in cloud (#32764)
f00750f7fe8 is described below
commit f00750f7fe89b374c4d299ee69950fa7ac1a0ea1
Author: deardeng <[email protected]>
AuthorDate: Mon Mar 25 23:16:37 2024 +0800
[feature](merge-cloud) Support auto suspend and auto start cluster in cloud
(#32764)
---
be/src/agent/heartbeat_server.cpp | 5 +
be/src/agent/task_worker_pool.cpp | 23 ++++-
be/src/runtime/fragment_mgr.cpp | 41 ++++++++
be/src/runtime/fragment_mgr.h | 5 +
.../org/apache/doris/cloud/catalog/CloudEnv.java | 1 -
.../doris/cloud/system/CloudSystemInfoService.java | 109 ++++++++++++++++++++-
.../java/org/apache/doris/qe/ConnectContext.java | 79 +++++++++++++++
7 files changed, 258 insertions(+), 5 deletions(-)
diff --git a/be/src/agent/heartbeat_server.cpp
b/be/src/agent/heartbeat_server.cpp
index dfeb05a932d..e6d893a4a2d 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -30,6 +30,7 @@
#include "common/status.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/heartbeat_flags.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
@@ -83,6 +84,10 @@ void HeartbeatServer::heartbeat(THeartbeatResult&
heartbeat_result,
heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
// If be is gracefully stop, then k_doris_exist is set to true
heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit);
+ heartbeat_result.backend_info.__set_fragment_executing_count(
+ get_fragment_executing_count());
+ heartbeat_result.backend_info.__set_fragment_last_active_time(
+ get_fragment_last_active_time());
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index be4d3b137e6..ca6a9817737 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -76,6 +76,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
@@ -446,7 +447,6 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(PUBLISH_VERSION)
ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
- ADD_TASK_COUNT(ALTER)
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
@@ -459,6 +459,17 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
DELETE_count << n;
}
return;
+ case TTaskType::ALTER:
+ {
+ ALTER_count << n;
+ // cloud auto stop need sc jobs, a tablet's sc can also be considered
a fragment
+ doris::g_fragment_executing_count << 1;
+ int64 now = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_fragment_last_active_time.set_value(now);
+ return;
+ }
default:
return;
}
@@ -1851,6 +1862,11 @@ void alter_tablet_callback(StorageEngine& engine, const
TAgentTaskRequest& req)
alter_tablet(engine, req, signature, task_type, &finish_task_request);
finish_task(finish_task_request);
}
+ doris::g_fragment_executing_count << -1;
+ int64 now = duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_fragment_last_active_time.set_value(now);
remove_task_info(req.task_type, req.signature);
}
@@ -1872,6 +1888,11 @@ void alter_cloud_tablet_callback(CloudStorageEngine&
engine, const TAgentTaskReq
alter_cloud_tablet(engine, req, signature, task_type,
&finish_task_request);
finish_task(finish_task_request);
}
+ doris::g_fragment_executing_count << -1;
+ int64 now = duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_fragment_last_active_time.set_value(now);
remove_task_info(req.task_type, req.signature);
}
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d858737d780..190c0f5b0e1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -99,6 +99,19 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr",
"prepare");
bvar::Adder<int64_t>
g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");
+bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
+bvar::Status<uint64_t> g_fragment_last_active_time(
+ "fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count());
+
+uint64_t get_fragment_executing_count() {
+ return g_fragment_executing_count.get_value();
+}
+uint64_t get_fragment_last_active_time() {
+ return g_fragment_last_active_time.get_value();
+}
+
std::string to_load_error_http_path(const std::string& file_name) {
if (file_name.empty()) {
return "";
@@ -470,9 +483,15 @@ void
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
// remove exec state after this fragment finished
{
+ int64 now = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
+ g_fragment_executing_count << -1;
+ g_fragment_last_active_time.set_value(now);
+
LOG_INFO("Instance {} finished",
print_id(fragment_executor->fragment_instance_id()));
if (all_done && query_ctx) {
@@ -584,6 +603,11 @@ void FragmentMgr::remove_pipeline_context(
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
+ int64 now = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_fragment_executing_count << -1;
+ g_fragment_last_active_time.set_value(now);
for (const auto& ins_id : ins_ids) {
LOG_INFO("Removing query {} instance {}, all done? {}",
print_id(query_id),
print_id(ins_id), all_done);
@@ -733,7 +757,12 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
static_cast<void>(_runtimefilter_controller.add_entity(
params.params, params.params.query_id, params.query_options,
&handler,
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
+ int64 now = duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
{
+ g_fragment_executing_count << 1;
+ g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
if (handler) {
query_ctx->set_merge_controller_handler(handler);
@@ -753,6 +782,8 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.erase(params.params.fragment_instance_id);
+ g_fragment_executing_count << -1;
+ g_fragment_last_active_time.set_value(now);
}
fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
@@ -844,7 +875,12 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
query_ctx->set_ready_to_execute_only();
}
}
+ int64 now = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
{
+ g_fragment_executing_count << 1;
+ g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
std::vector<TUniqueId> ins_ids;
reinterpret_cast<pipeline::PipelineXFragmentContext*>(context.get())
@@ -905,7 +941,12 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
if (i == 0 && handler) {
query_ctx->set_merge_controller_handler(handler);
}
+ int64 now = duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
{
+ g_fragment_executing_count << 1;
+ g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
_pipeline_map.insert(std::make_pair(fragment_instance_id,
context));
}
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 3435d1f4f64..5aef45954d3 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -44,6 +44,8 @@ class IOBufAsZeroCopyInputStream;
}
namespace doris {
+extern bvar::Adder<uint64_t> g_fragment_executing_count;
+extern bvar::Status<uint64_t> g_fragment_last_active_time;
namespace pipeline {
class PipelineFragmentContext;
@@ -202,4 +204,7 @@ private:
nullptr; // used for pipeliine context report
};
+uint64_t get_fragment_executing_count();
+uint64_t get_fragment_last_active_time();
+
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 28e58f3cf4b..613fef3be68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -392,7 +392,6 @@ public class CloudEnv extends Env {
public void changeCloudCluster(String clusterName, ConnectContext ctx)
throws DdlException {
checkCloudClusterPriv(clusterName);
- // TODO(merge-cloud): pick cloud auto start
CloudSystemInfoService.waitForAutoStart(clusterName);
try {
((CloudSystemInfoService)
Env.getCurrentSystemInfo()).addCloudCluster(clusterName, "");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 8d852c11097..5eb74590d56 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -38,9 +38,11 @@ import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -578,9 +580,110 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
}
- public static void waitForAutoStart(final String clusterName) throws
DdlException {
- // TODO(merge-cloud): merge from cloud.
- // throw new DdlException("Env.waitForAutoStart unimplemented");
+ public static String getClusterNameAutoStart(final String clusterName) {
+ if (!Strings.isNullOrEmpty(clusterName)) {
+ return clusterName;
+ }
+
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ LOG.warn("auto start cant get context so new it");
+ context = new ConnectContext();
+ }
+ ConnectContext.CloudClusterResult cloudClusterTypeAndName =
context.getCloudClusterByPolicy();
+ if (cloudClusterTypeAndName == null) {
+ LOG.warn("get cluster from ctx err");
+ return null;
+ }
+ if (cloudClusterTypeAndName.comment
+ ==
ConnectContext.CloudClusterResult.Comment.DEFAULT_CLUSTER_SET_BUT_NOT_EXIST) {
+ LOG.warn("get default cluster from ctx err");
+ return null;
+ }
+
+
Preconditions.checkState(!Strings.isNullOrEmpty(cloudClusterTypeAndName.clusterName),
+ "get cluster name empty");
+ LOG.info("get cluster to resume {}", cloudClusterTypeAndName);
+ return cloudClusterTypeAndName.clusterName;
+ }
+
+ public static void waitForAutoStart(String clusterName) throws
DdlException {
+ if (Config.isNotCloudMode()) {
+ return;
+ }
+ clusterName = getClusterNameAutoStart(clusterName);
+ if (Strings.isNullOrEmpty(clusterName)) {
+ LOG.warn("auto start in cloud mode, but clusterName empty {}",
clusterName);
+ return;
+ }
+ String clusterStatus = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+ if (Strings.isNullOrEmpty(clusterStatus)) {
+ // for cluster rename or cluster dropped
+ LOG.warn("cant find clusterStatus in fe, clusterName {}",
clusterName);
+ return;
+ }
+ // nofity ms -> wait for clusterStatus to normal
+ LOG.debug("auto start wait cluster {} status {}-{}", clusterName,
clusterStatus,
+ Cloud.ClusterStatus.valueOf(clusterStatus));
+ if (Cloud.ClusterStatus.valueOf(clusterStatus) !=
Cloud.ClusterStatus.NORMAL) {
+ Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+
builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS);
+
+ Cloud.ClusterPB.Builder clusterBuilder =
Cloud.ClusterPB.newBuilder();
+ clusterBuilder.setClusterId(((CloudSystemInfoService)
+
Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName));
+ clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME);
+ builder.setCluster(clusterBuilder);
+
+ Cloud.AlterClusterResponse response;
+ try {
+ response =
MetaServiceProxy.getInstance().alterCluster(builder.build());
+ if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.OK) {
+ LOG.warn("notify to resume cluster not ok, cluster {},
response: {}", clusterName, response);
+ }
+ LOG.info("notify to resume cluster {}, response: {} ",
clusterName, response);
+ } catch (RpcException e) {
+ LOG.warn("failed to notify to resume cluster {}", clusterName,
e);
+ throw new DdlException("notify to resume cluster not ok");
+ }
+ }
+ // wait 15 mins?
+ int retryTimes = 15 * 60;
+ int retryTime = 0;
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ boolean hasAutoStart = false;
+ while
(!String.valueOf(Cloud.ClusterStatus.NORMAL).equals(clusterStatus)
+ && retryTime < retryTimes) {
+ hasAutoStart = true;
+ ++retryTime;
+ // sleep random millis [0.5, 1] s
+ int randomSeconds = 500 + (int) (Math.random() * (1000 - 500));
+ LOG.info("resume cluster {} retry times {}, wait randomMillis: {},
current status: {}",
+ clusterName, retryTime, randomSeconds, clusterStatus);
+ try {
+ if (retryTime > retryTimes / 2) {
+ // sleep random millis [1, 1.5] s
+ randomSeconds = 1000 + (int) (Math.random() * (1000 -
500));
+ }
+ Thread.sleep(randomSeconds);
+ } catch (InterruptedException e) {
+ LOG.info("change cluster sleep wait InterruptedException: ",
e);
+ }
+ clusterStatus = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+ }
+ if (retryTime >= retryTimes) {
+ // auto start timeout
+ stopWatch.stop();
+ LOG.warn("auto start cluster {} timeout, wait {} ms", clusterName,
stopWatch.getTime());
+ throw new DdlException("auto start cluster timeout");
+ }
+
+ stopWatch.stop();
+ if (hasAutoStart) {
+ LOG.info("auto start cluster {}, start cost {} ms", clusterName,
stopWatch.getTime());
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index a20e40cdc67..2023395aa55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -1070,6 +1070,85 @@ public class ConnectContext {
return getCloudCluster(true);
}
+ public static class CloudClusterResult {
+ public enum Comment {
+ FOUND_BY_DEFAULT_CLUSTER,
+ DEFAULT_CLUSTER_SET_BUT_NOT_EXIST,
+ FOUND_BY_FIRST_CLUSTER_WITH_ALIVE_BE,
+ FOUND_BY_FRIST_CLUSTER_HAS_AUTH,
+ }
+
+ public String clusterName;
+ public Comment comment;
+
+ public CloudClusterResult(final String name, Comment c) {
+ this.clusterName = name;
+ this.comment = c;
+ }
+
+ @Override
+ public String toString() {
+ return "CloudClusterResult{"
+ + "clusterName='" + clusterName + '\''
+ + ", comment=" + comment
+ + '}';
+ }
+ }
+
+
+ // can't get cluster from context, use the following strategy to obtain
the cluster name
+ // 当用户有多个集群的权限时,会按照如下策略进行拉取:
+ // 如果当前mysql用户没有指定cluster(没有default 或者 use), 选择有权限的cluster。
+ // 如果有多个cluster满足权限条件,优先选活的,按字母序选
+ // 如果没有活的,则拉起一个,按字母序选
+ public CloudClusterResult getCloudClusterByPolicy() {
+ List<String> cloudClusterNames = ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).getCloudClusterNames();
+ // try set default cluster
+ String defaultCloudCluster =
Env.getCurrentEnv().getAuth().getDefaultCloudCluster(getQualifiedUser());
+ if (!Strings.isNullOrEmpty(defaultCloudCluster)) {
+ // check cluster validity
+ CloudClusterResult r;
+ if (cloudClusterNames.contains(defaultCloudCluster)) {
+ // valid
+ r = new CloudClusterResult(defaultCloudCluster,
+ CloudClusterResult.Comment.FOUND_BY_DEFAULT_CLUSTER);
+ LOG.info("use default cluster {}", defaultCloudCluster);
+ } else {
+ // invalid
+ r = new CloudClusterResult(defaultCloudCluster,
+
CloudClusterResult.Comment.DEFAULT_CLUSTER_SET_BUT_NOT_EXIST);
+ LOG.warn("default cluster {} current invalid, please change
it", r);
+ }
+ return r;
+ }
+
+ List<String> hasAuthCluster = new ArrayList<>();
+ // get all available cluster of the user
+ for (String cloudClusterName : cloudClusterNames) {
+ if
(Env.getCurrentEnv().getAuth().checkCloudPriv(getCurrentUserIdentity(),
+ cloudClusterName, PrivPredicate.USAGE,
ResourceTypeEnum.CLUSTER)) {
+ hasAuthCluster.add(cloudClusterName);
+ // find a cluster has more than one alive be
+ List<Backend> bes = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
+ .getBackendsByClusterName(cloudClusterName);
+ AtomicBoolean hasAliveBe = new AtomicBoolean(false);
+
bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> {
+ LOG.debug("get a clusterName {}, it's has more than one
alive be {}", cloudCluster, backend);
+ hasAliveBe.set(true);
+ });
+ if (hasAliveBe.get()) {
+ // set a cluster to context cloudCluster
+ CloudClusterResult r = new
CloudClusterResult(cloudClusterName,
+
CloudClusterResult.Comment.FOUND_BY_FIRST_CLUSTER_WITH_ALIVE_BE);
+ LOG.debug("set context {}", r);
+ return r;
+ }
+ }
+ }
+ return hasAuthCluster.isEmpty() ? null
+ : new CloudClusterResult(hasAuthCluster.get(0),
CloudClusterResult.Comment.FOUND_BY_FRIST_CLUSTER_HAS_AUTH);
+ }
+
/**
* @param updateErr whether set this connect state to error when the
returned cluster is null or empty.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]