This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new a7ece60  [TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex (#210)
a7ece60 is described below

commit a7ece606488fcc0cea0ac7ade03fe4d4cd9a87ef
Author: gosonzhang <4675...@qq.com>
AuthorDate: Fri Jul 17 11:46:14 2020 +0000

    [TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex (#210)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../include/tubemq/flowctrl_def.h                  |  16 ++--
 .../include/tubemq/rmt_data_cache.h                |  19 ++--
 .../include/tubemq/tubemq_config.h                 |   5 +
 .../tubemq-client-cpp/src/flowctrl_def.cc          |  71 ++++++++------
 .../tubemq-client-cpp/src/meta_info.cc             |   4 +-
 .../tubemq-client-cpp/src/rmt_data_cache.cc        | 102 ++++++---------------
 6 files changed, 101 insertions(+), 116 deletions(-)

diff --git 
a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
index c0289ca..8b99cbd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/flowctrl_def.h
@@ -26,6 +26,7 @@
 #include <algorithm>
 #include <list>
 #include <map>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -34,6 +35,7 @@
 namespace tubemq {
 
 using std::map;
+using std::mutex;
 using std::string;
 using std::vector;
 
@@ -94,12 +96,12 @@ class FlowCtrlRuleHandler {
                              const string& flowctrl_info);
   bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) 
const;
   int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) 
const;
+  void GetFilterCtrlItem(FlowCtrlItem& result) const;
+  void GetFlowCtrlInfo(string& flowctrl_info) const;
   int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); }
   int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); }
   void SetQryPriorityId(int32_t qrypriority_id) { 
this->qrypriority_id_.Set(qrypriority_id); }
   int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
-  const FlowCtrlItem& GetFilterCtrlItem() const { return 
this->filter_ctrl_item_; }
-  const string& GetFlowCtrlInfo() const { return this->flowctrl_info_; }
 
  private:
   void initialStatisData();
@@ -124,17 +126,17 @@ class FlowCtrlRuleHandler {
                        int32_t& value);
 
  private:
+  mutable mutex config_lock_;
+  string flowctrl_info_;
+  FlowCtrlItem filter_ctrl_item_;
+  map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
+  int64_t last_update_time_;
   AtomicLong flowctrl_id_;
   AtomicInteger qrypriority_id_;
-  string flowctrl_info_;
   AtomicInteger min_zero_cnt_;
   AtomicLong min_datadlt_limt_;
   AtomicInteger datalimit_start_time_;
   AtomicInteger datalimit_end_time_;
-  FlowCtrlItem filter_ctrl_item_;
-  map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
-  pthread_rwlock_t configrw_lock_;
-  int64_t last_update_time_;
 };
 
 }  // namespace tubemq
diff --git 
a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index af12ce4..98f192e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -20,11 +20,12 @@
 #ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
 #define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
 
-#include <pthread.h>
 #include <stdint.h>
 
+#include <condition_variable>
 #include <list>
 #include <map>
+#include <mutex>
 #include <set>
 #include <string>
 #include <tuple>
