This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch tubemq-client-cpp in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push: new 989d197 [TUBEMQ-269] Create C/C++ RmtDataCache class (#198) 989d197 is described below commit 989d19719bd3830f84fc88e0c72dd455bd914a86 Author: gosonzhang <4675...@qq.com> AuthorDate: Fri Jul 10 01:01:47 2020 +0000 [TUBEMQ-269] Create C/C++ RmtDataCache class (#198) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../tubemq-client-cpp/include/tubemq/atomic_def.h | 1 - .../tubemq-client-cpp/include/tubemq/meta_info.h | 86 +++---- .../include/tubemq/rmt_data_cache.h | 62 ++++- .../tubemq-client-cpp/src/meta_info.cc | 274 ++++++++++----------- .../tubemq-client-cpp/src/rmt_data_cache.cc | 132 ++++++++++ 5 files changed, 369 insertions(+), 186 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h index c8030db..30830d9 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/atomic_def.h @@ -24,7 +24,6 @@ namespace tubemq { -using namespace std; class AtomicInteger { public: diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h index 02d5a63..813a9c5 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/meta_info.h @@ -91,14 +91,54 @@ class Partition { string partition_info_; }; +class PartitionExt : public Partition { + public: + PartitionExt(); + PartitionExt(const string& partition_info); + PartitionExt(const NodeInfo& broker_info, const string& part_str); + ~PartitionExt(); + void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit, + int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow); + int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler, + const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed); + int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler, + const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed, + int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit, + int64_t last_datadlt, bool require_slow); + void SetLastConsumed(bool last_consumed); + bool IsLastConsumed(); + + private: + void resetParameters(); + void updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler, + const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, int64_t last_datadlt); + + private: + bool is_last_consumed_; + FlowCtrlResult cur_flowctrl_; + FlowCtrlItem cur_freqctrl_; + int64_t next_stage_updtime_; + int64_t next_slice_updtime_; + int64_t limit_slice_msgsize_; + int64_t cur_stage_msgsize_; + int64_t cur_slice_msgsize_; + int32_t total_zero_cnt_; + int64_t booked_time_; + int32_t booked_errcode_; + bool booked_esc_limit_; + int32_t booked_msgsize_; + int64_t booked_dlt_limit_; + int64_t booked_curdata_dlt_; + bool booked_require_slow_; +}; + class SubscribeInfo { public: SubscribeInfo(const string& sub_info); - SubscribeInfo(const string& consumer_id, const string& group, const Partition& partition); SubscribeInfo& operator=(const SubscribeInfo& target); const string& GetConsumerId() const; const string& GetGroup() const; - const Partition& GetPartition() const; + const PartitionExt& GetPartitionExt() const; const uint32_t GgetBrokerId() const; const string& GetBrokerHost() const; const uint32_t GetBrokerPort() const; @@ -112,7 +152,7 @@ class SubscribeInfo { private: string consumer_id_; string group_; - Partition partition_; + PartitionExt partitionext_; string sub_info_; }; @@ -138,46 +178,6 @@ class ConsumerEvent { list<SubscribeInfo> subscribe_list_; }; -class PartitionExt : public Partition { - public: - PartitionExt(); - PartitionExt(const string& partition_info); - PartitionExt(const NodeInfo& broker_info, const string& part_str); - ~PartitionExt(); - void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit, - int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow); - int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler, - const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed); - int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler, - const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed, - int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit, - int64_t last_datadlt, bool require_slow); - void SetLastConsumed(bool last_consumed); - bool IsLastConsumed(); - - private: - void resetParameters(); - void updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler, - const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, int64_t last_datadlt); - - private: - bool is_last_consumed_; - FlowCtrlResult cur_flowctrl_; - FlowCtrlItem cur_freqctrl_; - int64_t next_stage_updtime_; - int64_t next_slice_updtime_; - int64_t limit_slice_msgsize_; - int64_t cur_stage_msgsize_; - int64_t cur_slice_msgsize_; - int32_t total_zero_cnt_; - int64_t booked_time_; - int32_t booked_errcode_; - bool booked_esc_limit_; - int32_t booked_msgsize_; - int64_t booked_dlt_limit_; - int64_t booked_curdata_dlt_; - bool booked_require_slow_; -}; } // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h index d97cbac..11f9018 100644 --- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h +++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/rmt_data_cache.h @@ -21,15 +21,73 @@ #define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_ #include <stdint.h> +#include <pthread.h> #include <atomic> +#include <list> +#include <map> +#include <set> + +#include "tubemq/flowctrl_def.h" +#include "tubemq/meta_info.h" + + + namespace tubemq { -using namespace std; +using std::map; +using std::set; +using std::list; + // consumer remote data cache -class RmtDataCacheCsm {} +class RmtDataCacheCsm { + public: + RmtDataCacheCsm(); + ~RmtDataCacheCsm(); + void AddNewPartition(const PartitionExt& partition_ext); + void OfferEvent(const ConsumerEvent& event); + void TakeEvent(ConsumerEvent& event); + void ClearEvent(); + void OfferEventResult(const ConsumerEvent& event); + bool PollEventResult(ConsumerEvent& event); + + + private: + // timer begin + + // timer end + // flow ctrl + FlowCtrlRuleHandler group_flowctrl_handler_; + FlowCtrlRuleHandler def_flowctrl_handler_; + pthread_rwlock_t meta_rw_lock_; + // partiton allocated map + map<string, PartitionExt> partitions_; + // topic partiton map + map<string, set<string> > topic_partition_; + // broker parition map + map<NodeInfo, set<string> > broker_partition_; + // for partiton idle map + list<string> index_partitions_; + // for partition used map + map<string, int64_t> partition_useds_; + // for partiton timer map + map<string, int64_t> partition_timeouts_; + // data book + pthread_mutex_t data_book_mutex_; + // for partition offset cache + map<string, int64_t> partition_offset_; + // for partiton register booked + map<string, bool> part_reg_booked_; + // event + pthread_mutex_t event_read_mutex_; + pthread_cond_t event_read_cond_; + list<ConsumerEvent> rebalance_events_; + pthread_mutex_t event_write_mutex_; + list<ConsumerEvent> rebalance_results_; +}; + } // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc index 5c4e66b..4f0f860 100644 --- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc +++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc @@ -244,145 +244,6 @@ void Partition::buildPartitionKey() { this->partition_info_ = ss2.str(); } -// sub_info = consumerId@group#broker_info#topic:partitionId -SubscribeInfo::SubscribeInfo(const string& sub_info) { - string::size_type pos = 0; - string seg_key = delimiter::kDelimiterPound; - string at_key = delimiter::kDelimiterAt; - this->consumer_id_ = " "; - this->group_ = " "; - // parse sub_info - pos = sub_info.find(seg_key); - if (pos != string::npos) { - string consumer_info = sub_info.substr(0, pos); - consumer_info = Utils::Trim(consumer_info); - string partition_info = sub_info.substr(pos + seg_key.size(), sub_info.size()); - partition_info = Utils::Trim(partition_info); - this->partition_ = Partition(partition_info); - pos = consumer_info.find(at_key); - this->consumer_id_ = consumer_info.substr(0, pos); - this->consumer_id_ = Utils::Trim(this->consumer_id_); - this->group_ = consumer_info.substr(pos + at_key.size(), consumer_info.size()); - this->group_ = Utils::Trim(this->group_); - } - buildSubInfo(); -} - -SubscribeInfo::SubscribeInfo(const string& consumer_id, const string& group, - const Partition& partition) { - this->consumer_id_ = consumer_id; - this->group_ = group; - this->partition_ = partition; - buildSubInfo(); -} - -SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) { - if (this != &target) { - this->consumer_id_ = target.consumer_id_; - this->group_ = target.group_; - this->partition_ = target.partition_; - } - return *this; -} - -const string& SubscribeInfo::GetConsumerId() const { return this->consumer_id_; } - -const string& SubscribeInfo::GetGroup() const { return this->group_; } - -const Partition& SubscribeInfo::GetPartition() const { return this->partition_; } - -const uint32_t SubscribeInfo::GgetBrokerId() const { return this->partition_.GetBrokerId(); } - -const string& SubscribeInfo::GetBrokerHost() const { return this->partition_.GetBrokerHost(); } - -const uint32_t SubscribeInfo::GetBrokerPort() const { return this->partition_.GetBrokerPort(); } - -const string& SubscribeInfo::GetTopic() const { return this->partition_.GetTopic(); } - -const uint32_t SubscribeInfo::GetPartitionId() const { return this->partition_.GetPartitionId(); } - -const string& SubscribeInfo::ToString() const { return this->sub_info_; } - -void SubscribeInfo::buildSubInfo() { - stringstream ss; - ss << this->consumer_id_; - ss << delimiter::kDelimiterAt; - ss << this->group_; - ss << delimiter::kDelimiterPound; - ss << this->partition_.ToString(); - this->sub_info_ = ss.str(); -} - -ConsumerEvent::ConsumerEvent() { - this->rebalance_id_ = tb_config::kInvalidValue; - this->event_type_ = tb_config::kInvalidValue; - this->event_status_ = tb_config::kInvalidValue; -} - -ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) { - this->rebalance_id_ = target.rebalance_id_; - this->event_type_ = target.event_type_; - this->event_status_ = target.event_status_; - this->subscribe_list_ = target.subscribe_list_; -} - -ConsumerEvent::ConsumerEvent(int64_t rebalance_id, int32_t event_type, - const list<SubscribeInfo>& subscribeInfo_lst, int32_t event_status) { - list<SubscribeInfo>::const_iterator it; - this->rebalance_id_ = rebalance_id; - this->event_type_ = event_type; - this->event_status_ = event_status; - for (it = subscribeInfo_lst.begin(); it != subscribeInfo_lst.end(); ++it) { - this->subscribe_list_.push_back(*it); - } -} - -ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) { - if (this != &target) { - this->rebalance_id_ = target.rebalance_id_; - this->event_type_ = target.event_type_; - this->event_status_ = target.event_status_; - this->subscribe_list_ = target.subscribe_list_; - } - return *this; -} - -const int64_t ConsumerEvent::GetRebalanceId() const { return this->rebalance_id_; } - -const int32_t ConsumerEvent::GetEventType() const { return this->event_type_; } - -const int32_t ConsumerEvent::GetEventStatus() const { return this->event_status_; } - -void ConsumerEvent::SetEventType(int32_t event_type) { this->event_type_ = event_type; } - -void ConsumerEvent::SetEventStatus(int32_t event_status) { this->event_status_ = event_status; } - -const list<SubscribeInfo>& ConsumerEvent::GetSubscribeInfoList() const { - return this->subscribe_list_; -} - -string ConsumerEvent::ToString() { - uint32_t count = 0; - stringstream ss; - list<SubscribeInfo>::const_iterator it; - ss << "ConsumerEvent [rebalanceId="; - ss << this->rebalance_id_; - ss << ", type="; - ss << this->event_type_; - ss << ", status="; - ss << this->event_status_; - ss << ", subscribeInfoList=["; - for (it = this->subscribe_list_.begin(); it != this->subscribe_list_.end(); ++it) { - if (count++ > 0) { - ss << ","; - } - ss << it->ToString(); - } - ss << "]]"; - return ss.str(); -} - - PartitionExt::PartitionExt() : Partition() { resetParameters(); } @@ -505,7 +366,7 @@ void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_ha bool result = false; // Accumulated data received this->cur_stage_msgsize_ += msg_size; - this->cur_slice_msgsize_ += msg_size; + this->cur_slice_msgsize_ += msg_size; int64_t curr_time = Utils::GetCurrentTimeMillis(); // Update strategy data values if (curr_time > this->next_stage_updtime_) { @@ -535,4 +396,137 @@ void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_ha } +// sub_info = consumerId@group#broker_info#topic:partitionId +SubscribeInfo::SubscribeInfo(const string& sub_info) { + string::size_type pos = 0; + string seg_key = delimiter::kDelimiterPound; + string at_key = delimiter::kDelimiterAt; + this->consumer_id_ = " "; + this->group_ = " "; + // parse sub_info + pos = sub_info.find(seg_key); + if (pos != string::npos) { + string consumer_info = sub_info.substr(0, pos); + consumer_info = Utils::Trim(consumer_info); + string partition_info = sub_info.substr(pos + seg_key.size(), sub_info.size()); + partition_info = Utils::Trim(partition_info); + this->partitionext_ = PartitionExt(partition_info); + pos = consumer_info.find(at_key); + this->consumer_id_ = consumer_info.substr(0, pos); + this->consumer_id_ = Utils::Trim(this->consumer_id_); + this->group_ = consumer_info.substr(pos + at_key.size(), consumer_info.size()); + this->group_ = Utils::Trim(this->group_); + } + buildSubInfo(); +} + +SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) { + if (this != &target) { + this->consumer_id_ = target.consumer_id_; + this->group_ = target.group_; + this->partitionext_ = target.partitionext_; + } + return *this; +} + +const string& SubscribeInfo::GetConsumerId() const { return this->consumer_id_; } + +const string& SubscribeInfo::GetGroup() const { return this->group_; } + +const PartitionExt& SubscribeInfo::GetPartitionExt() const { return this->partitionext_; } + +const uint32_t SubscribeInfo::GgetBrokerId() const { return this->partitionext_.GetBrokerId(); } + +const string& SubscribeInfo::GetBrokerHost() const { return this->partitionext_.GetBrokerHost(); } + +const uint32_t SubscribeInfo::GetBrokerPort() const { return this->partitionext_.GetBrokerPort(); } + +const string& SubscribeInfo::GetTopic() const { return this->partitionext_.GetTopic(); } + +const uint32_t SubscribeInfo::GetPartitionId() const { return this->partitionext_.GetPartitionId(); } + +const string& SubscribeInfo::ToString() const { return this->sub_info_; } + +void SubscribeInfo::buildSubInfo() { + stringstream ss; + ss << this->consumer_id_; + ss << delimiter::kDelimiterAt; + ss << this->group_; + ss << delimiter::kDelimiterPound; + ss << this->partitionext_.ToString(); + this->sub_info_ = ss.str(); +} + +ConsumerEvent::ConsumerEvent() { + this->rebalance_id_ = tb_config::kInvalidValue; + this->event_type_ = tb_config::kInvalidValue; + this->event_status_ = tb_config::kInvalidValue; +} + +ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) { + this->rebalance_id_ = target.rebalance_id_; + this->event_type_ = target.event_type_; + this->event_status_ = target.event_status_; + this->subscribe_list_ = target.subscribe_list_; +} + +ConsumerEvent::ConsumerEvent(int64_t rebalance_id, int32_t event_type, + const list<SubscribeInfo>& subscribeInfo_lst, int32_t event_status) { + list<SubscribeInfo>::const_iterator it; + this->rebalance_id_ = rebalance_id; + this->event_type_ = event_type; + this->event_status_ = event_status; + for (it = subscribeInfo_lst.begin(); it != subscribeInfo_lst.end(); ++it) { + this->subscribe_list_.push_back(*it); + } +} + +ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) { + if (this != &target) { + this->rebalance_id_ = target.rebalance_id_; + this->event_type_ = target.event_type_; + this->event_status_ = target.event_status_; + this->subscribe_list_ = target.subscribe_list_; + } + return *this; +} + +const int64_t ConsumerEvent::GetRebalanceId() const { return this->rebalance_id_; } + +const int32_t ConsumerEvent::GetEventType() const { return this->event_type_; } + +const int32_t ConsumerEvent::GetEventStatus() const { return this->event_status_; } + +void ConsumerEvent::SetEventType(int32_t event_type) { this->event_type_ = event_type; } + +void ConsumerEvent::SetEventStatus(int32_t event_status) { this->event_status_ = event_status; } + +const list<SubscribeInfo>& ConsumerEvent::GetSubscribeInfoList() const { + return this->subscribe_list_; +} + +string ConsumerEvent::ToString() { + uint32_t count = 0; + stringstream ss; + list<SubscribeInfo>::const_iterator it; + ss << "ConsumerEvent [rebalanceId="; + ss << this->rebalance_id_; + ss << ", type="; + ss << this->event_type_; + ss << ", status="; + ss << this->event_status_; + ss << ", subscribeInfoList=["; + for (it = this->subscribe_list_.begin(); it != this->subscribe_list_.end(); ++it) { + if (count++ > 0) { + ss << ","; + } + ss << it->ToString(); + } + ss << "]]"; + return ss.str(); +} + + + + }; // namespace tubemq diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc new file mode 100644 index 0000000..d239d26 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "tubemq/rmt_data_cache.h" +#include "tubemq/meta_info.h" + + + +namespace tubemq { + + +RmtDataCacheCsm::RmtDataCacheCsm() { + pthread_rwlock_init(&meta_rw_lock_, NULL); + pthread_mutex_init(&data_book_mutex_, NULL); + pthread_mutex_init(&event_read_mutex_, NULL); + pthread_cond_init(&event_read_cond_, NULL); + pthread_mutex_init(&event_write_mutex_, NULL); +} + +RmtDataCacheCsm::~RmtDataCacheCsm() { + pthread_mutex_destroy(&event_write_mutex_); + pthread_mutex_destroy(&event_read_mutex_); + pthread_mutex_destroy(&data_book_mutex_); + pthread_cond_destroy(&event_read_cond_); + pthread_rwlock_destroy(&meta_rw_lock_); +} + +void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) { + // + map<string, PartitionExt>::iterator it_map; + map<string, set<string> >::iterator it_topic; + map<NodeInfo, set<string> >::iterator it_broker; + // + string partition_key = partition_ext.GetPartitionKey(); + pthread_rwlock_wrlock(&meta_rw_lock_); + it_map = partitions_.find(partition_key); + if (it_map == partitions_.end()) { + partitions_[partition_key] = partition_ext; + it_topic = topic_partition_.find(partition_ext.GetTopic()); + if (it_topic == topic_partition_.end()) { + set<string> tmp_part_set; + tmp_part_set.insert(partition_key); + topic_partition_[partition_ext.GetTopic()] = tmp_part_set; + } else { + if (it_topic->second.find(partition_key) == it_topic->second.end()) { + it_topic->second.insert(partition_key); + } + } + it_broker = broker_partition_.find(partition_ext.GetBrokerInfo()); + if (it_broker == broker_partition_.end()) { + set<string> tmp_part_set; + tmp_part_set.insert(partition_key); + broker_partition_[partition_ext.GetBrokerInfo()] = tmp_part_set; + } else { + if (it_broker->second.find(partition_key) == it_broker->second.end()) { + it_broker->second.insert(partition_key); + } + } + } + // check partition_key status + if (partition_useds_.find(partition_key) == partition_useds_.end() + && partition_timeouts_.find(partition_key) == partition_timeouts_.end()) { + index_partitions_.remove(partition_key); + index_partitions_.push_back(partition_key); + } + pthread_rwlock_unlock(&meta_rw_lock_); +} + +void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) { + pthread_mutex_lock(&event_read_mutex_); + this->rebalance_events_.push_back(event); + pthread_cond_broadcast(&event_read_cond_); + pthread_mutex_unlock(&event_read_mutex_); +} + +void RmtDataCacheCsm::TakeEvent(ConsumerEvent& event) { + pthread_mutex_lock(&event_read_mutex_); + while (this->rebalance_events_.empty()) { + pthread_cond_wait(&event_read_cond_, &event_read_mutex_); + } + event = rebalance_events_.front(); + rebalance_events_.pop_front(); + pthread_mutex_unlock(&event_read_mutex_); +} + +void RmtDataCacheCsm::ClearEvent() { + pthread_mutex_lock(&event_read_mutex_); + rebalance_events_.clear(); + pthread_mutex_unlock(&event_read_mutex_); +} + +void RmtDataCacheCsm::OfferEventResult(const ConsumerEvent& event) { + pthread_mutex_lock(&event_write_mutex_); + this->rebalance_events_.push_back(event); + pthread_mutex_unlock(&event_write_mutex_); +} + +bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) { + bool result = false; + pthread_mutex_lock(&event_write_mutex_); + if (!rebalance_events_.empty()) { + event = rebalance_events_.front(); + rebalance_events_.pop_front(); + result = true; + } + pthread_mutex_unlock(&event_write_mutex_); + return result; +} + + + + + + + +} // namespace tubemq