Copilot commented on code in PR #2175:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2175#discussion_r3241127224
##########
extensions/kafka/KafkaProcessorBase.h:
##########
@@ -35,12 +35,12 @@ enum class SecurityProtocolOption { plaintext, ssl,
sasl_plaintext, sasl_ssl };
enum class SASLMechanismOption { GSSAPI, PLAIN };
} // namespace kafka
-class KafkaProcessorBase : public core::ProcessorImpl {
+class KafkaProcessorBase : public api::core::ProcessorImpl {
public:
EXTENSIONAPI static constexpr auto SSLContextService =
core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service")
.withDescription("SSL Context Service Name")
- .withAllowedTypes<minifi::controllers::SSLContextServiceInterface>()
+ .withAllowedType<MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE>()
.build();
Review Comment:
`MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE` is referenced here but this
header does not include `minifi-c/minifi-c.h` (where the macro is defined).
This makes compilation depend on transitive include order and will fail in
translation units that include this header first. Include the defining header
(or replace the macro with a C++ constant/string literal in a dedicated header).
##########
extensions/kafka/KafkaProcessorBase.cpp:
##########
@@ -16,18 +16,25 @@
*/
#include "KafkaProcessorBase.h"
-#include "minifi-cpp/controllers/SSLContextServiceInterface.h"
+#include "api/utils/ProcessorConfigUtils.h"
#include "rdkafka_utils.h"
-#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
-std::optional<utils::net::SslData>
KafkaProcessorBase::getSslData(core::ProcessContext& context) const {
- return utils::net::getSslData(context, SSLContextService, logger_);
+KafkaProcessorBase::KafkaProcessorBase(core::ProcessorMetadata metadata) :
ProcessorImpl(std::move(metadata)), kafka_opaque_(*logger_) {
}
-void
KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext&
context, gsl::not_null<rd_kafka_conf_t*> config) {
- security_protocol_ =
utils::parseEnumProperty<kafka::SecurityProtocolOption>(context,
SecurityProtocol);
+std::optional<api::utils::net::SslData>
KafkaProcessorBase::getSslData(api::core::ProcessContext& context) const {
+ const auto controller_service_name =
api::utils::parseOptionalProperty(context, SSLContextService);
+ if (!controller_service_name) {
+ return std::nullopt;
+ }
+
+ return context.getSslData(*controller_service_name) | utils::toOptional();
Review Comment:
`CffiProcessContext::getSslData` now calls
`MinifiProcessContextGetSslDataFromProperty`, which expects a *property name*;
however this code parses the SSL Context Service property value and passes that
*controller service name* to `context.getSslData(...)`. This will cause the C
API call to look up a property named like the controller service id and
typically return PROPERTY_NOT_SET, breaking SSL configuration for Kafka
processors. Pass the property name (e.g. `SSLContextService.name`) to
`getSslData` or adjust the API to accept controller service names again.
##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -386,147 +383,147 @@ void PublishKafka::notifyStop() {
conn_.reset();
}
-bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
+void PublishKafka::configureNewConnection(api::core::ProcessContext& context) {
std::array<char, 512U> err_chars{};
rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error
result: ";
- utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
- if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed
to create rd_kafka_conf_t object"); }
+ utils::rd_kafka_conf_unique_ptr conf{rd_kafka_conf_new()};
+ if (conf == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed
to create rd_kafka_conf_t object"); }
+
+ rd_kafka_conf_set_opaque(conf.get(), &kafka_opaque_);
const auto* const key = conn_->getKey();
if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION,
"There are no brokers"); }
- result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers",
key->brokers_.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "bootstrap.servers",
key->brokers_.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION,
"Client id is empty"); }
- result = rd_kafka_conf_set(conf_.get(), "client.id",
key->client_id_.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "client.id", key->client_id_.c_str(),
err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
- if (const auto debug_context = context.getProperty(DebugContexts)) {
- result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(),
err_chars.data(), err_chars.size());
+ if (const auto debug_context = context.getProperty(DebugContexts, nullptr)) {
+ result = rd_kafka_conf_set(conf.get(), "debug", debug_context->c_str(),
err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: debug [{}]", *debug_context);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto max_message_size = context.getProperty(MaxMessageSize);
max_message_size && !max_message_size->empty()) {
- result = rd_kafka_conf_set(conf_.get(), "message.max.bytes",
max_message_size->c_str(), err_chars.data(), err_chars.size());
+ if (const auto max_message_size = context.getProperty(MaxMessageSize,
nullptr); max_message_size && !max_message_size->empty()) {
+ result = rd_kafka_conf_set(conf.get(), "message.max.bytes",
max_message_size->c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: message.max.bytes [{}]",
*max_message_size);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto queue_buffer_max_message =
utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
+ if (const auto queue_buffer_max_message =
api::utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
if (*queue_buffer_max_message < batch_size_) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration:
Batch Size cannot be larger than Queue Max Message");
}
const auto value = std::to_string(*queue_buffer_max_message);
- result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages",
value.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.messages",
value.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]",
value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto queue_buffer_max_size =
utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
+ if (const auto queue_buffer_max_size =
api::utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
auto valInt = *queue_buffer_max_size / 1024;
auto valueConf = std::to_string(valInt);
- result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes",
valueConf.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.kbytes",
valueConf.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]",
valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto queue_buffer_max_time =
utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
+ if (const auto queue_buffer_max_time =
api::utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
auto valueConf = std::to_string(queue_buffer_max_time->count());
- result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms",
valueConf.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.ms",
valueConf.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto batch_size = utils::parseOptionalU64Property(context,
BatchSize)) {
+ if (const auto batch_size = api::utils::parseOptionalU64Property(context,
BatchSize)) {
auto value = std::to_string(*batch_size);
- result = rd_kafka_conf_set(conf_.get(), "batch.num.messages",
value.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "batch.num.messages",
value.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: batch.num.messages [{}]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto compress_codec = context.getProperty(CompressCodec);
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
- result = rd_kafka_conf_set(conf_.get(), "compression.codec",
compress_codec->c_str(), err_chars.data(), err_chars.size());
+ if (const auto compress_codec = context.getProperty(CompressCodec, nullptr);
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
+ result = rd_kafka_conf_set(conf.get(), "compression.codec",
compress_codec->c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: compression.codec [{}]",
*compress_codec);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
+ setKafkaAuthenticationParameters(context, gsl::make_not_null(conf.get()));
// Add all the dynamic properties as librdkafka configurations
- const auto& dynamic_prop_keys = context.getDynamicPropertyKeys();
- logger_->log_info("PublishKafka registering {} librdkafka dynamic
properties", dynamic_prop_keys.size());
-
- for (const auto& prop_key : dynamic_prop_keys) {
- if (const auto dynamic_property_value =
context.getDynamicProperty(prop_key, nullptr); dynamic_property_value &&
!dynamic_property_value->empty()) {
- logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]",
prop_key, *dynamic_property_value);
- result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(),
dynamic_property_value->c_str(), err_chars.data(), err_chars.size());
- if (result != RD_KAFKA_CONF_OK) {
- auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
- }
- } else {
+
+ std::optional<std::string> error_during_on_dyn_props = std::nullopt;
+ for (auto [prop_key, prop_value] : context.getDynamicProperties(nullptr)) {
+ if (prop_value.empty()) {
logger_->log_warn(
- "PublishKafka Dynamic Property '{}' is empty and therefore will not "
- "be configured",
+ "PublishKafka Dynamic Property '{}' is empty and therefore will not
be configured",
prop_key);
+ return;
Review Comment:
This branch returns from `configureNewConnection` when encountering an empty
dynamic property value. Since `configureNewConnection` is responsible for
finishing producer configuration, returning here silently leaves the connection
unconfigured and will cause later failures that are harder to diagnose.
Consider either skipping empty dynamic properties (continue) or throwing a
schedule exception so the processor fails fast with a clear reason.
##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -386,147 +383,147 @@ void PublishKafka::notifyStop() {
conn_.reset();
}
-bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
+void PublishKafka::configureNewConnection(api::core::ProcessContext& context) {
std::array<char, 512U> err_chars{};
rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error
result: ";
- utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
- if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed
to create rd_kafka_conf_t object"); }
+ utils::rd_kafka_conf_unique_ptr conf{rd_kafka_conf_new()};
+ if (conf == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed
to create rd_kafka_conf_t object"); }
+
+ rd_kafka_conf_set_opaque(conf.get(), &kafka_opaque_);
const auto* const key = conn_->getKey();
if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION,
"There are no brokers"); }
- result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers",
key->brokers_.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "bootstrap.servers",
key->brokers_.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION,
"Client id is empty"); }
- result = rd_kafka_conf_set(conf_.get(), "client.id",
key->client_id_.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "client.id", key->client_id_.c_str(),
err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
- if (const auto debug_context = context.getProperty(DebugContexts)) {
- result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(),
err_chars.data(), err_chars.size());
+ if (const auto debug_context = context.getProperty(DebugContexts, nullptr)) {
+ result = rd_kafka_conf_set(conf.get(), "debug", debug_context->c_str(),
err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: debug [{}]", *debug_context);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto max_message_size = context.getProperty(MaxMessageSize);
max_message_size && !max_message_size->empty()) {
- result = rd_kafka_conf_set(conf_.get(), "message.max.bytes",
max_message_size->c_str(), err_chars.data(), err_chars.size());
+ if (const auto max_message_size = context.getProperty(MaxMessageSize,
nullptr); max_message_size && !max_message_size->empty()) {
+ result = rd_kafka_conf_set(conf.get(), "message.max.bytes",
max_message_size->c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: message.max.bytes [{}]",
*max_message_size);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto queue_buffer_max_message =
utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
+ if (const auto queue_buffer_max_message =
api::utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) {
if (*queue_buffer_max_message < batch_size_) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration:
Batch Size cannot be larger than Queue Max Message");
}
const auto value = std::to_string(*queue_buffer_max_message);
- result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages",
value.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.messages",
value.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]",
value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto queue_buffer_max_size =
utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
+ if (const auto queue_buffer_max_size =
api::utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) {
auto valInt = *queue_buffer_max_size / 1024;
auto valueConf = std::to_string(valInt);
- result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes",
valueConf.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.kbytes",
valueConf.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]",
valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto queue_buffer_max_time =
utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
+ if (const auto queue_buffer_max_time =
api::utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) {
auto valueConf = std::to_string(queue_buffer_max_time->count());
- result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms",
valueConf.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.ms",
valueConf.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto batch_size = utils::parseOptionalU64Property(context,
BatchSize)) {
+ if (const auto batch_size = api::utils::parseOptionalU64Property(context,
BatchSize)) {
auto value = std::to_string(*batch_size);
- result = rd_kafka_conf_set(conf_.get(), "batch.num.messages",
value.c_str(), err_chars.data(), err_chars.size());
+ result = rd_kafka_conf_set(conf.get(), "batch.num.messages",
value.c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: batch.num.messages [{}]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- if (const auto compress_codec = context.getProperty(CompressCodec);
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
- result = rd_kafka_conf_set(conf_.get(), "compression.codec",
compress_codec->c_str(), err_chars.data(), err_chars.size());
+ if (const auto compress_codec = context.getProperty(CompressCodec, nullptr);
compress_codec && !compress_codec->empty() && *compress_codec != "none") {
+ result = rd_kafka_conf_set(conf.get(), "compression.codec",
compress_codec->c_str(), err_chars.data(), err_chars.size());
logger_->log_debug("PublishKafka: compression.codec [{}]",
*compress_codec);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
+ setKafkaAuthenticationParameters(context, gsl::make_not_null(conf.get()));
// Add all the dynamic properties as librdkafka configurations
- const auto& dynamic_prop_keys = context.getDynamicPropertyKeys();
- logger_->log_info("PublishKafka registering {} librdkafka dynamic
properties", dynamic_prop_keys.size());
-
- for (const auto& prop_key : dynamic_prop_keys) {
- if (const auto dynamic_property_value =
context.getDynamicProperty(prop_key, nullptr); dynamic_property_value &&
!dynamic_property_value->empty()) {
- logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]",
prop_key, *dynamic_property_value);
- result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(),
dynamic_property_value->c_str(), err_chars.data(), err_chars.size());
- if (result != RD_KAFKA_CONF_OK) {
- auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
- }
- } else {
+
+ std::optional<std::string> error_during_on_dyn_props = std::nullopt;
+ for (auto [prop_key, prop_value] : context.getDynamicProperties(nullptr)) {
+ if (prop_value.empty()) {
logger_->log_warn(
- "PublishKafka Dynamic Property '{}' is empty and therefore will not "
- "be configured",
+ "PublishKafka Dynamic Property '{}' is empty and therefore will not
be configured",
prop_key);
+ return;
+ }
+ logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]",
prop_key, prop_value);
+ result = rd_kafka_conf_set(conf.get(), prop_key.data(), prop_value.data(),
err_chars.data(), err_chars.size());
+ if (result != RD_KAFKA_CONF_OK) {
+ error_during_on_dyn_props = utils::string::join_pack(PREFIX_ERROR_MSG,
err_chars.data());
}
}
+ if (error_during_on_dyn_props) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, *error_during_on_dyn_props);
+; }
Review Comment:
There is a stray `;` before the closing brace (`; }`) which appears to be a
syntax error and will prevent compilation. Remove the extra semicolon and
format the block normally.
##########
minifi-api/minifi-c-api.def:
##########
@@ -29,4 +29,4 @@ EXPORTS
MinifiProcessSessionGetFlowFileSize
MinifiProcessSessionGetFlowFileId
MinifiProcessContextGetDynamicProperties
Review Comment:
The .def export list drops `MinifiProcessContextGetSslData` in favor of
`MinifiProcessContextGetSslDataFromProperty`, which will break existing Windows
extensions linked against the previous symbol unless the API version is bumped
or a compatibility export is kept. Consider exporting both (old as wrapper)
during a deprecation period.
##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -158,12 +159,12 @@ class ReadCallback {
});
}
- static utils::rd_kafka_headers_unique_ptr make_headers(const core::FlowFile&
flow_file,
+ static utils::rd_kafka_headers_unique_ptr make_headers(const
api::core::FlowFile& flow_file,
+ const api::core::ProcessSession& session,
const std::optional<utils::Regex>& attribute_name_regex) {
utils::rd_kafka_headers_unique_ptr result{rd_kafka_headers_new(8)};
- if (!result) { throw std::bad_alloc{}; }
- for (const auto& [attribute_key, attribute_value]:
flow_file.getAttributes()) {
+ for (const auto& [attribute_key, attribute_value]:
session.getAttributes(flow_file)) {
if (attribute_name_regex && utils::regexMatch(attribute_key,
*attribute_name_regex)) {
rd_kafka_header_add(result.get(),
attribute_key.c_str(),
Review Comment:
`rd_kafka_headers_new(8)` can return nullptr on allocation failure, but
`result` is dereferenced unconditionally via `result.get()` in
`rd_kafka_header_add`. Add a null check and fail fast (throw/return an error)
to avoid a crash on OOM.
##########
minifi-api/include/minifi-c/minifi-c.h:
##########
@@ -280,7 +280,7 @@ typedef struct MinifiSslData {
MinifiStringView passphrase;
} MinifiSslData;
-MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext*
process_context, MinifiStringView controller_service_name,
+MinifiStatus MinifiProcessContextGetSslDataFromProperty(MinifiProcessContext*
process_context, MinifiStringView property_name,
void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx);
Review Comment:
Renaming the exported C API entry point from
`MinifiProcessContextGetSslData` to
`MinifiProcessContextGetSslDataFromProperty` removes the old symbol from the
public header without a `MINIFI_API_VERSION` bump or compatibility shim. This
breaks binary compatibility for extensions compiled against API v3 that still
link to the old name. Either keep the old function as a deprecated
wrapper/alias, or bump `MINIFI_API_VERSION` and document the breaking change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]