Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 4fe38ccdf -> 08f4a2b8c


MINIFICPP-427 - add PublishKafka headers support

This closes #360.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/08f4a2b8
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/08f4a2b8
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/08f4a2b8

Branch: refs/heads/master
Commit: 08f4a2b8c484c0fff190518fb285ba66d027b46e
Parents: 4fe38cc
Author: Dustin Rodrigues <[email protected]>
Authored: Sat Jun 16 12:46:09 2018 -0400
Committer: Marc Parisi <[email protected]>
Committed: Mon Jul 30 09:32:24 2018 -0400

----------------------------------------------------------------------
 PROCESSORS.md                          |  1 +
 README.md                              |  2 ++
 extensions/librdkafka/PublishKafka.cpp |  9 ++++++-
 extensions/librdkafka/PublishKafka.h   | 39 ++++++++++++++++++++++++++---
 4 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 0b79963..214d2af 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1023,6 +1023,7 @@ default values, and whether a property supports the NiFi 
Expression Language.
 | Request Timeout | | | The ack timeout of the producer request in 
milliseconds |
 | Client Name | | | Client Name to use when communicating with Kafka |
 | Batch Size | | | Maximum number of messages batched in one MessageSet |
+| Attributes to Send as Headers | | | Any attribute whose name matches the 
regex will be added to the Kafka messages as a Header |
 | Queue Buffering Max Time | | | Delay to wait for messages in the producer 
queue to accumulate before constructing message batches |
 | Queue Max Buffer Size | | | Maximum total message size sum allowed on the 
producer queue |
 | Queue Max Message | | | Maximum number of messages allowed on the producer 
queue |

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index c3352d5..3da9a34 100644
--- a/README.md
+++ b/README.md
@@ -118,6 +118,8 @@ or greater is recommended.
 
 **NOTE** if bustache (ApplyTemplate) support is enabled, a recent version of a 
compiler supporting c++-11 must be used. GCC versions >= 6.3.1 are known to 
work.
 
+**NOTE** if Kafka support is enabled, a recent version of a compiler 
supporting C++-11 regexes must be used. GCC versions >= 4.9.x are recommended.
+
 **NOTE** if Expression Language support is enabled, FlexLexer must be in the 