@@ -41,9 +42,11 @@
 
 namespace tubemq {
 
+using std::condition_variable;
 using std::map;
 using std::set;
 using std::list;
+using std::mutex;
 using std::string;
 using std::tuple;
 
@@ -110,7 +113,7 @@ class RmtDataCacheCsm {
 
  private:
   // timer executor
-  ExecutorPtr executor_;
+  ExecutorPool executor_;
   // 
   string consumer_id_;
   string group_name_;
@@ -120,7 +123,7 @@ class RmtDataCacheCsm {
   AtomicBoolean under_groupctrl_;
   AtomicLong last_checktime_;
   // meta info
-  pthread_rwlock_t meta_rw_lock_;
+  mutable mutex meta_lock_;
   // partiton allocated map
   map<string, PartitionExt> partitions_;
   // topic partiton map
@@ -129,25 +132,23 @@ class RmtDataCacheCsm {
   map<NodeInfo, set<string> > broker_partition_;
   map<string, SubscribeInfo>  part_subinfo_;
   // for idle partitions occupy
-  pthread_mutex_t  part_mutex_;
-  // for partiton idle map
   list<string> index_partitions_;
   // for partition used map
   map<string, int64_t> partition_useds_;
   // for partiton timer map
   map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_;
   // data book
-  pthread_mutex_t data_book_mutex_;
+  mutable mutex data_book_mutex_;
   // for partition offset cache
   map<string, int64_t> partition_offset_;
   // for partiton register booked
   map<string, bool> part_reg_booked_;
 
   // event
-  pthread_mutex_t  event_read_mutex_;
-  pthread_cond_t   event_read_cond_;
+  mutable mutex event_read_mutex_;
+  condition_variable event_read_cond_;
   list<ConsumerEvent> rebalance_events_;
-  pthread_mutex_t  event_write_mutex_;
+  mutable mutex event_write_mutex_;
   list<ConsumerEvent> rebalance_results_;
 };
 
diff --git 
a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
index df3f09c..3660cbb 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/tubemq_config.h
@@ -99,6 +99,11 @@ class ConsumerConfig : public BaseConfig {
                              const map<string, set<string> >& 
subscribed_topic_and_filter_map,
                              const string& session_key, uint32_t source_count, 
bool is_select_big,
                              const map<string, int64_t>& part_offset_map);
+  bool IsBoundConsume() { return is_bound_consume_; }
+  const string& GetSessionKey() const { return session_key_; }
+  const uint32_t GetSourceCount() const { return source_count_; }
+  bool IsSelectBig() { return is_select_big_; }
+  const map<string, int64_t>& GetPartOffsetInfo() const { return 
part_offset_map_; }
   const string& GetGroupName() const;
   const map<string, set<string> >& GetSubTopicAndFilterMap() const;
   void SetConsumePosition(ConsumePosition consume_from_where);
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc 
b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
index 370e6e6..ef3357f 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
@@ -32,6 +32,7 @@
 namespace tubemq {
 
 using std::stringstream;
+using std::lock_guard;
 
 FlowCtrlResult::FlowCtrlResult() {
   this->datasize_limit_ = tb_config::kMaxIntValue;
@@ -171,10 +172,11 @@ FlowCtrlRuleHandler::FlowCtrlRuleHandler() {
   this->datalimit_start_time_.Set(2500);
   this->datalimit_end_time_.Set(tb_config::kInvalidValue);
   this->last_update_time_ = Utils::GetCurrentTimeMillis();
-  pthread_rwlock_init(&configrw_lock_, NULL);
 }
 
-FlowCtrlRuleHandler::~FlowCtrlRuleHandler() { 
pthread_rwlock_destroy(&configrw_lock_); }
+FlowCtrlRuleHandler::~FlowCtrlRuleHandler() {
+  // 
+}
 
 void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t 
qrypriority_id,
                                                 int64_t flowctrl_id, const 
string& flowctrl_info) {
@@ -186,7 +188,7 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool 
is_default, int32_t qryprio
   if (flowctrl_info.length() > 0) {
     parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map);
   }
-  pthread_rwlock_wrlock(&this->configrw_lock_);
+  lock_guard<mutex> lck(config_lock_);
   this->flowctrl_id_.Set(flowctrl_id);
   this->qrypriority_id_.Set(qrypriority_id);
   clearStatisData();
@@ -199,7 +201,6 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool 
is_default, int32_t qryprio
     initialStatisData();
   }
   this->last_update_time_ = Utils::GetCurrentTimeMillis();
-  pthread_rwlock_unlock(&this->configrw_lock_);
   if (is_default) {
     LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", 
curr_flowctrl_id,
              flowctrl_id);
@@ -268,51 +269,69 @@ void FlowCtrlRuleHandler::clearStatisData() {
 bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
                                           FlowCtrlResult& flowctrl_result) 
const {
   struct tm utc_tm;
+  bool result = false;
   vector<FlowCtrlItem>::const_iterator it_vec;
   map<int, vector<FlowCtrlItem> >::const_iterator it_map;
+  // get current data limit
   time_t cur_time = time(NULL);
-
   gmtime_r(&cur_time, &utc_tm);
   int curr_time = (utc_tm.tm_hour + 8) % 24 * 100 + utc_tm.tm_min;
-  if ((last_datadlt < this->min_datadlt_limt_.Get()) ||
-      (curr_time < this->datalimit_start_time_.Get()) ||
-      (curr_time > this->datalimit_end_time_.Get())) {
+  if ((last_datadlt < this->min_datadlt_limt_.Get())
+      || (curr_time < this->datalimit_start_time_.Get())
+      || (curr_time > this->datalimit_end_time_.Get())) {
     return false;
   }
+  // search total flowctrl rule
+  lock_guard<mutex> lck(config_lock_);
   it_map = this->flowctrl_rules_.find(0);
-  if (it_map == this->flowctrl_rules_.end()) {
-    return false;
-  }
-  for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); 
++it_vec) {
-    if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
-      return true;
+  if (it_map != this->flowctrl_rules_.end()) {
+    for (it_vec = it_map->second.begin();it_vec != it_map->second.end(); 
++it_vec) {
+      if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
+        result = true;
+        break;
+      }
     }
   }
-  return false;
+  return result;
 }
 
 int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
                                                  int32_t received_limit) const 
{
-  int32_t rule_val = -2;
+  int32_t limit_data = received_limit;
   vector<FlowCtrlItem>::const_iterator it_vec;
   map<int, vector<FlowCtrlItem> >::const_iterator it_map;
-
+  // check min zero count
   if (msg_zero_cnt < this->min_zero_cnt_.Get()) {
-    return received_limit;
+    return limit_data;
   }
+  // search rule allow value
+  lock_guard<mutex> lck(config_lock_);
   it_map = this->flowctrl_rules_.find(1);
-  if (it_map == this->flowctrl_rules_.end()) {
-    return received_limit;
-  }
-  for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); 
++it_vec) {
-    rule_val = it_vec->GetFreLimit(msg_zero_cnt);
-    if (rule_val >= 0) {
-      return rule_val;
+  if (it_map != this->flowctrl_rules_.end()) {
+    for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); 
++it_vec) {
+      limit_data = it_vec->GetFreLimit(msg_zero_cnt);
+      if (limit_data >= 0) {
+        break;
+      }
     }
   }
