This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 98f0234  MINIFICPP-1351 fix PublishKafka::notifyStop race condition 
over connection
98f0234 is described below

commit 98f0234cc6e192a9caccb726b31df2bcc8cd353d
Author: Marton Szasz <[email protected]>
AuthorDate: Thu Sep 3 18:36:14 2020 +0200

    MINIFICPP-1351 fix PublishKafka::notifyStop race condition over connection
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #894
---
 extensions/librdkafka/PublishKafka.cpp | 1 +
 extensions/librdkafka/PublishKafka.h   | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
index 1b92edc..5b007a3 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -542,6 +542,7 @@ void PublishKafka::onSchedule(const 
std::shared_ptr<core::ProcessContext> &conte
 void PublishKafka::notifyStop() {
   logger_->log_debug("notifyStop called");
   interrupted_ = true;
+  std::lock_guard<std::mutex> conn_lock(connection_mutex_);
   std::lock_guard<std::mutex> lock(messages_mutex_);
   for (auto& messages : messages_set_) {
     messages->interrupt();
diff --git a/extensions/librdkafka/PublishKafka.h 
b/extensions/librdkafka/PublishKafka.h
index 7cc9101..44e2634 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -125,7 +125,7 @@ class PublishKafka : public core::Processor {
   utils::Regex attributeNameRegex_;
 
   std::atomic<bool> interrupted_{false};
-  std::mutex messages_mutex_;
+  std::mutex messages_mutex_;  // If both connection_mutex_ and 
messages_mutex_ are needed, always take connection_mutex_ first to avoid 
deadlock
   std::set<std::shared_ptr<Messages>> messages_set_;
 };
 

Reply via email to