This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f00750f7fe8 [feature](merge-cloud) Support auto suspend and auto start 
cluster in cloud (#32764)
f00750f7fe8 is described below

commit f00750f7fe89b374c4d299ee69950fa7ac1a0ea1
Author: deardeng <[email protected]>
AuthorDate: Mon Mar 25 23:16:37 2024 +0800

    [feature](merge-cloud) Support auto suspend and auto start cluster in cloud 
(#32764)
---
 be/src/agent/heartbeat_server.cpp                  |   5 +
 be/src/agent/task_worker_pool.cpp                  |  23 ++++-
 be/src/runtime/fragment_mgr.cpp                    |  41 ++++++++
 be/src/runtime/fragment_mgr.h                      |   5 +
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |   1 -
 .../doris/cloud/system/CloudSystemInfoService.java | 109 ++++++++++++++++++++-
 .../java/org/apache/doris/qe/ConnectContext.java   |  79 +++++++++++++++
 7 files changed, 258 insertions(+), 5 deletions(-)

diff --git a/be/src/agent/heartbeat_server.cpp 
b/be/src/agent/heartbeat_server.cpp
index dfeb05a932d..e6d893a4a2d 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/heartbeat_flags.h"
 #include "service/backend_options.h"
 #include "util/debug_util.h"
@@ -83,6 +84,10 @@ void HeartbeatServer::heartbeat(THeartbeatResult& 
heartbeat_result,
         heartbeat_result.backend_info.__set_be_node_role(config::be_node_role);
         // If be is gracefully stop, then k_doris_exist is set to true
         heartbeat_result.backend_info.__set_is_shutdown(doris::k_doris_exit);
+        heartbeat_result.backend_info.__set_fragment_executing_count(
+                get_fragment_executing_count());
+        heartbeat_result.backend_info.__set_fragment_last_active_time(
+                get_fragment_last_active_time());
     }
     watch.stop();
     if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index be4d3b137e6..ca6a9817737 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -76,6 +76,7 @@
 #include "olap/txn_manager.h"
 #include "olap/utils.h"
 #include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/snapshot_loader.h"
 #include "service/backend_options.h"
 #include "util/doris_metrics.h"
@@ -446,7 +447,6 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
     ADD_TASK_COUNT(PUBLISH_VERSION)
     ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
     ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
-    ADD_TASK_COUNT(ALTER)
     ADD_TASK_COUNT(CLONE)
     ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
     ADD_TASK_COUNT(GC_BINLOG)
@@ -459,6 +459,17 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
             DELETE_count << n;
         }
         return;
+    case TTaskType::ALTER:
+    {
+        ALTER_count << n;
+        // cloud auto stop need sc jobs, a tablet's sc can also be considered 
a fragment
+        doris::g_fragment_executing_count << 1;
+        int64 now = duration_cast<std::chrono::milliseconds>(
+                            
std::chrono::system_clock::now().time_since_epoch())
+                            .count();
+        g_fragment_last_active_time.set_value(now);
+        return;
+    }
     default:
         return;
     }
@@ -1851,6 +1862,11 @@ void alter_tablet_callback(StorageEngine& engine, const 
TAgentTaskRequest& req)
         alter_tablet(engine, req, signature, task_type, &finish_task_request);
         finish_task(finish_task_request);
     }
+    doris::g_fragment_executing_count << -1;
+    int64 now = duration_cast<std::chrono::milliseconds>(
+                        std::chrono::system_clock::now().time_since_epoch())
+                        .count();
+    g_fragment_last_active_time.set_value(now);
     remove_task_info(req.task_type, req.signature);
 }
 
@@ -1872,6 +1888,11 @@ void alter_cloud_tablet_callback(CloudStorageEngine& 
engine, const TAgentTaskReq
         alter_cloud_tablet(engine, req, signature, task_type, 
&finish_task_request);
         finish_task(finish_task_request);
     }
+    doris::g_fragment_executing_count << -1;
+    int64 now = duration_cast<std::chrono::milliseconds>(
+                        std::chrono::system_clock::now().time_since_epoch())
+                        .count();
+    g_fragment_last_active_time.set_value(now);
     remove_task_info(req.task_type, req.signature);
 }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d858737d780..190c0f5b0e1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -99,6 +99,19 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::
 bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", 
"prepare");
 bvar::Adder<int64_t> 
g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");
 
+bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
+bvar::Status<uint64_t> g_fragment_last_active_time(
+        "fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
+                                             
std::chrono::system_clock::now().time_since_epoch())
+                                             .count());
+
+uint64_t get_fragment_executing_count() {
+    return g_fragment_executing_count.get_value();
+}
+uint64_t get_fragment_last_active_time() {
+    return g_fragment_last_active_time.get_value();
+}
+
 std::string to_load_error_http_path(const std::string& file_name) {
     if (file_name.empty()) {
         return "";
@@ -470,9 +483,15 @@ void 
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
 
     // remove exec state after this fragment finished
     {
+        int64 now = duration_cast<std::chrono::milliseconds>(
+                            
std::chrono::system_clock::now().time_since_epoch())
+                            .count();
         std::lock_guard<std::mutex> lock(_lock);
         
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
 
+        g_fragment_executing_count << -1;
+        g_fragment_last_active_time.set_value(now);
+
         LOG_INFO("Instance {} finished", 
print_id(fragment_executor->fragment_instance_id()));
 
         if (all_done && query_ctx) {
@@ -584,6 +603,11 @@ void FragmentMgr::remove_pipeline_context(
         std::vector<TUniqueId> ins_ids;
         f_context->instance_ids(ins_ids);
         bool all_done = q_context->countdown(ins_ids.size());
+        int64 now = duration_cast<std::chrono::milliseconds>(
+                            
std::chrono::system_clock::now().time_since_epoch())
+                            .count();
+        g_fragment_executing_count << -1;
+        g_fragment_last_active_time.set_value(now);
         for (const auto& ins_id : ins_ids) {
             LOG_INFO("Removing query {} instance {}, all done? {}", 
print_id(query_id),
                      print_id(ins_id), all_done);
@@ -733,7 +757,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
     static_cast<void>(_runtimefilter_controller.add_entity(
             params.params, params.params.query_id, params.query_options, 
&handler,
             
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
+    int64 now = duration_cast<std::chrono::milliseconds>(
+                        std::chrono::system_clock::now().time_since_epoch())
+                        .count();
     {
+        g_fragment_executing_count << 1;
+        g_fragment_last_active_time.set_value(now);
         std::lock_guard<std::mutex> lock(_lock);
         if (handler) {
             query_ctx->set_merge_controller_handler(handler);
@@ -753,6 +782,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params,
             // Remove the exec state added
             std::lock_guard<std::mutex> lock(_lock);
             _fragment_instance_map.erase(params.params.fragment_instance_id);
+            g_fragment_executing_count << -1;
+            g_fragment_last_active_time.set_value(now);
         }
         fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                                   "push plan fragment to thread pool failed");
@@ -844,7 +875,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
                 query_ctx->set_ready_to_execute_only();
             }
         }
+        int64 now = duration_cast<std::chrono::milliseconds>(
+                            
std::chrono::system_clock::now().time_since_epoch())
+                            .count();
         {
+            g_fragment_executing_count << 1;
+            g_fragment_last_active_time.set_value(now);
             std::lock_guard<std::mutex> lock(_lock);
             std::vector<TUniqueId> ins_ids;
             
reinterpret_cast<pipeline::PipelineXFragmentContext*>(context.get())
@@ -905,7 +941,12 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             if (i == 0 && handler) {
                 query_ctx->set_merge_controller_handler(handler);
             }
+            int64 now = duration_cast<std::chrono::milliseconds>(
+                                
std::chrono::system_clock::now().time_since_epoch())
+                                .count();
             {
+                g_fragment_executing_count << 1;
+                g_fragment_last_active_time.set_value(now);
                 std::lock_guard<std::mutex> lock(_lock);
                 _pipeline_map.insert(std::make_pair(fragment_instance_id, 
context));
             }
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 3435d1f4f64..5aef45954d3 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -44,6 +44,8 @@ class IOBufAsZeroCopyInputStream;
 }
 
 namespace doris {
+extern bvar::Adder<uint64_t> g_fragment_executing_count;
+extern bvar::Status<uint64_t> g_fragment_last_active_time;
 
 namespace pipeline {
 class PipelineFragmentContext;
@@ -202,4 +204,7 @@ private:
             nullptr; // used for pipeliine context report
 };
 
+uint64_t get_fragment_executing_count();
+uint64_t get_fragment_last_active_time();
+
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 28e58f3cf4b..613fef3be68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -392,7 +392,6 @@ public class CloudEnv extends Env {
 
     public void changeCloudCluster(String clusterName, ConnectContext ctx) 
throws DdlException {
         checkCloudClusterPriv(clusterName);
-        // TODO(merge-cloud): pick cloud auto start
         CloudSystemInfoService.waitForAutoStart(clusterName);
         try {
             ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).addCloudCluster(clusterName, "");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 8d852c11097..5eb74590d56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -38,9 +38,11 @@ import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -578,9 +580,110 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         }
     }
 
-    public static void waitForAutoStart(final String clusterName) throws 
DdlException {
-        // TODO(merge-cloud): merge from cloud.
-        // throw new DdlException("Env.waitForAutoStart unimplemented");
+    public static String getClusterNameAutoStart(final String clusterName) {
+        if (!Strings.isNullOrEmpty(clusterName)) {
+            return clusterName;
+        }
+
+        ConnectContext context = ConnectContext.get();
+        if (context == null) {
+            LOG.warn("auto start cant get context so new it");
+            context = new ConnectContext();
+        }
+        ConnectContext.CloudClusterResult cloudClusterTypeAndName = 
context.getCloudClusterByPolicy();
+        if (cloudClusterTypeAndName == null) {
+            LOG.warn("get cluster from ctx err");
+            return null;
+        }
+        if (cloudClusterTypeAndName.comment
+                == 
ConnectContext.CloudClusterResult.Comment.DEFAULT_CLUSTER_SET_BUT_NOT_EXIST) {
+            LOG.warn("get default cluster from ctx err");
+            return null;
+        }
+
+        
Preconditions.checkState(!Strings.isNullOrEmpty(cloudClusterTypeAndName.clusterName),
+                "get cluster name empty");
+        LOG.info("get cluster to resume {}", cloudClusterTypeAndName);
+        return cloudClusterTypeAndName.clusterName;
+    }
+
+    public static void waitForAutoStart(String clusterName) throws 
DdlException {
+        if (Config.isNotCloudMode()) {
+            return;
+        }
+        clusterName = getClusterNameAutoStart(clusterName);
+        if (Strings.isNullOrEmpty(clusterName)) {
+            LOG.warn("auto start in cloud mode, but clusterName empty {}", 
clusterName);
+            return;
+        }
+        String clusterStatus = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+        if (Strings.isNullOrEmpty(clusterStatus)) {
+            // for cluster rename or cluster dropped
+            LOG.warn("cant find clusterStatus in fe, clusterName {}", 
clusterName);
+            return;
+        }
+        // nofity ms -> wait for clusterStatus to normal
+        LOG.debug("auto start wait cluster {} status {}-{}", clusterName, 
clusterStatus,
+                Cloud.ClusterStatus.valueOf(clusterStatus));
+        if (Cloud.ClusterStatus.valueOf(clusterStatus) != 
Cloud.ClusterStatus.NORMAL) {
+            Cloud.AlterClusterRequest.Builder builder = 
Cloud.AlterClusterRequest.newBuilder();
+            builder.setCloudUniqueId(Config.cloud_unique_id);
+            
builder.setOp(Cloud.AlterClusterRequest.Operation.SET_CLUSTER_STATUS);
+
+            Cloud.ClusterPB.Builder clusterBuilder = 
Cloud.ClusterPB.newBuilder();
+            clusterBuilder.setClusterId(((CloudSystemInfoService)
+                    
Env.getCurrentSystemInfo()).getCloudClusterIdByName(clusterName));
+            clusterBuilder.setClusterStatus(Cloud.ClusterStatus.TO_RESUME);
+            builder.setCluster(clusterBuilder);
+
+            Cloud.AlterClusterResponse response;
+            try {
+                response = 
MetaServiceProxy.getInstance().alterCluster(builder.build());
+                if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.OK) {
+                    LOG.warn("notify to resume cluster not ok, cluster {}, 
response: {}", clusterName, response);
+                }
+                LOG.info("notify to resume cluster {}, response: {} ", 
clusterName, response);
+            } catch (RpcException e) {
+                LOG.warn("failed to notify to resume cluster {}", clusterName, 
e);
+                throw new DdlException("notify to resume cluster not ok");
+            }
+        }
+        // wait 15 mins?
+        int retryTimes = 15 * 60;
+        int retryTime = 0;
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        boolean hasAutoStart = false;
+        while 
(!String.valueOf(Cloud.ClusterStatus.NORMAL).equals(clusterStatus)
+            && retryTime < retryTimes) {
+            hasAutoStart = true;
+            ++retryTime;
+            // sleep random millis [0.5, 1] s
+            int randomSeconds =  500 + (int) (Math.random() * (1000 - 500));
+            LOG.info("resume cluster {} retry times {}, wait randomMillis: {}, 
current status: {}",
+                    clusterName, retryTime, randomSeconds, clusterStatus);
+            try {
+                if (retryTime > retryTimes / 2) {
+                    // sleep random millis [1, 1.5] s
+                    randomSeconds =  1000 + (int) (Math.random() * (1000 - 
500));
+                }
+                Thread.sleep(randomSeconds);
+            } catch (InterruptedException e) {
+                LOG.info("change cluster sleep wait InterruptedException: ", 
e);
+            }
+            clusterStatus = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudStatusByName(clusterName);
+        }
+        if (retryTime >= retryTimes) {
+            // auto start timeout
+            stopWatch.stop();
+            LOG.warn("auto start cluster {} timeout, wait {} ms", clusterName, 
stopWatch.getTime());
+            throw new DdlException("auto start cluster timeout");
+        }
+
+        stopWatch.stop();
+        if (hasAutoStart) {
+            LOG.info("auto start cluster {}, start cost {} ms", clusterName, 
stopWatch.getTime());
+        }
     }
 
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index a20e40cdc67..2023395aa55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -1070,6 +1070,85 @@ public class ConnectContext {
         return getCloudCluster(true);
     }
 
+    public static class CloudClusterResult {
+        public enum Comment {
+            FOUND_BY_DEFAULT_CLUSTER,
+            DEFAULT_CLUSTER_SET_BUT_NOT_EXIST,
+            FOUND_BY_FIRST_CLUSTER_WITH_ALIVE_BE,
+            FOUND_BY_FRIST_CLUSTER_HAS_AUTH,
+        }
+
+        public String clusterName;
+        public Comment comment;
+
+        public CloudClusterResult(final String name, Comment c) {
+            this.clusterName = name;
+            this.comment = c;
+        }
+
+        @Override
+        public String toString() {
+            return "CloudClusterResult{"
+                + "clusterName='" + clusterName + '\''
+                + ", comment=" + comment
+                + '}';
+        }
+    }
+
+
+    // can't get cluster from context, use the following strategy to obtain 
the cluster name
+    // 当用户有多个集群的权限时,会按照如下策略进行拉取:
+    // 如果当前mysql用户没有指定cluster(没有default 或者 use), 选择有权限的cluster。
+    // 如果有多个cluster满足权限条件,优先选活的,按字母序选
+    // 如果没有活的,则拉起一个,按字母序选
+    public CloudClusterResult getCloudClusterByPolicy() {
+        List<String> cloudClusterNames = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudClusterNames();
+        // try set default cluster
+        String defaultCloudCluster = 
Env.getCurrentEnv().getAuth().getDefaultCloudCluster(getQualifiedUser());
+        if (!Strings.isNullOrEmpty(defaultCloudCluster)) {
+            // check cluster validity
+            CloudClusterResult r;
+            if (cloudClusterNames.contains(defaultCloudCluster)) {
+                // valid
+                r = new CloudClusterResult(defaultCloudCluster,
+                    CloudClusterResult.Comment.FOUND_BY_DEFAULT_CLUSTER);
+                LOG.info("use default cluster {}", defaultCloudCluster);
+            } else {
+                // invalid
+                r = new CloudClusterResult(defaultCloudCluster,
+                    
CloudClusterResult.Comment.DEFAULT_CLUSTER_SET_BUT_NOT_EXIST);
+                LOG.warn("default cluster {} current invalid, please change 
it", r);
+            }
+            return r;
+        }
+
+        List<String> hasAuthCluster = new ArrayList<>();
+        // get all available cluster of the user
+        for (String cloudClusterName : cloudClusterNames) {
+            if 
(Env.getCurrentEnv().getAuth().checkCloudPriv(getCurrentUserIdentity(),
+                    cloudClusterName, PrivPredicate.USAGE, 
ResourceTypeEnum.CLUSTER)) {
+                hasAuthCluster.add(cloudClusterName);
+                // find a cluster has more than one alive be
+                List<Backend> bes = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                        .getBackendsByClusterName(cloudClusterName);
+                AtomicBoolean hasAliveBe = new AtomicBoolean(false);
+                
bes.stream().filter(Backend::isAlive).findAny().ifPresent(backend -> {
+                    LOG.debug("get a clusterName {}, it's has more than one 
alive be {}", cloudCluster, backend);
+                    hasAliveBe.set(true);
+                });
+                if (hasAliveBe.get()) {
+                    // set a cluster to context cloudCluster
+                    CloudClusterResult r = new 
CloudClusterResult(cloudClusterName,
+                            
CloudClusterResult.Comment.FOUND_BY_FIRST_CLUSTER_WITH_ALIVE_BE);
+                    LOG.debug("set context {}", r);
+                    return r;
+                }
+            }
+        }
+        return hasAuthCluster.isEmpty() ? null
+            : new CloudClusterResult(hasAuthCluster.get(0), 
CloudClusterResult.Comment.FOUND_BY_FRIST_CLUSTER_HAS_AUTH);
+    }
+
     /**
      * @param updateErr whether set this connect state to error when the 
returned cluster is null or empty.
      *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to