Copilot commented on code in PR #2175:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2175#discussion_r3241127224


##########
extensions/kafka/KafkaProcessorBase.h:
##########
@@ -35,12 +35,12 @@ enum class SecurityProtocolOption { plaintext, ssl, 
sasl_plaintext, sasl_ssl };
 enum class SASLMechanismOption { GSSAPI, PLAIN };
 }  // namespace kafka
 
-class KafkaProcessorBase : public core::ProcessorImpl {
+class KafkaProcessorBase : public api::core::ProcessorImpl {
  public:
   EXTENSIONAPI static constexpr auto SSLContextService =
       core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service")
           .withDescription("SSL Context Service Name")
-          .withAllowedTypes<minifi::controllers::SSLContextServiceInterface>()
+          .withAllowedType<MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE>()
           .build();

Review Comment:
   `MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE` is referenced here but this 
header does not include `minifi-c/minifi-c.h` (where the macro is defined). 
This makes compilation depend on transitive include order and will fail in 
translation units that include this header first. Include the defining header 
(or replace the macro with a C++ constant/string literal in a dedicated header).



##########
extensions/kafka/KafkaProcessorBase.cpp:
##########
@@ -16,18 +16,25 @@
  */
 #include "KafkaProcessorBase.h"
 
-#include "minifi-cpp/controllers/SSLContextServiceInterface.h"
+#include "api/utils/ProcessorConfigUtils.h"
 #include "rdkafka_utils.h"
-#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
 
-std::optional<utils::net::SslData> 
KafkaProcessorBase::getSslData(core::ProcessContext& context) const {
-  return utils::net::getSslData(context, SSLContextService, logger_);
+KafkaProcessorBase::KafkaProcessorBase(core::ProcessorMetadata metadata) : 
ProcessorImpl(std::move(metadata)), kafka_opaque_(*logger_) {
 }
 
-void 
KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& 
context, gsl::not_null<rd_kafka_conf_t*> config) {
-  security_protocol_ = 
utils::parseEnumProperty<kafka::SecurityProtocolOption>(context, 
SecurityProtocol);
+std::optional<api::utils::net::SslData> 
KafkaProcessorBase::getSslData(api::core::ProcessContext& context) const {
+  const auto controller_service_name = 
api::utils::parseOptionalProperty(context, SSLContextService);
+  if (!controller_service_name) {
+    return std::nullopt;
+  }
+
+  return context.getSslData(*controller_service_name) | utils::toOptional();

Review Comment:
   `CffiProcessContext::getSslData` now calls 
`MinifiProcessContextGetSslDataFromProperty`, which expects a *property name*; 
however this code parses the SSL Context Service property value and passes that 
*controller service name* to `context.getSslData(...)`. This will cause the C 
API call to look up a property named like the controller service id and 
typically return PROPERTY_NOT_SET, breaking SSL configuration for Kafka 
processors. Pass the property name (e.g. `SSLContextService.name`) to 
`getSslData` or adjust the API to accept controller service names again.
   



##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -386,147 +383,147 @@ void PublishKafka::notifyStop() {
   conn_.reset();
 }
 
-bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
+void PublishKafka::configureNewConnection(api::core::ProcessContext& context) {
   std::array<char, 512U> err_chars{};
   rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
   constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error 
result: ";
 
-  utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
-  if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed 
to create rd_kafka_conf_t object"); }
+  utils::rd_kafka_conf_unique_ptr conf{rd_kafka_conf_new()};
+  if (conf == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed 
to create rd_kafka_conf_t object"); }
+
+  rd_kafka_conf_set_opaque(conf.get(), &kafka_opaque_);
 
   const auto* const key = conn_->getKey();
 
   if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"There are no brokers"); }
-  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", 
key->brokers_.c_str(), err_chars.data(), err_chars.size());
+  result = rd_kafka_conf_set(conf.get(), "bootstrap.servers", 
key->brokers_.c_str(), err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
     auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
   if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"Client id is empty"); }
-  result = rd_kafka_conf_set(conf_.get(), "client.id", 
key->client_id_.c_str(), err_chars.data(), err_chars.size());
+  result = rd_kafka_conf_set(conf.get(), "client.id", key->client_id_.c_str(), 
err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
   if (result != RD_KAFKA_CONF_OK) {
     auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
-  if (const auto debug_context = context.getProperty(DebugContexts)) {
-    result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(), 
err_chars.data(), err_chars.size());
+  if (const auto debug_context = context.getProperty(DebugContexts, nullptr)) {
+    result = rd_kafka_conf_set(conf.get(), "debug", debug_context->c_str(), 
err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: debug [{}]", *debug_context);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto max_message_size = context.getProperty(MaxMessageSize); 
max_message_size && !max_message_size->empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", 
max_message_size->c_str(), err_chars.data(), err_chars.size());
+  if (const auto max_message_size = context.getProperty(MaxMessageSize, 
nullptr); max_message_size && !max_message_size->empty()) {
+    result = rd_kafka_conf_set(conf.get(), "message.max.bytes", 
max_message_size->c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: message.max.bytes [{}]", 
*max_message_size);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto queue_buffer_max_message = 
utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
+  if (const auto queue_buffer_max_message = 
api::utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
     if (*queue_buffer_max_message < batch_size_) {
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: 
Batch Size cannot be larger than Queue Max Message");
     }
 
     const auto value = std::to_string(*queue_buffer_max_message);
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", 
value.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.messages", 
value.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", 
value);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto queue_buffer_max_size = 
utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
+  if (const auto queue_buffer_max_size = 
api::utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
     auto valInt = *queue_buffer_max_size / 1024;
     auto valueConf = std::to_string(valInt);
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", 
valueConf.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.kbytes", 
valueConf.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", 
valueConf);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto queue_buffer_max_time = 
utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
+  if (const auto queue_buffer_max_time = 
api::utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
     auto valueConf = std::to_string(queue_buffer_max_time->count());
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", 
valueConf.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.ms", 
valueConf.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto batch_size = utils::parseOptionalU64Property(context, 
BatchSize)) {
+  if (const auto batch_size = api::utils::parseOptionalU64Property(context, 
BatchSize)) {
     auto value = std::to_string(*batch_size);
-    result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", 
value.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "batch.num.messages", 
value.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: batch.num.messages [{}]", value);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto compress_codec = context.getProperty(CompressCodec); 
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
-    result = rd_kafka_conf_set(conf_.get(), "compression.codec", 
compress_codec->c_str(), err_chars.data(), err_chars.size());
+  if (const auto compress_codec = context.getProperty(CompressCodec, nullptr); 
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
+    result = rd_kafka_conf_set(conf.get(), "compression.codec", 
compress_codec->c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: compression.codec [{}]", 
*compress_codec);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
+  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf.get()));
 
   // Add all the dynamic properties as librdkafka configurations
-  const auto& dynamic_prop_keys = context.getDynamicPropertyKeys();
-  logger_->log_info("PublishKafka registering {} librdkafka dynamic 
properties", dynamic_prop_keys.size());
-
-  for (const auto& prop_key : dynamic_prop_keys) {
-    if (const auto dynamic_property_value = 
context.getDynamicProperty(prop_key, nullptr); dynamic_property_value && 
!dynamic_property_value->empty()) {
-      logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", 
prop_key, *dynamic_property_value);
-      result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), 
dynamic_property_value->c_str(), err_chars.data(), err_chars.size());
-      if (result != RD_KAFKA_CONF_OK) {
-        auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-      }
-    } else {
+
+  std::optional<std::string> error_during_on_dyn_props = std::nullopt;
+  for (auto [prop_key, prop_value] : context.getDynamicProperties(nullptr)) {
+    if (prop_value.empty()) {
       logger_->log_warn(
-          "PublishKafka Dynamic Property '{}' is empty and therefore will not "
-          "be configured",
+          "PublishKafka Dynamic Property '{}' is empty and therefore will not 
be configured",
           prop_key);
+      return;

Review Comment:
   This branch returns from `configureNewConnection` when encountering an empty 
dynamic property value. Since `configureNewConnection` is responsible for 
finishing producer configuration, returning here silently leaves the connection 
unconfigured and will cause later failures that are harder to diagnose. 
Consider either skipping empty dynamic properties (continue) or throwing a 
schedule exception so the processor fails fast with a clear reason.
   



##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -386,147 +383,147 @@ void PublishKafka::notifyStop() {
   conn_.reset();
 }
 
-bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
+void PublishKafka::configureNewConnection(api::core::ProcessContext& context) {
   std::array<char, 512U> err_chars{};
   rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
   constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error 
result: ";
 
-  utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
-  if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed 
to create rd_kafka_conf_t object"); }
+  utils::rd_kafka_conf_unique_ptr conf{rd_kafka_conf_new()};
+  if (conf == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed 
to create rd_kafka_conf_t object"); }
+
+  rd_kafka_conf_set_opaque(conf.get(), &kafka_opaque_);
 
   const auto* const key = conn_->getKey();
 
   if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"There are no brokers"); }
-  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", 
key->brokers_.c_str(), err_chars.data(), err_chars.size());
+  result = rd_kafka_conf_set(conf.get(), "bootstrap.servers", 
key->brokers_.c_str(), err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
     auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
   if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"Client id is empty"); }
-  result = rd_kafka_conf_set(conf_.get(), "client.id", 
key->client_id_.c_str(), err_chars.data(), err_chars.size());
+  result = rd_kafka_conf_set(conf.get(), "client.id", key->client_id_.c_str(), 
err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
   if (result != RD_KAFKA_CONF_OK) {
     auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
-  if (const auto debug_context = context.getProperty(DebugContexts)) {
-    result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(), 
err_chars.data(), err_chars.size());
+  if (const auto debug_context = context.getProperty(DebugContexts, nullptr)) {
+    result = rd_kafka_conf_set(conf.get(), "debug", debug_context->c_str(), 
err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: debug [{}]", *debug_context);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto max_message_size = context.getProperty(MaxMessageSize); 
max_message_size && !max_message_size->empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", 
max_message_size->c_str(), err_chars.data(), err_chars.size());
+  if (const auto max_message_size = context.getProperty(MaxMessageSize, 
nullptr); max_message_size && !max_message_size->empty()) {
+    result = rd_kafka_conf_set(conf.get(), "message.max.bytes", 
max_message_size->c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: message.max.bytes [{}]", 
*max_message_size);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto queue_buffer_max_message = 
utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
+  if (const auto queue_buffer_max_message = 
api::utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
     if (*queue_buffer_max_message < batch_size_) {
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: 
Batch Size cannot be larger than Queue Max Message");
     }
 
     const auto value = std::to_string(*queue_buffer_max_message);
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", 
value.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.messages", 
value.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", 
value);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto queue_buffer_max_size = 
utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
+  if (const auto queue_buffer_max_size = 
api::utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
     auto valInt = *queue_buffer_max_size / 1024;
     auto valueConf = std::to_string(valInt);
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", 
valueConf.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.kbytes", 
valueConf.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", 
valueConf);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto queue_buffer_max_time = 
utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
+  if (const auto queue_buffer_max_time = 
api::utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
     auto valueConf = std::to_string(queue_buffer_max_time->count());
-    result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", 
valueConf.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.ms", 
valueConf.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto batch_size = utils::parseOptionalU64Property(context, 
BatchSize)) {
+  if (const auto batch_size = api::utils::parseOptionalU64Property(context, 
BatchSize)) {
     auto value = std::to_string(*batch_size);
-    result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", 
value.c_str(), err_chars.data(), err_chars.size());
+    result = rd_kafka_conf_set(conf.get(), "batch.num.messages", 
value.c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: batch.num.messages [{}]", value);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  if (const auto compress_codec = context.getProperty(CompressCodec); 
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
-    result = rd_kafka_conf_set(conf_.get(), "compression.codec", 
compress_codec->c_str(), err_chars.data(), err_chars.size());
+  if (const auto compress_codec = context.getProperty(CompressCodec, nullptr); 
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
+    result = rd_kafka_conf_set(conf.get(), "compression.codec", 
compress_codec->c_str(), err_chars.data(), err_chars.size());
     logger_->log_debug("PublishKafka: compression.codec [{}]", 
*compress_codec);
     if (result != RD_KAFKA_CONF_OK) {
       auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
+  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf.get()));
 
   // Add all the dynamic properties as librdkafka configurations
-  const auto& dynamic_prop_keys = context.getDynamicPropertyKeys();
-  logger_->log_info("PublishKafka registering {} librdkafka dynamic 
properties", dynamic_prop_keys.size());
-
-  for (const auto& prop_key : dynamic_prop_keys) {
-    if (const auto dynamic_property_value = 
context.getDynamicProperty(prop_key, nullptr); dynamic_property_value && 
!dynamic_property_value->empty()) {
-      logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", 
prop_key, *dynamic_property_value);
-      result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), 
dynamic_property_value->c_str(), err_chars.data(), err_chars.size());
-      if (result != RD_KAFKA_CONF_OK) {
-        auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-      }
-    } else {
+
+  std::optional<std::string> error_during_on_dyn_props = std::nullopt;
+  for (auto [prop_key, prop_value] : context.getDynamicProperties(nullptr)) {
+    if (prop_value.empty()) {
       logger_->log_warn(
-          "PublishKafka Dynamic Property '{}' is empty and therefore will not "
-          "be configured",
+          "PublishKafka Dynamic Property '{}' is empty and therefore will not 
be configured",
           prop_key);
+      return;
+    }
+    logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", 
prop_key, prop_value);
+    result = rd_kafka_conf_set(conf.get(), prop_key.data(), prop_value.data(), 
err_chars.data(), err_chars.size());
+    if (result != RD_KAFKA_CONF_OK) {
+      error_during_on_dyn_props = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     }
   }
+  if (error_during_on_dyn_props) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, *error_during_on_dyn_props);
+; }

Review Comment:
   There is a stray `;` before the closing brace (`; }`) which appears to be a 
syntax error and will prevent compilation. Remove the extra semicolon and 
format the block normally.
   



##########
minifi-api/minifi-c-api.def:
##########
@@ -29,4 +29,4 @@ EXPORTS
   MinifiProcessSessionGetFlowFileSize
   MinifiProcessSessionGetFlowFileId
   MinifiProcessContextGetDynamicProperties

Review Comment:
   The .def export list drops `MinifiProcessContextGetSslData` in favor of 
`MinifiProcessContextGetSslDataFromProperty`, which will break existing Windows 
extensions linked against the previous symbol unless the API version is bumped 
or a compatibility export is kept. Consider exporting both (old as wrapper) 
during a deprecation period.
   



##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -158,12 +159,12 @@ class ReadCallback {
     });
   }
 
-  static utils::rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& 
flow_file,
+  static utils::rd_kafka_headers_unique_ptr make_headers(const 
api::core::FlowFile& flow_file,
+      const api::core::ProcessSession& session,
       const std::optional<utils::Regex>& attribute_name_regex) {
     utils::rd_kafka_headers_unique_ptr result{rd_kafka_headers_new(8)};
-    if (!result) { throw std::bad_alloc{}; }
 
-    for (const auto& [attribute_key, attribute_value]: 
flow_file.getAttributes()) {
+    for (const auto& [attribute_key, attribute_value]: 
session.getAttributes(flow_file)) {
       if (attribute_name_regex && utils::regexMatch(attribute_key, 
*attribute_name_regex)) {
         rd_kafka_header_add(result.get(),
             attribute_key.c_str(),

Review Comment:
   `rd_kafka_headers_new(8)` can return nullptr on allocation failure, but 
`result` is dereferenced unconditionally via `result.get()` in 
`rd_kafka_header_add`. Add a null check and fail fast (throw/return an error) 
to avoid a crash on OOM.



##########
minifi-api/include/minifi-c/minifi-c.h:
##########
@@ -280,7 +280,7 @@ typedef struct MinifiSslData {
   MinifiStringView passphrase;
 } MinifiSslData;
 
-MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* 
process_context, MinifiStringView controller_service_name,
+MinifiStatus MinifiProcessContextGetSslDataFromProperty(MinifiProcessContext* 
process_context, MinifiStringView property_name,
     void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx);

Review Comment:
   Renaming the exported C API entry point from 
`MinifiProcessContextGetSslData` to 
`MinifiProcessContextGetSslDataFromProperty` removes the old symbol from the 
public header without a `MINIFI_API_VERSION` bump or compatibility shim. This 
breaks binary compatibility for extensions compiled against API v3 that still 
link to the old name. Either keep the old function as a deprecated 
wrapper/alias, or bump `MINIFI_API_VERSION` and document the breaking change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to