Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 4fe38ccdf -> 08f4a2b8c
MINIFICPP-427 - add PublishKafka headers support This closes #360. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/08f4a2b8 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/08f4a2b8 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/08f4a2b8 Branch: refs/heads/master Commit: 08f4a2b8c484c0fff190518fb285ba66d027b46e Parents: 4fe38cc Author: Dustin Rodrigues <[email protected]> Authored: Sat Jun 16 12:46:09 2018 -0400 Committer: Marc Parisi <[email protected]> Committed: Mon Jul 30 09:32:24 2018 -0400 ---------------------------------------------------------------------- PROCESSORS.md | 1 + README.md | 2 ++ extensions/librdkafka/PublishKafka.cpp | 9 ++++++- extensions/librdkafka/PublishKafka.h | 39 ++++++++++++++++++++++++++--- 4 files changed, 47 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/PROCESSORS.md ---------------------------------------------------------------------- diff --git a/PROCESSORS.md b/PROCESSORS.md index 0b79963..214d2af 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -1023,6 +1023,7 @@ default values, and whether a property supports the NiFi Expression Language. | Request Timeout | | | The ack timeout of the producer request in milliseconds | | Client Name | | | Client Name to use when communicating with Kafka | | Batch Size | | | Maximum number of messages batched in one MessageSet | +| Attributes to Send as Headers | | | Any attribute whose name matches the regex will be added to the Kafka messages as a Header | | Queue Buffering Max Time | | | Delay to wait for messages in the producer queue to accumulate before constructing message batches | | Queue Max Buffer Size | | | Maximum total message size sum allowed on the producer queue | | Queue Max Message | | | Maximum number of messages allowed on the producer queue | http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index c3352d5..3da9a34 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,8 @@ or greater is recommended. **NOTE** if bustache (ApplyTemplate) support is enabled, a recent version of a compiler supporting c++-11 must be used. GCC versions >= 6.3.1 are known to work. +**NOTE** if Kafka support is enabled, a recent version of a compiler supporting C++-11 regexes must be used. GCC versions >= 4.9.x are recommended. + **NOTE** if Expression Language support is enabled, FlexLexer must be in the include path and the version must be compatible with the version of flex used when generating lexer sources. Lexer source generation is automatically performed during CMake builds. To re-generate the sources, remove: * extensions/expression-language/Parser.cpp http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/extensions/librdkafka/PublishKafka.cpp ---------------------------------------------------------------------- diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index 5f39e03..656eb32 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -42,6 +42,7 @@ core::Property PublishKafka::MaxMessageSize("Max Request Size", "Maximum Kafka p core::Property PublishKafka::RequestTimeOut("Request Timeout", "The ack timeout of the producer request in milliseconds", ""); core::Property PublishKafka::ClientName("Client Name", "Client Name to use when communicating with Kafka", ""); core::Property PublishKafka::BatchSize("Batch Size", "Maximum number of messages batched in one MessageSet", ""); +core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", ""); core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", ""); core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", ""); core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", ""); @@ -64,6 +65,7 @@ void PublishKafka::initialize() { properties.insert(MaxMessageSize); properties.insert(RequestTimeOut); properties.insert(ClientName); + properties.insert(AttributeNameRegex); properties.insert(BatchSize); properties.insert(QueueBufferMaxTime); properties.insert(QueueBufferMaxSize); @@ -122,6 +124,11 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio logger_->log_error("PublishKafka: configure error result [%s]", errstr); } value = ""; + if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) { + attributeNameRegex.assign(value); + logger_->log_debug("PublishKafka: AttributeNameRegex %s", value); + } + value = ""; if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { valInt = valInt/1024; valueConf = std::to_string(valInt); @@ -262,7 +269,7 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex if (flowFile->getAttribute(KAFKA_KEY_ATTRIBUTE, value)) kafkaKey = value; - PublishKafka::ReadCallback callback(flowFile->getSize(), max_seg_size_, kafkaKey, rkt_); + PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, rkt_, rk_, flowFile, attributeNameRegex); session->read(flowFile, &callback); if (callback.status_ < 0) { logger_->log_error("Failed to send flow to kafka topic %s", topic_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/extensions/librdkafka/PublishKafka.h ---------------------------------------------------------------------- diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 65ed849..0814dd9 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -28,6 +28,7 @@ #include "core/Property.h" #include "core/logging/LoggerConfiguration.h" #include "rdkafka.h" +#include <regex> namespace org { namespace apache { @@ -83,6 +84,7 @@ public: static core::Property RequestTimeOut; static core::Property ClientName; static core::Property BatchSize; + static core::Property AttributeNameRegex; static core::Property QueueBufferMaxTime; static core::Property QueueBufferMaxSize; static core::Property QueueBufferMaxMessage; @@ -101,12 +103,17 @@ public: // Nest Callback Class for read stream class ReadCallback: public InputStreamCallback { public: - ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt) : - flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), rkt_(rkt) { + ReadCallback(uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt, rd_kafka_t *rk, const std::shared_ptr<core::FlowFile> &flowFile, const std::regex &attributeNameRegex) : + max_seg_size_(max_seg_size), key_(key), rkt_(rkt), rk_(rk), flowFile_(flowFile), attributeNameRegex_(attributeNameRegex) { + flow_size_ = flowFile_->getSize(); status_ = 0; read_size_ = 0; + hdrs = nullptr; } ~ReadCallback() { + if (hdrs) { + rd_kafka_headers_destroy(hdrs); + } } int64_t process(std::shared_ptr<io::BaseStream> stream) { if (flow_size_ < max_seg_size_) @@ -115,6 +122,17 @@ public: buffer.reserve(max_seg_size_); read_size_ = 0; status_ = 0; + rd_kafka_resp_err_t err; + + for (auto kv : flowFile_->getAttributes()) { + if (regex_match(kv.first, attributeNameRegex_)) { + if (!hdrs) { + hdrs = rd_kafka_headers_new(8); + } + err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size()); + } + } + while (read_size_ < flow_size_) { int readRet = stream->read(&buffer[0], max_seg_size_); if (readRet < 0) { @@ -122,7 +140,17 @@ public: return read_size_; } if (readRet > 0) { - if (rd_kafka_produce(rkt_, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, &buffer[0], readRet, key_.c_str(), key_.size(), NULL) == -1) { + if (hdrs) { + rd_kafka_headers_t *hdrs_copy; + hdrs_copy = rd_kafka_headers_copy(hdrs); + err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet), RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END); + if (err) { + rd_kafka_headers_destroy(hdrs_copy); + } + } else { + err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END); + } + if (err) { status_ = -1; return read_size_; } @@ -137,8 +165,12 @@ public: uint64_t max_seg_size_; std::string key_; rd_kafka_topic_t *rkt_; + rd_kafka_t *rk_; + rd_kafka_headers_t *hdrs; + std::shared_ptr<core::FlowFile> flowFile_; int status_; int read_size_; + std::regex attributeNameRegex_; }; public: @@ -167,6 +199,7 @@ private: rd_kafka_topic_t *rkt_; std::string topic_; uint64_t max_seg_size_; + std::regex attributeNameRegex; }; REGISTER_RESOURCE (PublishKafka);
