adamdebreceni commented on a change in pull request #811:
URL: https://github.com/apache/nifi-minifi-cpp/pull/811#discussion_r439985423



##########
File path: extensions/librdkafka/PublishKafka.h
##########
@@ -233,9 +233,13 @@ class PublishKafka : public core::Processor {
 
       const gsl::owner<rd_kafka_headers_t*> hdrs_copy = 
rd_kafka_headers_copy(hdrs.get());
       const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
-                                         RD_KAFKA_V_HEADERS(hdrs_copy), 
RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
-      if (err) {
-        // the message only takes ownership of the headers in case of success
+                                         RD_KAFKA_V_HEADERS(hdrs_copy), 
RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
+      if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {

Review comment:
       👍

##########
File path: extensions/librdkafka/PublishKafka.h
##########
@@ -233,9 +233,13 @@ class PublishKafka : public core::Processor {
 
       const gsl::owner<rd_kafka_headers_t*> hdrs_copy = 
rd_kafka_headers_copy(hdrs.get());
       const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
-                                         RD_KAFKA_V_HEADERS(hdrs_copy), 
RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
-      if (err) {
-        // the message only takes ownership of the headers in case of success
+                                         RD_KAFKA_V_HEADERS(hdrs_copy), 
RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
+      if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
+        // in case of failure, messageDeliveryCallback is not called and 
callback_ptr will delete the callback
+        // in case of success, messageDeliveryCallback takes ownership of the 
callback, so we no longer need to delete it

Review comment:
       I'm not familiar with this processor, I assume messageDeliveryCallback 
is async called, could we have a scenario where the messageDeliveryCallback is 
not eventually called?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to