Copilot commented on code in PR #2175:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2175#discussion_r3242728654
##########
libminifi/src/minifi-c.cpp:
##########
@@ -613,19 +613,21 @@ void
MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, Min
}
}
-MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext*
process_context, MinifiStringView controller_service_name,
+MinifiStatus MinifiProcessContextGetSslDataFromProperty(MinifiProcessContext*
process_context, MinifiStringView property_name,
void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx)
{
gsl_Assert(process_context != MINIFI_NULL);
try {
const auto context =
reinterpret_cast<minifi::core::ProcessContext*>(process_context);
- const auto name_str = std::string{toStringView(controller_service_name)};
- const auto service_shared_ptr = context->getControllerService(name_str,
context->getProcessorInfo().getUUID());
+ const auto property_name_str = std::string{toStringView(property_name)};
+ const auto name_str = context->getProperty(property_name_str, nullptr);
+ if (!name_str) { return MINIFI_STATUS_PROPERTY_NOT_SET; }
+ const auto service_shared_ptr = context->getControllerService(*name_str,
context->getProcessorInfo().getUUID());
if (!service_shared_ptr) { return MINIFI_STATUS_VALIDATION_FAILED; }
if (const auto ssl_context_service =
dynamic_cast<minifi::controllers::SSLContextServiceInterface*>(service_shared_ptr.get()))
{
- const std::string ca_cert_file =
ssl_context_service->getCACertificate().string();
- const std::string passphrase = ssl_context_service->getPassphrase();
- const std::string cert_file =
ssl_context_service->getCertificateFile().string();
- const std::string private_key_file =
ssl_context_service->getPrivateKeyFile().string();
+ const std::string ca_cert_file =
ssl_context_service->getCACertificate().string();
+ const std::string passphrase = ssl_context_service->getPassphrase();
+ const std::string cert_file =
ssl_context_service->getCertificateFile().string();
+ const std::string private_key_file =
ssl_context_service->getPrivateKeyFile().string();
Review Comment:
The body of the `if (ssl_context_service)` block is not indented, which
makes the control flow harder to read and looks like an accidental formatting
regression. Please indent the statements inside the block consistently with the
surrounding style.
##########
extensions/kafka/rdkafka_utils.cpp:
##########
@@ -20,6 +20,7 @@
#include <array>
#include "minifi-cpp/Exception.h"
+#include "minifi-cpp/core/logging/Logger.h"
#include "utils/StringUtils.h"
Review Comment:
This file uses `std::vector` but doesn't include `<vector>` directly (it
currently relies on transitive includes from other headers). Please add an
explicit `#include <vector>` here to avoid fragile build dependencies.
##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -41,33 +38,27 @@ struct
std::hash<org::apache::nifi::minifi::processors::ConsumeKafka::KafkaMessa
};
namespace org::apache::nifi::minifi::processors {
-// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog
would potentially start
-// reporting issues with the processor health otherwise
-bool consume_kafka::ConsumeKafkaMaxPollTimePropertyValidator::validate(const
std::string_view input) const {
- const auto parsed_time =
parsing::parseDurationMinMax<std::chrono::nanoseconds>(input, 0ms, 4s);
- return parsed_time.has_value();
-}
-
-void ConsumeKafka::initialize() {
- setSupportedProperties(Properties);
- setSupportedRelationships(Relationships);
-}
-void ConsumeKafka::onSchedule(core::ProcessContext& context,
core::ProcessSessionFactory&) {
+MinifiStatus ConsumeKafka::onScheduleImpl(api::core::ProcessContext& context) {
+ using utils::KafkaEncoding;
+ namespace utils = api::utils;
// Required properties
- topic_names_ = utils::string::splitAndTrim(utils::parseProperty(context,
TopicNames), ",");
+ topic_names_ =
minifi::utils::string::splitAndTrim(utils::parseProperty(context, TopicNames),
",");
topic_name_format_ =
utils::parseEnumProperty<consume_kafka::TopicNameFormatEnum>(context,
TopicNameFormat);
commit_policy_ =
utils::parseEnumProperty<consume_kafka::CommitPolicyEnum>(context,
CommitPolicy);
- key_attribute_encoding_ =
utils::parseEnumProperty<utils::KafkaEncoding>(context, KeyAttributeEncoding);
- message_header_encoding_ =
utils::parseEnumProperty<utils::KafkaEncoding>(context, MessageHeaderEncoding);
+ key_attribute_encoding_ = utils::parseEnumProperty<KafkaEncoding>(context,
KeyAttributeEncoding);
+ message_header_encoding_ = utils::parseEnumProperty<KafkaEncoding>(context,
MessageHeaderEncoding);
duplicate_header_handling_ =
utils::parseEnumProperty<consume_kafka::MessageHeaderPolicyEnum>(context,
DuplicateHeaderHandling);
max_poll_time_milliseconds_ = utils::parseDurationProperty(context,
MaxPollTime);
+ if (max_poll_time_milliseconds_ > 4s) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "MaxPollTime is too large (it
should be less than 4s)");
Review Comment:
The schedule exception text says MaxPollTime "should be less than 4s", but
the check allows exactly 4s (`> 4s`). Please update the error message to match
the actual constraint (e.g., "must be <= 4s" / "must not exceed 4s").
##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -158,12 +159,13 @@ class ReadCallback {
});
}
- static utils::rd_kafka_headers_unique_ptr make_headers(const core::FlowFile&
flow_file,
+ static utils::rd_kafka_headers_unique_ptr make_headers(const
api::core::FlowFile& flow_file,
+ const api::core::ProcessSession& session,
const std::optional<utils::Regex>& attribute_name_regex) {
utils::rd_kafka_headers_unique_ptr result{rd_kafka_headers_new(8)};
if (!result) { throw std::bad_alloc{}; }
- for (const auto& [attribute_key, attribute_value]:
flow_file.getAttributes()) {
+ for (const auto& [attribute_key, attribute_value]:
session.getAttributes(flow_file)) {
Review Comment:
`session.getAttributes(flow_file)` returns a `std::map` by value, so this
loop copies all attributes just to build headers. This can be a significant
overhead for large FlowFiles / many attributes. Consider adding/using an
attribute-iteration API (or another zero-copy view) so headers can be built
without copying the entire attribute map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]