This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 989d197  [TUBEMQ-269] Create C/C++ RmtDataCache class (#198)
989d197 is described below

commit 989d19719bd3830f84fc88e0c72dd455bd914a86
Author: gosonzhang <4675...@qq.com>
AuthorDate: Fri Jul 10 01:01:47 2020 +0000

    [TUBEMQ-269] Create C/C++ RmtDataCache class (#198)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../tubemq-client-cpp/include/tubemq/atomic_def.h  |   1 -
 .../tubemq-client-cpp/include/tubemq/meta_info.h   |  86 +++----
 .../include/tubemq/rmt_data_cache.h                |  62 ++++-
 .../tubemq-client-cpp/src/meta_info.cc             | 274 ++++++++++-----------
 .../tubemq-client-cpp/src/rmt_data_cache.cc        | 132 ++++++++++
 5 files changed, 369 insertions(+), 186 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
index c8030db..30830d9 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h
@@ -24,7 +24,6 @@
 
 namespace tubemq {
 
-using namespace std;
 
 class AtomicInteger {
  public:
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
index 02d5a63..813a9c5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h
@@ -91,14 +91,54 @@ class Partition {
   string partition_info_;
 };
 
+class PartitionExt : public Partition {
+ public:
+  PartitionExt();
+  PartitionExt(const string& partition_info);
+  PartitionExt(const NodeInfo& broker_info, const string& part_str);
+  ~PartitionExt();
+  void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
+    int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow);
+  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, 
bool last_consumed);
+  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, 
bool last_consumed,
+    int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t 
rsp_dlt_limit,
+    int64_t last_datadlt, bool require_slow);
+  void SetLastConsumed(bool last_consumed);
+  bool IsLastConsumed();
+
+ private:
+  void resetParameters();
+  void updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler,
+    const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, 
int64_t last_datadlt);
+
+ private:
+  bool is_last_consumed_;
+  FlowCtrlResult cur_flowctrl_;
+  FlowCtrlItem cur_freqctrl_;
+  int64_t next_stage_updtime_;
+  int64_t next_slice_updtime_;
+  int64_t limit_slice_msgsize_;
+  int64_t cur_stage_msgsize_;
+  int64_t cur_slice_msgsize_;
+  int32_t total_zero_cnt_;
+  int64_t booked_time_;
+  int32_t booked_errcode_;
+  bool    booked_esc_limit_;
+  int32_t booked_msgsize_;
+  int64_t booked_dlt_limit_;
+  int64_t booked_curdata_dlt_;
+  bool    booked_require_slow_;
+};
+
 class SubscribeInfo {
  public:
   SubscribeInfo(const string& sub_info);
-  SubscribeInfo(const string& consumer_id, const string& group, const 
Partition& partition);
   SubscribeInfo& operator=(const SubscribeInfo& target);
   const string& GetConsumerId() const;
   const string& GetGroup() const;
-  const Partition& GetPartition() const;
+  const PartitionExt& GetPartitionExt() const;
   const uint32_t GgetBrokerId() const;
   const string& GetBrokerHost() const;
   const uint32_t GetBrokerPort() const;
@@ -112,7 +152,7 @@ class SubscribeInfo {
  private:
   string consumer_id_;
   string group_;
-  Partition partition_;
+  PartitionExt partitionext_;
   string sub_info_;
 };
 
@@ -138,46 +178,6 @@ class ConsumerEvent {
   list<SubscribeInfo> subscribe_list_;
 };
 
-class PartitionExt : public Partition {
- public:
-  PartitionExt();
-  PartitionExt(const string& partition_info);
-  PartitionExt(const NodeInfo& broker_info, const string& part_str);
-  ~PartitionExt();
-  void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
-    int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow);
-  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
-    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, 
bool last_consumed);
-  int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
-    const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, 
bool last_consumed,
-    int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t 
rsp_dlt_limit,
-    int64_t last_datadlt, bool require_slow);
-  void SetLastConsumed(bool last_consumed);
-  bool IsLastConsumed();
-
- private:
-  void resetParameters();
-  void updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler,
-    const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, 
int64_t last_datadlt);
-
- private:
-  bool is_last_consumed_;
-  FlowCtrlResult cur_flowctrl_;
-  FlowCtrlItem cur_freqctrl_;
-  int64_t next_stage_updtime_;
-  int64_t next_slice_updtime_;
-  int64_t limit_slice_msgsize_;
-  int64_t cur_stage_msgsize_;
-  int64_t cur_slice_msgsize_;
-  int32_t total_zero_cnt_;
-  int64_t booked_time_;
-  int32_t booked_errcode_;
-  bool    booked_esc_limit_;
-  int32_t booked_msgsize_;
-  int64_t booked_dlt_limit_;
-  int64_t booked_curdata_dlt_;
-  bool    booked_require_slow_;
-};
 
 }  // namespace tubemq
 
