martinzink commented on code in PR #2175:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2175#discussion_r3241070961
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -41,33 +38,27 @@ struct
std::hash<org::apache::nifi::minifi::processors::ConsumeKafka::KafkaMessa
};
namespace org::apache::nifi::minifi::processors {
-// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog
would potentially start
-// reporting issues with the processor health otherwise
-bool consume_kafka::ConsumeKafkaMaxPollTimePropertyValidator::validate(const
std::string_view input) const {
Review Comment:
Removed this custom validator (because C Api only supports standard
validators ATM), and added a custom validation into onSchedule
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -180,15 +131,15 @@ void
ConsumeKafka::configureNewConnection(core::ProcessContext& context) {
setKafkaConfigurationField(*conf_, "enable.auto.offset.store",
std::to_string(commit_policy_ == consume_kafka::CommitPolicyEnum::AutoCommit));
setKafkaConfigurationField(*conf_, "isolation.level",
utils::parseBoolProperty(context, HonorTransactions) ? "read_committed" :
"read_uncommitted");
setKafkaConfigurationField(*conf_, "group.id", utils::parseProperty(context,
GroupID));
- setKafkaConfigurationField(*conf_, "client.id", this->getUUIDStr());
+ // setKafkaConfigurationField(*conf_, "client.id", client_id); No need to
set id since its autogenerated, and we don't access it anywhere from minifi
Review Comment:
We dont have access to the processor's UUID, but this seems unnessary
anyways, any other ideas are welcome :D
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -76,40 +67,9 @@ void ConsumeKafka::onSchedule(core::ProcessContext& context,
core::ProcessSessio
configureNewConnection(context);
if (commit_policy_ ==
consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) {
setTriggerWhenEmpty(true);
- } else if (context.hasIncomingConnections()) {
- logger_->log_error("Incoming connections are not allowed with {}",
magic_enum::enum_name(commit_policy_));
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Incoming
connections are not allowed with {}", magic_enum::enum_name(commit_policy_)));
- }
-}
-
-namespace {
-void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger,
rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) {
Review Comment:
I've moved this rebalence_cb into a helper class thats available through an
opaque handle (needed for logging which is not available anymore through statis
functions)
https://github.com/apache/nifi-minifi-cpp/pull/2175/changes#diff-93908b3b2326640601fa590e78c35c046654a1ac90c6bf70dcd7dd1bcf0d2c5cR73
##########
extensions/kafka/KafkaConnection.cpp:
##########
@@ -89,37 +80,4 @@ void KafkaConnection::putTopic(const std::string& topicName,
const std::shared_p
topics_[topicName] = topic;
}
-void KafkaConnection::logCallback(const rd_kafka_t* rk, const int level, const
char* /*fac*/, const char* buf) {
Review Comment:
This is also been moved into KafkaOpaque class
https://github.com/apache/nifi-minifi-cpp/pull/2175/changes#diff-93908b3b2326640601fa590e78c35c046654a1ac90c6bf70dcd7dd1bcf0d2c5cR46
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]