lordgamez commented on code in PR #2175:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2175#discussion_r3356958754


##########
extensions/kafka/KafkaProcessorBase.cpp:
##########
@@ -16,22 +16,24 @@
  */
 #include "KafkaProcessorBase.h"
 
-#include "minifi-cpp/controllers/SSLContextServiceInterface.h"
+#include "api/utils/ProcessorConfigUtils.h"
 #include "rdkafka_utils.h"
-#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
 
-std::optional<utils::net::SslData> 
KafkaProcessorBase::getSslData(core::ProcessContext& context) const {
-  return utils::net::getSslData(context, SSLContextService, logger_);
+KafkaProcessorBase::KafkaProcessorBase(core::ProcessorMetadata metadata) : 
ProcessorImpl(std::move(metadata)), kafka_opaque_(*logger_) {
 }
 
-void 
KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& 
context, gsl::not_null<rd_kafka_conf_t*> config) {
-  security_protocol_ = 
utils::parseEnumProperty<kafka::SecurityProtocolOption>(context, 
SecurityProtocol);
+std::optional<api::utils::net::SslData> 
KafkaProcessorBase::getSslData(api::core::ProcessContext& context) const {

Review Comment:
   Could this function be removed and use std::expected instead of the optional 
in the kafka processors?



##########
extensions/kafka/rdkafka_utils.h:
##########
@@ -87,18 +85,28 @@ template<typename T>
 void kafka_headers_for_each(const rd_kafka_headers_t& headers, T 
key_value_handle) {
   const char* key = nullptr;  // Null terminated, not to be freed
   const void* value = nullptr;
-  std::size_t size;
+  std::size_t size = 0;
   for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == 
rd_kafka_header_get_all(&headers, i, &key, &value, &size); ++i) {
     key_value_handle(std::string(key), std::span<const char>(static_cast<const 
char*>(value), size));
   }
 }
 
 void setKafkaConfigurationField(rd_kafka_conf_t& configuration, const 
std::string& field_name, const std::string& value);
-void print_topics_list(core::logging::Logger& logger, const 
rd_kafka_topic_partition_list_t& kf_topic_partition_list);
 void print_kafka_message(const rd_kafka_message_t& rkmessage, 
core::logging::Logger& logger);
 std::string get_encoded_string(const std::string& input, KafkaEncoding 
encoding);
 std::optional<std::string> get_encoded_message_key(const rd_kafka_message_t& 
message, KafkaEncoding encoding);
 
+class KafkaOpaque {
+ public:
+  explicit KafkaOpaque(core::logging::Logger& logger) : logger_(logger) {}
+
+  void print_topics_list(const rd_kafka_topic_partition_list_t& 
kf_topic_partition_list) const;
+  static void logCallback(const rd_kafka_t* rk, int level, const char* 
/*fac*/, const char* buf);
+  static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, 
rd_kafka_topic_partition_list_t* partitions, void* opaque_ptr);

Review Comment:
   The method naming style seems to be inconsistent, could we rename these 
either all to be snake_case or camelCase?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to