This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch MINIFICPP-1348-RC1 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 61a46d1aa934acdfbe6aa21bd429c6315a7a8e7a Author: Marton Szasz <[email protected]> AuthorDate: Thu Sep 3 18:36:14 2020 +0200 MINIFICPP-1351 fix PublishKafka::notifyStop race condition over connection Signed-off-by: Arpad Boda <[email protected]> This closes #894 --- extensions/librdkafka/PublishKafka.cpp | 1 + extensions/librdkafka/PublishKafka.h | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index 1b92edc..5b007a3 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -542,6 +542,7 @@ void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &conte void PublishKafka::notifyStop() { logger_->log_debug("notifyStop called"); interrupted_ = true; + std::lock_guard<std::mutex> conn_lock(connection_mutex_); std::lock_guard<std::mutex> lock(messages_mutex_); for (auto& messages : messages_set_) { messages->interrupt(); diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 7cc9101..44e2634 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -125,7 +125,7 @@ class PublishKafka : public core::Processor { utils::Regex attributeNameRegex_; std::atomic<bool> interrupted_{false}; - std::mutex messages_mutex_; + std::mutex messages_mutex_; // If both connection_mutex_ and messages_mutex_ are needed, always take connection_mutex_ first to avoid deadlock std::set<std::shared_ptr<Messages>> messages_set_; };
