lordgamez commented on a change in pull request #1076:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1076#discussion_r647485533



##########
File path: extensions/librdkafka/ConsumeKafka.cpp
##########
@@ -359,25 +362,6 @@ void ConsumeKafka::configure_new_connection(const 
core::ProcessContext& context)
   if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) {
     logger_->log_error("rd_kafka_poll_set_consumer error %d: %s", 
poll_set_consumer_response, rd_kafka_err2str(poll_set_consumer_response));
   }
-
-  // There is no rd_kafka_seek alternative for 
rd_kafka_topic_partition_list_t, only rd_kafka_topic_t
-  // rd_kafka_topic_partition_list_set_offset should reset the offsets to the 
latest (or whatever is set in the config),
-  // Also, rd_kafka_committed should also fetch and set latest the latest 
offset
-  // In reality, neither of them seem to work (not even with calling 
rd_kafka_position())
-  logger_->log_info("Resetting offset manually.");
-  while (true) {
-    std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>
-        message_wrapper{ rd_kafka_consumer_poll(consumer_.get(), 
max_poll_time_milliseconds_.count()), utils::rd_kafka_message_deleter() };
-
-    if (!message_wrapper || RD_KAFKA_RESP_ERR_NO_ERROR != 
message_wrapper->err) {
-      break;
-    }
-    utils::print_kafka_message(*message_wrapper, *logger_);
-    // Commit offsets on broker for the provided list of partitions
-    logger_->log_info("Committing offset: %" PRId64 ".", 
message_wrapper->offset);
-    rd_kafka_commit_message(consumer_.get(), message_wrapper.get(), /* async = 
*/ 0);
-  }
-  logger_->log_info("Done resetting offset manually.");

Review comment:
       I could not find out the reason for that. It seems to always set the 
offset to the latest message in the kafka topic, which contradicts the 
`OffsetReset` property. When I tested it manually with the `earliest` offset 
set it still retrieved the messages published to the Kafka broker after the 
Minifi agent was restarted. I tried to ask Adam Hunyadi about this 
functionality, but I did not get a reply about this in e-mail. To me it seems 
to be unneeded and faulty after testing the functionality.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to