fgerlits commented on code in PR #1885:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1885#discussion_r1820314800


##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -406,139 +354,123 @@ void PublishKafka::notifyStop() {
     // After onTrigger finishes, we can take connection_mutex_ and close the 
connection without needing to wait for message finishes/timeouts in onTrigger.
     // A possible new onTrigger between our critical sections won't produce 
more messages because we set interrupted_ = true above.
     std::lock_guard<std::mutex> lock(messages_mutex_);
-    for (auto& messages : messages_set_) {
-      messages->interrupt();
-    }
+    for (auto& messages: messages_set_) { messages->interrupt(); }
   }
   std::lock_guard<std::mutex> conn_lock(connection_mutex_);
   conn_.reset();
 }
 
-
 bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
-  std::string value;
-  int64_t valInt = 0;
-  std::string valueConf;
-  std::array<char, 512U> errstr{};
+  std::array<char, 512U> err_chars{};
   rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
-  const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: 
";
+  constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error 
result: ";
 
-  std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ 
rd_kafka_conf_new() };
-  if (conf_ == nullptr) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create 
rd_kafka_conf_t object");
-  }
+  utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
+  if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed 
to create rd_kafka_conf_t object"); }
 
   const auto* const key = conn_->getKey();
 
-  if (key->brokers_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
-  }
-  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", 
key->brokers_.c_str(), errstr.data(), errstr.size());
+  if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"There are no brokers"); }
+  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", 
key->brokers_.c_str(), err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, errstr.data());
+    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
-  if (key->client_id_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
-  }
-  result = rd_kafka_conf_set(conf_.get(), "client.id", 
key->client_id_.c_str(), errstr.data(), errstr.size());
+  if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"Client id is empty"); }
+  result = rd_kafka_conf_set(conf_.get(), "client.id", 
key->client_id_.c_str(), err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
   if (result != RD_KAFKA_CONF_OK) {
-    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, errstr.data());
+    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
-  value = "";
-  if (context.getProperty(DebugContexts, value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "debug", value.c_str(), 
errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: debug [{}]", value);
+  if (auto debug_contexts = context.getProperty(DebugContexts); debug_contexts 
&& !debug_contexts->empty()) {
+    result = rd_kafka_conf_set(conf_.get(), "debug", debug_contexts->c_str(), 
err_chars.data(), err_chars.size());
+    logger_->log_debug("PublishKafka: debug [{}]", *debug_contexts);
     if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
+      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  value = "";
-  if (context.getProperty(MaxMessageSize, value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: message.max.bytes [{}]", value);
+  if (auto max_message_size = context.getProperty(MaxMessageSize); 
max_message_size && !max_message_size->empty()) {
+    result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", 
max_message_size->c_str(), err_chars.data(), err_chars.size());
+    logger_->log_debug("PublishKafka: message.max.bytes [{}]", 
max_message_size);
     if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
+      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  uint32_t int_val = 0;
-  if (context.getProperty(QueueBufferMaxMessage, int_val)) {
-    if (int_val < batch_size_) {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: 
Batch Size cannot be larger than Queue Max Message");
-    }
 
-    value = std::to_string(int_val);
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", 
value);
+  if (auto queue_buffer_max_message = 
context.getProperty<uint32_t>(QueueBufferMaxMessage)) {
+    if (*queue_buffer_max_message < batch_size_) { throw 
Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot 
be larger than Queue Max Message"); }
+
+    auto queue_buffer_max_message_str = 
std::to_string(*queue_buffer_max_message);
+    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", 
queue_buffer_max_message_str.c_str(), err_chars.data(), err_chars.size());
+    logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", 
queue_buffer_max_message_str);
     if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
+      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context.getProperty(QueueBufferMaxSize, value) && !value.empty() && 
core::Property::StringToInt(value, valInt)) {
-    valInt = valInt / 1024;
-    valueConf = std::to_string(valInt);
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", 
valueConf.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", 
valueConf);
+
+  if (auto queue_buffer_max_size = 
context.getProperty<core::DataSizeValue>(QueueBufferMaxSize)) {
+    auto queue_buffer_max_size_kb = queue_buffer_max_size->getValue() / 1024;
+    auto queue_buffer_max_size_kb_str = 
std::to_string(queue_buffer_max_size_kb);
+    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", 
queue_buffer_max_size_kb_str.c_str(), err_chars.data(), err_chars.size());
+    logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", 
queue_buffer_max_size_kb_str);
     if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
+      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
   if (auto queue_buffer_max_time = 
context.getProperty<core::TimePeriodValue>(QueueBufferMaxTime)) {
-    valueConf = 
std::to_string(queue_buffer_max_time->getMilliseconds().count());
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", 
valueConf.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf);
+    auto queue_buffer_max_time_ms_str = 
std::to_string(queue_buffer_max_time->getMilliseconds().count());
+    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", 
queue_buffer_max_time_ms_str.c_str(), err_chars.data(), err_chars.size());
+    logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", 
queue_buffer_max_time_ms_str);
     if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
+      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context.getProperty(BatchSize, value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: batch.num.messages [{}]", value);
+
+  if (auto batch_size = context.getProperty(BatchSize); batch_size && 
!batch_size->empty()) {
+    result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", 
batch_size->c_str(), err_chars.data(), err_chars.size());
+    logger_->log_debug("PublishKafka: batch.num.messages [{}]", *batch_size);
     if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
+      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context.getProperty(CompressCodec, value) && !value.empty() && value != 
"none") {
-    result = rd_kafka_conf_set(conf_.get(), "compression.codec", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: compression.codec [{}]", value);
+
+  if (auto compress_codec = 
utils::parseOptionalEnumProperty<CompressionCodecEnum>(context, CompressCodec)) 
{

Review Comment:
   we need to check that the value is not "none"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to