diff --git 
a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
index d97cbac..11f9018 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h
@@ -21,15 +21,73 @@
 #define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
 
 #include <stdint.h>
+#include <pthread.h>
 
 #include <atomic>
+#include <list>
+#include <map>
+#include <set>
+
+#include "tubemq/flowctrl_def.h"
+#include "tubemq/meta_info.h"
+
+
+
 
 namespace tubemq {
 
-using namespace std;
+using std::map;
+using std::set;
+using std::list;
+
 
 // consumer remote data cache
-class RmtDataCacheCsm {}
+class RmtDataCacheCsm {
+ public:
+  RmtDataCacheCsm();
+  ~RmtDataCacheCsm();
+  void AddNewPartition(const PartitionExt& partition_ext);
+  void OfferEvent(const ConsumerEvent& event);
+  void TakeEvent(ConsumerEvent& event);
+  void ClearEvent();
+  void OfferEventResult(const ConsumerEvent& event);
+  bool PollEventResult(ConsumerEvent& event);
+
+
+ private:
+  // timer begin
+
+  // timer end
+  // flow ctrl
+  FlowCtrlRuleHandler group_flowctrl_handler_;
+  FlowCtrlRuleHandler def_flowctrl_handler_;
+  pthread_rwlock_t meta_rw_lock_;
+  // partiton allocated map
+  map<string, PartitionExt> partitions_;
+  // topic partiton map
+  map<string, set<string> > topic_partition_;
+  // broker parition map
+  map<NodeInfo, set<string> > broker_partition_;
+  // for partiton idle map
+  list<string> index_partitions_;
+  // for partition used map
+  map<string, int64_t> partition_useds_;
+  // for partiton timer map
+  map<string, int64_t> partition_timeouts_;
+  // data book
+  pthread_mutex_t data_book_mutex_;
+  // for partition offset cache
+  map<string, int64_t> partition_offset_;
+  // for partiton register booked
+  map<string, bool> part_reg_booked_;
+  // event
+  pthread_mutex_t  event_read_mutex_;
+  pthread_cond_t   event_read_cond_;
+  list<ConsumerEvent> rebalance_events_;
+  pthread_mutex_t  event_write_mutex_;
+  list<ConsumerEvent> rebalance_results_;
+};
+
 
 }  // namespace tubemq
 
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc 
b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 5c4e66b..4f0f860 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -244,145 +244,6 @@ void Partition::buildPartitionKey() {
   this->partition_info_ = ss2.str();
 }
 