-  return received_limit;
+  return limit_data;
+}
+
+void FlowCtrlRuleHandler::GetFilterCtrlItem(FlowCtrlItem& result) const {
+  result.Clear();
+  lock_guard<mutex> lck(config_lock_);
+  result = this->filter_ctrl_item_;
+}
+
+void FlowCtrlRuleHandler::GetFlowCtrlInfo(string& flowctrl_info) const {
+  flowctrl_info.clear();
+  lock_guard<mutex> lck(config_lock_);
+  flowctrl_info = this->flowctrl_info_;
 }
 
+
+
 bool FlowCtrlRuleHandler::compareDataLimitQueue(const FlowCtrlItem& o1, const 
FlowCtrlItem& o2) {
   if (o1.GetStartTime() >= o2.GetStartTime()) {
     return true;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc 
b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 55e8cdc..9299f71 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -408,9 +408,9 @@ void PartitionExt::updateStrategyData(const 
FlowCtrlRuleHandler& def_flowctrl_ha
           this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 
0);
         }
       }
-      this->cur_freqctrl_ = group_flowctrl_handler.GetFilterCtrlItem();
+      group_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
       if (this->cur_freqctrl_.GetFreqMsLimit() < 0) {
-        this->cur_freqctrl_ = def_flowctrl_handler.GetFilterCtrlItem();
+        def_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
       }
       curr_time = Utils::GetCurrentTimeMillis();
     }
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc 
b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index 8b1e76b..c9c499d 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -30,27 +30,18 @@
 
 namespace tubemq {
 
-
+using std::lock_guard;
+using std::unique_lock;
+using namespace std::placeholders;
 
 
 RmtDataCacheCsm::RmtDataCacheCsm() {
   under_groupctrl_.Set(false);
   last_checktime_.Set(0);
-  pthread_rwlock_init(&meta_rw_lock_, NULL);
-  pthread_mutex_init(&part_mutex_, NULL);
-  pthread_mutex_init(&data_book_mutex_, NULL);
-  pthread_mutex_init(&event_read_mutex_, NULL);
-  pthread_cond_init(&event_read_cond_, NULL);
-  pthread_mutex_init(&event_write_mutex_, NULL);
 }
 
 RmtDataCacheCsm::~RmtDataCacheCsm() {
-  pthread_mutex_destroy(&event_write_mutex_);
-  pthread_mutex_destroy(&event_read_mutex_);
-  pthread_mutex_destroy(&data_book_mutex_);
-  pthread_cond_destroy(&event_read_cond_);
-  pthread_mutex_destroy(&part_mutex_);
-  pthread_rwlock_destroy(&meta_rw_lock_);
+  // 
 }
 
 void RmtDataCacheCsm::SetConsumerInfo(const string& client_id,
@@ -103,7 +94,8 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& 
partition_ext) {
   //
   SubscribeInfo sub_info(consumer_id_, group_name_, partition_ext);
   string partition_key = partition_ext.GetPartitionKey();
-  pthread_rwlock_wrlock(&meta_rw_lock_);
+  // lock operate
+  lock_guard<mutex> lck(meta_lock_);
   it_map = partitions_.find(partition_key);
   if (it_map == partitions_.end()) {
     partitions_[partition_key] = partition_ext;
@@ -130,10 +122,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& 
partition_ext) {
     part_subinfo_[partition_key] = sub_info;
   }
   // check partition_key status
-  pthread_mutex_lock(&part_mutex_);
   resetIdlePartition(partition_key, true);
-  pthread_mutex_unlock(&part_mutex_);
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 bool RmtDataCacheCsm::SelectPartition(string &err_info,
@@ -142,13 +131,12 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
   int64_t booked_time = 0;
   string partition_key;
   map<string, PartitionExt>::iterator it_map;
-
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  // lock operate
+  lock_guard<mutex> lck(meta_lock_);
   if (partitions_.empty()) {
     err_info = "No partition info in local cache, please retry later!";
     result = false;
   } else {
-    pthread_mutex_lock(&part_mutex_);
     if (index_partitions_.empty()) {
       err_info = "No idle partition to consume, please retry later!";
       result = false;
@@ -167,9 +155,7 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
         err_info = "Ok";
       }
     }
-    pthread_mutex_unlock(&part_mutex_);
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
   return result;
 }
 
@@ -180,18 +166,16 @@ void RmtDataCacheCsm::BookedPartionInfo(const string& 
partition_key,
   map<string, PartitionExt>::iterator it_part;
   // book partition offset info
   if (curr_offset >= 0) {
-    pthread_mutex_lock(&data_book_mutex_);
+    lock_guard<mutex> lck1(data_book_mutex_);
     partition_offset_[partition_key] = curr_offset;
-    pthread_mutex_unlock(&data_book_mutex_);
   }
   // book partition temp info
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck2(meta_lock_);
   it_part = partitions_.find(partition_key);
   if (it_part != partitions_.end()) {
     it_part->second.BookConsumeData(err_code, msg_size,
               esc_limit, limit_dlt, cur_data_dlt, require_slow);
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 // success process release partition
@@ -233,7 +217,7 @@ void RmtDataCacheCsm::FilterPartitions(const 
list<SubscribeInfo>& subscribe_info
   // initial return;
   subscribed_partitions.clear();
   unsub_partitions.clear();
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   if (partitions_.empty()) {
     for (it_lst = subscribe_info_lst.begin(); it_lst != 
subscribe_info_lst.end(); it_lst++) {
       unsub_partitions.push_back(it_lst->GetPartitionExt());
@@ -248,17 +232,15 @@ void RmtDataCacheCsm::FilterPartitions(const 
list<SubscribeInfo>& subscribe_info
       }
     }
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 void RmtDataCacheCsm::GetSubscribedInfo(list<SubscribeInfo>& 
subscribe_info_lst) {
   map<string, SubscribeInfo>::iterator it_sub;
   subscribe_info_lst.clear();
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   for (it_sub = part_subinfo_.begin(); it_sub != part_subinfo_.end(); 
++it_sub) {
     subscribe_info_lst.push_back(it_sub->second);
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 void RmtDataCacheCsm::GetAllBrokerPartitions(
@@ -267,7 +249,7 @@ void RmtDataCacheCsm::GetAllBrokerPartitions(
   map<NodeInfo, list<PartitionExt> >::iterator it_broker;
 
   broker_parts.clear();
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   for (it_part = partitions_.begin(); it_part != partitions_.end(); ++it_part) 
{
     it_broker = broker_parts.find(it_part->second.GetBrokerInfo());
     if (it_broker == broker_parts.end()) {
@@ -278,20 +260,18 @@ void RmtDataCacheCsm::GetAllBrokerPartitions(
       it_broker->second.push_back(it_part->second);
     }
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 bool RmtDataCacheCsm::GetPartitionExt(const string& part_key, PartitionExt& 
partition_ext) {
   bool result = false;
   map<string, PartitionExt>::iterator it_map;
 
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   it_map = partitions_.find(part_key);
   if (it_map != partitions_.end()) {
     result = true;
     partition_ext = it_map->second;
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
   return result;
 }
 
@@ -299,11 +279,10 @@ void RmtDataCacheCsm::GetRegBrokers(list<NodeInfo>& 
brokers) {
   map<NodeInfo, set<string> >::iterator it;
 
   brokers.clear();
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   for (it = broker_partition_.begin(); it != broker_partition_.end(); ++it) {
     brokers.push_back(it->first);
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
@@ -313,7 +292,7 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& 
broker_info,
   map<string, PartitionExt>::iterator it_part;
 
   partition_list.clear();
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   it_broker = broker_partition_.find(broker_info);
   if (it_broker != broker_partition_.end()) {
     for (it_key = it_broker->second.begin();
@@ -324,7 +303,6 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& 
broker_info,
       }
     }
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 
@@ -332,11 +310,10 @@ void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, 
int64_t> part_offset_ma
   map<string, int64_t>::iterator it;
 
   part_offset_map.clear();
-  pthread_mutex_lock(&data_book_mutex_);
+  lock_guard<mutex> lck(data_book_mutex_);
   for (it = partition_offset_.begin(); it != partition_offset_.end(); ++it) {
     part_offset_map[it->first] = it->second;
   }
-  pthread_mutex_unlock(&data_book_mutex_);
 }
 
 
@@ -373,15 +350,12 @@ void RmtDataCacheCsm::RemovePartition(const 
list<PartitionExt>& partition_list)
 void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
   set<string>::const_iterator it_lst;
 
-  pthread_rwlock_wrlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   for (it_lst = partition_keys.begin(); it_lst != partition_keys.end(); 
it_lst++) {
-    pthread_mutex_lock(&part_mutex_);
     resetIdlePartition(*it_lst, false);
-    pthread_mutex_unlock(&part_mutex_);
     // remove meta info set info
     rmvMetaInfo(*it_lst);
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& 
subscribe_infos,
@@ -397,8 +371,7 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const 
list<SubscribeInfo>& subscribe
   if (subscribe_infos.empty()) {
     return;
   }
-  pthread_rwlock_wrlock(&meta_rw_lock_);
-  pthread_mutex_lock(&part_mutex_);
+  lock_guard<mutex> lck(meta_lock_);
   for (it = subscribe_infos.begin(); it != subscribe_infos.end(); ++it) {
     part_key = it->GetPartitionExt().GetPartitionKey();
     it_part = partitions_.find(part_key);
@@ -422,8 +395,6 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const 
list<SubscribeInfo>& subscribe
     }
     resetIdlePartition(part_key, false);
   }
-  pthread_mutex_unlock(&part_mutex_);
-  pthread_rwlock_unlock(&meta_rw_lock_);
 }
 
 
@@ -431,64 +402,56 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const 
list<SubscribeInfo>& subscribe
 bool RmtDataCacheCsm::BookPartition(const string& partition_key) {
   bool result = false;
   map<string, bool>::iterator it;
-  pthread_mutex_lock(&data_book_mutex_);
+
+  lock_guard<mutex> lck(data_book_mutex_);
   it = part_reg_booked_.find(partition_key);
   if (it == part_reg_booked_.end()) {
     part_reg_booked_[partition_key] = true;
   }
-  pthread_mutex_unlock(&data_book_mutex_);
   return result;
 }
 
 void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
-  pthread_mutex_lock(&event_read_mutex_);
+  unique_lock<mutex> lck(event_read_mutex_);
   this->rebalance_events_.push_back(event);
-  pthread_cond_broadcast(&event_read_cond_);
-  pthread_mutex_unlock(&event_read_mutex_);
+  event_read_cond_.notify_all();
 }
 
 void RmtDataCacheCsm::TakeEvent(ConsumerEvent& event) {
-  pthread_mutex_lock(&event_read_mutex_);
+  unique_lock<mutex> lck(event_read_mutex_);
   while (this->rebalance_events_.empty()) {
-    pthread_cond_wait(&event_read_cond_, &event_read_mutex_);
+    event_read_cond_.wait(lck);
   }
   event = rebalance_events_.front();
   rebalance_events_.pop_front();
-  pthread_mutex_unlock(&event_read_mutex_);
 }
 
 void RmtDataCacheCsm::ClearEvent() {
-  pthread_mutex_lock(&event_read_mutex_);
+  unique_lock<mutex> lck(event_read_mutex_);
   rebalance_events_.clear();
-  pthread_mutex_unlock(&event_read_mutex_);
 }
 
 void RmtDataCacheCsm::OfferEventResult(const ConsumerEvent& event) {
-  pthread_mutex_lock(&event_write_mutex_);
+  lock_guard<mutex> lck(event_write_mutex_);
   this->rebalance_events_.push_back(event);
-  pthread_mutex_unlock(&event_write_mutex_);
 }
 
 bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
   bool result = false;
-  pthread_mutex_lock(&event_write_mutex_);
+  lock_guard<mutex> lck(event_write_mutex_);
   if (!rebalance_events_.empty()) {
     event = rebalance_events_.front();
     rebalance_events_.pop_front();
     result = true;
   }
-  pthread_mutex_unlock(&event_write_mutex_);
   return result;
 }
 
 void RmtDataCacheCsm::HandleTimeout(const string partition_key,
                                           const asio::error_code& error) {
   if (!error) {
-    pthread_rwlock_rdlock(&meta_rw_lock_);
-    pthread_mutex_lock(&part_mutex_);
+    lock_guard<mutex> lck(meta_lock_);
     resetIdlePartition(partition_key, true);
-    pthread_mutex_unlock(&part_mutex_);    
-    pthread_rwlock_unlock(&meta_rw_lock_);
   }
 }
 
@@ -579,18 +542,15 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, 
bool need_delay_check,
   if (!result) {
     return false;
   }
-  pthread_rwlock_rdlock(&meta_rw_lock_);
+  lock_guard<mutex> lck(meta_lock_);
   it_part = partitions_.find(partition_key);
   if (it_part == partitions_.end()) {
     // partition is unregister, release partition
-    pthread_mutex_lock(&part_mutex_);
     partition_useds_.erase(partition_key);
     index_partitions_.remove(partition_key);
-    pthread_mutex_unlock(&part_mutex_);
     err_info = "Not found the partition in Consume Partition set!";
     result = false;
   } else {
-    pthread_mutex_lock(&part_mutex_);
     it_used = partition_useds_.find(partition_key);
     if (it_used == partition_useds_.end()) {
       // partition is release but registered
@@ -619,9 +579,7 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool 
need_delay_check,
         result = false;
       }
     }
-    pthread_mutex_unlock(&part_mutex_);
   }
-  pthread_rwlock_unlock(&meta_rw_lock_);
   return result;
 }
 

Reply via email to