arpadboda commented on a change in pull request #712: MINIFICPP-1047 Add 
property "Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r368581719
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,150 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& 
flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    rd_kafka_headers_s* make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), 
kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned 
char>& buffer, const size_t buflen) const {
+      rd_kafka_resp_err_t err{};
+
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, 
segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, 
rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? 
MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, 
const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        rd_kafka_headers_t * const hdrs_copy = rd_kafka_headers_copy(hdrs);
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
+                                RD_KAFKA_V_HEADERS(hdrs_copy), 
RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+      } else {
+        err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+      return err;
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? 
flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex)
+    { }
 
     ~ReadCallback() {
       if (hdrs) {
         rd_kafka_headers_destroy(hdrs);
       }
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       rd_kafka_resp_err_t err;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), 
kv.second.c_str(), kv.second.size());
-        }
-      }
+      called_ = true;
 
       size_t segment_num = 0U;
+
+      assert(flow_size_ == 0 || max_seg_size_ != 0 && "at this point, 
max_seg_size_ is only zero if flow_size_ is zero");
 
 Review comment:
   Same still applies: I hardly compile code in debug mode, so I don't think 
this assertion would even warn me even if I touch the processor code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to