bakaid commented on a change in pull request #653: MINIFICPP-1033 -
PublishKafka fixes
URL: https://github.com/apache/nifi-minifi-cpp/pull/653#discussion_r328031893
##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -289,122 +356,309 @@ bool PublishKafka::configureNewConnection(const
std::shared_ptr<KafkaConnection>
value = "";
if (context->getDynamicProperty(key, value) && !value.empty()) {
logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", key,
value);
- rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr,
sizeof(errstr));
+ result = rd_kafka_conf_set(conf_, key.c_str(), value.c_str(),
errstr.data(), errstr.size());
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure error result [%s]",
errstr);
+ return false;
+ }
} else {
logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and
therefore will not be configured", key);
}
}
+ // Set the delivery callback
+ rd_kafka_conf_set_dr_msg_cb(conf_, &PublishKafka::messageDeliveryCallback);
+
// Set the logger callback
- rd_kafka_conf_set_log_cb(conf_, KafkaConnection::logCallback);
+ rd_kafka_conf_set_log_cb(conf_, &KafkaConnection::logCallback);
- auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr,
sizeof(errstr));
+ rd_kafka_t* producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr.data(),
errstr.size());
- if (!producer) {
+ if (producer == nullptr) {
logger_->log_error("Failed to create Kafka producer %s", errstr);
return false;
}
- conn->setConnection(producer, conf_);
+ // The producer took ownership of the configuration, we must not free it
+ confGuard.disable();
+
+ conn->setConnection(producer);
+
+ return true;
+}
+
+bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection>
&conn, const std::shared_ptr<core::ProcessContext> &context, const std::string&
topic_name) {
+ rd_kafka_topic_conf_t* topic_conf_ = rd_kafka_topic_conf_new();
+ if (topic_conf_ == nullptr) {
+ logger_->log_error("Failed to create rd_kafka_topic_conf_t object");
+ return false;
+ }
+ utils::ScopeGuard confGuard([topic_conf_](){
+ rd_kafka_topic_conf_destroy(topic_conf_);
+ });
+
+ rd_kafka_conf_res_t result;
+ std::string value;
+ std::array<char, 512U> errstr;
+ int64_t valInt;
+ std::string valueConf;
+
+ value = "";
+ if (context->getProperty(DeliveryGuarantee.getName(), value) &&
!value.empty()) {
+ rd_kafka_topic_conf_set(topic_conf_, "request.required.acks",
value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure request.required.acks error
result [%s]", errstr);
+ return false;
+ }
+ }
+ value = "";
+ if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty())
{
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, valInt, unit) &&
+ core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+ valueConf = std::to_string(valInt);
+ rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms",
valueConf.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure request.timeout.ms error
result [%s]", errstr);
+ return false;
+ }
+ }
+ }
+ value = "";
+ if (context->getProperty(MessageTimeOut.getName(), value) && !value.empty())
{
+ core::TimeUnit unit;
+ if (core::Property::StringToTime(value, valInt, unit) &&
+ core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
+ valueConf = std::to_string(valInt);
+ rd_kafka_topic_conf_set(topic_conf_, "message.timeout.ms",
valueConf.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: message.timeout.ms [%s]", valueConf);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure message.timeout.ms error
result [%s]", errstr);
+ return false;
+ }
+ }
+ }
+
+ rd_kafka_topic_t* topic_reference =
rd_kafka_topic_new(conn->getConnection(), topic_name.c_str(), topic_conf_);
+ if (topic_reference == nullptr) {
+ rd_kafka_resp_err_t resp_err = rd_kafka_last_error();
+ logger_->log_error("PublishKafka: failed to create topic %s, error: %s",
topic_name.c_str(), rd_kafka_err2str(resp_err));
+ return false;
+ }
+
+ // The topic took ownership of the configuration, we must not free it
+ confGuard.disable();
+
+ std::shared_ptr<KafkaTopic> kafkaTopicref =
std::make_shared<KafkaTopic>(topic_reference);
+
+ conn->putTopic(topic_name, kafkaTopicref);
return true;
}
void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session) {
- logger_->log_trace("Enter trigger");
- std::shared_ptr<core::FlowFile> flowFile = session->get();
+ // Check whether we have been interrupted
+ if (interrupted_) {
+ logger_->log_info("The processor has been interrupted, not running
onTrigger");
+ context->yield();
+ return;
+ }
- if (!flowFile) {
+ // Try to get a KafkaConnection
+ std::string client_id, brokers;
+ if (!context->getProperty(ClientName.getName(), client_id)) {
+ logger_->log_error("Client Name property missing or invalid");
+ context->yield();
+ return;
+ }
+ if (!context->getProperty(SeedBrokers.getName(), brokers)) {
+ logger_->log_error("Knowb Brokers property missing or invalid");
+ context->yield();
return;
}
- std::string client_id, brokers, topic;
+ KafkaConnectionKey key;
+ key.brokers_ = brokers;
+ key.client_id_ = client_id;
- std::unique_ptr<KafkaLease> lease;
- std::shared_ptr<KafkaConnection> conn;
-// get the client ID, brokers, and topic from either the flowfile, the
configuration, or the properties
- if (context->getProperty(ClientName, client_id, flowFile) &&
context->getProperty(SeedBrokers, brokers, flowFile) &&
context->getProperty(Topic, topic, flowFile)) {
- KafkaConnectionKey key;
- key.brokers_ = brokers;
- key.client_id_ = client_id;
+ std::unique_ptr<KafkaLease> lease =
connection_pool_.getOrCreateConnection(key);
+ if (lease == nullptr) {
+ logger_->log_info("This connection is used by another thread.");
+ context->yield();
+ return;
+ }
- lease = connection_pool_.getOrCreateConnection(key);
- if (lease == nullptr) {
- logger_->log_info("This connection is used by another thread.");
+ std::shared_ptr<KafkaConnection> conn = lease->getConn();
+ if (!conn->initialized()) {
+ logger_->log_trace("Connection not initialized to %s, %s", client_id,
brokers);
+ if (!configureNewConnection(conn, context)) {
+ logger_->log_error("Could not configure Kafka Connection");
context->yield();
return;
}
- conn = lease->getConn();
-
- if (!conn->initialized()) {
- logger_->log_trace("Connection not initialized to %s, %s, %s",
client_id, brokers, topic);
- if (!configureNewConnection(conn, context, flowFile)) {
- logger_->log_error("Could not configure Kafka Connection");
- session->transfer(flowFile, Failure);
- return;
- }
- }
+ }
- if (!conn->hasTopic(topic)) {
- auto topic_conf_ = rd_kafka_topic_conf_new();
- auto topic_reference = rd_kafka_topic_new(conn->getConnection(),
topic.c_str(), topic_conf_);
- rd_kafka_conf_res_t result;
- std::string value;
- char errstr[512];
- int64_t valInt;
- std::string valueConf;
-
- if (context->getProperty(DeliveryGuarantee, value, flowFile) &&
!value.empty()) {
- rd_kafka_topic_conf_set(topic_conf_, "request.required.acks",
value.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure delivery guarantee error
result [%s]", errstr);
- }
- value = "";
- if (context->getProperty(RequestTimeOut, value, flowFile) &&
!value.empty()) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt, unit) &&
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
- valueConf = std::to_string(valInt);
- rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms",
valueConf.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: request.timeout.ms [%s]",
valueConf);
- if (result != RD_KAFKA_CONF_OK)
- logger_->log_error("PublishKafka: configure timeout error result
[%s]", errstr);
- }
- }
+ // Get some properties not (only) used directly to set up librdkafka
+ std::string value;
+
+ // Batch Size
+ uint32_t batch_size;
+ value = "";
+ if (context->getProperty(BatchSize.getName(), value) && !value.empty() &&
core::Property::StringToInt(value, batch_size)) {
+ logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size);
+ } else {
+ batch_size = 10;
+ }
- std::shared_ptr<KafkaTopic> kafkaTopicref =
std::make_shared<KafkaTopic>(topic_reference, topic_conf_);
+ // Target Batch Payload Size
+ uint64_t target_batch_payload_size;
+ value = "";
+ if (context->getProperty(TargetBatchPayloadSize.getName(), value) &&
!value.empty() && core::Property::StringToInt(value,
target_batch_payload_size)) {
+ logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]",
target_batch_payload_size);
+ } else {
+ target_batch_payload_size = 512 * 1024U;
+ }
- conn->putTopic(topic, kafkaTopicref);
- }
+ // Max Flow Segment Size
+ uint64_t max_flow_seg_size;
+ value = "";
+ if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty()
&& core::Property::StringToInt(value, max_flow_seg_size)) {
+ logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]",
max_flow_seg_size);
} else {
- logger_->log_error("Do not have required properties");
- session->transfer(flowFile, Failure);
+ max_flow_seg_size = 0U;
+ }
+
+ // Attributes to Send as Headers
+ utils::Regex attributeNameRegex;
+ value = "";
+ if (context->getProperty(AttributeNameRegex.getName(), value) &&
!value.empty()) {
+ attributeNameRegex = utils::Regex(value);
+ logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
+ }
+
+ // Collect FlowFiles to process
+ uint64_t actual_bytes = 0U;
+ std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
+ for (uint32_t i = 0; i < batch_size; i++) {
+ std::shared_ptr<core::FlowFile> flowFile = session->get();
+ if (flowFile == nullptr) {
+ break;
+ }
+ actual_bytes += flowFile->getSize();
+ flowFiles.emplace_back(std::move(flowFile));
+ if (target_batch_payload_size != 0U && actual_bytes >=
target_batch_payload_size) {
+ break;
+ }
+ }
+ if (flowFiles.empty()) {
return;
Review comment:
Agreed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services