include path and the version must be compatible with the version of flex used 
when generating lexer sources. Lexer source generation is automatically 
performed during CMake builds. To re-generate the sources, remove: 
 
  * extensions/expression-language/Parser.cpp

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/extensions/librdkafka/PublishKafka.cpp
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
index 5f39e03..656eb32 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -42,6 +42,7 @@ core::Property PublishKafka::MaxMessageSize("Max Request 
Size", "Maximum Kafka p
 core::Property PublishKafka::RequestTimeOut("Request Timeout", "The ack 
timeout of the producer request in milliseconds", "");
 core::Property PublishKafka::ClientName("Client Name", "Client Name to use 
when communicating with Kafka", "");
 core::Property PublishKafka::BatchSize("Batch Size", "Maximum number of 
messages batched in one MessageSet", "");
+core::Property PublishKafka::AttributeNameRegex("Attributes to Send as 
Headers", "Any attribute whose name matches the regex will be added to the 
Kafka messages as a Header", "");
 core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", 
"Delay to wait for messages in the producer queue to accumulate before 
constructing message batches", "");
 core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", 
"Maximum total message size sum allowed on the producer queue", "");
 core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", 
"Maximum number of messages allowed on the producer queue", "");
@@ -64,6 +65,7 @@ void PublishKafka::initialize() {
   properties.insert(MaxMessageSize);
   properties.insert(RequestTimeOut);
   properties.insert(ClientName);
+  properties.insert(AttributeNameRegex);
   properties.insert(BatchSize);
   properties.insert(QueueBufferMaxTime);
   properties.insert(QueueBufferMaxSize);
@@ -122,6 +124,11 @@ void PublishKafka::onSchedule(core::ProcessContext 
*context, core::ProcessSessio
       logger_->log_error("PublishKafka: configure error result [%s]", errstr);
   }
   value = "";
+  if (context->getProperty(AttributeNameRegex.getName(), value) && 
!value.empty()) {
+    attributeNameRegex.assign(value);
+    logger_->log_debug("PublishKafka: AttributeNameRegex %s", value);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxSize.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
       valInt = valInt/1024;
       valueConf = std::to_string(valInt);
@@ -262,7 +269,7 @@ void PublishKafka::onTrigger(const 
std::shared_ptr<core::ProcessContext> &contex
   if (flowFile->getAttribute(KAFKA_KEY_ATTRIBUTE, value))
     kafkaKey = value;
 
-  PublishKafka::ReadCallback callback(flowFile->getSize(), max_seg_size_, 
kafkaKey, rkt_);
+  PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, rkt_, rk_, 
flowFile, attributeNameRegex);
   session->read(flowFile, &callback);
   if (callback.status_ < 0) {
     logger_->log_error("Failed to send flow to kafka topic %s", topic_);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/08f4a2b8/extensions/librdkafka/PublishKafka.h
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PublishKafka.h 
b/extensions/librdkafka/PublishKafka.h
index 65ed849..0814dd9 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -28,6 +28,7 @@
 #include "core/Property.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "rdkafka.h"
+#include <regex>
 
 namespace org {
 namespace apache {
@@ -83,6 +84,7 @@ public:
   static core::Property RequestTimeOut;
   static core::Property ClientName;
   static core::Property BatchSize;
+  static core::Property AttributeNameRegex;
   static core::Property QueueBufferMaxTime;
   static core::Property QueueBufferMaxSize;
   static core::Property QueueBufferMaxMessage;
@@ -101,12 +103,17 @@ public:
   // Nest Callback Class for read stream
   class ReadCallback: public InputStreamCallback {
   public:
-    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string 
&key, rd_kafka_topic_t *rkt) :
-        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
rkt_(rkt) {
+    ReadCallback(uint64_t max_seg_size, const std::string &key, 
rd_kafka_topic_t *rkt, rd_kafka_t *rk, const std::shared_ptr<core::FlowFile> 
&flowFile, const std::regex &attributeNameRegex)  :
+        max_seg_size_(max_seg_size), key_(key), rkt_(rkt), rk_(rk), 
flowFile_(flowFile), attributeNameRegex_(attributeNameRegex) {
+      flow_size_ = flowFile_->getSize();
       status_ = 0;
       read_size_ = 0;
+      hdrs = nullptr;
     }
     ~ReadCallback() {
+      if (hdrs) {
+        rd_kafka_headers_destroy(hdrs);
+      }
     }
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
       if (flow_size_ < max_seg_size_)
@@ -115,6 +122,17 @@ public:
       buffer.reserve(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
+      rd_kafka_resp_err_t err;
+
+      for (auto kv : flowFile_->getAttributes()) {
+        if (regex_match(kv.first, attributeNameRegex_)) {
+          if (!hdrs) {
+            hdrs = rd_kafka_headers_new(8);
+          }
+          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(),  
kv.second.c_str(), kv.second.size());
+        }
+      }
+
       while (read_size_ < flow_size_) {
         int readRet = stream->read(&buffer[0], max_seg_size_);
         if (readRet < 0) {
@@ -122,7 +140,17 @@ public:
           return read_size_;
         }
         if (readRet > 0) {
-          if (rd_kafka_produce(rkt_, RD_KAFKA_PARTITION_UA, 
RD_KAFKA_MSG_F_COPY, &buffer[0], readRet, key_.c_str(), key_.size(), NULL) == 
-1) {
+          if (hdrs) {
+            rd_kafka_headers_t *hdrs_copy;
+            hdrs_copy = rd_kafka_headers_copy(hdrs);
+            err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], 
readRet), RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), 
key_.size()), RD_KAFKA_V_END);
+            if (err) {
+              rd_kafka_headers_destroy(hdrs_copy);
+            }
+          } else {
+            err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], 
readRet), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END);
+          }
+          if (err) {
             status_ = -1;
             return read_size_;
           }
@@ -137,8 +165,12 @@ public:
     uint64_t max_seg_size_;
     std::string key_;
     rd_kafka_topic_t *rkt_;
+    rd_kafka_t *rk_;
+    rd_kafka_headers_t *hdrs;
+    std::shared_ptr<core::FlowFile> flowFile_;
     int status_;
     int read_size_;
+    std::regex attributeNameRegex_;
   };
 
 public:
@@ -167,6 +199,7 @@ private:
   rd_kafka_topic_t *rkt_;
   std::string topic_;
   uint64_t max_seg_size_;
+  std::regex attributeNameRegex;
 };
 
 REGISTER_RESOURCE (PublishKafka);

Reply via email to