This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new fe480b3  MINIFICPP-959: Review librdkafka thread safety
fe480b3 is described below

commit fe480b3ad2cd26da5aaa52105612ae8ede78fb5b
Author: Nghia Le <[email protected]>
AuthorDate: Wed Aug 7 20:39:26 2019 +0200

    MINIFICPP-959: Review librdkafka thread safety
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #621
---
 extensions/librdkafka/KafkaConnection.cpp | 144 +++++++++++++++++
 extensions/librdkafka/KafkaConnection.h   | 129 ++++++++++++++++
 extensions/librdkafka/KafkaPool.h         |  77 ++++++++++
 extensions/librdkafka/KafkaTopic.h        |  65 ++++++++
 extensions/librdkafka/PublishKafka.cpp    |  25 +--
 extensions/librdkafka/PublishKafka.h      | 246 +-----------------------------
 6 files changed, 430 insertions(+), 256 deletions(-)

diff --git a/extensions/librdkafka/KafkaConnection.cpp 
b/extensions/librdkafka/KafkaConnection.cpp
new file mode 100644
index 0000000..28b0b45
--- /dev/null
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+KafkaConnection::KafkaConnection(const KafkaConnectionKey &key)
+    : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
+      conf_(nullptr),
+      kafka_connection_(nullptr) {
+  lease_ = false;
+  initialized_ = false;
+  key_ = key;
+}
+
+void KafkaConnection::remove() {
+  topics_.clear();
+  removeConnection();
+}
+
+void KafkaConnection::removeConnection() {
+  if (kafka_connection_) {
+    rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
+    rd_kafka_destroy(kafka_connection_);
+    modifyLoggers([&](std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>>& loggers) {
+      loggers.erase(kafka_connection_);
+    });
+    kafka_connection_ = nullptr;
+  }
+  if (conf_) {
+    rd_kafka_conf_destroy(conf_);
+    conf_ = nullptr;
+  }
+  initialized_ = false;
+}
+
+bool KafkaConnection::initialized() const {
+  return initialized_;
+}
+
+void KafkaConnection::setConnection(rd_kafka_t *producer, rd_kafka_conf_t 
*conf) {
+  removeConnection();
+  kafka_connection_ = producer;
+  conf_ = conf;
+  initialized_ = true;
+  modifyLoggers([&](std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>>& loggers) {
+    loggers[producer] = logger_;
+  });
+}
+
+rd_kafka_conf_t *KafkaConnection::getConf() const {
+  return conf_;
+}
+
+rd_kafka_t *KafkaConnection::getConnection() const {
+  return kafka_connection_;
+}
+
+bool KafkaConnection::hasTopic(const std::string &topic) const {
+  return topics_.count(topic);
+}
+
+std::shared_ptr<KafkaTopic> KafkaConnection::getTopic(const std::string 
&topic) const {
+  auto topicObj = topics_.find(topic);
+  if (topicObj != topics_.end()) {
+    return topicObj->second;
+  }
+  return nullptr;
+}
+
+KafkaConnectionKey const * const KafkaConnection::getKey() const {
+  return &key_;
+}
+
+void KafkaConnection::putTopic(const std::string &topicName, const 
std::shared_ptr<KafkaTopic> &topic) {
+  topics_[topicName] = topic;
+}
+
+void KafkaConnection::logCallback(const rd_kafka_t* rk, int level, const char* 
/*fac*/, const char* buf) {
+  std::shared_ptr<logging::Logger> logger;
+  try {
+    modifyLoggers([&](std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>>& loggers) {
+      logger = loggers.at(rk).lock();
+    });
+  } catch (...) {
+  }
+
+  if (!logger) {
+    return;
+  }
+
+  switch (level) {
+    case 0:  // LOG_EMERG
+    case 1:  // LOG_ALERT
+    case 2:  // LOG_CRIT
+    case 3:  // LOG_ERR
+      logging::LOG_ERROR(logger) << buf;
+      break;
+    case 4:  // LOG_WARNING
+      logging::LOG_WARN(logger) << buf;
+      break;
+    case 5:  // LOG_NOTICE
+    case 6:  // LOG_INFO
+      logging::LOG_INFO(logger) << buf;
+      break;
+    case 7:  // LOG_DEBUG
+      logging::LOG_DEBUG(logger) << buf;
+      break;
+  }
+}
+
+bool KafkaConnection::tryUse() {
+  std::lock_guard<std::mutex> lock(lease_mutex_);
+  if (lease_) {
+    return false;
+  }
+  lease_ = true;
+  return true;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/librdkafka/KafkaConnection.h 
b/extensions/librdkafka/KafkaConnection.h
new file mode 100644
index 0000000..6e158fe
--- /dev/null
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_KAFKACONNECTION_H
+#define NIFI_MINIFI_CPP_KAFKACONNECTION_H
+
+#include <mutex>
+#include <string>
+#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/Logger.h"
+#include "rdkafka.h"
+#include "KafkaTopic.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class KafkaConnectionKey {
+  public:
+    std::string client_id_;
+    std::string brokers_;
+
+    bool operator <(const KafkaConnectionKey& rhs) const {
+      return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_, 
rhs.client_id_);
+    }
+};
+
+class KafkaConnection {
+ public:
+
+  explicit KafkaConnection(const KafkaConnectionKey &key);
+
+  ~KafkaConnection() {
+    remove();
+  }
+
+  void remove();
+
+  void removeConnection();
+
+  bool initialized() const;
+
+  void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf);
+
+  rd_kafka_conf_t *getConf() const;
+
+  rd_kafka_t *getConnection() const;
+
+  bool hasTopic(const std::string &topic) const;
+
+  std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) const;
+
+  KafkaConnectionKey const * const getKey() const;
+
+  void putTopic(const std::string &topicName, const 
std::shared_ptr<KafkaTopic> &topic);
+
+  static void logCallback(const rd_kafka_t* rk, int level, const char* 
/*fac*/, const char* buf);
+
+  bool tryUse();
+
+  friend class KafkaLease;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+
+  std::mutex lease_mutex_;
+
+  bool lease_;
+
+  bool initialized_;
+
+  KafkaConnectionKey key_;
+
+  std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
+
+  rd_kafka_conf_t *conf_;
+  rd_kafka_t *kafka_connection_;
+
+  static void modifyLoggers(const std::function<void(std::unordered_map<const 
rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
+    static std::mutex loggers_mutex;
+    static std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>> loggers;
+
+    std::lock_guard<std::mutex> lock(loggers_mutex);
+    func(loggers);
+  }
+};
+
+class KafkaLease {
+  public:
+    ~KafkaLease() {
+      std::lock_guard<std::mutex> lock(conn_->lease_mutex_);
+      conn_->lease_ = false;
+    }
+    std::shared_ptr<KafkaConnection> getConn() const {
+      return conn_;
+    }
+    friend class KafkaPool;
+  private:
+    KafkaLease(std::shared_ptr<KafkaConnection> conn) // This one should be 
private, and only KafkaPool can call (friend).
+            : conn_(conn) {
+    }
+
+    std::shared_ptr<KafkaConnection> conn_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_KAFKACONNECTION_H
diff --git a/extensions/librdkafka/KafkaPool.h 
b/extensions/librdkafka/KafkaPool.h
new file mode 100644
index 0000000..3fe2d42
--- /dev/null
+++ b/extensions/librdkafka/KafkaPool.h
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_KAFKAPOOL_H
+#define NIFI_MINIFI_CPP_KAFKAPOOL_H
+
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class KafkaPool {
+ public:
+
+  explicit KafkaPool(int max)
+      : max_(max) {
+  }
+
+  bool removeConnection(const KafkaConnectionKey &key) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return map_.erase(key) == 1;
+  }
+
+  std::unique_ptr<KafkaLease> getOrCreateConnection(const KafkaConnectionKey 
&key) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    auto connection = map_.find(key);
+    std::shared_ptr<KafkaConnection> conn;
+    if (connection == map_.end()) {
+      // Not found, create new connection.
+      conn = std::make_shared<KafkaConnection>(key);
+      if (map_.size() == max_) {
+        // Reached pool limit, remove the first one.
+        map_.erase(map_.begin());
+      }
+      map_[key] = conn;
+    } else {
+      conn = connection->second;
+    }
+    std::unique_ptr<KafkaLease> lease;
+    if (conn->tryUse()) {
+      lease = std::unique_ptr<KafkaLease>(new KafkaLease(conn));
+    }
+    return lease;
+  }
+
+ private:
+  std::mutex mutex_;
+
+  size_t max_;
+
+  std::map<KafkaConnectionKey, std::shared_ptr<KafkaConnection>> map_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_KAFKAPOOL_H
diff --git a/extensions/librdkafka/KafkaTopic.h 
b/extensions/librdkafka/KafkaTopic.h
new file mode 100644
index 0000000..30c34a0
--- /dev/null
+++ b/extensions/librdkafka/KafkaTopic.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_KAFKATOPIC_H
+#define NIFI_MINIFI_CPP_KAFKATOPIC_H
+
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class KafkaTopic {
+ public:
+  KafkaTopic(rd_kafka_topic_t *topic_reference, rd_kafka_topic_conf_t 
*topic_conf)
+      : topic_conf_(topic_conf),
+        topic_reference_(topic_reference) {
+
+  }
+
+  ~KafkaTopic() {
+    if (topic_reference_) {
+      rd_kafka_topic_destroy(topic_reference_);
+    }
+    if (topic_conf_) {
+      rd_kafka_topic_conf_destroy(topic_conf_);
+    }
+  }
+
+  rd_kafka_topic_conf_t *getTopicConf() const {
+    return topic_conf_;
+  }
+
+  rd_kafka_topic_t *getTopic() const {
+    return topic_reference_;
+  }
+
+ private:
+  rd_kafka_topic_conf_t *topic_conf_;
+  rd_kafka_topic_t *topic_reference_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_KAFKATOPIC_H
diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
index 75cb55c..b248c1e 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -79,7 +79,8 @@ core::Property PublishKafka::KerberosKeytabPath("Kerberos 
Keytab Path",
 core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of 
a field in the Input Records that should be used as the Key for the Kafka 
message.\n"
                                              "Supports Expression Language: 
true (will be evaluated using flow file attributes)",
                                              "");
-core::Property PublishKafka::DebugContexts("Debug contexts", "A 
comma-separated list of debug contexts to enable. Including: generic, broker, 
topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, 
interceptor, plugin, consumer, admin, eos, all", "");
+core::Property PublishKafka::DebugContexts("Debug contexts", "A 
comma-separated list of debug contexts to enable."
+                                           "Including: generic, broker, topic, 
metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, 
plugin, consumer, admin, eos, all", "");
 core::Relationship PublishKafka::Success("success", "Any FlowFile that is 
successfully sent to Kafka will be routed to this Relationship");
 core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot 
be sent to Kafka will be routed to this Relationship");
 
@@ -140,7 +141,7 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr<KafkaConnection>
 
   if (!key->brokers_.empty()) {
     result = rd_kafka_conf_set(conf_, "bootstrap.servers", 
key->brokers_.c_str(), errstr, sizeof(errstr));
-    logger_->log_debug("PublishKafka: bootstrap.servers [%s]", 
key->brokers_.c_str());
+    logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
     if (result != RD_KAFKA_CONF_OK)
       logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   } else {
@@ -150,7 +151,7 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr<KafkaConnection>
 
   if (!key->client_id_.empty()) {
     rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, 
sizeof(errstr));
-    logger_->log_debug("PublishKafka: client.id [%s]", 
key->client_id_.c_str());
+    logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
     if (result != RD_KAFKA_CONF_OK)
       logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   } else {
@@ -213,7 +214,7 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr<KafkaConnection>
   max_seg_size_ = ULLONG_MAX;
   if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
     max_seg_size_ = valInt;
-    logger_->log_debug("PublishKafka: max flow segment size [%d]", 
max_seg_size_);
+    logger_->log_debug("PublishKafka: max flow segment size [%llu]", 
max_seg_size_);
   }
   value = "";
   if (context->getProperty(QueueBufferMaxTime.getName(), value) && 
!value.empty()) {
@@ -319,27 +320,29 @@ void PublishKafka::onTrigger(const 
std::shared_ptr<core::ProcessContext> &contex
 
   std::string client_id, brokers, topic;
 
-  std::shared_ptr<KafkaConnection> conn = nullptr;
+  std::unique_ptr<KafkaLease> lease;
+  std::shared_ptr<KafkaConnection> conn;
 // get the client ID, brokers, and topic from either the flowfile, the 
configuration, or the properties
   if (context->getProperty(ClientName, client_id, flowFile) && 
context->getProperty(SeedBrokers, brokers, flowFile) && 
context->getProperty(Topic, topic, flowFile)) {
     KafkaConnectionKey key;
     key.brokers_ = brokers;
     key.client_id_ = client_id;
 
-    conn = connection_pool_.getOrCreateConnection(key);
+    lease = connection_pool_.getOrCreateConnection(key);
+    if (lease == nullptr) {
+      logger_->log_info("This connection is used by another thread.");
+      context->yield();
+      return;
+    }
+    conn = lease->getConn();
 
     if (!conn->initialized()) {
       logger_->log_trace("Connection not initialized to %s, %s, %s", 
client_id, brokers, topic);
-      // get the ownership so we can configure this connection
-      KafkaLease lease = conn->obtainOwnership();
-
       if (!configureNewConnection(conn, context, flowFile)) {
         logger_->log_error("Could not configure Kafka Connection");
         session->transfer(flowFile, Failure);
         return;
       }
-
-      // lease will go away
     }
 
     if (!conn->hasTopic(topic)) {
diff --git a/extensions/librdkafka/PublishKafka.h 
b/extensions/librdkafka/PublishKafka.h
index 8f6806f..4d0a759 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -29,6 +29,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "core/logging/Logger.h"
 #include "rdkafka.h"
+#include "KafkaPool.h"
 #include <regex>
 
 namespace org {
@@ -52,251 +53,6 @@ namespace processors {
 #define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
 #define KAFKA_KEY_ATTRIBUTE "kafka.key"
 
-class KafkaConnectionKey {
- public:
-  std::string client_id_;
-  std::string brokers_;
-};
-
-class KafkaTopic {
- public:
-  KafkaTopic(rd_kafka_topic_t *topic_reference, rd_kafka_topic_conf_t 
*topic_conf)
-      : topic_conf_(topic_conf),
-        topic_reference_(topic_reference) {
-
-  }
-
-  ~KafkaTopic() {
-    if (topic_reference_) {
-      rd_kafka_topic_destroy(topic_reference_);
-    }
-    if (topic_conf_) {
-      rd_kafka_topic_conf_destroy(topic_conf_);
-    }
-  }
-
-  rd_kafka_topic_conf_t *getTopicConf() {
-    return topic_conf_;
-  }
-
-  rd_kafka_topic_t *getTopic() {
-    return topic_reference_;
-  }
-
- private:
-  rd_kafka_topic_conf_t *topic_conf_;
-  rd_kafka_topic_t *topic_reference_;
-};
-
-struct KafkaConnectionComparator {
-
-  bool operator()(const KafkaConnectionKey& a, const KafkaConnectionKey& b) 
const {
-    if (a.brokers_ != b.brokers_) {
-      return a.brokers_ < b.brokers_;
-    } else {
-      return a.client_id_ < b.client_id_;
-    }
-  }
-};
-
-class KafkaLease {
- public:
-  KafkaLease(std::atomic<bool> *lease)
-      : lease_(lease) {
-    *lease_ = true;
-  }
-  ~KafkaLease() {
-    *lease_ = false;
-  }
- private:
-  std::atomic<bool> *lease_;
-};
-
-class KafkaConnection {
-
- public:
-
-  explicit KafkaConnection(const KafkaConnectionKey &key)
-      : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
-        conf_(nullptr),
-        kafka_connection_(nullptr) {
-    lease_ = false;
-    initialized_ = false;
-    key_ = key;
-  }
-
-  ~KafkaConnection() {
-    remove();
-  }
-
-  void remove() {
-    topics_.clear();
-    removeConnection();
-  }
-
-  void removeConnection() {
-    if (kafka_connection_) {
-      rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds 
*/
-      rd_kafka_destroy(kafka_connection_);
-      modifyLoggers([&](std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>>& loggers) {
-        loggers.erase(kafka_connection_);
-      });
-      kafka_connection_ = nullptr;
-    }
-    if (conf_) {
-      rd_kafka_conf_destroy(conf_);
-      conf_ = nullptr;
-    }
-    initialized_ = false;
-  }
-
-  bool initialized() {
-    return initialized_;
-  }
-
-  void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    removeConnection();
-    kafka_connection_ = producer;
-    conf_ = conf;
-    initialized_ = true;
-    modifyLoggers([&](std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>>& loggers) {
-      loggers[producer] = logger_;
-    });
-  }
-
-  rd_kafka_conf_t *getConf() {
-    return conf_;
-  }
-
-  rd_kafka_t *getConnection() {
-    return kafka_connection_;
-  }
-
-  KafkaLease obtainOwnership() {
-    while (lease_) {
-    }
-    std::lock_guard<std::mutex> lock(mutex_);
-    return KafkaLease(&lease_);
-  }
-
-  bool hasTopic(const std::string &topic) {
-    return topics_.find(topic) != std::end(topics_);
-  }
-
-  std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) {
-    auto topicObj = topics_.find(topic);
-    if (topicObj != topics_.end()) {
-      return topicObj->second;
-    }
-    return nullptr;
-  }
-
-  KafkaConnectionKey const * const getKey() const {
-    return &key_;
-  }
-
-  void putTopic(const std::string &topicName, const 
std::shared_ptr<KafkaTopic> topic) {
-    topics_.insert(std::make_pair(topicName, topic));
-  }
-
-  static void logCallback(const rd_kafka_t* rk, int level, const char* 
/*fac*/, const char* buf) {
-    std::shared_ptr<logging::Logger> logger;
-    try {
-      modifyLoggers([&](std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>>& loggers) {
-        logger = loggers.at(rk).lock();
-      });
-    } catch (...) {
-    }
-
-    if (!logger) {
-      return;
-    }
-
-    switch (level) {
-      case 0: // LOG_EMERG
-      case 1: // LOG_ALERT
-      case 2: // LOG_CRIT
-      case 3: // LOG_ERR
-        logging::LOG_ERROR(logger) << buf;
-        break;
-      case 4: // LOG_WARNING
-        logging::LOG_WARN(logger) << buf;
-        break;
-      case 5: // LOG_NOTICE
-      case 6: // LOG_INFO
-        logging::LOG_INFO(logger) << buf;
-        break;
-      case 7: // LOG_DEBUG
-        logging::LOG_DEBUG(logger) << buf;
-        break;
-    }
-  }
-
- private:
-
-  std::shared_ptr<logging::Logger> logger_;
-
-  std::mutex mutex_;
-
-  std::atomic<bool> lease_;
-
-  std::atomic<bool> initialized_;
-
-  KafkaConnectionKey key_;
-
-  std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
-
-  rd_kafka_conf_t *conf_;
-  rd_kafka_t *kafka_connection_;
-
-  static void modifyLoggers(const std::function<void(std::unordered_map<const 
rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
-    static std::mutex loggers_mutex;
-    static std::unordered_map<const rd_kafka_t*, 
std::weak_ptr<logging::Logger>> loggers;
-
-    std::lock_guard<std::mutex> lock(loggers_mutex);
-    func(loggers);
-  }
-};
-
-class KafkaPool {
- public:
-
-  explicit KafkaPool(int max)
-      : max_(max) {
-  }
-
-  bool removeConnection(const KafkaConnectionKey &key) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    return map_.erase(key) == 1;
-  }
-
-  std::shared_ptr<KafkaConnection> getOrCreateConnection(const 
KafkaConnectionKey &key) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    auto connection = map_.find(key);
-    if (connection != map_.end()) {
-      return connection->second;
-    }
-    if (map_.size() > max_) {
-      auto advanceIter = map_.begin();
-      auto elemToRemove = map_.size() - max_;
-      std::advance(advanceIter, elemToRemove);
-      map_.erase(map_.begin(), advanceIter);
-    }
-    auto newConnection = std::make_shared<KafkaConnection>(key);
-    map_.insert(std::make_pair(key, newConnection));
-    return newConnection;
-  }
-
- private:
-  std::mutex mutex_;
-
-  int max_;
-
-  std::map<KafkaConnectionKey, std::shared_ptr<KafkaConnection>, 
KafkaConnectionComparator> map_;
-
-};
-
 // PublishKafka Class
 class PublishKafka : public core::Processor {
  public:

Reply via email to