This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 8baad35413c8aa7e2fc9276e91cad51cea375b38 Author: gosonzhang <[email protected]> AuthorDate: Thu Apr 8 16:34:31 2021 +0800 [TUBEMQ-594] Trpc-go tube sdk strongly rely on local config --- .../example/consumer/test_consumer.cc | 3 +- .../example/consumer/test_multithread_pull.cc | 3 +- .../include/tubemq/tubemq_client.h | 3 + .../include/tubemq/tubemq_config.h | 42 ++++++- .../tubemq-client-cpp/src/client_service.cc | 123 ++++++++++++--------- .../tubemq-client-cpp/src/client_service.h | 9 +- .../tubemq-client-cpp/src/const_config.h | 14 +++ .../tubemq-client-cpp/src/tubemq_client.cc | 6 + .../tubemq-client-cpp/src/tubemq_config.cc | 116 ++++++++++++++++++- .../tubemq-client-cpp/src/version.h | 2 +- 10 files changed, 253 insertions(+), 68 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc index d719fcb..9049ab0 100644 --- a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc +++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_consumer.cc @@ -73,6 +73,7 @@ int main(int argc, char* argv[]) { set<string> topic_list; topic_list.insert(topic_name); ConsumerConfig consumer_config; + TubeMQServiceConfig serviceConfig; consumer_config.SetRpcReadTimeoutMs(20000); result = consumer_config.SetMasterAddrInfo(err_info, master_addr); @@ -85,7 +86,7 @@ int main(int argc, char* argv[]) { printf("\n Set GroupConsume Target failure: %s", err_info.c_str()); return -1; } - result = StartTubeMQService(err_info, conf_file); + result = StartTubeMQService(err_info, serviceConfig); if (!result) { printf("\n StartTubeMQService failure: %s", err_info.c_str()); return -1; diff --git a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc index f5a7d5a..af27bb7 100644 --- a/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc +++ b/tubemq-client-twins/tubemq-client-cpp/example/consumer/test_multithread_pull.cc @@ -127,6 +127,7 @@ int main(int argc, char* argv[]) { set<string> topic_list; topic_list.insert(topic_name); ConsumerConfig consumer_config; + TubeMQServiceConfig serviceConfig; consumer_config.SetRpcReadTimeoutMs(20000); result = consumer_config.SetMasterAddrInfo(err_info, master_addr); @@ -139,7 +140,7 @@ int main(int argc, char* argv[]) { printf("\n Set GroupConsume Target failure: %s", err_info.c_str()); return -1; } - result = StartTubeMQService(err_info, conf_file); + result = StartTubeMQService(err_info, serviceConfig); if (!result) { printf("\n StartTubeMQService failure: %s", err_info.c_str()); return -1; diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_client.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_client.h index 1c1f965..1a2a4c3 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_client.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_client.h @@ -32,6 +32,9 @@ namespace tubemq { bool StartTubeMQService(string& err_info, + const TubeMQServiceConfig& serviceConfig); +// Deprecated method +bool StartTubeMQService(string& err_info, const string& conf_file = "../conf/client.conf"); bool StopTubeMQService(string& err_info); diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h index 496e911..69f6434 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h @@ -34,6 +34,42 @@ using std::set; using std::string; +// configure for log, thread pool etc. +class TubeMQServiceConfig { + public: + TubeMQServiceConfig(); + ~TubeMQServiceConfig(); + TubeMQServiceConfig& operator=(const TubeMQServiceConfig& target); + void setLogCofigInfo(int32_t log_max_num, + int32_t log_max_size, int32_t log_level, const string& log_path); + const int32_t getMaxLogFileNum() const; + const int32_t getMaxLogFileSize() const; + const int32_t getLogPrintLevel() const; + const string& GetLogStorePath() const; + void setDnsXfsPeriodInMs(int32_t dns_xfs_period_ms); + const int32_t getDnsXfsPeriodInMs() const; + void setServiceThreads(int32_t timer_threads, + int32_t network_threads, int32_t signal_threads); + const int32_t getTimerThreads() const; + const int32_t getNetWorkThreads() const; + const int32_t getSignalThreads() const; + const string ToString() const; + + private: + // max log file count + int32_t log_num_; + // unit MB + int32_t log_size_; + // 0:trace, 1:debug, 2:info, 3:warn, 4:error + int32_t log_level_; + // need include log filename + string log_path_; + int32_t dns_xfs_period_ms_; + int32_t timer_threads_; + int32_t network_threads_; + int32_t signal_threads_; +}; + class BaseConfig { public: BaseConfig(); @@ -63,7 +99,7 @@ class BaseConfig { int32_t GetMaxHeartBeatRetryTimes(); void SetHeartbeatPeriodAftFailMs(int32_t heartbeat_period_afterfail_ms); int32_t GetHeartbeatPeriodAftFailMs(); - string ToString(); + const string ToString() const; private: string master_addrinfo_; @@ -118,7 +154,7 @@ class ConsumerConfig : public BaseConfig { // be blocked forever and will not return until the consumption conditions are met; // 2. if If it is set to 0, it means that the GetMessage() calling thread will only block // the ConsumerConfig.GetPartCheckSliceMs() interval when the consumption conditions - // are not met and then return; + // are not met and then return; // 3. if it is set to a positive number, it will not meet the current user usage (including // unused partitions or allocated partitions, but these partitions do not meet the usage // conditions), the GetMessage() calling thread will be blocked until the total time of @@ -138,7 +174,7 @@ class ConsumerConfig : public BaseConfig { void SetMaxConfirmWaitPeriodMs(int32_t max_confirm_wait_period_ms); const int32_t GetShutdownRebWaitPeriodMs() const; void SetShutdownRebWaitPeriodMs(int32_t wait_period_when_shutdown_ms); - string ToString(); + const string ToString() const; private: bool setGroupConsumeTarget(string& err_info, bool is_bound_consume, const string& group_name, diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc index fc0b1a6..4778673 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc @@ -68,20 +68,9 @@ TubeMQService::~TubeMQService() { Stop(err_info); } -bool TubeMQService::Start(string& err_info, string conf_file) { - // check configure file +bool TubeMQService::Start(string& err_info, + const TubeMQServiceConfig& serviceConfig) { bool result = false; - Fileini fileini; - string sector = "TubeMQ"; - - result = Utils::ValidConfigFile(err_info, conf_file); - if (!result) { - return result; - } - result = fileini.Loadini(err_info, conf_file); - if (!result) { - return result; - } result = Utils::GetLocalIPV4Address(err_info, local_host_); if (!result) { return result; @@ -90,16 +79,25 @@ bool TubeMQService::Start(string& err_info, string conf_file) { err_info = "TubeMQ Service has startted or Stopped!"; return false; } - iniLogger(fileini, sector); - iniPoolThreads(fileini, sector); - iniXfsThread(fileini, sector); + serviceConfig_ = serviceConfig; + iniServiceConfigure(); service_status_.Set(2); err_info = "Ok!"; - LOG_INFO("[TubeMQService] TubeMQ service startted!"); - + LOG_INFO("[TubeMQService] TubeMQ service startted! initial configure is %s ", + serviceConfig.ToString().c_str()); return true; } + +bool TubeMQService::Start(string& err_info, string conf_file) { + // check configure file + TubeMQServiceConfig serviceConfig; + if (!getServiceConfByFile(err_info, conf_file, serviceConfig)) { + return false; + } + return Start(err_info, serviceConfig); +} + bool TubeMQService::Stop(string& err_info) { if (service_status_.CompareAndSet(2, -1)) { LOG_INFO("[TubeMQService] TubeMQ service begin to stop!"); @@ -119,42 +117,18 @@ bool TubeMQService::Stop(string& err_info) { bool TubeMQService::IsRunning() { return (service_status_.Get() == 2); } -void TubeMQService::iniLogger(const Fileini& fileini, const string& sector) { - string err_info; - int32_t log_num = 10; - int32_t log_size = 10; - int32_t log_level = 4; - string log_path = "../log/tubemq"; - fileini.GetValue(err_info, sector, "log_num", log_num, 10); - fileini.GetValue(err_info, sector, "log_size", log_size, 100); - fileini.GetValue(err_info, sector, "log_path", log_path, "../log/tubemq"); - fileini.GetValue(err_info, sector, "log_level", log_level, 4); - log_level = TUBEMQ_MID(log_level, 4, 0); - GetLogger().Init(log_path, Logger::Level(log_level), log_size, log_num); -} - -void TubeMQService::iniXfsThread(const Fileini& fileini, const string& sector) { - string err_info; - int32_t dns_xfs_period_ms = 30 * 1000; - fileini.GetValue(err_info, sector, "dns_xfs_period_ms", dns_xfs_period_ms, 30 * 1000); - TUBEMQ_MID(dns_xfs_period_ms, tb_config::kMaxIntValue, 10000); - dns_xfs_thread_ = std::thread(&TubeMQService::thread_task_dnsxfs, this, dns_xfs_period_ms); -} - -void TubeMQService::iniPoolThreads(const Fileini& fileini, const string& sector) { - string err_info; - int32_t timer_threads = 2; - int32_t network_threads = 4; - int32_t signal_threads = 8; - fileini.GetValue(err_info, sector, "timer_threads", timer_threads, 2); - TUBEMQ_MID(timer_threads, 50, 2); - fileini.GetValue(err_info, sector, "network_threads", network_threads, 4); - TUBEMQ_MID(network_threads, 50, 4); - fileini.GetValue(err_info, sector, "signal_threads", signal_threads, 8); - TUBEMQ_MID(signal_threads, 50, 4); - timer_executor_->Resize(timer_threads); - network_executor_->Resize(network_threads); - thread_pool_ = std::make_shared<ThreadPool>(signal_threads); +void TubeMQService::iniServiceConfigure() { + // initial logger parameters + GetLogger().Init(serviceConfig_.GetLogStorePath(), + Logger::Level(serviceConfig_.getLogPrintLevel()), + serviceConfig_.getMaxLogFileSize(), serviceConfig_.getMaxLogFileNum()); + // initial dns translate thread + dns_xfs_thread_ = std::thread(&TubeMQService::thread_task_dnsxfs, + this, serviceConfig_.getDnsXfsPeriodInMs()); + // initial service thread pools + timer_executor_->Resize(serviceConfig_.getTimerThreads()); + network_executor_->Resize(serviceConfig_.getNetWorkThreads()); + thread_pool_ = std::make_shared<ThreadPool>(serviceConfig_.getSignalThreads()); connection_pool_ = std::make_shared<ConnectionPool>(network_executor_); } @@ -292,4 +266,45 @@ void TubeMQService::updMasterAddrByDns() { } } +bool TubeMQService::getServiceConfByFile(string& err_info, + string conf_file, TubeMQServiceConfig& serviceConfig) { + // check configure file + bool result = false; + Fileini fileini; + string sector = "TubeMQ"; + result = Utils::ValidConfigFile(err_info, conf_file); + if (!result) { + return result; + } + result = fileini.Loadini(err_info, conf_file); + if (!result) { + return result; + } + // get log paremeters + int32_t log_num = 10; + int32_t log_size = 100; + int32_t log_level = 4; + string log_path = "../log/tubemq"; + fileini.GetValue(err_info, sector, "log_num", log_num, 10); + fileini.GetValue(err_info, sector, "log_size", log_size, 100); + fileini.GetValue(err_info, sector, "log_path", log_path, "../log/tubemq"); + fileini.GetValue(err_info, sector, "log_level", log_level, 4); + log_level = TUBEMQ_MID(log_level, 4, 0); + serviceConfig.setLogCofigInfo(log_num, log_size, log_level, log_path); + // get dns translate period + int32_t dns_xfs_period_ms = 30 * 1000; + fileini.GetValue(err_info, sector, "dns_xfs_period_ms", dns_xfs_period_ms, 30 * 1000); + serviceConfig.setDnsXfsPeriodInMs(dns_xfs_period_ms); + // get thread pools paremeters + int32_t timer_threads = 2; + int32_t network_threads = 4; + int32_t signal_threads = 8; + fileini.GetValue(err_info, sector, "timer_threads", timer_threads, 2); + fileini.GetValue(err_info, sector, "network_threads", network_threads, 4); + fileini.GetValue(err_info, sector, "signal_threads", signal_threads, 8); + serviceConfig.setServiceThreads(timer_threads, network_threads, signal_threads); + err_info = "Ok"; + return true; +} + } // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h index a84206b..8d50269 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h +++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h @@ -62,6 +62,8 @@ using BaseClientPtr = std::shared_ptr<BaseClient>; class TubeMQService : public noncopyable { public: static TubeMQService* Instance(); + bool Start(string& err_info, const TubeMQServiceConfig& serviceConfig); + // Deprecated method bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf"); bool Stop(string& err_info); bool IsRunning(); @@ -90,17 +92,18 @@ class TubeMQService : public noncopyable { private: TubeMQService(); ~TubeMQService(); - void iniLogger(const Fileini& fileini, const string& sector); - void iniPoolThreads(const Fileini& fileini, const string& sector); - void iniXfsThread(const Fileini& fileini, const string& sector); + void iniServiceConfigure(); void thread_task_dnsxfs(int dns_xfs_period_ms); void shutDownClinets() const; bool hasXfsTask(map<string, int32_t>& src_addr_map); bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map); + bool getServiceConfByFile(string& err_info, + string conf_file, TubeMQServiceConfig& serviceConfig); private: static TubeMQService* _instance; string local_host_; + TubeMQServiceConfig serviceConfig_; AtomicInteger service_status_; AtomicInteger client_index_base_; mutable mutex mutex_; diff --git a/tubemq-client-twins/tubemq-client-cpp/src/const_config.h b/tubemq-client-twins/tubemq-client-cpp/src/const_config.h index d22f7e7..c854548 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/const_config.h +++ b/tubemq-client-twins/tubemq-client-cpp/src/const_config.h @@ -36,6 +36,20 @@ using std::string; // configuration value setting namespace tb_config { +// log setting default define +static const int32_t kLogNumDef = 10; +static const int32_t kLogSizeDefMB = 100; +static const int32_t kLogLevelDef = 4; +static const char kLogPathDef[] = "../log/tubemq"; + +// dns tranlate period in ms +static const int32_t kDnsXfsPeriodInMsDef = 30000; + +// frame threads define +static const int32_t kTimerThreadNumDef = 2; +static const int32_t kNetworkThreadNumDef = 4; +static const int32_t kSignalThreadNumDef = 8; + // rpc timeout define static const int32_t kRpcTimoutDefMs = 15000; static const int32_t kRpcTimoutMaxMs = 300000; diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc index dfa15ca..e3027be 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc @@ -35,6 +35,12 @@ namespace tubemq { using std::lock_guard; using std::stringstream; +bool StartTubeMQService(string& err_info, + const TubeMQServiceConfig& serviceConfig) { + signal(SIGPIPE, SIG_IGN); + return TubeMQService::Instance()->Start(err_info, serviceConfig); +} + bool StartTubeMQService(string& err_info, const string& conf_file) { signal(SIGPIPE, SIG_IGN); return TubeMQService::Instance()->Start(err_info, conf_file); diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc index 877b7f1..809dd09 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc @@ -32,6 +32,112 @@ using std::set; using std::stringstream; using std::vector; + + +TubeMQServiceConfig::TubeMQServiceConfig() { + log_num_ = tb_config::kLogNumDef; + log_size_ = tb_config::kLogSizeDefMB; + log_level_ = tb_config::kLogLevelDef; + log_path_ = tb_config::kLogPathDef; + dns_xfs_period_ms_ = tb_config::kDnsXfsPeriodInMsDef; + timer_threads_ = tb_config::kTimerThreadNumDef; + network_threads_ = tb_config::kNetworkThreadNumDef; + signal_threads_ = tb_config::kSignalThreadNumDef; +} + +TubeMQServiceConfig::~TubeMQServiceConfig() { + // +} + +TubeMQServiceConfig& TubeMQServiceConfig::operator=(const TubeMQServiceConfig& target) { + if (this != &target) { + log_num_ = target.log_num_; + log_size_ = target.log_size_; + log_level_ = target.log_level_; + log_path_ = target.log_path_; + dns_xfs_period_ms_ = target.dns_xfs_period_ms_; + timer_threads_ = target.timer_threads_; + network_threads_ = target.network_threads_; + signal_threads_ = target.signal_threads_; + } + return *this; +} + +void TubeMQServiceConfig::setLogCofigInfo(int32_t log_max_num, + int32_t log_max_size, int32_t log_level, const string& log_path) { + log_num_ = log_max_num; + log_size_ = log_max_size; + log_level_ = log_level; + log_path_ = log_path; + log_level_ = TUBEMQ_MID(log_level, 4, 0); +} + +void TubeMQServiceConfig::setDnsXfsPeriodInMs(int32_t dns_xfs_period_ms) { + dns_xfs_period_ms_ = + TUBEMQ_MID(dns_xfs_period_ms, tb_config::kMaxIntValue, 10000); +} + +void TubeMQServiceConfig::setServiceThreads(int32_t timer_threads, + int32_t network_threads, int32_t signal_threads) { + timer_threads_ = TUBEMQ_MID(timer_threads, 50, 2); + network_threads_ = TUBEMQ_MID(network_threads, 50, 4); + signal_threads_ = TUBEMQ_MID(signal_threads, 50, 4); +} + +const int32_t TubeMQServiceConfig::getMaxLogFileNum() const { + return log_num_; +} + +const int32_t TubeMQServiceConfig::getMaxLogFileSize() const { + return log_size_; +} + +const int32_t TubeMQServiceConfig::getLogPrintLevel() const { + return log_level_; +} + +const string& TubeMQServiceConfig::GetLogStorePath() const { + return log_path_; +} + +const int32_t TubeMQServiceConfig::getDnsXfsPeriodInMs() const { + return dns_xfs_period_ms_; +} + +const int32_t TubeMQServiceConfig::getTimerThreads() const { + return timer_threads_; +} + +const int32_t TubeMQServiceConfig::getNetWorkThreads() const { + return network_threads_; +} + +const int32_t TubeMQServiceConfig::getSignalThreads() const { + return signal_threads_; +} + +const string TubeMQServiceConfig::ToString() const { + stringstream ss; + ss << "TubeMQServiceConfig={log_num_="; + ss << log_num_; + ss << ", log_size_="; + ss << log_size_; + ss << ", log_level_="; + ss << log_level_; + ss << ", log_path_='"; + ss << log_path_; + ss << "', dns_xfs_period_ms_="; + ss << dns_xfs_period_ms_; + ss << ", timer_threads_="; + ss << timer_threads_; + ss << ", network_threads_="; + ss << network_threads_; + ss << ", signal_threads_="; + ss << signal_threads_; + ss << "}"; + return ss.str(); +} + BaseConfig::BaseConfig() { master_addrinfo_ = ""; auth_enable_ = false; @@ -188,7 +294,7 @@ void BaseConfig::SetHeartbeatPeriodAftFailMs(int32_t heartbeat_period_afterfail_ int32_t BaseConfig::GetHeartbeatPeriodAftFailMs() { return heartbeat_period_afterfail_ms_; } -string BaseConfig::ToString() { +const string BaseConfig::ToString() const { stringstream ss; ss << "BaseConfig={master_addrinfo_='"; ss << master_addrinfo_; @@ -538,11 +644,11 @@ void ConsumerConfig::SetShutdownRebWaitPeriodMs( shutdown_reb_wait_period_ms_ = wait_period_when_shutdown_ms; } -string ConsumerConfig::ToString() { +const string ConsumerConfig::ToString() const { int32_t i = 0; stringstream ss; - map<string, int64_t>::iterator it; - map<string, set<string> >::iterator it_map; + map<string, int64_t>::const_iterator it; + map<string, set<string> >::const_iterator it_map; // print info ss << "ConsumerConfig = {"; @@ -560,7 +666,7 @@ string ConsumerConfig::ToString() { ss << "'=["; int32_t j = 0; set<string> topic_set = it_map->second; - for (set<string>::iterator it = topic_set.begin(); it != topic_set.end(); ++it) { + for (set<string>::const_iterator it = topic_set.begin(); it != topic_set.end(); ++it) { if (j++ > 0) { ss << ","; } diff --git a/tubemq-client-twins/tubemq-client-cpp/src/version.h b/tubemq-client-twins/tubemq-client-cpp/src/version.h index c479ede..5fb7c74 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/version.h +++ b/tubemq-client-twins/tubemq-client-cpp/src/version.h @@ -26,7 +26,7 @@ namespace tubemq { using std::string; -static const char kTubeMQClientVersion[] = "0.1.0-0.5.0"; +static const char kTubeMQClientVersion[] = "0.1.1-0.5.0"; } // namespace tubemq
