bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property 
"Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r373458007
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& 
flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    utils::owner<rd_kafka_headers_s*> make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), 
kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned 
char>& buffer, const size_t buflen) const {
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, 
segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, 
rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? 
MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, 
const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        const utils::owner<rd_kafka_headers_t*> hdrs_copy = 
rd_kafka_headers_copy(hdrs);
+        const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
+                                           RD_KAFKA_V_HEADERS(hdrs_copy), 
RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+        return err;
+      } else {
+        return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index,
+                 const bool fail_empty_flow_files)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? 
flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex),
+          fail_empty_flow_files_(fail_empty_flow_files)
+    { }
 
     ~ReadCallback() {
-      if (hdrs) {
-        rd_kafka_headers_destroy(hdrs);
-      }
+      rd_kafka_headers_destroy(hdrs);
 
 Review comment:
   I am not a big fan of unnecessarily checking for nullptr in cases where it 
really does not make sense, like before `free`.
   In this case, however `rd_kafka_headers_destroy` will segfault if it gets a 
nullptr: 
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_header.c#L36, 
and generically, I like more defensiveness around third party APIs.
   Even if at the current state of the code we can be sure that hdrs won't be 
nullptr, a later refactor might broke that implicit assumption and cause an 
issue here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to