This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch tubemq-client-cpp in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push: new a7ece60 [TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex (#210) a7ece60 is described below commit a7ece606488fcc0cea0ac7ade03fe4d4cd9a87ef Author: gosonzhang <4675...@qq.com> AuthorDate: Fri Jul 17 11:46:14 2020 +0000 [TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex (#210) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../include/tubemq/flowctrl_def.h | 16 ++-- .../include/tubemq/rmt_data_cache.h | 19 ++-- .../include/tubemq/tubemq_config.h | 5 + .../tubemq-client-cpp/src/flowctrl_def.cc | 71 ++++++++------ .../tubemq-client-cpp/src/meta_info.cc | 4 +- .../tubemq-client-cpp/src/rmt_data_cache.cc | 102 ++++++--------------- 6 files changed, 101 insertions(+), 116 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h index c0289ca..8b99cbd 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h @@ -26,6 +26,7 @@ #include <algorithm> #include <list> #include <map> +#include <mutex> #include <string> #include <vector> @@ -34,6 +35,7 @@ namespace tubemq { using std::map; +using std::mutex; using std::string; using std::vector; @@ -94,12 +96,12 @@ class FlowCtrlRuleHandler { const string& flowctrl_info); bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) const; int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) const; + void GetFilterCtrlItem(FlowCtrlItem& result) const; + void GetFlowCtrlInfo(string& flowctrl_info) const; int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); } int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); } void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); } int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); } - const FlowCtrlItem& GetFilterCtrlItem() const { return this->filter_ctrl_item_; } - const string& GetFlowCtrlInfo() const { return this->flowctrl_info_; } private: void initialStatisData(); @@ -124,17 +126,17 @@ class FlowCtrlRuleHandler { int32_t& value); private: + mutable mutex config_lock_; + string flowctrl_info_; + FlowCtrlItem filter_ctrl_item_; + map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_; + int64_t last_update_time_; AtomicLong flowctrl_id_; AtomicInteger qrypriority_id_; - string flowctrl_info_; AtomicInteger min_zero_cnt_; AtomicLong min_datadlt_limt_; AtomicInteger datalimit_start_time_; AtomicInteger datalimit_end_time_; - FlowCtrlItem filter_ctrl_item_; - map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_; - pthread_rwlock_t configrw_lock_; - int64_t last_update_time_; }; } // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h index af12ce4..98f192e 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h @@ -20,11 +20,12 @@ #ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_ #define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_ -#include <pthread.h> #include <stdint.h> +#include <condition_variable> #include <list> #include <map> +#include <mutex> #include <set> #include <string> #include <tuple> @@ -41,9 +42,11 @@ namespace tubemq { +using std::condition_variable; using std::map; using std::set; using std::list; +using std::mutex; using std::string; using std::tuple; @@ -110,7 +113,7 @@ class RmtDataCacheCsm { private: // timer executor - ExecutorPtr executor_; + ExecutorPool executor_; // string consumer_id_; string group_name_; @@ -120,7 +123,7 @@ class RmtDataCacheCsm { AtomicBoolean under_groupctrl_; AtomicLong last_checktime_; // meta info - pthread_rwlock_t meta_rw_lock_; + mutable mutex meta_lock_; // partiton allocated map map<string, PartitionExt> partitions_; // topic partiton map @@ -129,25 +132,23 @@ class RmtDataCacheCsm { map<NodeInfo, set<string> > broker_partition_; map<string, SubscribeInfo> part_subinfo_; // for idle partitions occupy - pthread_mutex_t part_mutex_; - // for partiton idle map list<string> index_partitions_; // for partition used map map<string, int64_t> partition_useds_; // for partiton timer map map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_; // data book - pthread_mutex_t data_book_mutex_; + mutable mutex data_book_mutex_; // for partition offset cache map<string, int64_t> partition_offset_; // for partiton register booked map<string, bool> part_reg_booked_; // event - pthread_mutex_t event_read_mutex_; - pthread_cond_t event_read_cond_; + mutable mutex event_read_mutex_; + condition_variable event_read_cond_; list<ConsumerEvent> rebalance_events_; - pthread_mutex_t event_write_mutex_; + mutable mutex event_write_mutex_; list<ConsumerEvent> rebalance_results_; }; diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h index df3f09c..3660cbb 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h @@ -99,6 +99,11 @@ class ConsumerConfig : public BaseConfig { const map<string, set<string> >& subscribed_topic_and_filter_map, const string& session_key, uint32_t source_count, bool is_select_big, const map<string, int64_t>& part_offset_map); + bool IsBoundConsume() { return is_bound_consume_; } + const string& GetSessionKey() const { return session_key_; } + const uint32_t GetSourceCount() const { return source_count_; } + bool IsSelectBig() { return is_select_big_; } + const map<string, int64_t>& GetPartOffsetInfo() const { return part_offset_map_; } const string& GetGroupName() const; const map<string, set<string> >& GetSubTopicAndFilterMap() const; void SetConsumePosition(ConsumePosition consume_from_where); diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc index 370e6e6..ef3357f 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc @@ -32,6 +32,7 @@ namespace tubemq { using std::stringstream; +using std::lock_guard; FlowCtrlResult::FlowCtrlResult() { this->datasize_limit_ = tb_config::kMaxIntValue; @@ -171,10 +172,11 @@ FlowCtrlRuleHandler::FlowCtrlRuleHandler() { this->datalimit_start_time_.Set(2500); this->datalimit_end_time_.Set(tb_config::kInvalidValue); this->last_update_time_ = Utils::GetCurrentTimeMillis(); - pthread_rwlock_init(&configrw_lock_, NULL); } -FlowCtrlRuleHandler::~FlowCtrlRuleHandler() { pthread_rwlock_destroy(&configrw_lock_); } +FlowCtrlRuleHandler::~FlowCtrlRuleHandler() { + // +} void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id, int64_t flowctrl_id, const string& flowctrl_info) { @@ -186,7 +188,7 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qryprio if (flowctrl_info.length() > 0) { parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map); } - pthread_rwlock_wrlock(&this->configrw_lock_); + lock_guard<mutex> lck(config_lock_); this->flowctrl_id_.Set(flowctrl_id); this->qrypriority_id_.Set(qrypriority_id); clearStatisData(); @@ -199,7 +201,6 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qryprio initialStatisData(); } this->last_update_time_ = Utils::GetCurrentTimeMillis(); - pthread_rwlock_unlock(&this->configrw_lock_); if (is_default) { LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id, flowctrl_id); @@ -268,51 +269,69 @@ void FlowCtrlRuleHandler::clearStatisData() { bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) const { struct tm utc_tm; + bool result = false; vector<FlowCtrlItem>::const_iterator it_vec; map<int, vector<FlowCtrlItem> >::const_iterator it_map; + // get current data limit time_t cur_time = time(NULL); - gmtime_r(&cur_time, &utc_tm); int curr_time = (utc_tm.tm_hour + 8) % 24 * 100 + utc_tm.tm_min; - if ((last_datadlt < this->min_datadlt_limt_.Get()) || - (curr_time < this->datalimit_start_time_.Get()) || - (curr_time > this->datalimit_end_time_.Get())) { + if ((last_datadlt < this->min_datadlt_limt_.Get()) + || (curr_time < this->datalimit_start_time_.Get()) + || (curr_time > this->datalimit_end_time_.Get())) { return false; } + // search total flowctrl rule + lock_guard<mutex> lck(config_lock_); it_map = this->flowctrl_rules_.find(0); - if (it_map == this->flowctrl_rules_.end()) { - return false; - } - for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) { - if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) { - return true; + if (it_map != this->flowctrl_rules_.end()) { + for (it_vec = it_map->second.begin();it_vec != it_map->second.end(); ++it_vec) { + if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) { + result = true; + break; + } } } - return false; + return result; } int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) const { - int32_t rule_val = -2; + int32_t limit_data = received_limit; vector<FlowCtrlItem>::const_iterator it_vec; map<int, vector<FlowCtrlItem> >::const_iterator it_map; - + // check min zero count if (msg_zero_cnt < this->min_zero_cnt_.Get()) { - return received_limit; + return limit_data; } + // search rule allow value + lock_guard<mutex> lck(config_lock_); it_map = this->flowctrl_rules_.find(1); - if (it_map == this->flowctrl_rules_.end()) { - return received_limit; - } - for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) { - rule_val = it_vec->GetFreLimit(msg_zero_cnt); - if (rule_val >= 0) { - return rule_val; + if (it_map != this->flowctrl_rules_.end()) { + for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) { + limit_data = it_vec->GetFreLimit(msg_zero_cnt); + if (limit_data >= 0) { + break; + } } } - return received_limit; + return limit_data; +} + +void FlowCtrlRuleHandler::GetFilterCtrlItem(FlowCtrlItem& result) const { + result.Clear(); + lock_guard<mutex> lck(config_lock_); + result = this->filter_ctrl_item_; +} + +void FlowCtrlRuleHandler::GetFlowCtrlInfo(string& flowctrl_info) const { + flowctrl_info.clear(); + lock_guard<mutex> lck(config_lock_); + flowctrl_info = this->flowctrl_info_; } + + bool FlowCtrlRuleHandler::compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2) { if (o1.GetStartTime() >= o2.GetStartTime()) { return true; diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc index 55e8cdc..9299f71 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc @@ -408,9 +408,9 @@ void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_ha this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 0); } } - this->cur_freqctrl_ = group_flowctrl_handler.GetFilterCtrlItem(); + group_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_); if (this->cur_freqctrl_.GetFreqMsLimit() < 0) { - this->cur_freqctrl_ = def_flowctrl_handler.GetFilterCtrlItem(); + def_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_); } curr_time = Utils::GetCurrentTimeMillis(); } diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc index 8b1e76b..c9c499d 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc @@ -30,27 +30,18 @@ namespace tubemq { - +using std::lock_guard; +using std::unique_lock; +using namespace std::placeholders; RmtDataCacheCsm::RmtDataCacheCsm() { under_groupctrl_.Set(false); last_checktime_.Set(0); - pthread_rwlock_init(&meta_rw_lock_, NULL); - pthread_mutex_init(&part_mutex_, NULL); - pthread_mutex_init(&data_book_mutex_, NULL); - pthread_mutex_init(&event_read_mutex_, NULL); - pthread_cond_init(&event_read_cond_, NULL); - pthread_mutex_init(&event_write_mutex_, NULL); } RmtDataCacheCsm::~RmtDataCacheCsm() { - pthread_mutex_destroy(&event_write_mutex_); - pthread_mutex_destroy(&event_read_mutex_); - pthread_mutex_destroy(&data_book_mutex_); - pthread_cond_destroy(&event_read_cond_); - pthread_mutex_destroy(&part_mutex_); - pthread_rwlock_destroy(&meta_rw_lock_); + // } void RmtDataCacheCsm::SetConsumerInfo(const string& client_id, @@ -103,7 +94,8 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) { // SubscribeInfo sub_info(consumer_id_, group_name_, partition_ext); string partition_key = partition_ext.GetPartitionKey(); - pthread_rwlock_wrlock(&meta_rw_lock_); + // lock operate + lock_guard<mutex> lck(meta_lock_); it_map = partitions_.find(partition_key); if (it_map == partitions_.end()) { partitions_[partition_key] = partition_ext; @@ -130,10 +122,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) { part_subinfo_[partition_key] = sub_info; } // check partition_key status - pthread_mutex_lock(&part_mutex_); resetIdlePartition(partition_key, true); - pthread_mutex_unlock(&part_mutex_); - pthread_rwlock_unlock(&meta_rw_lock_); } bool RmtDataCacheCsm::SelectPartition(string &err_info, @@ -142,13 +131,12 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info, int64_t booked_time = 0; string partition_key; map<string, PartitionExt>::iterator it_map; - - pthread_rwlock_rdlock(&meta_rw_lock_); + // lock operate + lock_guard<mutex> lck(meta_lock_); if (partitions_.empty()) { err_info = "No partition info in local cache, please retry later!"; result = false; } else { - pthread_mutex_lock(&part_mutex_); if (index_partitions_.empty()) { err_info = "No idle partition to consume, please retry later!"; result = false; @@ -167,9 +155,7 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info, err_info = "Ok"; } } - pthread_mutex_unlock(&part_mutex_); } - pthread_rwlock_unlock(&meta_rw_lock_); return result; } @@ -180,18 +166,16 @@ void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, map<string, PartitionExt>::iterator it_part; // book partition offset info if (curr_offset >= 0) { - pthread_mutex_lock(&data_book_mutex_); + lock_guard<mutex> lck1(data_book_mutex_); partition_offset_[partition_key] = curr_offset; - pthread_mutex_unlock(&data_book_mutex_); } // book partition temp info - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck2(meta_lock_); it_part = partitions_.find(partition_key); if (it_part != partitions_.end()) { it_part->second.BookConsumeData(err_code, msg_size, esc_limit, limit_dlt, cur_data_dlt, require_slow); } - pthread_rwlock_unlock(&meta_rw_lock_); } // success process release partition @@ -233,7 +217,7 @@ void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info // initial return; subscribed_partitions.clear(); unsub_partitions.clear(); - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); if (partitions_.empty()) { for (it_lst = subscribe_info_lst.begin(); it_lst != subscribe_info_lst.end(); it_lst++) { unsub_partitions.push_back(it_lst->GetPartitionExt()); @@ -248,17 +232,15 @@ void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info } } } - pthread_rwlock_unlock(&meta_rw_lock_); } void RmtDataCacheCsm::GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst) { map<string, SubscribeInfo>::iterator it_sub; subscribe_info_lst.clear(); - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); for (it_sub = part_subinfo_.begin(); it_sub != part_subinfo_.end(); ++it_sub) { subscribe_info_lst.push_back(it_sub->second); } - pthread_rwlock_unlock(&meta_rw_lock_); } void RmtDataCacheCsm::GetAllBrokerPartitions( @@ -267,7 +249,7 @@ void RmtDataCacheCsm::GetAllBrokerPartitions( map<NodeInfo, list<PartitionExt> >::iterator it_broker; broker_parts.clear(); - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); for (it_part = partitions_.begin(); it_part != partitions_.end(); ++it_part) { it_broker = broker_parts.find(it_part->second.GetBrokerInfo()); if (it_broker == broker_parts.end()) { @@ -278,20 +260,18 @@ void RmtDataCacheCsm::GetAllBrokerPartitions( it_broker->second.push_back(it_part->second); } } - pthread_rwlock_unlock(&meta_rw_lock_); } bool RmtDataCacheCsm::GetPartitionExt(const string& part_key, PartitionExt& partition_ext) { bool result = false; map<string, PartitionExt>::iterator it_map; - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); it_map = partitions_.find(part_key); if (it_map != partitions_.end()) { result = true; partition_ext = it_map->second; } - pthread_rwlock_unlock(&meta_rw_lock_); return result; } @@ -299,11 +279,10 @@ void RmtDataCacheCsm::GetRegBrokers(list<NodeInfo>& brokers) { map<NodeInfo, set<string> >::iterator it; brokers.clear(); - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); for (it = broker_partition_.begin(); it != broker_partition_.end(); ++it) { brokers.push_back(it->first); } - pthread_rwlock_unlock(&meta_rw_lock_); } void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info, @@ -313,7 +292,7 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info, map<string, PartitionExt>::iterator it_part; partition_list.clear(); - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); it_broker = broker_partition_.find(broker_info); if (it_broker != broker_partition_.end()) { for (it_key = it_broker->second.begin(); @@ -324,7 +303,6 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info, } } } - pthread_rwlock_unlock(&meta_rw_lock_); } @@ -332,11 +310,10 @@ void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_ma map<string, int64_t>::iterator it; part_offset_map.clear(); - pthread_mutex_lock(&data_book_mutex_); + lock_guard<mutex> lck(data_book_mutex_); for (it = partition_offset_.begin(); it != partition_offset_.end(); ++it) { part_offset_map[it->first] = it->second; } - pthread_mutex_unlock(&data_book_mutex_); } @@ -373,15 +350,12 @@ void RmtDataCacheCsm::RemovePartition(const list<PartitionExt>& partition_list) void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) { set<string>::const_iterator it_lst; - pthread_rwlock_wrlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); for (it_lst = partition_keys.begin(); it_lst != partition_keys.end(); it_lst++) { - pthread_mutex_lock(&part_mutex_); resetIdlePartition(*it_lst, false); - pthread_mutex_unlock(&part_mutex_); // remove meta info set info rmvMetaInfo(*it_lst); } - pthread_rwlock_unlock(&meta_rw_lock_); } void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos, @@ -397,8 +371,7 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe if (subscribe_infos.empty()) { return; } - pthread_rwlock_wrlock(&meta_rw_lock_); - pthread_mutex_lock(&part_mutex_); + lock_guard<mutex> lck(meta_lock_); for (it = subscribe_infos.begin(); it != subscribe_infos.end(); ++it) { part_key = it->GetPartitionExt().GetPartitionKey(); it_part = partitions_.find(part_key); @@ -422,8 +395,6 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe } resetIdlePartition(part_key, false); } - pthread_mutex_unlock(&part_mutex_); - pthread_rwlock_unlock(&meta_rw_lock_); } @@ -431,64 +402,56 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe bool RmtDataCacheCsm::BookPartition(const string& partition_key) { bool result = false; map<string, bool>::iterator it; - pthread_mutex_lock(&data_book_mutex_); + + lock_guard<mutex> lck(data_book_mutex_); it = part_reg_booked_.find(partition_key); if (it == part_reg_booked_.end()) { part_reg_booked_[partition_key] = true; } - pthread_mutex_unlock(&data_book_mutex_); return result; } void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) { - pthread_mutex_lock(&event_read_mutex_); + unique_lock<mutex> lck(event_read_mutex_); this->rebalance_events_.push_back(event); - pthread_cond_broadcast(&event_read_cond_); - pthread_mutex_unlock(&event_read_mutex_); + event_read_cond_.notify_all(); } void RmtDataCacheCsm::TakeEvent(ConsumerEvent& event) { - pthread_mutex_lock(&event_read_mutex_); + unique_lock<mutex> lck(event_read_mutex_); while (this->rebalance_events_.empty()) { - pthread_cond_wait(&event_read_cond_, &event_read_mutex_); + event_read_cond_.wait(lck); } event = rebalance_events_.front(); rebalance_events_.pop_front(); - pthread_mutex_unlock(&event_read_mutex_); } void RmtDataCacheCsm::ClearEvent() { - pthread_mutex_lock(&event_read_mutex_); + unique_lock<mutex> lck(event_read_mutex_); rebalance_events_.clear(); - pthread_mutex_unlock(&event_read_mutex_); } void RmtDataCacheCsm::OfferEventResult(const ConsumerEvent& event) { - pthread_mutex_lock(&event_write_mutex_); + lock_guard<mutex> lck(event_write_mutex_); this->rebalance_events_.push_back(event); - pthread_mutex_unlock(&event_write_mutex_); } bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) { bool result = false; - pthread_mutex_lock(&event_write_mutex_); + lock_guard<mutex> lck(event_write_mutex_); if (!rebalance_events_.empty()) { event = rebalance_events_.front(); rebalance_events_.pop_front(); result = true; } - pthread_mutex_unlock(&event_write_mutex_); return result; } void RmtDataCacheCsm::HandleTimeout(const string partition_key, const asio::error_code& error) { if (!error) { - pthread_rwlock_rdlock(&meta_rw_lock_); - pthread_mutex_lock(&part_mutex_); + lock_guard<mutex> lck(meta_lock_); resetIdlePartition(partition_key, true); - pthread_mutex_unlock(&part_mutex_); - pthread_rwlock_unlock(&meta_rw_lock_); } } @@ -579,18 +542,15 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check, if (!result) { return false; } - pthread_rwlock_rdlock(&meta_rw_lock_); + lock_guard<mutex> lck(meta_lock_); it_part = partitions_.find(partition_key); if (it_part == partitions_.end()) { // partition is unregister, release partition - pthread_mutex_lock(&part_mutex_); partition_useds_.erase(partition_key); index_partitions_.remove(partition_key); - pthread_mutex_unlock(&part_mutex_); err_info = "Not found the partition in Consume Partition set!"; result = false; } else { - pthread_mutex_lock(&part_mutex_); it_used = partition_useds_.find(partition_key); if (it_used == partition_useds_.end()) { // partition is release but registered @@ -619,9 +579,7 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check, result = false; } } - pthread_mutex_unlock(&part_mutex_); } - pthread_rwlock_unlock(&meta_rw_lock_); return result; }