bakaid commented on a change in pull request #621: MINIFICPP-959: Review 
librdkafka thread safety
URL: https://github.com/apache/nifi-minifi-cpp/pull/621#discussion_r312985818
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -275,9 +80,12 @@ class KafkaPool {
     std::lock_guard<std::mutex> lock(mutex_);
     auto connection = map_.find(key);
     if (connection != map_.end()) {
-      return connection->second;
+      auto conn = connection->second;
+      return conn->inUse() ? nullptr : conn;
 
 Review comment:
   @nghiaxlee  Unfortunately this is still not thread-safe.
   If two threads request a connection with the same key, and the connection is 
unused, both will get the KafkaConnection, and both will try to create a 
KafkaLease for it.
   KafkaLease does not check whether the connection is in use, it just sets it 
as used, which can be done by both threads, meaning that both threads will 
access the KafkaConnection. (We have a mutex in this function, but that can be 
acquired by both threads and a connection return before either thread created 
the KafkaLease, thereby setting the inUse. The original problem of this code, 
that there is no synchronization between this mutex and the atomic<bool> still 
exists.)
   For this to work properly you have to
    - make sure KafkaLease checks whether the connection is in use (with 
atomic_flag_test_and_set or just use a mutex), and if not, fails to construct
    - preferably make getOrCreateConnection itself return the lease and not 
delegate the responsibility of getting a lease to the user
    - make the user handle the failure of a KafkaLease creation (either wait or 
yield, as we discussed yielding here is the better option)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to