szaszm commented on code in PR #1946:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1946#discussion_r2085024024


##########
docker/DockerVerify.sh:
##########
@@ -177,7 +177,7 @@ TEST_DIRECTORY="${docker_dir}/test/integration"
 export TEST_DIRECTORY
 
 # Add --no-logcapture to see logs interleaved with the test output
-BEHAVE_OPTS=(--logging-level INFO --parallel-processes 
"${_arg_parallel_processes}" --parallel-scheme feature -o 
"${PWD}/behavex_output" -t "${_arg_tags_to_run}")
+BEHAVE_OPTS=(--logging-level INFO --parallel-processes 
"${_arg_parallel_processes}" --parallel-scheme feature -o 
"${PWD}/behavex_output" -t "ENABLE_KAFKA")

Review Comment:
   is this intentionally replaced from arg to hardcoded? I don't really 
understand how this works or worked.



##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -274,97 +270,165 @@ std::vector<std::string> 
ConsumeKafka::get_matching_headers(const rd_kafka_messa
   }
   std::vector<std::string> matching_headers;
   for (std::size_t header_idx = 0;; ++header_idx) {
-    const char* value = nullptr;  // Not to be freed
+    const char *value = nullptr;  // Not to be freed

Review Comment:
   ```suggestion
       const char* value = nullptr;  // Not to be freed
   ```



##########
utils/include/utils/expected.h:
##########
@@ -229,3 +238,34 @@ auto try_expression(F&& action, Args&&... args) noexcept {
 }
 
 }  // namespace org::apache::nifi::minifi::utils
+
+#ifndef WIN32  // on windows this conflicts because nonstd::expected === 
std::expected
+// based on fmt::formatter<std::expected<T, E>, Char

Review Comment:
   ```suggestion
   // based on fmt::formatter<std::expected<T, E>, Char>
   ```



##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -160,30 +161,31 @@ void 
ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
   // If they are not set, offset reset is ignored and polling produces messages
   // Registering a rebalance_cb turns off librdkafka's automatic partition 
assignment/revocation and instead delegates that
   // responsibility to the application's rebalance_cb.
-  rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb);
+  if (commit_policy_ != 
consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) {
+    rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb);
+  }
 
   // Uncomment this for librdkafka debug logs:
   // logger_->log_info("Enabling all debug logs for kafka consumer.");
   // setKafkaConfigurationField(*conf_, "debug", "all");
 
   setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
 
-  setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_);
+  setKafkaConfigurationField(*conf_, "bootstrap.servers", 
utils::parseProperty(context, KafkaBrokers));
   setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true");
-  setKafkaConfigurationField(*conf_, "auto.offset.reset", offset_reset_);
+  setKafkaConfigurationField(*conf_,
+      "auto.offset.reset",
+      
std::string(magic_enum::enum_name(utils::parseEnumProperty<consume_kafka::OffsetResetPolicyEnum>(context,
 OffsetReset))));
   setKafkaConfigurationField(*conf_, "enable.auto.commit", "false");
-  setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false");
-  setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ? 
"read_committed" : "read_uncommitted");
-  setKafkaConfigurationField(*conf_, "group.id", group_id_);
-  setKafkaConfigurationField(*conf_, "session.timeout.ms", 
std::to_string(session_timeout_milliseconds_.count()));
+  setKafkaConfigurationField(*conf_, "enable.auto.offset.store", 
std::to_string(commit_policy_ == consume_kafka::CommitPolicyEnum::AutoCommit));

Review Comment:
   Shouldn't we enable `enable.auto.commit` when `commit_policy_ == AutoCommit`?



##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -201,32 +203,33 @@ void 
ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
   //
   // As far as I understand, instead of rd_kafka_position() an 
rd_kafka_committed() call if preferred here,
   // as it properly fetches offsets from the broker
