szaszm commented on code in PR #1946:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1946#discussion_r2085024024
##########
docker/DockerVerify.sh:
##########
@@ -177,7 +177,7 @@ TEST_DIRECTORY="${docker_dir}/test/integration"
export TEST_DIRECTORY
# Add --no-logcapture to see logs interleaved with the test output
-BEHAVE_OPTS=(--logging-level INFO --parallel-processes
"${_arg_parallel_processes}" --parallel-scheme feature -o
"${PWD}/behavex_output" -t "${_arg_tags_to_run}")
+BEHAVE_OPTS=(--logging-level INFO --parallel-processes
"${_arg_parallel_processes}" --parallel-scheme feature -o
"${PWD}/behavex_output" -t "ENABLE_KAFKA")
Review Comment:
is this intentionally replaced from arg to hardcoded? I don't really
understand how this works or worked.
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -274,97 +270,165 @@ std::vector<std::string>
ConsumeKafka::get_matching_headers(const rd_kafka_messa
}
std::vector<std::string> matching_headers;
for (std::size_t header_idx = 0;; ++header_idx) {
- const char* value = nullptr; // Not to be freed
+ const char *value = nullptr; // Not to be freed
Review Comment:
```suggestion
const char* value = nullptr; // Not to be freed
```
##########
utils/include/utils/expected.h:
##########
@@ -229,3 +238,34 @@ auto try_expression(F&& action, Args&&... args) noexcept {
}
} // namespace org::apache::nifi::minifi::utils
+
+#ifndef WIN32 // on windows this conflicts because nonstd::expected ===
std::expected
+// based on fmt::formatter<std::expected<T, E>, Char
Review Comment:
```suggestion
// based on fmt::formatter<std::expected<T, E>, Char>
```
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -160,30 +161,31 @@ void
ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
// If they are not set, offset reset is ignored and polling produces messages
// Registering a rebalance_cb turns off librdkafka's automatic partition
assignment/revocation and instead delegates that
// responsibility to the application's rebalance_cb.
- rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb);
+ if (commit_policy_ !=
consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) {
+ rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb);
+ }
// Uncomment this for librdkafka debug logs:
// logger_->log_info("Enabling all debug logs for kafka consumer.");
// setKafkaConfigurationField(*conf_, "debug", "all");
setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
- setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_);
+ setKafkaConfigurationField(*conf_, "bootstrap.servers",
utils::parseProperty(context, KafkaBrokers));
setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true");
- setKafkaConfigurationField(*conf_, "auto.offset.reset", offset_reset_);
+ setKafkaConfigurationField(*conf_,
+ "auto.offset.reset",
+
std::string(magic_enum::enum_name(utils::parseEnumProperty<consume_kafka::OffsetResetPolicyEnum>(context,
OffsetReset))));
setKafkaConfigurationField(*conf_, "enable.auto.commit", "false");
- setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false");
- setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ?
"read_committed" : "read_uncommitted");
- setKafkaConfigurationField(*conf_, "group.id", group_id_);
- setKafkaConfigurationField(*conf_, "session.timeout.ms",
std::to_string(session_timeout_milliseconds_.count()));
+ setKafkaConfigurationField(*conf_, "enable.auto.offset.store",
std::to_string(commit_policy_ == consume_kafka::CommitPolicyEnum::AutoCommit));
Review Comment:
Shouldn't we enable `enable.auto.commit` when `commit_policy_ == AutoCommit`?
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -201,32 +203,33 @@ void
ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
//
// As far as I understand, instead of rd_kafka_position() an
rd_kafka_committed() call if preferred here,
// as it properly fetches offsets from the broker
- if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_committed(consumer_.get(),
kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) {
- logger_->log_error("Retrieving committed offsets for topics+partitions
failed.");
+ if (const auto retrieved_committed = rd_kafka_committed(consumer_.get(),
kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS);
RD_KAFKA_RESP_ERR_NO_ERROR != retrieved_committed) {
+ logger_->log_error("Retrieving committed offsets for topics+partitions
failed {}: {}",
+ magic_enum::enum_underlying(retrieved_committed),
+ rd_kafka_err2str(retrieved_committed));
}
- rd_kafka_resp_err_t poll_set_consumer_response =
rd_kafka_poll_set_consumer(consumer_.get());
- if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) {
+ if (rd_kafka_resp_err_t poll_set_consumer_response =
rd_kafka_poll_set_consumer(consumer_.get()); RD_KAFKA_RESP_ERR_NO_ERROR !=
poll_set_consumer_response) {
logger_->log_error("rd_kafka_poll_set_consumer error {}: {}",
magic_enum::enum_underlying(poll_set_consumer_response),
rd_kafka_err2str(poll_set_consumer_response));
}
}
-std::string ConsumeKafka::extract_message(const rd_kafka_message_t& rkmessage)
{
+std::string ConsumeKafka::extractMessage(const rd_kafka_message_t& rkmessage) {
if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION,
"ConsumeKafka: received error message from broker: " +
std::to_string(rkmessage.err) + " " + rd_kafka_err2str(rkmessage.err));
}
- return {static_cast<char*>(rkmessage.payload), rkmessage.len};
+ return {static_cast<char *>(rkmessage.payload), rkmessage.len};
Review Comment:
```suggestion
return {static_cast<char*>(rkmessage.payload), rkmessage.len};
```
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -274,97 +270,165 @@ std::vector<std::string>
ConsumeKafka::get_matching_headers(const rd_kafka_messa
}
std::vector<std::string> matching_headers;
for (std::size_t header_idx = 0;; ++header_idx) {
- const char* value = nullptr; // Not to be freed
+ const char *value = nullptr; // Not to be freed
std::size_t size = 0;
if (RD_KAFKA_RESP_ERR_NO_ERROR !=
- rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(),
reinterpret_cast<const void**>(&value), &size)) {
+ rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(),
reinterpret_cast<const void **>(&value), &size)) {
Review Comment:
```suggestion
rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(),
reinterpret_cast<const void**>(&value), &size)) {
```
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -274,97 +270,165 @@ std::vector<std::string>
ConsumeKafka::get_matching_headers(const rd_kafka_messa
}
std::vector<std::string> matching_headers;
for (std::size_t header_idx = 0;; ++header_idx) {
- const char* value = nullptr; // Not to be freed
+ const char *value = nullptr; // Not to be freed
std::size_t size = 0;
if (RD_KAFKA_RESP_ERR_NO_ERROR !=
- rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(),
reinterpret_cast<const void**>(&value), &size)) {
+ rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(),
reinterpret_cast<const void **>(&value), &size)) {
break;
}
- if (size < 200) {
- logger_->log_debug("{:.{}}", value, size);
- } else {
- logger_->log_debug("{:.{}}...", value, 200);
- }
matching_headers.emplace_back(value, size);
}
return matching_headers;
}
-std::vector<std::pair<std::string, std::string>>
ConsumeKafka::get_flowfile_attributes_from_message_header(const
rd_kafka_message_t& message) const {
+std::vector<std::pair<std::string, std::string>>
ConsumeKafka::getFlowFilesAttributesFromMessageHeaders(const
rd_kafka_message_t& message) const {
+ gsl_Assert(headers_to_add_as_attributes_);
std::vector<std::pair<std::string, std::string>> attributes_from_headers;
- for (const std::string& header_name: headers_to_add_as_attributes_) {
+ for (const std::string& header_name: *headers_to_add_as_attributes_) {
const std::vector<std::string> matching_headers =
get_matching_headers(message, header_name);
if (!matching_headers.empty()) {
attributes_from_headers.emplace_back(header_name,
-
utils::get_encoded_string(resolve_duplicate_headers(matching_headers),
message_header_encoding_attr_to_enum()));
+
utils::get_encoded_string(resolve_duplicate_headers(matching_headers),
message_header_encoding_));
}
}
return attributes_from_headers;
}
-void ConsumeKafka::add_kafka_attributes_to_flowfile(core::FlowFile& flow_file,
const rd_kafka_message_t& message) const {
- // We do not currently support batching messages into a single flowfile
- flow_file.setAttribute(KAFKA_COUNT_ATTR, "1");
- if (const auto message_key = utils::get_encoded_message_key(message,
key_attr_encoding_attr_to_enum())) {
- flow_file.setAttribute(KAFKA_MESSAGE_KEY_ATTR, *message_key);
+void ConsumeKafka::addAttributesToSingleMessageFlowFile(core::FlowFile&
flow_file, const rd_kafka_message_t& message) const {
+ flow_file.setAttribute(KafkaCountAttribute.name, "1");
+ if (const auto message_key = get_encoded_message_key(message,
key_attribute_encoding_)) {
+ flow_file.setAttribute(KafkaKeyAttribute.name, *message_key);
+ }
+ flow_file.setAttribute(KafkaOffsetAttribute.name,
std::to_string(message.offset));
+ flow_file.setAttribute(KafkaPartitionAttribute.name,
std::to_string(message.partition));
+ flow_file.setAttribute(KafkaTopicAttribute.name,
rd_kafka_topic_name(message.rkt));
+ if (headers_to_add_as_attributes_) {
+ for (const auto &[attr_key, attr_value]:
getFlowFilesAttributesFromMessageHeaders(message)) {
flow_file.setAttribute(attr_key, attr_value); }
Review Comment:
```suggestion
for (const auto& [attr_key, attr_value]:
getFlowFilesAttributesFromMessageHeaders(message)) {
flow_file.setAttribute(attr_key, attr_value); }
```
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -235,36 +238,29 @@ std::vector<utils::rd_kafka_message_unique_ptr>
ConsumeKafka::poll_kafka_message
logger_->log_error("Received message with error {}: {}",
magic_enum::enum_underlying(message->err), rd_kafka_err2str(message->err));
break;
}
- utils::print_kafka_message(*message, *logger_);
- messages.emplace_back(std::move(message));
+ const std::string_view topic_name = rd_kafka_topic_name(message->rkt);
+ const int32_t partition = message->partition;
+
+ message_bundles[KafkaMessageLocation{std::string(topic_name),
partition}].pushBack(std::move(message));
elapsed = std::chrono::steady_clock::now() - start;
+ ++message_count;
}
- return messages;
-}
-
-utils::KafkaEncoding ConsumeKafka::key_attr_encoding_attr_to_enum() const {
- if (utils::string::equalsIgnoreCase(key_attribute_encoding_,
KEY_ATTR_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; }
- if (utils::string::equalsIgnoreCase(key_attribute_encoding_,
KEY_ATTR_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; }
- throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Key Attribute
Encoding\" property not recognized.");
-}
-
-utils::KafkaEncoding ConsumeKafka::message_header_encoding_attr_to_enum()
const {
- if (utils::string::equalsIgnoreCase(message_header_encoding_,
MSG_HEADER_ENCODING_UTF_8)) { return utils::KafkaEncoding::UTF8; }
- if (utils::string::equalsIgnoreCase(message_header_encoding_,
MSG_HEADER_ENCODING_HEX)) { return utils::KafkaEncoding::HEX; }
- throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Message
Header Encoding\" property not recognized.");
+ return message_bundles;
}
std::string ConsumeKafka::resolve_duplicate_headers(const
std::vector<std::string>& matching_headers) const {
- if (MSG_HEADER_KEEP_FIRST == duplicate_header_handling_) { return
matching_headers.front(); }
- if (MSG_HEADER_KEEP_LATEST == duplicate_header_handling_) { return
matching_headers.back(); }
- if (MSG_HEADER_COMMA_SEPARATED_MERGE == duplicate_header_handling_) { return
utils::string::join(", ", matching_headers); }
- throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Duplicate
Header Handling\" property not recognized.");
+ switch (duplicate_header_handling_) {
+ case consume_kafka::MessageHeaderPolicyEnum::KEEP_FIRST: return
matching_headers.front();
+ case consume_kafka::MessageHeaderPolicyEnum::KEEP_LATEST: return
matching_headers.back();
+ case consume_kafka::MessageHeaderPolicyEnum::COMMA_SEPARATED_MERGE: return
utils::string::join(", ", matching_headers);
+ default: throw Exception(PROCESSOR_EXCEPTION, "\"Duplicate Header
Handling\" property not recognized.");
+ }
}
std::vector<std::string> ConsumeKafka::get_matching_headers(const
rd_kafka_message_t& message, const std::string& header_name) const {
// Headers fetched this way are freed when rd_kafka_message_destroy is called
// Detaching them using rd_kafka_message_detach_headers does not seem to work
- rd_kafka_headers_t* headers_raw = nullptr;
+ rd_kafka_headers_t *headers_raw = nullptr;
Review Comment:
```suggestion
rd_kafka_headers_t* headers_raw = nullptr;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]