This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch cloud-node-mgr in repository https://gitbox.apache.org/repos/asf/doris.git
commit 31fc01961bb1e5910a482479703269dc018d85b2 Author: Yongqiang YANG <[email protected]> AuthorDate: Fri Aug 30 12:28:41 2024 +0800 [improvement](cloud) manage node via sql like non cloud mode --- be/src/agent/heartbeat_server.cpp | 13 ++ be/src/cloud/config.cpp | 5 +- be/src/cloud/config.h | 16 +- be/src/common/config.cpp | 4 +- cloud/src/meta-service/meta_service_resource.cpp | 5 +- cloud/src/resource-manager/resource_manager.cpp | 4 +- .../main/java/org/apache/doris/common/Config.java | 12 +- .../main/java/org/apache/doris/alter/Alter.java | 14 +- .../java/org/apache/doris/alter/SystemHandler.java | 6 +- .../main/java/org/apache/doris/catalog/Env.java | 32 ++-- .../doris/catalog/InternalSchemaInitializer.java | 25 ++- .../org/apache/doris/cloud/catalog/CloudEnv.java | 56 ++++++- .../apache/doris/cloud/rpc/MetaServiceClient.java | 5 + .../apache/doris/cloud/rpc/MetaServiceProxy.java | 10 ++ .../doris/cloud/system/CloudSystemInfoService.java | 172 +++++++++++++++++++++ .../doris/common/RandomIdentifierGenerator.java | 52 +++++++ .../apache/doris/common/util/PropertyAnalyzer.java | 2 +- .../org/apache/doris/deploy/DeployManager.java | 4 +- .../doris/httpv2/rest/manager/NodeAction.java | 2 +- .../main/java/org/apache/doris/qe/DdlExecutor.java | 7 +- .../main/java/org/apache/doris/resource/Tag.java | 3 + .../doris/statistics/util/StatisticsUtil.java | 3 + .../java/org/apache/doris/system/HeartbeatMgr.java | 2 + .../org/apache/doris/clone/DecommissionTest.java | 2 +- .../doris/cluster/DecommissionBackendTest.java | 8 +- gensrc/thrift/HeartbeatService.thrift | 2 + 26 files changed, 405 insertions(+), 61 deletions(-) diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 02c9bda41b6..12031279413 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -26,6 +26,7 @@ #include <ostream> #include <string> +#include "cloud/config.h" #include "common/config.h" #include "common/status.h" #include "olap/storage_engine.h" @@ -244,6 +245,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) { _engine.notify_listeners(); } + if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty()) { + auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint, + true); + LOG(INFO) << "set config meta_service_endpoing " << st; + } + + if (master_info.__isset.cloud_instance_id && config::cloud_instance_id.empty()) { + auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true); + LOG(INFO) << "set config cloud_instance_id " << st; + config::set_cloud_unique_id(); + } + return Status::OK(); } diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 1b8256bf932..bd74d3a4f3d 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -19,8 +19,9 @@ namespace doris::config { -DEFINE_String(cloud_unique_id, ""); -DEFINE_String(meta_service_endpoint, ""); +DEFINE_mString(cloud_instance_id, ""); +DEFINE_mString(cloud_unique_id, ""); +DEFINE_mString(meta_service_endpoint, ""); DEFINE_Bool(meta_service_use_load_balancer, "false"); DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000"); DEFINE_Bool(meta_service_connection_pooled, "true"); diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index 104ead04996..1855fc1bffa 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -21,10 +21,20 @@ namespace doris::config { -DECLARE_String(cloud_unique_id); +DECLARE_mString(cloud_instance_id); + +DECLARE_mString(cloud_unique_id); static inline bool is_cloud_mode() { - return !cloud_unique_id.empty(); + return !cloud_unique_id.empty() || !cloud_instance_id.empty(); +} + +static inline void set_cloud_unique_id() { + if (cloud_unique_id.empty()) { + if (!cloud_instance_id.empty()) { + cloud_unique_id = "1:" + cloud_instance_id + ":compute"; + } + } } // Set the endpoint of meta service. @@ -40,7 +50,7 @@ static inline bool is_cloud_mode() { // If you want to access a group of meta services directly, set the addresses of meta services to this config, // separated by a comma, like "host:port,host:port,host:port", then BE will choose a server to connect in randomly. // In this mode, The config meta_service_connection_pooled is still useful, but the other two configs will be ignored. -DECLARE_String(meta_service_endpoint); +DECLARE_mString(meta_service_endpoint); // Set the underlying connection type to pooled. DECLARE_Bool(meta_service_connection_pooled); DECLARE_mInt64(meta_service_connection_pool_size); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2b3b2ffb09e..a17d0c9d815 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -37,10 +37,10 @@ #include <utility> #include <vector> +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" -#include "config.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "util/cpu_info.h" @@ -1654,6 +1654,8 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default); } + set_cloud_unique_id(); + return true; } diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 90a88f86006..283bbe7392c 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -2336,9 +2336,8 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller, std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(), [](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) == instance.storage_vault_names().end()) { - code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND; - msg = "instance has no built in storage vault"; - return; + LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance " + << instance.instance_id(); } auto get_cluster_mysql_user = [](const ClusterPB& c, std::set<std::string>* mysql_users) { diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index 25848294aa7..282be20baab 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -168,7 +168,9 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std:: if (ClusterPB::COMPUTE == cluster.type() && n.has_heartbeat_port() && n.heartbeat_port()) { continue; } - ss << "check cluster params failed, node : " << proto_to_json(n); + ss << "check cluster params failed, edit_log_port is required for frontends while " + "heatbeat_port is required for banckens, node : " + << proto_to_json(n); *err = ss.str(); no_err = false; break; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9b61a734246..8fdcaf42512 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1831,7 +1831,7 @@ public class Config extends ConfigBase { public static boolean enable_date_conversion = true; @ConfField(mutable = false, masterOnly = true) - public static boolean enable_multi_tags = false; + public static boolean enable_multi_tags = true; /** * If set to TRUE, FE will convert DecimalV2 to DecimalV3 automatically. @@ -2799,15 +2799,21 @@ public class Config extends ConfigBase { @ConfField public static int warn_sys_accumulated_file_size = 2; @ConfField public static int audit_sys_accumulated_file_size = 4; + // compatibily with elder version. + // cloud_unique_id is introduced before cloud_instance_id, so it has higher priority. @ConfField public static String cloud_unique_id = ""; + // If cloud_unique_id is empty, cloud_instance_id works, otherwise cloud_unique_id works. + @ConfField + public static String cloud_instance_id = ""; + public static boolean isCloudMode() { - return !cloud_unique_id.isEmpty(); + return !cloud_unique_id.isEmpty() || !cloud_instance_id.isEmpty(); } public static boolean isNotCloudMode() { - return cloud_unique_id.isEmpty(); + return !isCloudMode(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index b4700ec32d4..672932f1877 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -103,18 +103,18 @@ public class Alter { private AlterHandler schemaChangeHandler; private AlterHandler materializedViewHandler; - private SystemHandler clusterHandler; + private SystemHandler systemHandler; public Alter() { schemaChangeHandler = Config.isCloudMode() ? new CloudSchemaChangeHandler() : new SchemaChangeHandler(); materializedViewHandler = new MaterializedViewHandler(); - clusterHandler = new SystemHandler(); + systemHandler = new SystemHandler(); } public void start() { schemaChangeHandler.start(); materializedViewHandler.start(); - clusterHandler.start(); + systemHandler.start(); } public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) @@ -746,8 +746,8 @@ public class Alter { } } - public void processAlterCluster(AlterSystemStmt stmt) throws UserException { - clusterHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null); + public void processAlterSystem(AlterSystemStmt stmt) throws UserException { + systemHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null); } private void processRename(Database db, OlapTable table, List<AlterClause> alterClauses) throws DdlException { @@ -994,8 +994,8 @@ public class Alter { return materializedViewHandler; } - public AlterHandler getClusterHandler() { - return clusterHandler; + public AlterHandler getSystemHandler() { + return systemHandler; } public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 28b3684ed04..6e221bf5ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -72,7 +72,7 @@ public class SystemHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(SystemHandler.class); public SystemHandler() { - super("cluster"); + super("system"); } @Override @@ -164,7 +164,7 @@ public class SystemHandler extends AlterHandler { } else if (alterClause instanceof AddObserverClause) { AddObserverClause clause = (AddObserverClause) alterClause; Env.getCurrentEnv().addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), - clause.getPort(), ""); + clause.getPort()); } else if (alterClause instanceof DropObserverClause) { DropObserverClause clause = (DropObserverClause) alterClause; Env.getCurrentEnv().dropFrontend(FrontendNodeType.OBSERVER, clause.getHost(), @@ -172,7 +172,7 @@ public class SystemHandler extends AlterHandler { } else if (alterClause instanceof AddFollowerClause) { AddFollowerClause clause = (AddFollowerClause) alterClause; Env.getCurrentEnv().addFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), - clause.getPort(), ""); + clause.getPort()); } else if (alterClause instanceof DropFollowerClause) { DropFollowerClause clause = (DropFollowerClause) alterClause; Env.getCurrentEnv().dropFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 597fee4dd8a..8a26ee193e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -424,7 +424,7 @@ public class Env { // node name is used for bdbje NodeName. protected String nodeName; protected FrontendNodeType role; - private FrontendNodeType feType; + protected FrontendNodeType feType; // replica and observer use this value to decide provide read service or not private long synchronizedTimeMs; private MasterInfo masterInfo; @@ -2978,11 +2978,19 @@ public class Env { }; } + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + addFrontend(role, host, editLogPort, ""); + } + public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire env lock. Try again"); } try { + if (Strings.isNullOrEmpty(nodeName)) { + nodeName = genFeNodeName(host, editLogPort, true /* new name style */); + } + Frontend fe = checkFeExist(host, editLogPort); if (fe != null) { throw new DdlException("frontend already exists " + fe); @@ -2991,10 +2999,6 @@ public class Env { throw new DdlException("frontend's hostName should not be empty while enable_fqdn_mode is true"); } - if (Strings.isNullOrEmpty(nodeName)) { - nodeName = genFeNodeName(host, editLogPort, false /* new name style */); - } - if (removedFrontends.contains(nodeName)) { throw new DdlException("frontend name already exists " + nodeName + ". Try again"); } @@ -3027,7 +3031,7 @@ public class Env { modifyFrontendHost(fe.getNodeName(), destHost); } - public void modifyFrontendHost(String nodeName, String destHost) throws DdlException { + private void modifyFrontendHost(String nodeName, String destHost) throws DdlException { if (!tryLock(false)) { throw new DdlException("Failed to acquire env lock. Try again"); } @@ -4304,8 +4308,8 @@ public class Env { return cooldownConfHandler; } - public SystemHandler getClusterHandler() { - return (SystemHandler) this.alter.getClusterHandler(); + public SystemHandler getSystemHandler() { + return (SystemHandler) this.alter.getSystemHandler(); } public BackupHandler getBackupHandler() { @@ -5461,15 +5465,15 @@ public class Env { } /* - * used for handling AlterClusterStmt - * (for client is the ALTER CLUSTER command). + * used for handling AlterSystemStmt + * (for client is the ALTER SYSTEM command). */ - public void alterCluster(AlterSystemStmt stmt) throws DdlException, UserException { - this.alter.processAlterCluster(stmt); + public void alterSystem(AlterSystemStmt stmt) throws DdlException, UserException { + this.alter.processAlterSystem(stmt); } - public void cancelAlterCluster(CancelAlterSystemStmt stmt) throws DdlException { - this.alter.getClusterHandler().cancel(stmt); + public void cancelAlterSystem(CancelAlterSystemStmt stmt) throws DdlException { + this.alter.getSystemHandler().cancel(stmt); } // Switch catalog of this session diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 87e8a0fc3b0..f07ea1bf5a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -34,6 +34,7 @@ import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; @@ -186,6 +187,22 @@ public class InternalSchemaInitializer extends Thread { } } + private static void trySetStorageVault(Map<String, String> properties) throws UserException { + CloudEnv cloudEnv = (CloudEnv) Env.getCurrentEnv(); + if (Config.isCloudMode() && cloudEnv.getEnableStorageVault()) { + String storageVaultName; + Pair<String, String> info = cloudEnv.getStorageVaultMgr().getDefaultStorageVaultInfo(); + if (info != null) { + storageVaultName = info.first; + } else { + throw new UserException("No default storage vault." + + " You can use `SHOW STORAGE VAULT` to get all available vaults," + + " and pick one set default vault with `SET <vault_name> AS DEFAULT STORAGE VAULT`"); + } + properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, storageVaultName); + } + } + private static CreateTableStmt buildStatisticsTblStmt(String statsTableName, List<String> uniqueKeys) throws UserException { TableName tableName = new TableName("", FeConstants.INTERNAL_DB_NAME, statsTableName); @@ -200,9 +217,7 @@ public class InternalSchemaInitializer extends Thread { } }; - if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { - properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME); - } + trySetStorageVault(properties); PropertyAnalyzer.getInstance().rewriteForceProperties(properties); CreateTableStmt createTableStmt = new CreateTableStmt(true, false, @@ -238,9 +253,7 @@ public class InternalSchemaInitializer extends Thread { } }; - if (Config.isCloudMode() && ((CloudEnv) Env.getCurrentEnv()).getEnableStorageVault()) { - properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, FeConstants.BUILT_IN_STORAGE_VAULT_NAME); - } + trySetStorageVault(properties); PropertyAnalyzer.getInstance().rewriteForceProperties(properties); CreateTableStmt createTableStmt = new CreateTableStmt(true, false, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 535b8ea582f..565bf0b928a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -38,6 +38,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; +import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService.HostInfo; @@ -88,6 +89,18 @@ public class CloudEnv extends Env { return this.upgradeMgr; } + @Override + public void initialize(String[] args) throws Exception { + if (Config.cloud_unique_id == null || Config.cloud_unique_id.isEmpty()) { + LOG.info("cloud_unique_id is not set, setting it using instance_id"); + Config.cloud_unique_id = "1:" + Config.cloud_instance_id + ":sql_server00"; + } + + LOG.info("Initializing CloudEnv with cloud_unique_id: {}", Config.cloud_unique_id); + + super.initialize(args); + } + @Override protected void startMasterOnlyDaemonThreads() { LOG.info("start cloud Master only daemon threads"); @@ -116,9 +129,13 @@ public class CloudEnv extends Env { return cacheHotspotMgr; } + private CloudSystemInfoService getCloudSystemInfoService() { + return (CloudSystemInfoService) systemInfo; + } + private Cloud.NodeInfoPB getLocalTypeFromMetaService() { // get helperNodes from ms - Cloud.GetClusterResponse response = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + Cloud.GetClusterResponse response = getCloudSystemInfoService() .getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, ""); if (!response.hasStatus() || !response.getStatus().hasCode() || response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { @@ -167,9 +184,18 @@ public class CloudEnv extends Env { if (nodeInfoPB == null) { LOG.warn("failed to get local fe's type, sleep 5 s, try again."); try { - Thread.sleep(5000); + try { + getCloudSystemInfoService().tryCreateInstance(Config.cloud_instance_id, + Config.cloud_instance_id, false); + } catch (Exception e) { + Thread.sleep(5000); + throw e; + } + addFrontend(FrontendNodeType.MASTER, selfNode.getHost(), selfNode.getPort()); } catch (InterruptedException e) { LOG.warn("thread sleep Exception", e); + } catch (DdlException e) { + LOG.warn("get ddl exception ", e); } continue; } @@ -207,7 +233,7 @@ public class CloudEnv extends Env { + "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS); } - if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames().contains(clusterName)) { + if (!getCloudSystemInfoService().getCloudClusterNames().contains(clusterName)) { if (LOG.isDebugEnabled()) { LOG.debug("current instance does not have a cluster name :{}", clusterName); } @@ -218,9 +244,9 @@ public class CloudEnv extends Env { public void changeCloudCluster(String clusterName, ConnectContext ctx) throws DdlException { checkCloudClusterPriv(clusterName); - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).waitForAutoStart(clusterName); + getCloudSystemInfoService().waitForAutoStart(clusterName); try { - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).addCloudCluster(clusterName, ""); + getCloudSystemInfoService().addCloudCluster(clusterName, ""); } catch (UserException e) { throw new DdlException(e.getMessage(), e.getMysqlErrorCode()); } @@ -330,4 +356,24 @@ public class CloudEnv extends Env { public void cancelCloudWarmUp(CancelCloudWarmUpStmt stmt) throws DdlException { getCacheHotspotMgr().cancel(stmt); } + + @Override + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + getCloudSystemInfoService().addFrontend(role, host, editLogPort); + } + + @Override + public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException { + if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER + && selfNode.getHost().equals(host)) { + throw new DdlException("can not drop current master node."); + } + + getCloudSystemInfoService().dropFrontend(role, host, port); + } + + @Override + public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) throws DdlException { + throw new DdlException("modify frontend host name is not supported in cloud mode"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index c9c799687ab..47108b147f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -403,4 +403,9 @@ public class MetaServiceClient { } return blockingStub.finishTabletJob(request); } + + public Cloud.CreateInstanceResponse + createInstance(Cloud.CreateInstanceRequest request) { + return blockingStub.createInstance(request); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 7d47ec70c1b..ec295902f88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -106,6 +106,7 @@ public class MetaServiceProxy { } String address = Config.meta_service_endpoint; + address = address.replaceAll("^[\"']|[\"']$", ""); MetaServiceClient service = serviceMap.get(address); if (service != null && service.isNormalState()) { return service; @@ -547,4 +548,13 @@ public class MetaServiceProxy { throw new RpcException("", e.getMessage(), e); } } + + public Cloud.CreateInstanceResponse createInstance(Cloud.CreateInstanceRequest request) throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.createInstance(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 48728efb003..3254fdead69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -29,6 +29,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.RandomIdentifierGenerator; import org.apache.doris.common.UserException; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.metric.MetricRepo; @@ -299,6 +300,75 @@ public class CloudSystemInfoService extends SystemInfoService { } } + private void alterBackendCluster(List<HostInfo> hostInfos, String clusterName, + Cloud.AlterClusterRequest.Operation operation) throws DdlException { + // Issue rpc to meta to alter node, then fe master would add this node to its frontends + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterName(clusterName) + .setType(Cloud.ClusterPB.Type.COMPUTE) + .build(); + + for (HostInfo hostInfo : hostInfos) { + Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setIp(hostInfo.getHost()) + .setHost(hostInfo.getHost()) + .setHeartbeatPort(hostInfo.getPort()) + .setCtime(System.currentTimeMillis() / 1000) + .build(); + clusterPB = clusterPB.toBuilder().addNodes(nodeInfoPB).build(); + } + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(Config.cloud_instance_id) + .setOp(operation) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter backends not ok, response: {}", response); + throw new DdlException("failed to alter backends errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new DdlException("failed to alter backends", e); + } + } + + /** + * @param hostInfos : backend's ip, hostName and port + * @throws DdlException + */ + @Override + public void addBackends(List<HostInfo> hostInfos, Map<String, String> tagMap) throws UserException { + // issue rpc to meta to add this node, then fe master would add this node to its backends + + String clusterName = tagMap.getOrDefault(Tag.CLOUD_CLUSTER_NAME, Tag.VALUE_DEFAULT_CLOUD_CLUSTER_NAME); + if (clusterName.isEmpty()) { + throw new UserException("clusterName empty"); + } + + tryCreateCluster(clusterName, RandomIdentifierGenerator.generateRandomIdentifier(8)); + alterBackendCluster(hostInfos, clusterName, Cloud.AlterClusterRequest.Operation.ADD_NODE); + } + + @Override + public void dropBackends(List<HostInfo> hostInfos) throws DdlException { + alterBackendCluster(hostInfos, "", Cloud.AlterClusterRequest.Operation.DROP_NODE); + } + + @Override + public void dropBackendsByIds(List<String> ids) throws DdlException { + for (String id : ids) { + if (getBackend(Long.parseLong(id)) == null) { + throw new DdlException("backend does not exists[" + id + "]"); + } + dropBackend(Long.parseLong(id)); + } + } + @Override public void replayAddBackend(Backend newBackend) { super.replayAddBackend(newBackend); @@ -654,6 +724,87 @@ public class CloudSystemInfoService extends SystemInfoService { } } + // FrontendCluster = SqlServerCluster + private void alterFrontendCluster(FrontendNodeType role, String host, int editLogPort, + Cloud.AlterClusterRequest.Operation op) throws DdlException { + // Issue rpc to meta to add this node, then fe master would add this node to its frontends + Cloud.NodeInfoPB nodeInfoPB = Cloud.NodeInfoPB.newBuilder() + .setIp(host) + .setHost(host) + .setEditLogPort(editLogPort) + .setNodeType(role == FrontendNodeType.MASTER ? Cloud.NodeInfoPB.NodeType.FE_MASTER + : Cloud.NodeInfoPB.NodeType.FE_OBSERVER) + .setCtime(System.currentTimeMillis() / 1000) + .build(); + + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterId(Config.cloud_sql_server_cluster_id) + .setClusterName(Config.cloud_sql_server_cluster_name) + .setType(Cloud.ClusterPB.Type.SQL) + .addNodes(nodeInfoPB) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setInstanceId(Config.cloud_instance_id) + .setOp(op) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("alter frontend not ok, response: {}", response); + throw new DdlException("failed to alter frontend errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new DdlException("failed to alter frontend", e); + } + } + + public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + if (role != FrontendNodeType.MASTER && role != FrontendNodeType.OBSERVER) { + throw new DdlException("unsupported frontend role: " + role); + } + + Cloud.AlterClusterRequest.Operation op; + op = role == FrontendNodeType.MASTER ? Cloud.AlterClusterRequest.Operation.ADD_CLUSTER + : Cloud.AlterClusterRequest.Operation.ADD_NODE; + alterFrontendCluster(role, host, editLogPort, op); + } + + public void dropFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException { + alterFrontendCluster(role, host, editLogPort, Cloud.AlterClusterRequest.Operation.DROP_NODE); + } + + private void tryCreateCluster(String clusterName, String clusterId) throws UserException { + Cloud.ClusterPB clusterPB = Cloud.ClusterPB.newBuilder() + .setClusterId(clusterId) + .setClusterName(clusterName) + .setType(Cloud.ClusterPB.Type.COMPUTE) + .build(); + + Cloud.AlterClusterRequest request = Cloud.AlterClusterRequest.newBuilder() + .setCloudUniqueId(Config.cloud_unique_id) + .setOp(Cloud.AlterClusterRequest.Operation.ADD_CLUSTER) + .setCluster(clusterPB) + .build(); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(request); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK + && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { + LOG.warn("create cluster not ok, response: {}", response); + throw new UserException("failed to create cluster errorCode: " + response.getStatus().getCode() + + " msg: " + response.getStatus().getMsg()); + } + } catch (RpcException e) { + throw new UserException("failed to create cluster", e); + } + } + public List<Pair<String, Integer>> getCurrentObFrontends() { List<Frontend> frontends = Env.getCurrentEnv().getFrontends(FrontendNodeType.OBSERVER); List<Pair<String, Integer>> frontendsPair = new ArrayList<>(); @@ -816,4 +967,25 @@ public class CloudSystemInfoService extends SystemInfoService { LOG.info("auto start cluster {}, start cost {} ms", clusterName, stopWatch.getTime()); } } + + public void tryCreateInstance(String instanceId, String name, boolean sseEnabled) throws DdlException { + Cloud.CreateInstanceRequest.Builder builder = Cloud.CreateInstanceRequest.newBuilder(); + builder.setInstanceId(instanceId); + builder.setName(name); + builder.setSseEnabled(sseEnabled); + + Cloud.CreateInstanceResponse response; + try { + response = MetaServiceProxy.getInstance().createInstance(builder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK + && response.getStatus().getCode() != Cloud.MetaServiceCode.ALREADY_EXISTED) { + LOG.warn("Failed to create instance {}, response: {}", instanceId, response); + throw new DdlException("Failed to create instance"); + } + LOG.info("Successfully created instance {}, response: {}", instanceId, response); + } catch (RpcException e) { + LOG.warn("Failed to create instance {}", instanceId, e); + throw new DdlException("Failed to create instance"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java new file mode 100644 index 00000000000..452698e1c18 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/RandomIdentifierGenerator.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.util.UUID; +import java.security.SecureRandom; + +public class RandomIdentifierGenerator { + + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + private static final String ALPHANUMERIC = ALPHABET + "0123456789_"; + private static final SecureRandom RANDOM = new SecureRandom(); + + /** + * Generates a random identifier matching the pattern "^[a-zA-Z][a-zA-Z0-9_]*$". + * + * @param length The desired length of the identifier (must be at least 1) + * @return A randomly generated identifier + */ + public static String generateRandomIdentifier(int length) { + if (length < 1) { + throw new IllegalArgumentException("Length must be at least 1"); + } + + StringBuilder sb = new StringBuilder(length); + + // First character must be a letter + sb.append(ALPHABET.charAt(RANDOM.nextInt(ALPHABET.length()))); + + // Remaining characters can be alphanumeric or underscore + for (int i = 1; i < length; i++) { + sb.append(ALPHANUMERIC.charAt(RANDOM.nextInt(ALPHANUMERIC.length()))); + } + + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 8441c9307ca..04140d664c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1240,7 +1240,7 @@ public class PropertyAnalyzer { tagMap.put(tag.type, tag.value); iter.remove(); } - if (tagMap.isEmpty() && defaultValue != null) { + if (!tagMap.containsKey(defaultValue.type) && defaultValue != null) { tagMap.put(defaultValue.type, defaultValue.value); } return tagMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java index 52d0b814b00..11230daf973 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java @@ -549,10 +549,10 @@ public class DeployManager extends MasterDaemon { try { switch (nodeType) { case ELECTABLE: - env.addFrontend(FrontendNodeType.FOLLOWER, remoteHost, remotePort, ""); + env.addFrontend(FrontendNodeType.FOLLOWER, remoteHost, remotePort); break; case OBSERVER: - env.addFrontend(FrontendNodeType.OBSERVER, remoteHost, remotePort, ""); + env.addFrontend(FrontendNodeType.OBSERVER, remoteHost, remotePort); break; case BACKEND: case BACKEND_CN: diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java index 814b00b49ac..4658f936db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java @@ -661,7 +661,7 @@ public class NodeAction extends RestBaseController { } HostInfo info = SystemInfoService.getHostAndPort(reqInfo.getHostPort()); if ("ADD".equals(action)) { - currentEnv.addFrontend(frontendNodeType, info.getHost(), info.getPort(), ""); + currentEnv.addFrontend(frontendNodeType, info.getHost(), info.getPort()); } else if ("DROP".equals(action)) { currentEnv.dropFrontend(frontendNodeType, info.getHost(), info.getPort()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index bdc7b765101..baf31653eff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -263,10 +263,10 @@ public class DdlExecutor { env.getAuth().updateUserProperty((SetUserPropertyStmt) ddlStmt); } else if (ddlStmt instanceof AlterSystemStmt) { AlterSystemStmt stmt = (AlterSystemStmt) ddlStmt; - env.alterCluster(stmt); + env.alterSystem(stmt); } else if (ddlStmt instanceof CancelAlterSystemStmt) { CancelAlterSystemStmt stmt = (CancelAlterSystemStmt) ddlStmt; - env.cancelAlterCluster(stmt); + env.cancelAlterSystem(stmt); } else if (ddlStmt instanceof AlterDatabaseQuotaStmt) { env.alterDatabaseQuota((AlterDatabaseQuotaStmt) ddlStmt); } else if (ddlStmt instanceof AlterDatabaseRename) { @@ -564,8 +564,7 @@ public class DdlExecutor { || ddlStmt instanceof AdminRebalanceDiskStmt || ddlStmt instanceof AdminCancelRebalanceDiskStmt || ddlStmt instanceof AlterResourceStmt - || ddlStmt instanceof AlterPolicyStmt - || ddlStmt instanceof AlterSystemStmt) { + || ddlStmt instanceof AlterPolicyStmt) { LOG.info("stmt={}, not supported in cloud mode", ddlStmt.toString()); throw new DdlException("Unsupported operation"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java index a51755412b9..21abce18ade 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java @@ -73,6 +73,9 @@ public class Tag implements Writable { public static final String CLOUD_CLUSTER_PRIVATE_ENDPOINT = "cloud_cluster_private_endpoint"; public static final String CLOUD_CLUSTER_STATUS = "cloud_cluster_status"; + public static final String COMPUTE_GROUP_NAME = "name"; + public static final String VALUE_DEFAULT_CLOUD_CLUSTER_NAME = "default_cluster"; + public static final String WORKLOAD_GROUP = "workload_group"; public static final ImmutableSet<String> RESERVED_TAG_TYPE = ImmutableSet.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index fd8d6fc2bf5..0dca003069e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -480,10 +480,13 @@ public class StatisticsUtil { // dbName, // StatisticConstants.HISTOGRAM_TBL_NAME)); } catch (Throwable t) { + LOG.info("stat table does not exist, db_name: {}, table_name: {}", dbName, + StatisticConstants.TABLE_STATISTIC_TBL_NAME); return false; } if (Config.isCloudMode()) { if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).availableBackendsExists()) { + LOG.info("there are no available backends"); return false; } try (AutoCloseConnectContext r = buildConnectContext()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 515db76b096..d2d05e97995 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -95,6 +95,8 @@ public class HeartbeatMgr extends MasterDaemon { tMasterInfo.setHttpPort(Config.http_port); long flags = heartbeatFlags.getHeartbeatFlags(); tMasterInfo.setHeartbeatFlags(flags); + tMasterInfo.setCloudInstanceId(Config.cloud_instance_id); + tMasterInfo.setMetaServiceEndpoint(Config.meta_service_endpoint); masterInfo.set(tMasterInfo); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java index 81e29d3abf3..6901d3b604c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -144,7 +144,7 @@ public class DecommissionTest { + ":" + backend.getHeartbeatPort() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(decommissionStmtStr, connectContext); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assert.assertEquals(true, backend.isDecommissioned()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index fc80ecbd97d..53355f6faa4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -100,7 +100,7 @@ public class DecommissionBackendTest extends TestWithFeService { Assertions.assertNotNull(srcBackend); String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -153,7 +153,7 @@ public class DecommissionBackendTest extends TestWithFeService { // decommission backend by id String decommissionByIdStmtStr = "alter system decommission backend \"" + srcBackend.getId() + "\""; AlterSystemStmt decommissionByIdStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionByIdStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionByIdStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionByIdStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -215,7 +215,7 @@ public class DecommissionBackendTest extends TestWithFeService { // 6. execute decommission String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); @@ -315,7 +315,7 @@ public class DecommissionBackendTest extends TestWithFeService { String decommissionStmtStr = "alter system decommission backend \"" + srcBackend.getAddress() + "\""; AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); - Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Env.getCurrentEnv().getAlterInstance().processAlterSystem(decommissionStmt); Assertions.assertTrue(srcBackend.isDecommissioned()); long startTimestamp = System.currentTimeMillis(); diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 4daea779735..7c94bc9ce0c 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -39,6 +39,8 @@ struct TMasterInfo { 7: optional i64 heartbeat_flags 8: optional i64 backend_id 9: optional list<TFrontendInfo> frontend_infos + 10: optional string meta_service_endpoint; + 11: optional string cloud_instance_id; } struct TBackendInfo { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