-// sub_info = consumerId@group#broker_info#topic:partitionId
-SubscribeInfo::SubscribeInfo(const string& sub_info) {
-  string::size_type pos = 0;
-  string seg_key = delimiter::kDelimiterPound;
-  string at_key = delimiter::kDelimiterAt;
-  this->consumer_id_ = " ";
-  this->group_ = " ";
-  // parse sub_info
-  pos = sub_info.find(seg_key);
-  if (pos != string::npos) {
-    string consumer_info = sub_info.substr(0, pos);
-    consumer_info = Utils::Trim(consumer_info);
-    string partition_info = sub_info.substr(pos + seg_key.size(), 
sub_info.size());
-    partition_info = Utils::Trim(partition_info);
-    this->partition_ = Partition(partition_info);
-    pos = consumer_info.find(at_key);
-    this->consumer_id_ = consumer_info.substr(0, pos);
-    this->consumer_id_ = Utils::Trim(this->consumer_id_);
-    this->group_ = consumer_info.substr(pos + at_key.size(), 
consumer_info.size());
-    this->group_ = Utils::Trim(this->group_);
-  }
-  buildSubInfo();
-}
-
-SubscribeInfo::SubscribeInfo(const string& consumer_id, const string& group,
-                             const Partition& partition) {
-  this->consumer_id_ = consumer_id;
-  this->group_ = group;
-  this->partition_ = partition;
-  buildSubInfo();
-}
-
-SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) {
-  if (this != &target) {
-    this->consumer_id_ = target.consumer_id_;
-    this->group_ = target.group_;
-    this->partition_ = target.partition_;
-  }
-  return *this;
-}
-
-const string& SubscribeInfo::GetConsumerId() const { return 
this->consumer_id_; }
-
-const string& SubscribeInfo::GetGroup() const { return this->group_; }
-
-const Partition& SubscribeInfo::GetPartition() const { return 
this->partition_; }
-
-const uint32_t SubscribeInfo::GgetBrokerId() const { return 
this->partition_.GetBrokerId(); }
-
-const string& SubscribeInfo::GetBrokerHost() const { return 
this->partition_.GetBrokerHost(); }
-
-const uint32_t SubscribeInfo::GetBrokerPort() const { return 
this->partition_.GetBrokerPort(); }
-
-const string& SubscribeInfo::GetTopic() const { return 
this->partition_.GetTopic(); }
-
-const uint32_t SubscribeInfo::GetPartitionId() const { return 
this->partition_.GetPartitionId(); }
-
-const string& SubscribeInfo::ToString() const { return this->sub_info_; }
-
-void SubscribeInfo::buildSubInfo() {
-  stringstream ss;
-  ss << this->consumer_id_;
-  ss << delimiter::kDelimiterAt;
-  ss << this->group_;
-  ss << delimiter::kDelimiterPound;
-  ss << this->partition_.ToString();
-  this->sub_info_ = ss.str();
-}
-
-ConsumerEvent::ConsumerEvent() {
-  this->rebalance_id_ = tb_config::kInvalidValue;
-  this->event_type_ = tb_config::kInvalidValue;
-  this->event_status_ = tb_config::kInvalidValue;
-}
-
-ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) {
-  this->rebalance_id_ = target.rebalance_id_;
-  this->event_type_ = target.event_type_;
-  this->event_status_ = target.event_status_;
-  this->subscribe_list_ = target.subscribe_list_;
-}
-
-ConsumerEvent::ConsumerEvent(int64_t rebalance_id, int32_t event_type,
-                             const list<SubscribeInfo>& subscribeInfo_lst, 
int32_t event_status) {
-  list<SubscribeInfo>::const_iterator it;
-  this->rebalance_id_ = rebalance_id;
-  this->event_type_ = event_type;
-  this->event_status_ = event_status;
-  for (it = subscribeInfo_lst.begin(); it != subscribeInfo_lst.end(); ++it) {
-    this->subscribe_list_.push_back(*it);
-  }
-}
-
-ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) {
-  if (this != &target) {
-    this->rebalance_id_ = target.rebalance_id_;
-    this->event_type_ = target.event_type_;
-    this->event_status_ = target.event_status_;
-    this->subscribe_list_ = target.subscribe_list_;
-  }
-  return *this;
-}
-
-const int64_t ConsumerEvent::GetRebalanceId() const { return 
this->rebalance_id_; }
-
-const int32_t ConsumerEvent::GetEventType() const { return this->event_type_; }
-
-const int32_t ConsumerEvent::GetEventStatus() const { return 
this->event_status_; }
-
-void ConsumerEvent::SetEventType(int32_t event_type) { this->event_type_ = 
event_type; }
-
-void ConsumerEvent::SetEventStatus(int32_t event_status) { this->event_status_ 
= event_status; }
-
-const list<SubscribeInfo>& ConsumerEvent::GetSubscribeInfoList() const {
-  return this->subscribe_list_;
-}
-
-string ConsumerEvent::ToString() {
-  uint32_t count = 0;
-  stringstream ss;
-  list<SubscribeInfo>::const_iterator it;
-  ss << "ConsumerEvent [rebalanceId=";
-  ss << this->rebalance_id_;
-  ss << ", type=";
-  ss << this->event_type_;
-  ss << ", status=";
-  ss << this->event_status_;
-  ss << ", subscribeInfoList=[";
-  for (it = this->subscribe_list_.begin(); it != this->subscribe_list_.end(); 
++it) {
-    if (count++ > 0) {
-      ss << ",";
-    }
-    ss << it->ToString();
-  }
-  ss << "]]";
-  return ss.str();
-}
-
-
 PartitionExt::PartitionExt() : Partition() {
   resetParameters();
 }
