bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property
"Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r373458007
##########
File path: extensions/librdkafka/PublishKafka.h
##########
@@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
// Nest Callback Class for read stream
class ReadCallback : public InputStreamCallback {
+ void allocate_message_object(const size_t segment_num) const {
+ messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult&
flow_file) {
+ // allocate message object to be filled in by the callback in produce()
+ if (flow_file.messages.size() < segment_num + 1) {
+ flow_file.messages.resize(segment_num + 1);
+ }
+ });
+ }
+
+ utils::owner<rd_kafka_headers_s*> make_headers() const {
+ rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+ for (const auto& kv : flowFile_->getAttributes()) {
+ if(attributeNameRegex_.match(kv.first)) {
+ rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(),
kv.second.c_str(), kv.second.size());
+ }
+ }
+ return result;
+ }
+
+ rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned
char>& buffer, const size_t buflen) const {
+ const auto messages_copy = this->messages_;
+ const auto flow_file_index_copy = this->flow_file_index_;
+ const auto produce_callback = [messages_copy, flow_file_index_copy,
segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+ messages_copy->modifyResult(flow_file_index_copy, [segment_num,
rkmessage](FlowFileResult &flow_file) {
+ auto &message = flow_file.messages.at(segment_num);
+ message.err_code = rkmessage->err;
+ message.status = message.err_code == 0 ?
MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+ });
+ };
+ // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+ auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*,
const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+ allocate_message_object(segment_num);
+
+ if (hdrs) {
+ const utils::owner<rd_kafka_headers_t*> hdrs_copy =
rd_kafka_headers_copy(hdrs);
+ const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_),
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(),
buflen),
+ RD_KAFKA_V_HEADERS(hdrs_copy),
RD_KAFKA_V_KEY(key_.c_str(), key_.size()),
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+ if (err) {
+ // the message only takes ownership of the headers in case of success
+ rd_kafka_headers_destroy(hdrs_copy);
+ }
+ return err;
+ } else {
+ return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_),
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(),
buflen),
+ RD_KAFKA_V_KEY(key_.c_str(), key_.size()),
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+ }
+ }
+
public:
- ReadCallback(uint64_t max_seg_size,
- const std::string &key,
- rd_kafka_topic_t *rkt,
- rd_kafka_t *rk,
- const std::shared_ptr<core::FlowFile> flowFile,
+ ReadCallback(const uint64_t max_seg_size,
+ std::string key,
+ rd_kafka_topic_t * const rkt,
+ rd_kafka_t * const rk,
+ std::shared_ptr<core::FlowFile> flowFile,
utils::Regex &attributeNameRegex,
std::shared_ptr<Messages> messages,
- size_t flow_file_index)
- : max_seg_size_(max_seg_size),
- key_(key),
+ const size_t flow_file_index,
+ const bool fail_empty_flow_files)
+ : flowFile_(std::move(flowFile)),
+ flow_size_(flowFile_->getSize()),
+ max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ?
flow_size_ : max_seg_size),
+ key_(std::move(key)),
rkt_(rkt),
rk_(rk),
- flowFile_(flowFile),
+ hdrs(make_headers()),
messages_(std::move(messages)),
flow_file_index_(flow_file_index),
- attributeNameRegex_(attributeNameRegex) {
- flow_size_ = flowFile_->getSize();
- status_ = 0;
- read_size_ = 0;
- hdrs = nullptr;
- }
+ status_(0),
+ read_size_(0),
+ attributeNameRegex_(attributeNameRegex),
+ fail_empty_flow_files_(fail_empty_flow_files)
+ { }
~ReadCallback() {
- if (hdrs) {
- rd_kafka_headers_destroy(hdrs);
- }
+ rd_kafka_headers_destroy(hdrs);
Review comment:
I am not a big fan of unnecessarily checking for nullptr in cases where it
really does not make sense, like before `free`.
In this case, however `rd_kafka_headers_destroy` will segfault if it gets a
nullptr:
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_header.c#L36,
and generically, I like more defensiveness around third party APIs.
Even if at the current state of the code we can be sure that hdrs won't be
nullptr, a later refactor might broke that implicit assumption and cause an
issue here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services