-  if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_committed(consumer_.get(), 
kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) {
-    logger_->log_error("Retrieving committed offsets for topics+partitions 
failed.");
+  if (const auto retrieved_committed = rd_kafka_committed(consumer_.get(), 
kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS); 
RD_KAFKA_RESP_ERR_NO_ERROR != retrieved_committed) {
+    logger_->log_error("Retrieving committed offsets for topics+partitions 
failed {}: {}",
+      magic_enum::enum_underlying(retrieved_committed),
+      rd_kafka_err2str(retrieved_committed));
   }
 
-  rd_kafka_resp_err_t poll_set_consumer_response = 
rd_kafka_poll_set_consumer(consumer_.get());
-  if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) {
+  if (rd_kafka_resp_err_t poll_set_consumer_response = 
rd_kafka_poll_set_consumer(consumer_.get()); RD_KAFKA_RESP_ERR_NO_ERROR != 
poll_set_consumer_response) {
     logger_->log_error("rd_kafka_poll_set_consumer error {}: {}",
         magic_enum::enum_underlying(poll_set_consumer_response),
         rd_kafka_err2str(poll_set_consumer_response));
   }
 }
 
-std::string ConsumeKafka::extract_message(const rd_kafka_message_t& rkmessage) 
{
+std::string ConsumeKafka::extractMessage(const rd_kafka_message_t& rkmessage) {
   if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) {
     throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION,
         "ConsumeKafka: received error message from broker: " + 
std::to_string(rkmessage.err) + " " + rd_kafka_err2str(rkmessage.err));
   }
-  return {static_cast<char*>(rkmessage.payload), rkmessage.len};
+  return {static_cast<char *>(rkmessage.payload), rkmessage.len};

Review Comment:
   ```suggestion
     return {static_cast<char*>(rkmessage.payload), rkmessage.len};
   ```



##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -274,97 +270,165 @@ std::vector<std::string> 
ConsumeKafka::get_matching_headers(const rd_kafka_messa
   }
   std::vector<std::string> matching_headers;
   for (std::size_t header_idx = 0;; ++header_idx) {
-    const char* value = nullptr;  // Not to be freed
+    const char *value = nullptr;  // Not to be freed
     std::size_t size = 0;
     if (RD_KAFKA_RESP_ERR_NO_ERROR !=
-        rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(), 
reinterpret_cast<const void**>(&value), &size)) {
+        rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(), 
reinterpret_cast<const void **>(&value), &size)) {

Review Comment:
   ```suggestion
           rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(), 
reinterpret_cast<const void**>(&value), &size)) {
   ```



##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -274,97 +270,165 @@ std::vector<std::string> 
ConsumeKafka::get_matching_headers(const rd_kafka_messa
   }
   std::vector<std::string> matching_headers;
   for (std::size_t header_idx = 0;; ++header_idx) {
-    const char* value = nullptr;  // Not to be freed
+    const char *value = nullptr;  // Not to be freed
     std::size_t size = 0;
     if (RD_KAFKA_RESP_ERR_NO_ERROR !=
-        rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(), 
reinterpret_cast<const void**>(&value), &size)) {
+        rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(), 
reinterpret_cast<const void **>(&value), &size)) {
       break;
     }
-    if (size < 200) {
-      logger_->log_debug("{:.{}}", value, size);
-    } else {
-      logger_->log_debug("{:.{}}...", value, 200);
-    }
     matching_headers.emplace_back(value, size);
   }
   return matching_headers;
 }
 
