Copilot commented on code in PR #2175:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2175#discussion_r3242728654


##########
libminifi/src/minifi-c.cpp:
##########
@@ -613,19 +613,21 @@ void 
MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, Min
   }
 }
 
-MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* 
process_context, MinifiStringView controller_service_name,
+MinifiStatus MinifiProcessContextGetSslDataFromProperty(MinifiProcessContext* 
process_context, MinifiStringView property_name,
     void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx) 
{
   gsl_Assert(process_context != MINIFI_NULL);
   try {
     const auto context = 
reinterpret_cast<minifi::core::ProcessContext*>(process_context);
-    const auto name_str = std::string{toStringView(controller_service_name)};
-    const auto service_shared_ptr = context->getControllerService(name_str, 
context->getProcessorInfo().getUUID());
+    const auto property_name_str = std::string{toStringView(property_name)};
+    const auto name_str = context->getProperty(property_name_str, nullptr);
+    if (!name_str) { return MINIFI_STATUS_PROPERTY_NOT_SET; }
+    const auto service_shared_ptr = context->getControllerService(*name_str, 
context->getProcessorInfo().getUUID());
     if (!service_shared_ptr) { return MINIFI_STATUS_VALIDATION_FAILED; }
     if (const auto ssl_context_service = 
dynamic_cast<minifi::controllers::SSLContextServiceInterface*>(service_shared_ptr.get()))
 {
-      const std::string ca_cert_file = 
ssl_context_service->getCACertificate().string();
-      const std::string passphrase = ssl_context_service->getPassphrase();
-      const std::string cert_file = 
ssl_context_service->getCertificateFile().string();
-      const std::string private_key_file = 
ssl_context_service->getPrivateKeyFile().string();
+    const std::string ca_cert_file = 
ssl_context_service->getCACertificate().string();
+    const std::string passphrase = ssl_context_service->getPassphrase();
+    const std::string cert_file = 
ssl_context_service->getCertificateFile().string();
+    const std::string private_key_file = 
ssl_context_service->getPrivateKeyFile().string();

Review Comment:
   The body of the `if (ssl_context_service)` block is not indented, which 
makes the control flow harder to read and looks like an accidental formatting 
regression. Please indent the statements inside the block consistently with the 
surrounding style.
   



##########
extensions/kafka/rdkafka_utils.cpp:
##########
@@ -20,6 +20,7 @@
 #include <array>
 
 #include "minifi-cpp/Exception.h"
+#include "minifi-cpp/core/logging/Logger.h"
 #include "utils/StringUtils.h"
 

Review Comment:
   This file uses `std::vector` but doesn't include `<vector>` directly (it 
currently relies on transitive includes from other headers). Please add an 
explicit `#include <vector>` here to avoid fragile build dependencies.



##########
extensions/kafka/ConsumeKafka.cpp:
##########
@@ -41,33 +38,27 @@ struct 
std::hash<org::apache::nifi::minifi::processors::ConsumeKafka::KafkaMessa
 };
 
 namespace org::apache::nifi::minifi::processors {
-// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog 
would potentially start
-// reporting issues with the processor health otherwise
-bool consume_kafka::ConsumeKafkaMaxPollTimePropertyValidator::validate(const 
std::string_view input) const {
-  const auto parsed_time = 
parsing::parseDurationMinMax<std::chrono::nanoseconds>(input, 0ms, 4s);
-  return parsed_time.has_value();
-}
-
-void ConsumeKafka::initialize() {
-  setSupportedProperties(Properties);
-  setSupportedRelationships(Relationships);
-}
 
-void ConsumeKafka::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+MinifiStatus ConsumeKafka::onScheduleImpl(api::core::ProcessContext& context) {
+  using utils::KafkaEncoding;
+  namespace utils = api::utils;
   // Required properties
-  topic_names_ = utils::string::splitAndTrim(utils::parseProperty(context, 
TopicNames), ",");
+  topic_names_ = 
minifi::utils::string::splitAndTrim(utils::parseProperty(context, TopicNames), 
",");
   topic_name_format_ = 
utils::parseEnumProperty<consume_kafka::TopicNameFormatEnum>(context, 
TopicNameFormat);
   commit_policy_ = 
utils::parseEnumProperty<consume_kafka::CommitPolicyEnum>(context, 
CommitPolicy);
-  key_attribute_encoding_ = 
utils::parseEnumProperty<utils::KafkaEncoding>(context, KeyAttributeEncoding);
-  message_header_encoding_ = 
utils::parseEnumProperty<utils::KafkaEncoding>(context, MessageHeaderEncoding);
+  key_attribute_encoding_ = utils::parseEnumProperty<KafkaEncoding>(context, 
KeyAttributeEncoding);
+  message_header_encoding_ = utils::parseEnumProperty<KafkaEncoding>(context, 
MessageHeaderEncoding);
   duplicate_header_handling_ = 
utils::parseEnumProperty<consume_kafka::MessageHeaderPolicyEnum>(context, 
DuplicateHeaderHandling);
   max_poll_time_milliseconds_ = utils::parseDurationProperty(context, 
MaxPollTime);
+  if (max_poll_time_milliseconds_ > 4s) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "MaxPollTime is too large (it 
should be less than 4s)");

Review Comment:
   The schedule exception text says MaxPollTime "should be less than 4s", but 
the check allows exactly 4s (`> 4s`). Please update the error message to match 
the actual constraint (e.g., "must be <= 4s" / "must not exceed 4s").
   



##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -158,12 +159,13 @@ class ReadCallback {
     });
   }
 
-  static utils::rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& 
flow_file,
+  static utils::rd_kafka_headers_unique_ptr make_headers(const 
api::core::FlowFile& flow_file,
+      const api::core::ProcessSession& session,
       const std::optional<utils::Regex>& attribute_name_regex) {
     utils::rd_kafka_headers_unique_ptr result{rd_kafka_headers_new(8)};
     if (!result) { throw std::bad_alloc{}; }
 
-    for (const auto& [attribute_key, attribute_value]: 
flow_file.getAttributes()) {
+    for (const auto& [attribute_key, attribute_value]: 
session.getAttributes(flow_file)) {

Review Comment:
   `session.getAttributes(flow_file)` returns a `std::map` by value, so this 
loop copies all attributes just to build headers. This can be a significant 
overhead for large FlowFiles / many attributes. Consider adding/using an 
attribute-iteration API (or another zero-copy view) so headers can be built 
without copying the entire attribute map.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to