@@ -505,7 +366,7 @@ void PartitionExt::updateStrategyData(const 
FlowCtrlRuleHandler& def_flowctrl_ha
   bool result = false;
   // Accumulated data received
   this->cur_stage_msgsize_ += msg_size;
-  this->cur_slice_msgsize_ += msg_size;  
+  this->cur_slice_msgsize_ += msg_size;
   int64_t curr_time = Utils::GetCurrentTimeMillis();
   // Update strategy data values
   if (curr_time > this->next_stage_updtime_) {
@@ -535,4 +396,137 @@ void PartitionExt::updateStrategyData(const 
FlowCtrlRuleHandler& def_flowctrl_ha
 }
 
 
+// sub_info = consumerId@group#broker_info#topic:partitionId
+SubscribeInfo::SubscribeInfo(const string& sub_info) {
+  string::size_type pos = 0;
+  string seg_key = delimiter::kDelimiterPound;
+  string at_key = delimiter::kDelimiterAt;
+  this->consumer_id_ = " ";
+  this->group_ = " ";
+  // parse sub_info
+  pos = sub_info.find(seg_key);
+  if (pos != string::npos) {
+    string consumer_info = sub_info.substr(0, pos);
+    consumer_info = Utils::Trim(consumer_info);
+    string partition_info = sub_info.substr(pos + seg_key.size(), 
sub_info.size());
+    partition_info = Utils::Trim(partition_info);
+    this->partitionext_ = PartitionExt(partition_info);
+    pos = consumer_info.find(at_key);
+    this->consumer_id_ = consumer_info.substr(0, pos);
+    this->consumer_id_ = Utils::Trim(this->consumer_id_);
+    this->group_ = consumer_info.substr(pos + at_key.size(), 
consumer_info.size());
+    this->group_ = Utils::Trim(this->group_);
+  }
+  buildSubInfo();
+}
+
+SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) {
+  if (this != &target) {
+    this->consumer_id_ = target.consumer_id_;
+    this->group_ = target.group_;
+    this->partitionext_ = target.partitionext_;
+  }
+  return *this;
+}
+
+const string& SubscribeInfo::GetConsumerId() const { return 
this->consumer_id_; }
+
+const string& SubscribeInfo::GetGroup() const { return this->group_; }
+
+const PartitionExt& SubscribeInfo::GetPartitionExt() const { return 
this->partitionext_; }
+
+const uint32_t SubscribeInfo::GgetBrokerId() const { return 
this->partitionext_.GetBrokerId(); }
+
+const string& SubscribeInfo::GetBrokerHost() const { return 
this->partitionext_.GetBrokerHost(); }
+
+const uint32_t SubscribeInfo::GetBrokerPort() const { return 
this->partitionext_.GetBrokerPort(); }
+
+const string& SubscribeInfo::GetTopic() const { return 
this->partitionext_.GetTopic(); }
+
+const uint32_t SubscribeInfo::GetPartitionId() const { return 
this->partitionext_.GetPartitionId(); }
+
+const string& SubscribeInfo::ToString() const { return this->sub_info_; }
+
+void SubscribeInfo::buildSubInfo() {
+  stringstream ss;
+  ss << this->consumer_id_;
+  ss << delimiter::kDelimiterAt;
+  ss << this->group_;
+  ss << delimiter::kDelimiterPound;
+  ss << this->partitionext_.ToString();
+  this->sub_info_ = ss.str();
+}
+
+ConsumerEvent::ConsumerEvent() {
+  this->rebalance_id_ = tb_config::kInvalidValue;
+  this->event_type_ = tb_config::kInvalidValue;
+  this->event_status_ = tb_config::kInvalidValue;
+}
+
+ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) {
+  this->rebalance_id_ = target.rebalance_id_;
+  this->event_type_ = target.event_type_;
+  this->event_status_ = target.event_status_;
+  this->subscribe_list_ = target.subscribe_list_;
+}
+
+ConsumerEvent::ConsumerEvent(int64_t rebalance_id, int32_t event_type,
+                             const list<SubscribeInfo>& subscribeInfo_lst, 
int32_t event_status) {
+  list<SubscribeInfo>::const_iterator it;
+  this->rebalance_id_ = rebalance_id;
+  this->event_type_ = event_type;
+  this->event_status_ = event_status;
+  for (it = subscribeInfo_lst.begin(); it != subscribeInfo_lst.end(); ++it) {
+    this->subscribe_list_.push_back(*it);
+  }
+}
+
+ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) {
+  if (this != &target) {
+    this->rebalance_id_ = target.rebalance_id_;
+    this->event_type_ = target.event_type_;
+    this->event_status_ = target.event_status_;
+    this->subscribe_list_ = target.subscribe_list_;
+  }
+  return *this;
+}
+
+const int64_t ConsumerEvent::GetRebalanceId() const { return 
this->rebalance_id_; }
+
+const int32_t ConsumerEvent::GetEventType() const { return this->event_type_; }
+
+const int32_t ConsumerEvent::GetEventStatus() const { return 
this->event_status_; }
+
+void ConsumerEvent::SetEventType(int32_t event_type) { this->event_type_ = 
event_type; }
+
+void ConsumerEvent::SetEventStatus(int32_t event_status) { this->event_status_ 
= event_status; }
+
+const list<SubscribeInfo>& ConsumerEvent::GetSubscribeInfoList() const {
+  return this->subscribe_list_;
+}
+
+string ConsumerEvent::ToString() {
+  uint32_t count = 0;
+  stringstream ss;
+  list<SubscribeInfo>::const_iterator it;
+  ss << "ConsumerEvent [rebalanceId=";
+  ss << this->rebalance_id_;
+  ss << ", type=";
+  ss << this->event_type_;
+  ss << ", status=";
+  ss << this->event_status_;
+  ss << ", subscribeInfoList=[";
+  for (it = this->subscribe_list_.begin(); it != this->subscribe_list_.end(); 
++it) {
+    if (count++ > 0) {
+      ss << ",";
+    }
+    ss << it->ToString();
+  }
+  ss << "]]";
+  return ss.str();
+}
+
+
+
+
 };  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc 