-std::vector<std::pair<std::string, std::string>> 
ConsumeKafka::get_flowfile_attributes_from_message_header(const 
rd_kafka_message_t& message) const {
+std::vector<std::pair<std::string, std::string>> 
ConsumeKafka::getFlowFilesAttributesFromMessageHeaders(const 
rd_kafka_message_t& message) const {
+  gsl_Assert(headers_to_add_as_attributes_);
   std::vector<std::pair<std::string, std::string>> attributes_from_headers;
-  for (const std::string& header_name: headers_to_add_as_attributes_) {
+  for (const std::string& header_name: *headers_to_add_as_attributes_) {
     const std::vector<std::string> matching_headers = 
get_matching_headers(message, header_name);
     if (!matching_headers.empty()) {
       attributes_from_headers.emplace_back(header_name,
-          
utils::get_encoded_string(resolve_duplicate_headers(matching_headers), 
message_header_encoding_attr_to_enum()));
+          
utils::get_encoded_string(resolve_duplicate_headers(matching_headers), 
message_header_encoding_));
     }
   }
   return attributes_from_headers;
 }
 
-void ConsumeKafka::add_kafka_attributes_to_flowfile(core::FlowFile& flow_file, 
const rd_kafka_message_t& message) const {
-  // We do not currently support batching messages into a single flowfile
-  flow_file.setAttribute(KAFKA_COUNT_ATTR, "1");
-  if (const auto message_key = utils::get_encoded_message_key(message, 
key_attr_encoding_attr_to_enum())) {
-    flow_file.setAttribute(KAFKA_MESSAGE_KEY_ATTR, *message_key);
+void ConsumeKafka::addAttributesToSingleMessageFlowFile(core::FlowFile& 
flow_file, const rd_kafka_message_t& message) const {
+  flow_file.setAttribute(KafkaCountAttribute.name, "1");
+  if (const auto message_key = get_encoded_message_key(message, 
key_attribute_encoding_)) {
+    flow_file.setAttribute(KafkaKeyAttribute.name, *message_key);
+  }
+  flow_file.setAttribute(KafkaOffsetAttribute.name, 
std::to_string(message.offset));
+  flow_file.setAttribute(KafkaPartitionAttribute.name, 
std::to_string(message.partition));
+  flow_file.setAttribute(KafkaTopicAttribute.name, 
rd_kafka_topic_name(message.rkt));
+  if (headers_to_add_as_attributes_) {
+    for (const auto &[attr_key, attr_value]: 
getFlowFilesAttributesFromMessageHeaders(message)) { 
flow_file.setAttribute(attr_key, attr_value); }

Review Comment:
   ```suggestion
       for (const auto& [attr_key, attr_value]: 
getFlowFilesAttributesFromMessageHeaders(message)) { 
flow_file.setAttribute(attr_key, attr_value); }
   ```



##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -235,36 +238,29 @@ std::vector<utils::rd_kafka_message_unique_ptr> 
ConsumeKafka::poll_kafka_message
       logger_->log_error("Received message with error {}: {}", 
magic_enum::enum_underlying(message->err), rd_kafka_err2str(message->err));
       break;
     }
-    utils::print_kafka_message(*message, *logger_);
-    messages.emplace_back(std::move(message));
+    const std::string_view topic_name = rd_kafka_topic_name(message->rkt);
+    const int32_t partition = message->partition;
+
+    message_bundles[KafkaMessageLocation{std::string(topic_name), 
partition}].pushBack(std::move(message));
     elapsed = std::chrono::steady_clock::now() - start;
+    ++message_count;
   }
-  return messages;
-}
-
-utils::KafkaEncoding ConsumeKafka::key_attr_encoding_attr_to_enum() const {
-  if (utils::string::equalsIgnoreCase(key_attribute_encoding_, 
KEY_ATTR_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; }
-  if (utils::string::equalsIgnoreCase(key_attribute_encoding_, 
KEY_ATTR_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; }
-  throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Key Attribute 
Encoding\" property not recognized.");
-}
-
-utils::KafkaEncoding ConsumeKafka::message_header_encoding_attr_to_enum() 
const {
-  if (utils::string::equalsIgnoreCase(message_header_encoding_, 
MSG_HEADER_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; }
-  if (utils::string::equalsIgnoreCase(message_header_encoding_, 
MSG_HEADER_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; }
-  throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Message 
Header Encoding\" property not recognized.");
+  return message_bundles;
 }
 
 std::string ConsumeKafka::resolve_duplicate_headers(const 
std::vector<std::string>& matching_headers) const {
-  if (MSG_HEADER_KEEP_FIRST == duplicate_header_handling_) { return 
matching_headers.front(); }
-  if (MSG_HEADER_KEEP_LATEST == duplicate_header_handling_) { return 
matching_headers.back(); }
-  if (MSG_HEADER_COMMA_SEPARATED_MERGE == duplicate_header_handling_) { return 
utils::string::join(", ", matching_headers); }
-  throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Duplicate 
Header Handling\" property not recognized.");
+  switch (duplicate_header_handling_) {
+    case consume_kafka::MessageHeaderPolicyEnum::KEEP_FIRST: return 
matching_headers.front();
+    case consume_kafka::MessageHeaderPolicyEnum::KEEP_LATEST: return 
matching_headers.back();
+    case consume_kafka::MessageHeaderPolicyEnum::COMMA_SEPARATED_MERGE: return 
utils::string::join(", ", matching_headers);
+    default: throw Exception(PROCESSOR_EXCEPTION, "\"Duplicate Header 
Handling\" property not recognized.");
+  }
 }
 
 std::vector<std::string> ConsumeKafka::get_matching_headers(const 
rd_kafka_message_t& message, const std::string& header_name) const {
   // Headers fetched this way are freed when rd_kafka_message_destroy is called
   // Detaching them using rd_kafka_message_detach_headers does not seem to work
-  rd_kafka_headers_t* headers_raw = nullptr;
+  rd_kafka_headers_t *headers_raw = nullptr;

Review Comment:
   ```suggestion
     rd_kafka_headers_t* headers_raw = nullptr;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to