arpadboda commented on a change in pull request #651: MINIFICPP-1033 - 
PublishKafka sends success flow even though kafka se…
URL: https://github.com/apache/nifi-minifi-cpp/pull/651#discussion_r326586028
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.cpp
 ##########
 @@ -345,6 +345,21 @@ void PublishKafka::onTrigger(const 
std::shared_ptr<core::ProcessContext> &contex
       }
     }
 
+    // Check if brokers are down, if yes then yield.
+    const struct rd_kafka_metadata *metadata;
+    /* Fetch metadata */
+    // TODO: What is the time complexity of this??
+    rd_kafka_resp_err_t err = rd_kafka_metadata(conn->getConnection(), 0, 
nullptr,
+                            &metadata, 500);
+    if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+      logger_->log_error("Failed to acquire metadata: %s\n", 
rd_kafka_err2str(err));
+      session->transfer(flowFile, Failure);
 
 Review comment:
   Honestly I'm not sure what's the right thing to do here.
   
   By default I would prefer to yield and not to do anything in case the 
operation cannot be performed, although broker and topic are flowfile-dependent 
values here, so we have to handle invalid values nicely. 
   Maybe instead of routing to failure I would consider penalizing the 
flowfile. 
   
   With penalization we give 2nd chance to transmit in case of connection 
errors or any other issues at the remote side, although we are going to keep 
flowfiles with invalid attributes in our queue forever. 
   
   Optionally we could add a map<uuid, uint> to count the penalization for 
flowfiles are route to failure after a threshold is exceeded.
   
   @bakaid what do you think? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to