b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
new file mode 100644
index 0000000..d239d26
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/rmt_data_cache.h"
+#include "tubemq/meta_info.h"
+
+
+
+namespace tubemq {
+ 
+
+RmtDataCacheCsm::RmtDataCacheCsm() {
+  pthread_rwlock_init(&meta_rw_lock_, NULL);
+  pthread_mutex_init(&data_book_mutex_, NULL);
+  pthread_mutex_init(&event_read_mutex_, NULL);
+  pthread_cond_init(&event_read_cond_, NULL);
+  pthread_mutex_init(&event_write_mutex_, NULL);
+}
+
+RmtDataCacheCsm::~RmtDataCacheCsm() {
+  pthread_mutex_destroy(&event_write_mutex_);
+  pthread_mutex_destroy(&event_read_mutex_);
+  pthread_mutex_destroy(&data_book_mutex_);
+  pthread_cond_destroy(&event_read_cond_);
+  pthread_rwlock_destroy(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
+  //
+  map<string, PartitionExt>::iterator it_map;
+  map<string, set<string> >::iterator it_topic;
+  map<NodeInfo, set<string> >::iterator it_broker;
+  //
+  string partition_key = partition_ext.GetPartitionKey();
+  pthread_rwlock_wrlock(&meta_rw_lock_);
+  it_map = partitions_.find(partition_key);
+  if (it_map == partitions_.end()) {
+    partitions_[partition_key] = partition_ext;
+    it_topic = topic_partition_.find(partition_ext.GetTopic());
+    if (it_topic == topic_partition_.end()) {
+      set<string> tmp_part_set;
+      tmp_part_set.insert(partition_key);
+      topic_partition_[partition_ext.GetTopic()] = tmp_part_set;
+    } else {
+      if (it_topic->second.find(partition_key) == it_topic->second.end()) {
+        it_topic->second.insert(partition_key);
+      }
+    }
+    it_broker = broker_partition_.find(partition_ext.GetBrokerInfo());
+    if (it_broker == broker_partition_.end()) {
+      set<string> tmp_part_set;
+      tmp_part_set.insert(partition_key);
+      broker_partition_[partition_ext.GetBrokerInfo()] = tmp_part_set;
+    } else {
+      if (it_broker->second.find(partition_key) == it_broker->second.end()) {
+        it_broker->second.insert(partition_key);
+      }
+    }
+  }
+  // check partition_key status
+  if (partition_useds_.find(partition_key) == partition_useds_.end() 
+    && partition_timeouts_.find(partition_key) == partition_timeouts_.end()) {
+    index_partitions_.remove(partition_key);
+    index_partitions_.push_back(partition_key);
+  }
+  pthread_rwlock_unlock(&meta_rw_lock_);
+}
+
+void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
+  pthread_mutex_lock(&event_read_mutex_);
+  this->rebalance_events_.push_back(event);
+  pthread_cond_broadcast(&event_read_cond_);
+  pthread_mutex_unlock(&event_read_mutex_);
+}
+
+void RmtDataCacheCsm::TakeEvent(ConsumerEvent& event) {
+  pthread_mutex_lock(&event_read_mutex_);
+  while (this->rebalance_events_.empty()) {
+    pthread_cond_wait(&event_read_cond_, &event_read_mutex_);
+  }
+  event = rebalance_events_.front();
+  rebalance_events_.pop_front();
+  pthread_mutex_unlock(&event_read_mutex_);
+}
+
+void RmtDataCacheCsm::ClearEvent() {
+  pthread_mutex_lock(&event_read_mutex_);
+  rebalance_events_.clear();
+  pthread_mutex_unlock(&event_read_mutex_);
+}
+
+void RmtDataCacheCsm::OfferEventResult(const ConsumerEvent& event) {
+  pthread_mutex_lock(&event_write_mutex_);
+  this->rebalance_events_.push_back(event);
+  pthread_mutex_unlock(&event_write_mutex_);
+}
+
+bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
+  bool result = false;
+  pthread_mutex_lock(&event_write_mutex_);
+  if (!rebalance_events_.empty()) {
+    event = rebalance_events_.front();
+    rebalance_events_.pop_front();
+    result = true;
+  }
+  pthread_mutex_unlock(&event_write_mutex_);
+  return result;
+}
+
+
+
+
+
+
+
+}  // namespace tubemq

Reply via email to