This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new fe480b3 MINIFICPP-959: Review librdkafka thread safety
fe480b3 is described below
commit fe480b3ad2cd26da5aaa52105612ae8ede78fb5b
Author: Nghia Le <[email protected]>
AuthorDate: Wed Aug 7 20:39:26 2019 +0200
MINIFICPP-959: Review librdkafka thread safety
Signed-off-by: Arpad Boda <[email protected]>
This closes #621
---
extensions/librdkafka/KafkaConnection.cpp | 144 +++++++++++++++++
extensions/librdkafka/KafkaConnection.h | 129 ++++++++++++++++
extensions/librdkafka/KafkaPool.h | 77 ++++++++++
extensions/librdkafka/KafkaTopic.h | 65 ++++++++
extensions/librdkafka/PublishKafka.cpp | 25 +--
extensions/librdkafka/PublishKafka.h | 246 +-----------------------------
6 files changed, 430 insertions(+), 256 deletions(-)
diff --git a/extensions/librdkafka/KafkaConnection.cpp
b/extensions/librdkafka/KafkaConnection.cpp
new file mode 100644
index 0000000..28b0b45
--- /dev/null
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+KafkaConnection::KafkaConnection(const KafkaConnectionKey &key)
+ : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
+ conf_(nullptr),
+ kafka_connection_(nullptr) {
+ lease_ = false;
+ initialized_ = false;
+ key_ = key;
+}
+
+void KafkaConnection::remove() {
+ topics_.clear();
+ removeConnection();
+}
+
+void KafkaConnection::removeConnection() {
+ if (kafka_connection_) {
+ rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */
+ rd_kafka_destroy(kafka_connection_);
+ modifyLoggers([&](std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>>& loggers) {
+ loggers.erase(kafka_connection_);
+ });
+ kafka_connection_ = nullptr;
+ }
+ if (conf_) {
+ rd_kafka_conf_destroy(conf_);
+ conf_ = nullptr;
+ }
+ initialized_ = false;
+}
+
+bool KafkaConnection::initialized() const {
+ return initialized_;
+}
+
+void KafkaConnection::setConnection(rd_kafka_t *producer, rd_kafka_conf_t
*conf) {
+ removeConnection();
+ kafka_connection_ = producer;
+ conf_ = conf;
+ initialized_ = true;
+ modifyLoggers([&](std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>>& loggers) {
+ loggers[producer] = logger_;
+ });
+}
+
+rd_kafka_conf_t *KafkaConnection::getConf() const {
+ return conf_;
+}
+
+rd_kafka_t *KafkaConnection::getConnection() const {
+ return kafka_connection_;
+}
+
+bool KafkaConnection::hasTopic(const std::string &topic) const {
+ return topics_.count(topic);
+}
+
+std::shared_ptr<KafkaTopic> KafkaConnection::getTopic(const std::string
&topic) const {
+ auto topicObj = topics_.find(topic);
+ if (topicObj != topics_.end()) {
+ return topicObj->second;
+ }
+ return nullptr;
+}
+
+KafkaConnectionKey const * const KafkaConnection::getKey() const {
+ return &key_;
+}
+
+void KafkaConnection::putTopic(const std::string &topicName, const
std::shared_ptr<KafkaTopic> &topic) {
+ topics_[topicName] = topic;
+}
+
+void KafkaConnection::logCallback(const rd_kafka_t* rk, int level, const char*
/*fac*/, const char* buf) {
+ std::shared_ptr<logging::Logger> logger;
+ try {
+ modifyLoggers([&](std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>>& loggers) {
+ logger = loggers.at(rk).lock();
+ });
+ } catch (...) {
+ }
+
+ if (!logger) {
+ return;
+ }
+
+ switch (level) {
+ case 0: // LOG_EMERG
+ case 1: // LOG_ALERT
+ case 2: // LOG_CRIT
+ case 3: // LOG_ERR
+ logging::LOG_ERROR(logger) << buf;
+ break;
+ case 4: // LOG_WARNING
+ logging::LOG_WARN(logger) << buf;
+ break;
+ case 5: // LOG_NOTICE
+ case 6: // LOG_INFO
+ logging::LOG_INFO(logger) << buf;
+ break;
+ case 7: // LOG_DEBUG
+ logging::LOG_DEBUG(logger) << buf;
+ break;
+ }
+}
+
+bool KafkaConnection::tryUse() {
+ std::lock_guard<std::mutex> lock(lease_mutex_);
+ if (lease_) {
+ return false;
+ }
+ lease_ = true;
+ return true;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/librdkafka/KafkaConnection.h
b/extensions/librdkafka/KafkaConnection.h
new file mode 100644
index 0000000..6e158fe
--- /dev/null
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_KAFKACONNECTION_H
+#define NIFI_MINIFI_CPP_KAFKACONNECTION_H
+
+#include <mutex>
+#include <string>
+#include "core/logging/LoggerConfiguration.h"
+#include "core/logging/Logger.h"
+#include "rdkafka.h"
+#include "KafkaTopic.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class KafkaConnectionKey {
+ public:
+ std::string client_id_;
+ std::string brokers_;
+
+ bool operator <(const KafkaConnectionKey& rhs) const {
+ return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_,
rhs.client_id_);
+ }
+};
+
+class KafkaConnection {
+ public:
+
+ explicit KafkaConnection(const KafkaConnectionKey &key);
+
+ ~KafkaConnection() {
+ remove();
+ }
+
+ void remove();
+
+ void removeConnection();
+
+ bool initialized() const;
+
+ void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf);
+
+ rd_kafka_conf_t *getConf() const;
+
+ rd_kafka_t *getConnection() const;
+
+ bool hasTopic(const std::string &topic) const;
+
+ std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) const;
+
+ KafkaConnectionKey const * const getKey() const;
+
+ void putTopic(const std::string &topicName, const
std::shared_ptr<KafkaTopic> &topic);
+
+ static void logCallback(const rd_kafka_t* rk, int level, const char*
/*fac*/, const char* buf);
+
+ bool tryUse();
+
+ friend class KafkaLease;
+
+ private:
+
+ std::shared_ptr<logging::Logger> logger_;
+
+ std::mutex lease_mutex_;
+
+ bool lease_;
+
+ bool initialized_;
+
+ KafkaConnectionKey key_;
+
+ std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
+
+ rd_kafka_conf_t *conf_;
+ rd_kafka_t *kafka_connection_;
+
+ static void modifyLoggers(const std::function<void(std::unordered_map<const
rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
+ static std::mutex loggers_mutex;
+ static std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>> loggers;
+
+ std::lock_guard<std::mutex> lock(loggers_mutex);
+ func(loggers);
+ }
+};
+
+class KafkaLease {
+ public:
+ ~KafkaLease() {
+ std::lock_guard<std::mutex> lock(conn_->lease_mutex_);
+ conn_->lease_ = false;
+ }
+ std::shared_ptr<KafkaConnection> getConn() const {
+ return conn_;
+ }
+ friend class KafkaPool;
+ private:
+ KafkaLease(std::shared_ptr<KafkaConnection> conn) // This one should be
private, and only KafkaPool can call (friend).
+ : conn_(conn) {
+ }
+
+ std::shared_ptr<KafkaConnection> conn_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_KAFKACONNECTION_H
diff --git a/extensions/librdkafka/KafkaPool.h
b/extensions/librdkafka/KafkaPool.h
new file mode 100644
index 0000000..3fe2d42
--- /dev/null
+++ b/extensions/librdkafka/KafkaPool.h
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_KAFKAPOOL_H
+#define NIFI_MINIFI_CPP_KAFKAPOOL_H
+
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class KafkaPool {
+ public:
+
+ explicit KafkaPool(int max)
+ : max_(max) {
+ }
+
+ bool removeConnection(const KafkaConnectionKey &key) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return map_.erase(key) == 1;
+ }
+
+ std::unique_ptr<KafkaLease> getOrCreateConnection(const KafkaConnectionKey
&key) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto connection = map_.find(key);
+ std::shared_ptr<KafkaConnection> conn;
+ if (connection == map_.end()) {
+ // Not found, create new connection.
+ conn = std::make_shared<KafkaConnection>(key);
+ if (map_.size() == max_) {
+ // Reached pool limit, remove the first one.
+ map_.erase(map_.begin());
+ }
+ map_[key] = conn;
+ } else {
+ conn = connection->second;
+ }
+ std::unique_ptr<KafkaLease> lease;
+ if (conn->tryUse()) {
+ lease = std::unique_ptr<KafkaLease>(new KafkaLease(conn));
+ }
+ return lease;
+ }
+
+ private:
+ std::mutex mutex_;
+
+ size_t max_;
+
+ std::map<KafkaConnectionKey, std::shared_ptr<KafkaConnection>> map_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_KAFKAPOOL_H
diff --git a/extensions/librdkafka/KafkaTopic.h
b/extensions/librdkafka/KafkaTopic.h
new file mode 100644
index 0000000..30c34a0
--- /dev/null
+++ b/extensions/librdkafka/KafkaTopic.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_KAFKATOPIC_H
+#define NIFI_MINIFI_CPP_KAFKATOPIC_H
+
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class KafkaTopic {
+ public:
+ KafkaTopic(rd_kafka_topic_t *topic_reference, rd_kafka_topic_conf_t
*topic_conf)
+ : topic_conf_(topic_conf),
+ topic_reference_(topic_reference) {
+
+ }
+
+ ~KafkaTopic() {
+ if (topic_reference_) {
+ rd_kafka_topic_destroy(topic_reference_);
+ }
+ if (topic_conf_) {
+ rd_kafka_topic_conf_destroy(topic_conf_);
+ }
+ }
+
+ rd_kafka_topic_conf_t *getTopicConf() const {
+ return topic_conf_;
+ }
+
+ rd_kafka_topic_t *getTopic() const {
+ return topic_reference_;
+ }
+
+ private:
+ rd_kafka_topic_conf_t *topic_conf_;
+ rd_kafka_topic_t *topic_reference_;
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_KAFKATOPIC_H
diff --git a/extensions/librdkafka/PublishKafka.cpp
b/extensions/librdkafka/PublishKafka.cpp
index 75cb55c..b248c1e 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -79,7 +79,8 @@ core::Property PublishKafka::KerberosKeytabPath("Kerberos
Keytab Path",
core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of
a field in the Input Records that should be used as the Key for the Kafka
message.\n"
"Supports Expression Language:
true (will be evaluated using flow file attributes)",
"");
-core::Property PublishKafka::DebugContexts("Debug contexts", "A
comma-separated list of debug contexts to enable. Including: generic, broker,
topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch,
interceptor, plugin, consumer, admin, eos, all", "");
+core::Property PublishKafka::DebugContexts("Debug contexts", "A
comma-separated list of debug contexts to enable."
+ "Including: generic, broker, topic,
metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor,
plugin, consumer, admin, eos, all", "");
core::Relationship PublishKafka::Success("success", "Any FlowFile that is
successfully sent to Kafka will be routed to this Relationship");
core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot
be sent to Kafka will be routed to this Relationship");
@@ -140,7 +141,7 @@ bool PublishKafka::configureNewConnection(const
std::shared_ptr<KafkaConnection>
if (!key->brokers_.empty()) {
result = rd_kafka_conf_set(conf_, "bootstrap.servers",
key->brokers_.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: bootstrap.servers [%s]",
key->brokers_.c_str());
+ logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
@@ -150,7 +151,7 @@ bool PublishKafka::configureNewConnection(const
std::shared_ptr<KafkaConnection>
if (!key->client_id_.empty()) {
rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr,
sizeof(errstr));
- logger_->log_debug("PublishKafka: client.id [%s]",
key->client_id_.c_str());
+ logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
@@ -213,7 +214,7 @@ bool PublishKafka::configureNewConnection(const
std::shared_ptr<KafkaConnection>
max_seg_size_ = ULLONG_MAX;
if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty()
&& core::Property::StringToInt(value, valInt)) {
max_seg_size_ = valInt;
- logger_->log_debug("PublishKafka: max flow segment size [%d]",
max_seg_size_);
+ logger_->log_debug("PublishKafka: max flow segment size [%llu]",
max_seg_size_);
}
value = "";
if (context->getProperty(QueueBufferMaxTime.getName(), value) &&
!value.empty()) {
@@ -319,27 +320,29 @@ void PublishKafka::onTrigger(const
std::shared_ptr<core::ProcessContext> &contex
std::string client_id, brokers, topic;
- std::shared_ptr<KafkaConnection> conn = nullptr;
+ std::unique_ptr<KafkaLease> lease;
+ std::shared_ptr<KafkaConnection> conn;
// get the client ID, brokers, and topic from either the flowfile, the
configuration, or the properties
if (context->getProperty(ClientName, client_id, flowFile) &&
context->getProperty(SeedBrokers, brokers, flowFile) &&
context->getProperty(Topic, topic, flowFile)) {
KafkaConnectionKey key;
key.brokers_ = brokers;
key.client_id_ = client_id;
- conn = connection_pool_.getOrCreateConnection(key);
+ lease = connection_pool_.getOrCreateConnection(key);
+ if (lease == nullptr) {
+ logger_->log_info("This connection is used by another thread.");
+ context->yield();
+ return;
+ }
+ conn = lease->getConn();
if (!conn->initialized()) {
logger_->log_trace("Connection not initialized to %s, %s, %s",
client_id, brokers, topic);
- // get the ownership so we can configure this connection
- KafkaLease lease = conn->obtainOwnership();
-
if (!configureNewConnection(conn, context, flowFile)) {
logger_->log_error("Could not configure Kafka Connection");
session->transfer(flowFile, Failure);
return;
}
-
- // lease will go away
}
if (!conn->hasTopic(topic)) {
diff --git a/extensions/librdkafka/PublishKafka.h
b/extensions/librdkafka/PublishKafka.h
index 8f6806f..4d0a759 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -29,6 +29,7 @@
#include "core/logging/LoggerConfiguration.h"
#include "core/logging/Logger.h"
#include "rdkafka.h"
+#include "KafkaPool.h"
#include <regex>
namespace org {
@@ -52,251 +53,6 @@ namespace processors {
#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
#define KAFKA_KEY_ATTRIBUTE "kafka.key"
-class KafkaConnectionKey {
- public:
- std::string client_id_;
- std::string brokers_;
-};
-
-class KafkaTopic {
- public:
- KafkaTopic(rd_kafka_topic_t *topic_reference, rd_kafka_topic_conf_t
*topic_conf)
- : topic_conf_(topic_conf),
- topic_reference_(topic_reference) {
-
- }
-
- ~KafkaTopic() {
- if (topic_reference_) {
- rd_kafka_topic_destroy(topic_reference_);
- }
- if (topic_conf_) {
- rd_kafka_topic_conf_destroy(topic_conf_);
- }
- }
-
- rd_kafka_topic_conf_t *getTopicConf() {
- return topic_conf_;
- }
-
- rd_kafka_topic_t *getTopic() {
- return topic_reference_;
- }
-
- private:
- rd_kafka_topic_conf_t *topic_conf_;
- rd_kafka_topic_t *topic_reference_;
-};
-
-struct KafkaConnectionComparator {
-
- bool operator()(const KafkaConnectionKey& a, const KafkaConnectionKey& b)
const {
- if (a.brokers_ != b.brokers_) {
- return a.brokers_ < b.brokers_;
- } else {
- return a.client_id_ < b.client_id_;
- }
- }
-};
-
-class KafkaLease {
- public:
- KafkaLease(std::atomic<bool> *lease)
- : lease_(lease) {
- *lease_ = true;
- }
- ~KafkaLease() {
- *lease_ = false;
- }
- private:
- std::atomic<bool> *lease_;
-};
-
-class KafkaConnection {
-
- public:
-
- explicit KafkaConnection(const KafkaConnectionKey &key)
- : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()),
- conf_(nullptr),
- kafka_connection_(nullptr) {
- lease_ = false;
- initialized_ = false;
- key_ = key;
- }
-
- ~KafkaConnection() {
- remove();
- }
-
- void remove() {
- topics_.clear();
- removeConnection();
- }
-
- void removeConnection() {
- if (kafka_connection_) {
- rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds
*/
- rd_kafka_destroy(kafka_connection_);
- modifyLoggers([&](std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>>& loggers) {
- loggers.erase(kafka_connection_);
- });
- kafka_connection_ = nullptr;
- }
- if (conf_) {
- rd_kafka_conf_destroy(conf_);
- conf_ = nullptr;
- }
- initialized_ = false;
- }
-
- bool initialized() {
- return initialized_;
- }
-
- void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) {
- std::lock_guard<std::mutex> lock(mutex_);
- removeConnection();
- kafka_connection_ = producer;
- conf_ = conf;
- initialized_ = true;
- modifyLoggers([&](std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>>& loggers) {
- loggers[producer] = logger_;
- });
- }
-
- rd_kafka_conf_t *getConf() {
- return conf_;
- }
-
- rd_kafka_t *getConnection() {
- return kafka_connection_;
- }
-
- KafkaLease obtainOwnership() {
- while (lease_) {
- }
- std::lock_guard<std::mutex> lock(mutex_);
- return KafkaLease(&lease_);
- }
-
- bool hasTopic(const std::string &topic) {
- return topics_.find(topic) != std::end(topics_);
- }
-
- std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) {
- auto topicObj = topics_.find(topic);
- if (topicObj != topics_.end()) {
- return topicObj->second;
- }
- return nullptr;
- }
-
- KafkaConnectionKey const * const getKey() const {
- return &key_;
- }
-
- void putTopic(const std::string &topicName, const
std::shared_ptr<KafkaTopic> topic) {
- topics_.insert(std::make_pair(topicName, topic));
- }
-
- static void logCallback(const rd_kafka_t* rk, int level, const char*
/*fac*/, const char* buf) {
- std::shared_ptr<logging::Logger> logger;
- try {
- modifyLoggers([&](std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>>& loggers) {
- logger = loggers.at(rk).lock();
- });
- } catch (...) {
- }
-
- if (!logger) {
- return;
- }
-
- switch (level) {
- case 0: // LOG_EMERG
- case 1: // LOG_ALERT
- case 2: // LOG_CRIT
- case 3: // LOG_ERR
- logging::LOG_ERROR(logger) << buf;
- break;
- case 4: // LOG_WARNING
- logging::LOG_WARN(logger) << buf;
- break;
- case 5: // LOG_NOTICE
- case 6: // LOG_INFO
- logging::LOG_INFO(logger) << buf;
- break;
- case 7: // LOG_DEBUG
- logging::LOG_DEBUG(logger) << buf;
- break;
- }
- }
-
- private:
-
- std::shared_ptr<logging::Logger> logger_;
-
- std::mutex mutex_;
-
- std::atomic<bool> lease_;
-
- std::atomic<bool> initialized_;
-
- KafkaConnectionKey key_;
-
- std::map<std::string, std::shared_ptr<KafkaTopic>> topics_;
-
- rd_kafka_conf_t *conf_;
- rd_kafka_t *kafka_connection_;
-
- static void modifyLoggers(const std::function<void(std::unordered_map<const
rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) {
- static std::mutex loggers_mutex;
- static std::unordered_map<const rd_kafka_t*,
std::weak_ptr<logging::Logger>> loggers;
-
- std::lock_guard<std::mutex> lock(loggers_mutex);
- func(loggers);
- }
-};
-
-class KafkaPool {
- public:
-
- explicit KafkaPool(int max)
- : max_(max) {
- }
-
- bool removeConnection(const KafkaConnectionKey &key) {
- std::lock_guard<std::mutex> lock(mutex_);
- return map_.erase(key) == 1;
- }
-
- std::shared_ptr<KafkaConnection> getOrCreateConnection(const
KafkaConnectionKey &key) {
- std::lock_guard<std::mutex> lock(mutex_);
- auto connection = map_.find(key);
- if (connection != map_.end()) {
- return connection->second;
- }
- if (map_.size() > max_) {
- auto advanceIter = map_.begin();
- auto elemToRemove = map_.size() - max_;
- std::advance(advanceIter, elemToRemove);
- map_.erase(map_.begin(), advanceIter);
- }
- auto newConnection = std::make_shared<KafkaConnection>(key);
- map_.insert(std::make_pair(key, newConnection));
- return newConnection;
- }
-
- private:
- std::mutex mutex_;
-
- int max_;
-
- std::map<KafkaConnectionKey, std::shared_ptr<KafkaConnection>,
KafkaConnectionComparator> map_;
-
-};
-
// PublishKafka Class
class PublishKafka : public core::Processor {
public: