This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 98f0234 MINIFICPP-1351 fix PublishKafka::notifyStop race condition
over connection
98f0234 is described below
commit 98f0234cc6e192a9caccb726b31df2bcc8cd353d
Author: Marton Szasz <[email protected]>
AuthorDate: Thu Sep 3 18:36:14 2020 +0200
MINIFICPP-1351 fix PublishKafka::notifyStop race condition over connection
Signed-off-by: Arpad Boda <[email protected]>
This closes #894
---
extensions/librdkafka/PublishKafka.cpp | 1 +
extensions/librdkafka/PublishKafka.h | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/extensions/librdkafka/PublishKafka.cpp
b/extensions/librdkafka/PublishKafka.cpp
index 1b92edc..5b007a3 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -542,6 +542,7 @@ void PublishKafka::onSchedule(const
std::shared_ptr<core::ProcessContext> &conte
void PublishKafka::notifyStop() {
logger_->log_debug("notifyStop called");
interrupted_ = true;
+ std::lock_guard<std::mutex> conn_lock(connection_mutex_);
std::lock_guard<std::mutex> lock(messages_mutex_);
for (auto& messages : messages_set_) {
messages->interrupt();
diff --git a/extensions/librdkafka/PublishKafka.h
b/extensions/librdkafka/PublishKafka.h
index 7cc9101..44e2634 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -125,7 +125,7 @@ class PublishKafka : public core::Processor {
utils::Regex attributeNameRegex_;
std::atomic<bool> interrupted_{false};
- std::mutex messages_mutex_;
+ std::mutex messages_mutex_; // If both connection_mutex_ and
messages_mutex_ are needed, always take connection_mutex_ first to avoid
deadlock
std::set<std::shared_ptr<Messages>> messages_set_;
};