szaszm commented on a change in pull request #940: URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r556352310
########## File path: extensions/librdkafka/rdkafka_utils.h ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <algorithm> +#include <chrono> +#include <memory> +#include <string> +#include <thread> +#include <utility> +#include <vector> + +#include "core/logging/LoggerConfiguration.h" +#include "utils/OptionalUtils.h" +#include "rdkafka.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +enum class KafkaEncoding { + UTF8, + HEX +}; + +struct rd_kafka_conf_deleter { + void operator()(rd_kafka_conf_t* ptr) const noexcept { rd_kafka_conf_destroy(ptr); } +}; + +struct rd_kafka_producer_deleter { + void operator()(rd_kafka_t* ptr) const noexcept { + rd_kafka_resp_err_t flush_ret = rd_kafka_flush(ptr, 10000 /* ms */); // Matching the wait time of KafkaConnection.cpp + // If concerned, we could log potential errors here: + // if (RD_KAFKA_RESP_ERR__TIMED_OUT == flush_ret) { + // std::cerr << "Deleting producer failed: time-out while trying to flush" << std::endl; + // } + rd_kafka_destroy(ptr); + } +}; + +struct rd_kafka_consumer_deleter { + void operator()(rd_kafka_t* ptr) const noexcept { + rd_kafka_consumer_close(ptr); + rd_kafka_destroy(ptr); + } +}; + +struct rd_kafka_topic_partition_list_deleter { + void operator()(rd_kafka_topic_partition_list_t* ptr) const noexcept { rd_kafka_topic_partition_list_destroy(ptr); } +}; + +struct rd_kafka_topic_conf_deleter { + void operator()(rd_kafka_topic_conf_t* ptr) const noexcept { rd_kafka_topic_conf_destroy(ptr); } +}; +struct rd_kafka_topic_deleter { + void operator()(rd_kafka_topic_t* ptr) const noexcept { rd_kafka_topic_destroy(ptr); } +}; + +struct rd_kafka_message_deleter { + void operator()(rd_kafka_message_t* ptr) const noexcept { rd_kafka_message_destroy(ptr); } +}; + +struct rd_kafka_headers_deleter { + void operator()(rd_kafka_headers_t* ptr) const noexcept { rd_kafka_headers_destroy(ptr); } +}; + +template <typename T> +void kafka_headers_for_each(const rd_kafka_headers_t* headers, T key_value_handle) { + const char *key; // Null terminated, not to be freed + const void *value; + std::size_t size; + for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_header_get_all(headers, i, &key, &value, &size); ++i) { + key_value_handle(std::string(key), std::string(static_cast<const char*>(value), size)); Review comment: We can avoid allocation in every cycle by passing the raw `const char*` down to the consumer. ########## File path: extensions/librdkafka/rdkafka_utils.cpp ########## @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <array> + +#include "rdkafka_utils.h" + +#include "Exception.h" +#include "utils/StringUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const std::string& field_name, const std::string& value) { + static std::array<char, 512U> errstr{}; + rd_kafka_conf_res_t result; + result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), errstr.data(), errstr.size()); + if (RD_KAFKA_CONF_OK != result) { + const std::string error_msg { errstr.data() }; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error: " + error_msg); + } +} + +void print_topics_list(logging::Logger& logger, rd_kafka_topic_partition_list_t* kf_topic_partition_list) { + for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) { + logger.log_debug("kf_topic_partition_list: topic: %s, partition: %d, offset:%lld", + kf_topic_partition_list->elems[i].topic, kf_topic_partition_list->elems[i].partition, kf_topic_partition_list->elems[i].offset); + } +} + +void print_kafka_message(const rd_kafka_message_t* rkmessage, logging::Logger& logger) { Review comment: Did you consider extracting the message formatting part? It may be useful outside of logging. ########## File path: extensions/librdkafka/rdkafka_utils.cpp ########## @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <array> + +#include "rdkafka_utils.h" + +#include "Exception.h" +#include "utils/StringUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const std::string& field_name, const std::string& value) { + static std::array<char, 512U> errstr{}; + rd_kafka_conf_res_t result; + result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), errstr.data(), errstr.size()); + if (RD_KAFKA_CONF_OK != result) { + const std::string error_msg { errstr.data() }; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error: " + error_msg); + } +} + +void print_topics_list(logging::Logger& logger, rd_kafka_topic_partition_list_t* kf_topic_partition_list) { + for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) { + logger.log_debug("kf_topic_partition_list: topic: %s, partition: %d, offset:%lld", + kf_topic_partition_list->elems[i].topic, kf_topic_partition_list->elems[i].partition, kf_topic_partition_list->elems[i].offset); + } +} + +void print_kafka_message(const rd_kafka_message_t* rkmessage, logging::Logger& logger) { + if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) { + const std::string error_msg = "ConsumeKafka: received error message from broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage->err)); + throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg); + } + std::string topicName = rd_kafka_topic_name(rkmessage->rkt); + std::string message(reinterpret_cast<char*>(rkmessage->payload), rkmessage->len); + const char* key = reinterpret_cast<const char*>(rkmessage->key); + const std::size_t key_len = rkmessage->key_len; + rd_kafka_timestamp_type_t tstype; + int64_t timestamp; + timestamp = rd_kafka_message_timestamp(rkmessage, &tstype); + const char *tsname = "?"; + if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { + if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) { + tsname = "create time"; + } else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) { + tsname = "log append time"; + } + } Review comment: The outer `if` is unnecessary. ```suggestion if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) { tsname = "create time"; } else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) { tsname = "log append time"; } ``` ########## File path: extensions/librdkafka/ConsumeKafka.cpp ########## @@ -0,0 +1,569 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ConsumeKafka.h" + +#include <algorithm> +#include <limits> + +#include "core/PropertyValidation.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/gsl.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog would potentially start +// reporting issues with the processor health otherwise +class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator { + public: + ConsumeKafkaMaxPollTimeValidator(const std::string &name) // NOLINT + : TimePeriodValidator(name) { + } + ~ConsumeKafkaMaxPollTimeValidator() override = default; + + ValidationResult validate(const std::string& subject, const std::string& input) const override { + uint64_t value; + TimeUnit timeUnit; + uint64_t value_as_ms; + return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid( + core::TimePeriodValue::StringToTime(input, value, timeUnit) && + org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, timeUnit, value_as_ms) && + 0 < value_as_ms && value_as_ms <= 4000).build(); + } +}; +} // namespace core +namespace processors { + +constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS; +constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME; + +constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES; +constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS; + +core::Property ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka Brokers") + ->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>.") + ->withDefaultValue("localhost:9092", core::StandardValidators::get().NON_BLANK_VALIDATOR) + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security Protocol") + ->withDescription("This property is currently not supported. Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") + ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT/*, SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, SECURITY_PROTOCOL_SASL_SSL*/ }) + ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names") + ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name Format") + ->withDescription("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression.") + ->withAllowableValues<std::string>({TOPIC_FORMAT_NAMES, TOPIC_FORMAT_PATTERNS}) + ->withDefaultValue(TOPIC_FORMAT_NAMES) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor Transactions") + ->withDescription( + "Specifies whether or not MiNiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of " + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. " + "If this value is true, MiNiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer " + "must wait for the producer to finish its entire transaction instead of pulling as the messages become available.") + ->withDefaultValue<bool>(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID") + ->withDescription("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::OffsetReset(core::PropertyBuilder::createProperty("Offset Reset") + ->withDescription("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that " + "data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.") + ->withAllowableValues<std::string>({OFFSET_RESET_EARLIEST, OFFSET_RESET_LATEST, OFFSET_RESET_NONE}) + ->withDefaultValue(OFFSET_RESET_LATEST) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::KeyAttributeEncoding(core::PropertyBuilder::createProperty("Key Attribute Encoding") + ->withDescription("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.") + ->withAllowableValues<std::string>({KEY_ATTR_ENCODING_UTF_8, KEY_ATTR_ENCODING_HEX}) + ->withDefaultValue(KEY_ATTR_ENCODING_UTF_8) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::MessageDemarcator(core::PropertyBuilder::createProperty("Message Demarcator") + ->withDescription("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch " + "for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. " + "This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. ") + ->supportsExpressionLanguage(true) + ->build()); + +core::Property ConsumeKafka::MessageHeaderEncoding(core::PropertyBuilder::createProperty("Message Header Encoding") + ->withDescription("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. This property indicates the Character Encoding " + "to use for deserializing the headers.") + ->withAllowableValues<std::string>({MSG_HEADER_ENCODING_UTF_8, MSG_HEADER_ENCODING_HEX}) + ->withDefaultValue(MSG_HEADER_ENCODING_UTF_8) + ->build()); + +core::Property ConsumeKafka::HeadersToAddAsAttributes(core::PropertyBuilder::createProperty("Headers To Add As Attributes") + ->withDescription("A Regular Expression that is matched against all message headers. Any message header whose name matches the regex will be added to the FlowFile " + "as an Attribute. If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that " + "header is selected by the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a " + "regex like \".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent MiNiFi " + "from bundling the messages together efficiently.") + ->build()); + +core::Property ConsumeKafka::DuplicateHeaderHandling(core::PropertyBuilder::createProperty("Duplicate Header Handling") + ->withDescription("For headers to be added as attributes, this option specifies how to handle cases where multiple headers are present with the same key. " + "For example in case of receiving these two headers: \"Accept: text/html\" and \"Accept: application/xml\" and we want to attach the value of \"Accept\" " + "as a FlowFile attribute:\n" + " - \"Keep First\" attaches: \"Accept -> text/html\"\n" + " - \"Keep Latest\" attaches: \"Accept -> application/xml\"\n" + " - \"Comma-separated Merge\" attaches: \"Accept -> text/html, application/xml\"\n") + ->withAllowableValues<std::string>({MSG_HEADER_KEEP_FIRST, MSG_HEADER_KEEP_LATEST, MSG_HEADER_COMMA_SEPARATED_MERGE}) + ->withDefaultValue(MSG_HEADER_KEEP_LATEST) // Mirroring NiFi behaviour + ->build()); + +core::Property ConsumeKafka::MaxPollRecords(core::PropertyBuilder::createProperty("Max Poll Records") + ->withDescription("Specifies the maximum number of records Kafka should return when polling each time the processor is triggered.") + ->withDefaultValue<unsigned int>(DEFAULT_MAX_POLL_RECORDS) + ->build()); + +core::Property ConsumeKafka::MaxPollTime(core::PropertyBuilder::createProperty("Max Poll Time") + ->withDescription("Specifies the maximum amount of time the consumer can use for polling data from the brokers. " + "Polling is a blocking operation, so the upper limit of this value is specified in 4 seconds.") + ->withDefaultValue(DEFAULT_MAX_POLL_TIME, std::make_shared<core::ConsumeKafkaMaxPollTimeValidator>(std::string("ConsumeKafkaMaxPollTimeValidator"))) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::SessionTimeout(core::PropertyBuilder::createProperty("Session Timeout") + ->withDescription("Client group session and failure detection timeout. The consumer sends periodic heartbeats " + "to indicate its liveness to the broker. If no hearts are received by the broker for a group member within " + "the session timeout, the broker will remove the consumer from the group and trigger a rebalance. " + "The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms.") + ->withDefaultValue<core::TimePeriodValue>("60 seconds") + ->build()); + +const core::Relationship ConsumeKafka::Success("success", "Incoming kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message."); + +void ConsumeKafka::initialize() { + setSupportedProperties({ + KafkaBrokers, + SecurityProtocol, + TopicNames, + TopicNameFormat, + HonorTransactions, + GroupID, + OffsetReset, + KeyAttributeEncoding, + MessageDemarcator, + MessageHeaderEncoding, + HeadersToAddAsAttributes, + DuplicateHeaderHandling, + MaxPollRecords, + MaxPollTime, + SessionTimeout + }); + setSupportedRelationships({ + Success, + }); +} + +void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) { + // Required properties + kafka_brokers_ = utils::getRequiredPropertyOrThrow(context, KafkaBrokers.getName()); + security_protocol_ = utils::getRequiredPropertyOrThrow(context, SecurityProtocol.getName()); + topic_names_ = utils::listFromRequiredCommaSeparatedProperty(context, TopicNames.getName()); + topic_name_format_ = utils::getRequiredPropertyOrThrow(context, TopicNameFormat.getName()); + honor_transactions_ = utils::parseBooleanPropertyOrThrow(context, HonorTransactions.getName()); + group_id_ = utils::getRequiredPropertyOrThrow(context, GroupID.getName()); + offset_reset_ = utils::getRequiredPropertyOrThrow(context, OffsetReset.getName()); + key_attribute_encoding_ = utils::getRequiredPropertyOrThrow(context, KeyAttributeEncoding.getName()); + max_poll_time_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, MaxPollTime.getName()); + session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, SessionTimeout.getName()); + + // Optional properties + context->getProperty(MessageDemarcator.getName(), message_demarcator_); + context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_); + context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_); + + headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(context, HeadersToAddAsAttributes.getName()); + max_poll_records_ = gsl::narrow<std::size_t>(utils::getOptionalUintProperty(context, MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS)); + + // For now security protocols are not yet supported + if (!utils::StringUtils::equalsIgnoreCase(SECURITY_PROTOCOL_PLAINTEXT, security_protocol_)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Security protocols are not supported yet."); + } + + if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) && !utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported key attribute encoding: " + key_attribute_encoding_); + } + + if (!utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_UTF_8, message_header_encoding_) && !utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_HEX, message_header_encoding_)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported message header encoding: " + key_attribute_encoding_); + } + + configure_new_connection(context); +} + +namespace { +void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) { + // Cooperative, incremental assignment is not supported in the current librdkafka version + std::shared_ptr<logging::Logger> logger{logging::LoggerFactory<ConsumeKafka>::getLogger()}; + logger->log_debug("Rebalance triggered."); + rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR; + switch (trigger) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + logger->log_debug("assigned"); + if (logger -> should_log(core::logging::LOG_LEVEL::info)) { + utils::print_topics_list(logger, partitions); + } + assign_error = rd_kafka_assign(rk, partitions); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + logger->log_debug("revoked:"); + rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe unneccessary + if (logger -> should_log(core::logging::LOG_LEVEL::info)) { + utils::print_topics_list(logger, partitions); + } + assign_error = rd_kafka_assign(rk, NULL); + break; + + default: + logger->log_debug("failed: %s", rd_kafka_err2str(trigger)); + assign_error = rd_kafka_assign(rk, NULL); + break; + } + logger->log_debug("assign failure: %s", rd_kafka_err2str(assign_error)); +} +} // namespace + +void ConsumeKafka::create_topic_partition_list() { + kf_topic_partition_list_ = { rd_kafka_topic_partition_list_new(topic_names_.size()), utils::rd_kafka_topic_partition_list_deleter() }; + + // On subscriptions any topics prefixed with ^ will be regex matched + if (utils::StringUtils::equalsIgnoreCase(TOPIC_FORMAT_PATTERNS, topic_name_format_)) { + for (const std::string& topic : topic_names_) { + const std::string regex_format = "^" + topic; + rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), regex_format.c_str(), RD_KAFKA_PARTITION_UA); + } + } else { + for (const std::string& topic : topic_names_) { + rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), topic.c_str(), RD_KAFKA_PARTITION_UA); + } + } + + // Subscribe to topic set using balanced consumer groups + // Subscribing from the same process without an inbetween unsubscribe call + // Does not seem to be triggering a rebalance (maybe librdkafka bug?) + // This might happen until the cross-overship between processors and connections are settled + rd_kafka_resp_err_t subscribe_response = rd_kafka_subscribe(consumer_.get(), kf_topic_partition_list_.get()); + if (RD_KAFKA_RESP_ERR_NO_ERROR != subscribe_response) { + logger_->log_error("rd_kafka_subscribe error %d: %s", subscribe_response, rd_kafka_err2str(subscribe_response)); + } +} + +void ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessContext* context) { + using utils::setKafkaConfigurationField; + + const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys(); + if (dynamic_prop_keys.empty()) { + return; + } + logger_->log_info("Loading %d extra kafka configuration fields from ConsumeKafka dynamic properties:", dynamic_prop_keys.size()); + for (const std::string& key : dynamic_prop_keys) { + std::string value; + gsl_Expects(context->getDynamicProperty(key, value)); + logger_->log_info("%s: %s", key.c_str(), value.c_str()); + setKafkaConfigurationField(conf_.get(), key, value); + } +} + +void ConsumeKafka::configure_new_connection(const core::ProcessContext* context) { + using utils::setKafkaConfigurationField; + + conf_ = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() }; + if (conf_ == nullptr) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); + } + + // Set rebalance callback for use with coordinated consumer group balancing + // Rebalance handlers are needed for the initial configuration of the consumer + // If they are not set, offset reset is ignored and polling produces messages + // Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb. + rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb); + + // Uncomment this for librdkafka debug logs: + // setKafkaConfigurationField(conf_.get(), "debug", "all"); + + setKafkaConfigurationField(conf_.get(), "bootstrap.servers", kafka_brokers_); + setKafkaConfigurationField(conf_.get(), "auto.offset.reset", "latest"); + setKafkaConfigurationField(conf_.get(), "enable.auto.commit", "false"); + setKafkaConfigurationField(conf_.get(), "enable.auto.offset.store", "false"); + setKafkaConfigurationField(conf_.get(), "isolation.level", honor_transactions_ ? "read_committed" : "read_uncommitted"); + setKafkaConfigurationField(conf_.get(), "group.id", group_id_); + setKafkaConfigurationField(conf_.get(), "session.timeout.ms", std::to_string(session_timeout_milliseconds_.count())); + setKafkaConfigurationField(conf_.get(), "max.poll.interval.ms", "600000"); // Twice the default, arbitrarily chosen + + // This is a librdkafka option, but the communication timeout is also specified in each of the + // relevant API calls. Could be redundant, but it probably does not hurt to set this + setKafkaConfigurationField(conf_.get(), "metadata.request.timeout.ms", std::to_string(METADATA_COMMUNICATIONS_TIMEOUT_MS)); + + extend_config_from_dynamic_properties(context); + + std::array<char, 512U> errstr{}; + consumer_ = { rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter() }; + if (consumer_ == nullptr) { + const std::string error_msg { errstr.begin(), errstr.end() }; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create Kafka consumer %s" + error_msg); + } + + create_topic_partition_list(); + + // Changing the partition list should happen only as part as the initialization of offsets + // a function like `rd_kafka_position()` might have unexpected effects + // for instance when a consumer gets assigned a partition it used to + // consume at an earlier rebalance. + // + // As far as I understand, instead of rd_kafka_position() an rd_kafka_committed() call if preferred here, + // as it properly fetches offsets from the broker + if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_committed(consumer_.get(), kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) { + logger_ -> log_error("Retrieving committed offsets for topics+partitions failed."); + } + + rd_kafka_resp_err_t poll_set_consumer_response = rd_kafka_poll_set_consumer(consumer_.get()); + if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) { + logger_->log_error("rd_kafka_poll_set_consumer error %d: %s", poll_set_consumer_response, rd_kafka_err2str(poll_set_consumer_response)); + } + + // There is no rd_kafka_seek alternative for rd_kafka_topic_partition_list_t, only rd_kafka_topic_t + // rd_kafka_topic_partition_list_set_offset should reset the offsets to the latest (or whatever is set in the config), + // Also, rd_kafka_committed should also fetch and set latest the latest offset + // In reality, neither of them seem to work (not even with calling rd_kafka_position()) + logger_->log_info("Resetting offset manually."); + while (true) { + std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter> + message_wrapper{ rd_kafka_consumer_poll(consumer_.get(), max_poll_time_milliseconds_.count()), utils::rd_kafka_message_deleter() }; + + if (!message_wrapper || RD_KAFKA_RESP_ERR_NO_ERROR != message_wrapper->err) { + break; + } + utils::print_kafka_message(message_wrapper.get(), logger_); + // Commit offsets on broker for the provided list of partitions + logger_->log_info("Committing offset: %" PRId64 ".", message_wrapper->offset); + rd_kafka_commit_message(consumer_.get(), message_wrapper.get(), /* async = */ 0); + } + logger_->log_info("Done resetting offset manually."); +} + +std::string ConsumeKafka::extract_message(const rd_kafka_message_t* rkmessage) const { + if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) { + throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "ConsumeKafka: received error message from broker: " + std::to_string(rkmessage->err) + " " + rd_kafka_err2str(rkmessage->err)); Review comment: I wouldn't call `fmt` a framework and while `join_pack`'s implementation is complex, its interface couldn't be simpler. I won't reject the PR if you leave it like this. ########## File path: extensions/librdkafka/tests/ConsumeKafkaTests.cpp ########## @@ -0,0 +1,590 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_MAIN + +#include <algorithm> +#include <memory> +#include <string> +#include <set> + +#include "TestBase.h" + +#include "../ConsumeKafka.h" +#include "../rdkafka_utils.h" +#include "../../standard-processors/processors/ExtractText.h" +#include "utils/file/FileUtils.h" +#include "utils/OptionalUtils.h" +#include "utils/RegexUtils.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +#include "utils/IntegrationTestUtils.h" + +namespace { +using org::apache::nifi::minifi::utils::optional; + +class KafkaTestProducer { + public: + enum class PublishEvent { + PUBLISH, + TRANSACTION_START, + TRANSACTION_COMMIT, + CANCEL + }; + KafkaTestProducer(const std::string& kafka_brokers, const std::string& topic, const bool transactional) : + logger_(logging::LoggerFactory<KafkaTestProducer>::getLogger()) { + using utils::setKafkaConfigurationField; + + std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() }; + + setKafkaConfigurationField(conf.get(), "bootstrap.servers", kafka_brokers); + setKafkaConfigurationField(conf.get(), "compression.codec", "snappy"); + setKafkaConfigurationField(conf.get(), "batch.num.messages", "1"); + + if (transactional) { + setKafkaConfigurationField(conf.get(), "transactional.id", "ConsumeKafkaTest_transaction_id"); + } + + static std::array<char, 512U> errstr{}; + producer_ = { rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), errstr.data(), errstr.size()), utils::rd_kafka_producer_deleter() }; + if (producer_ == nullptr) { + auto error_msg = "Failed to create Kafka producer" + std::string{ errstr.data() }; + throw std::runtime_error(error_msg); + } + + // The last argument is a config here, but it is already owned by the producer. I assume that this would mean an override on the original config if used + topic_ = { rd_kafka_topic_new(producer_.get(), topic.c_str(), nullptr), utils::rd_kafka_topic_deleter() }; + + if (transactional) { + rd_kafka_init_transactions(producer_.get(), TRANSACTIONS_TIMEOUT_MS.count()); + } + } + + // Uses all the headers for every published message + void publish_messages_to_topic( + const std::vector<std::string>& messages_on_topic, const std::string& message_key, std::vector<PublishEvent> events, + const std::vector<std::pair<std::string, std::string>>& message_headers, const optional<std::string>& message_header_encoding) { + auto next_message = messages_on_topic.cbegin(); + for (const PublishEvent event : events) { + switch (event) { + case PublishEvent::PUBLISH: + REQUIRE(messages_on_topic.cend() != next_message); + publish_message(*next_message, message_key, message_headers, message_header_encoding); + std::advance(next_message, 1); + break; + case PublishEvent::TRANSACTION_START: + logger_->log_debug("Starting new transaction..."); + rd_kafka_begin_transaction(producer_.get()); + break; + case PublishEvent::TRANSACTION_COMMIT: + logger_->log_debug("Committing transaction..."); + rd_kafka_commit_transaction(producer_.get(), TRANSACTIONS_TIMEOUT_MS.count()); + break; + case PublishEvent::CANCEL: + logger_->log_debug("Cancelling transaction..."); + rd_kafka_abort_transaction(producer_.get(), TRANSACTIONS_TIMEOUT_MS.count()); + } + } + } + + private: + void publish_message( + const std::string& message, const std::string& message_key, const std::vector<std::pair<std::string, std::string>>& message_headers, const optional<std::string>& message_header_encoding) { + logger_->log_debug("Producing: %s", message.c_str()); + std::unique_ptr<rd_kafka_headers_t, utils::rd_kafka_headers_deleter> headers(rd_kafka_headers_new(message_headers.size()), utils::rd_kafka_headers_deleter()); + if (!headers) { + throw std::runtime_error("Generating message headers failed."); + } + for (const std::pair<std::string, std::string>& message_header : message_headers) { + rd_kafka_header_add(headers.get(), + const_cast<char*>(message_header.first.c_str()), message_header.first.size(), + const_cast<char*>(message_header.second.c_str()), message_header.second.size()); + } + + if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_producev( + producer_.get(), + RD_KAFKA_V_RKT(topic_.get()), + RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_VALUE(const_cast<char*>(&message[0]), message.size()), + RD_KAFKA_V_HEADERS(headers.release()), + RD_KAFKA_V_KEY(message_key.c_str(), message_key.size()), + RD_KAFKA_V_END)) { + logger_->log_error("Producer failure: %d: %s", rd_kafka_last_error(), rd_kafka_err2str(rd_kafka_last_error())); + } + } + + static const std::chrono::milliseconds TRANSACTIONS_TIMEOUT_MS; + + std::unique_ptr<rd_kafka_t, utils::rd_kafka_producer_deleter> producer_; + std::unique_ptr<rd_kafka_topic_t, utils::rd_kafka_topic_deleter> topic_; + + std::shared_ptr<logging::Logger> logger_; +}; + +const std::chrono::milliseconds KafkaTestProducer::TRANSACTIONS_TIMEOUT_MS{ 2000 }; + +class ConsumeKafkaTest { + public: + using Processor = org::apache::nifi::minifi::core::Processor; + using ConsumeKafka = org::apache::nifi::minifi::processors::ConsumeKafka; + using ExtractText = org::apache::nifi::minifi::processors::ExtractText; + + const KafkaTestProducer::PublishEvent PUBLISH = KafkaTestProducer::PublishEvent::PUBLISH; + const KafkaTestProducer::PublishEvent TRANSACTION_START = KafkaTestProducer::PublishEvent::TRANSACTION_START; + const KafkaTestProducer::PublishEvent TRANSACTION_COMMIT = KafkaTestProducer::PublishEvent::TRANSACTION_COMMIT; + const KafkaTestProducer::PublishEvent CANCEL = KafkaTestProducer::PublishEvent::CANCEL; + + const std::vector<KafkaTestProducer::PublishEvent> NON_TRANSACTIONAL_MESSAGES { PUBLISH, PUBLISH }; + const std::vector<KafkaTestProducer::PublishEvent> SINGLE_COMMITTED_TRANSACTION { TRANSACTION_START, PUBLISH, PUBLISH, TRANSACTION_COMMIT }; + const std::vector<KafkaTestProducer::PublishEvent> TWO_SEPARATE_TRANSACTIONS { TRANSACTION_START, PUBLISH, TRANSACTION_COMMIT, TRANSACTION_START, PUBLISH, TRANSACTION_COMMIT }; + const std::vector<KafkaTestProducer::PublishEvent> NON_COMMITTED_TRANSACTION { TRANSACTION_START, PUBLISH, PUBLISH }; + const std::vector<KafkaTestProducer::PublishEvent> CANCELLED_TRANSACTION { TRANSACTION_START, PUBLISH, CANCEL }; + + const std::string KEEP_FIRST = ConsumeKafka::MSG_HEADER_KEEP_FIRST; + const std::string KEEP_LATEST = ConsumeKafka::MSG_HEADER_KEEP_LATEST; + const std::string COMMA_SEPARATED_MERGE = ConsumeKafka::MSG_HEADER_COMMA_SEPARATED_MERGE; + + static const std::string PRODUCER_TOPIC; + static const std::string TEST_MESSAGE_KEY; + + // Relationships + const core::Relationship success {"success", "description"}; + const core::Relationship failure {"failure", "description"}; + + ConsumeKafkaTest() : + logTestController_(LogTestController::getInstance()), + logger_(logging::LoggerFactory<ConsumeKafkaTest>::getLogger()) { + reInitialize(); + } + + virtual ~ConsumeKafkaTest() { + logTestController_.reset(); + } + + protected: + void reInitialize() { + testController_.reset(new TestController()); + plan_ = testController_->createPlan(); + logTestController_.setError<LogTestController>(); + logTestController_.setError<TestPlan>(); + logTestController_.setTrace<ConsumeKafka>(); + logTestController_.setTrace<ConsumeKafkaTest>(); + logTestController_.setTrace<KafkaTestProducer>(); + logTestController_.setDebug<ExtractText>(); + logTestController_.setDebug<core::ProcessContext>(); + } + + void optional_set_property(const std::shared_ptr<core::Processor>& processor, const std::string& property_name, const optional<std::string>& opt_value) { + if (opt_value) { + plan_->setProperty(processor, property_name, opt_value.value()); + } + } + + std::string decode_key(const std::string& key, const optional<std::string>& key_attribute_encoding) { + if (!key_attribute_encoding || utils::StringUtils::equalsIgnoreCase(ConsumeKafka::KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding.value())) { + return key; + } + if (utils::StringUtils::equalsIgnoreCase(ConsumeKafka::ConsumeKafka::KEY_ATTR_ENCODING_HEX, key_attribute_encoding.value())) { + return utils::StringUtils::from_hex(key); + } + throw std::runtime_error("Message Header Encoding does not match any of the presets in the test."); + } + + std::vector<std::string> sort_and_split_messages(const std::vector<std::string>& messages_on_topic, const optional<std::string>& message_demarcator) { + if (message_demarcator) { + std::vector<std::string> sorted_split_messages; + for (const auto& message : messages_on_topic) { + std::vector<std::string> split_message = utils::StringUtils::split(message, message_demarcator.value()); + std::move(split_message.begin(), split_message.end(), std::back_inserter(sorted_split_messages)); + } + std::sort(sorted_split_messages.begin(), sorted_split_messages.end()); + return sorted_split_messages; + } + std::vector<std::string> sorted_messages{ messages_on_topic.cbegin(), messages_on_topic.cend() }; + std::sort(sorted_messages.begin(), sorted_messages.end()); + return sorted_messages; + } + + static const std::chrono::seconds MAX_CONSUMEKAFKA_POLL_TIME_SECONDS; + static const std::string ATTRIBUTE_FOR_CAPTURING_CONTENT; + static const std::string TEST_FILE_NAME_POSTFIX; + + std::unique_ptr<TestController> testController_; + std::shared_ptr<TestPlan> plan_; + LogTestController& logTestController_; + std::shared_ptr<logging::Logger> logger_; +}; + +class ConsumeKafkaPropertiesTest : public ConsumeKafkaTest { + public: + ConsumeKafkaPropertiesTest() : ConsumeKafkaTest() {} + virtual ~ConsumeKafkaPropertiesTest() { + logTestController_.reset(); + } + + void single_consumer_with_plain_text_test( + bool expect_config_valid, + bool expect_fixed_message_order, + const std::vector<std::pair<std::string, std::string>>& expect_header_attributes, + const std::vector<std::string>& messages_on_topic, + const std::vector<KafkaTestProducer::PublishEvent>& transaction_events, + const std::vector<std::pair<std::string, std::string>>& message_headers, + const std::string& kafka_brokers, + const std::string& security_protocol, + const std::string& topic_names, + const optional<std::string>& topic_name_format, + const optional<bool>& honor_transactions, + const optional<std::string>& group_id, + const optional<std::string>& offset_reset, + const optional<std::string>& key_attribute_encoding, + const optional<std::string>& message_demarcator, + const optional<std::string>& message_header_encoding, + const optional<std::string>& headers_to_add_as_attributes, + const optional<std::string>& duplicate_header_handling, + const optional<std::string>& max_poll_records, + const optional<std::string>& max_poll_time, + const optional<std::string>& session_timeout) { + reInitialize(); + + // Consumer chain + std::shared_ptr<core::Processor> consume_kafka = plan_->addProcessor("ConsumeKafka", "consume_kafka", {success}, false); + std::shared_ptr<core::Processor> extract_text = plan_->addProcessor("ExtractText", "extract_text", {success}, false); + + // Set up connections + plan_->addConnection(consume_kafka, success, extract_text); + extract_text->setAutoTerminatedRelationships({success}); + + const auto bool_to_string = [] (const bool b) -> std::string { return b ? "true" : "false"; }; + + plan_->setProperty(consume_kafka, ConsumeKafka::KafkaBrokers.getName(), kafka_brokers); + plan_->setProperty(consume_kafka, ConsumeKafka::SecurityProtocol.getName(), security_protocol); + plan_->setProperty(consume_kafka, ConsumeKafka::TopicNames.getName(), topic_names); + + optional_set_property(consume_kafka, ConsumeKafka::TopicNameFormat.getName(), topic_name_format); + optional_set_property(consume_kafka, ConsumeKafka::HonorTransactions.getName(), honor_transactions | utils::map(bool_to_string)); + optional_set_property(consume_kafka, ConsumeKafka::GroupID.getName(), group_id); + optional_set_property(consume_kafka, ConsumeKafka::OffsetReset.getName(), offset_reset); + optional_set_property(consume_kafka, ConsumeKafka::KeyAttributeEncoding.getName(), key_attribute_encoding); + optional_set_property(consume_kafka, ConsumeKafka::MessageDemarcator.getName(), message_demarcator); + optional_set_property(consume_kafka, ConsumeKafka::MessageHeaderEncoding.getName(), message_header_encoding); + optional_set_property(consume_kafka, ConsumeKafka::HeadersToAddAsAttributes.getName(), headers_to_add_as_attributes); + optional_set_property(consume_kafka, ConsumeKafka::DuplicateHeaderHandling.getName(), duplicate_header_handling); + optional_set_property(consume_kafka, ConsumeKafka::MaxPollRecords.getName(), max_poll_records); + optional_set_property(consume_kafka, ConsumeKafka::MaxPollTime.getName(), max_poll_time); + optional_set_property(consume_kafka, ConsumeKafka::SessionTimeout.getName(), session_timeout); + + plan_->setProperty(extract_text, ExtractText::Attribute.getName(), ATTRIBUTE_FOR_CAPTURING_CONTENT); + + if (!expect_config_valid) { + REQUIRE_THROWS(plan_->scheduleProcessor(consume_kafka)); + return; + } else { + plan_->scheduleProcessors(); + } + + std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_; + std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_; + + const bool is_transactional = std::count(transaction_events.cbegin(), transaction_events.cend(), KafkaTestProducer::PublishEvent::TRANSACTION_START); + const bool transactions_committed = transaction_events.back() == KafkaTestProducer::PublishEvent::TRANSACTION_COMMIT; + + KafkaTestProducer producer(kafka_brokers, PRODUCER_TOPIC, is_transactional); + producer.publish_messages_to_topic(messages_on_topic, TEST_MESSAGE_KEY, transaction_events, message_headers, message_header_encoding); + + + std::vector<std::shared_ptr<core::FlowFile>> flow_files_produced; + for (std::size_t num_expected_messages_processed = 0; num_expected_messages_processed < messages_on_topic.size(); num_expected_messages_processed += std::stoi(max_poll_records.value_or("1"))) { + plan_->increment_location(); + if ((honor_transactions && false == honor_transactions.value()) || (is_transactional && !transactions_committed)) { + INFO("Non-committed messages received."); + REQUIRE(false == plan_->runCurrentProcessorUntilFlowfileIsProduced(MAX_CONSUMEKAFKA_POLL_TIME_SECONDS)); + return; + } + { + SCOPED_INFO("ConsumeKafka timed out when waiting to receive the message published to the kafka broker."); + REQUIRE(plan_->runCurrentProcessorUntilFlowfileIsProduced(MAX_CONSUMEKAFKA_POLL_TIME_SECONDS)); + } + std::size_t num_flow_files_produced = plan_->getNumFlowFileProducedByCurrentProcessor(); + plan_->increment_location(); + for (std::size_t times_extract_text_run = 0; times_extract_text_run < num_flow_files_produced; ++times_extract_text_run) { + plan_->runCurrentProcessor(); // ExtractText + std::shared_ptr<core::FlowFile> flow_file = plan_->getFlowFileProducedByCurrentProcessor(); + for (const auto& exp_header : expect_header_attributes) { + SCOPED_INFO("ConsumeKafka did not produce the expected flowfile attribute from message header: " << exp_header.first << "."); + const auto header_attr_opt = flow_file->getAttribute(exp_header.first); + REQUIRE(header_attr_opt); + REQUIRE(exp_header.second == header_attr_opt.value()); + } + { + SCOPED_INFO("Message key is missing or incorrect (potential encoding mismatch)."); + REQUIRE(TEST_MESSAGE_KEY == decode_key(flow_file->getAttribute(ConsumeKafka::KAFKA_MESSAGE_KEY_ATTR).value(), key_attribute_encoding)); + REQUIRE("1" == flow_file->getAttribute(ConsumeKafka::KAFKA_COUNT_ATTR).value()); + REQUIRE(flow_file->getAttribute(ConsumeKafka::KAFKA_OFFSET_ATTR)); + REQUIRE(flow_file->getAttribute(ConsumeKafka::KAFKA_PARTITION_ATTR)); + REQUIRE(PRODUCER_TOPIC == flow_file->getAttribute(ConsumeKafka::KAFKA_TOPIC_ATTR).value()); + } + flow_files_produced.emplace_back(std::move(flow_file)); + } + plan_->reset_location(); + } + + const auto contentOrderOfFlowFile = [&] (const std::shared_ptr<core::FlowFile>& lhs, const std::shared_ptr<core::FlowFile>& rhs) { + return lhs->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() < rhs->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value(); + }; + { + SCOPED_INFO("The flowfiles generated by ConsumeKafka are invalid (probably nullptr)."); + REQUIRE_NOTHROW(std::sort(flow_files_produced.begin(), flow_files_produced.end(), contentOrderOfFlowFile)); + } + std::vector<std::string> sorted_split_messages = sort_and_split_messages(messages_on_topic, message_demarcator); + const auto flow_file_content_matches_message = [&] (const std::shared_ptr<core::FlowFile>& flowfile, const std::string message) { + return flowfile->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() == message; + }; + + logger_->log_debug("************"); + std::string expected = "Expected: "; + for (int i = 0; i < sorted_split_messages.size(); ++i) { + expected += sorted_split_messages[i] + ", "; + } + std::string actual = " Actual: "; + for (int i = 0; i < sorted_split_messages.size(); ++i) { + actual += flow_files_produced[i]->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() + ", "; + } + logger_->log_debug("%s", expected.c_str()); + logger_->log_debug("%s", actual.c_str()); + logger_->log_debug("************"); + + INFO("The messages received by ConsumeKafka do not match those published"); + REQUIRE(std::equal(flow_files_produced.begin(), flow_files_produced.end(), sorted_split_messages.begin(), flow_file_content_matches_message)); + } +}; + +class ConsumeKafkaContinuousPublishingTest : public ConsumeKafkaTest { + public: + ConsumeKafkaContinuousPublishingTest() : ConsumeKafkaTest() {} + virtual ~ConsumeKafkaContinuousPublishingTest() { + logTestController_.reset(); + } + + void single_consumer_with_continuous_message_producing( + const uint64_t msg_periodicity_ms, + const std::string& kafka_brokers, + const optional<std::string>& group_id, + const optional<std::string>& max_poll_records, + const optional<std::string>& max_poll_time, + const optional<std::string>& session_timeout) { + reInitialize(); + + std::shared_ptr<core::Processor> consume_kafka = plan_->addProcessor("ConsumeKafka", "consume_kafka", {success}, false); + + plan_->setProperty(consume_kafka, "allow.auto.create.topics", "true", true); // Seems like the topic tests work without this + + plan_->setProperty(consume_kafka, ConsumeKafka::KafkaBrokers.getName(), kafka_brokers); + plan_->setProperty(consume_kafka, ConsumeKafka::TopicNames.getName(), PRODUCER_TOPIC); + optional_set_property(consume_kafka, ConsumeKafka::GroupID.getName(), group_id); + + optional_set_property(consume_kafka, ConsumeKafka::MaxPollRecords.getName(), max_poll_records); + optional_set_property(consume_kafka, ConsumeKafka::MaxPollTime.getName(), max_poll_time); + optional_set_property(consume_kafka, ConsumeKafka::SessionTimeout.getName(), session_timeout); + + consume_kafka->setAutoTerminatedRelationships({success}); + + KafkaTestProducer producer("localhost:9092", PRODUCER_TOPIC, /* transactional = */ false); + + std::atomic_bool producer_loop_stop{ false }; + const auto producer_loop = [&] { + std::size_t num_messages_sent = 0; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + while (!producer_loop_stop) { + producer.publish_messages_to_topic({ "Message after " + std::to_string(msg_periodicity_ms * ++num_messages_sent) + " ms"}, TEST_MESSAGE_KEY, { PUBLISH }, {}, {}); + std::this_thread::sleep_for(std::chrono::milliseconds(msg_periodicity_ms)); + } + }; + + plan_->scheduleProcessors(); + + const auto get_time_property_ms = [] (const std::string& property_string) { + int64_t value; + org::apache::nifi::minifi::core::TimeUnit unit; + REQUIRE(org::apache::nifi::minifi::core::Property::StringToTime(property_string, value, unit)); + int64_t value_as_ms; + REQUIRE(org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, unit, value_as_ms)); + return value_as_ms; + }; + + std::thread producer_thread(producer_loop); + CHECK_NOTHROW(plan_->runNextProcessor()); + producer_loop_stop = true; + producer_thread.join(); + + std::size_t num_flow_files_produced = plan_->getNumFlowFileProducedByCurrentProcessor(); + + const uint64_t max_poll_time_ms = get_time_property_ms(max_poll_time.value_or(ConsumeKafka::DEFAULT_MAX_POLL_TIME)); + const uint64_t max_poll_records_value = max_poll_records ? std::stoi(max_poll_records.value()) : ConsumeKafka::DEFAULT_MAX_POLL_RECORDS; + const uint64_t exp_lower_bound = std::min(max_poll_time_ms / msg_periodicity_ms - 2, max_poll_records_value); + const uint64_t exp_upper_bound = std::min(max_poll_time_ms / msg_periodicity_ms + 2, max_poll_records_value); + logger_->log_debug("Max poll time: %d, Max poll records: %d, Exp. flowfiles produced: (min: %d, max: %d), actual: %d", + max_poll_time_ms, max_poll_records_value, exp_lower_bound, exp_upper_bound, num_flow_files_produced); + + REQUIRE(exp_lower_bound <= num_flow_files_produced); + REQUIRE(num_flow_files_produced <= exp_upper_bound); + } +}; + +const std::string ConsumeKafkaTest::TEST_FILE_NAME_POSTFIX{ "target_kafka_message.txt" }; +const std::string ConsumeKafkaTest::TEST_MESSAGE_KEY{ "consume_kafka_test_key" }; +const std::string ConsumeKafkaTest::PRODUCER_TOPIC{ "ConsumeKafkaTest" }; +const std::string ConsumeKafkaTest::ATTRIBUTE_FOR_CAPTURING_CONTENT{ "flowfile_content" }; +const std::chrono::seconds ConsumeKafkaTest::MAX_CONSUMEKAFKA_POLL_TIME_SECONDS{ 5 }; + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "ConsumeKafka parses and uses kafka topics.", "[ConsumeKafka][Kafka][Topic]") { + auto run_tests = [&] (const std::vector<std::string>& messages_on_topic, const std::string& topic_names, const optional<std::string>& topic_name_format) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", topic_names, topic_name_format, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({ "Ulysses", "James Joyce" }, "ConsumeKafkaTest", {}); + run_tests({ "The Great Gatsby", "F. Scott Fitzgerald" }, "ConsumeKafkaTest", ConsumeKafka::TOPIC_FORMAT_NAMES); + run_tests({ "War and Peace", "Lev Tolstoy" }, "a,b,c,ConsumeKafkaTest,d", ConsumeKafka::TOPIC_FORMAT_NAMES); + run_tests({ "Nineteen Eighty Four", "George Orwell" }, "ConsumeKafkaTest", ConsumeKafka::TOPIC_FORMAT_PATTERNS); + run_tests({ "Hamlet", "William Shakespeare" }, "Cons[emu]*KafkaTest", ConsumeKafka::TOPIC_FORMAT_PATTERNS); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Offsets are reset to the latest when a consumer starts with non-processed messages.", "[ConsumeKafka][Kafka][OffsetReset]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const std::vector<KafkaTestProducer::PublishEvent>& transaction_events) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + KafkaTestProducer producer("localhost:9092", PRODUCER_TOPIC, false); + producer.publish_messages_to_topic({"Dummy messages", "that should be ignored", "due to offset reset on ConsumeKafka startup"}, TEST_MESSAGE_KEY, {PUBLISH, PUBLISH, PUBLISH}, {}, {}); + run_tests({"Brave New World", "Aldous Huxley"}, NON_TRANSACTIONAL_MESSAGES); + producer.publish_messages_to_topic({"Dummy messages", "that should be ignored", "due to offset reset on ConsumeKafka startup"}, TEST_MESSAGE_KEY, {PUBLISH, PUBLISH, PUBLISH}, {}, {}); + run_tests({"Call of the Wild", "Jack London"}, NON_TRANSACTIONAL_MESSAGES); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Key attribute is encoded according to the \"Key Attribute Encoding\" property.", "[ConsumeKafka][Kafka][KeyAttributeEncoding]") { + auto run_tests = [&] (const std::vector<std::string>& messages_on_topic, const optional<std::string>& key_attribute_encoding) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, key_attribute_encoding, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + + run_tests({ "The Odyssey", "Ὅμηρος" }, {}); + run_tests({ "Lolita", "Владимир Владимирович Набоков" }, "utf-8"); + run_tests({ "Crime and Punishment", "Фёдор Михайлович Достоевский" }, "hex"); + run_tests({ "Paradise Lost", "John Milton" }, "hEX"); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Transactional behaviour is supported.", "[ConsumeKafka][Kafka][Transaction]") { + auto run_tests = [&] (const std::vector<std::string>& messages_on_topic, const std::vector<KafkaTestProducer::PublishEvent>& transaction_events, const optional<bool>& honor_transactions) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({ "Pride and Prejudice", "Jane Austen" }, SINGLE_COMMITTED_TRANSACTION, {}); + run_tests({ "Dune", "Frank Herbert" }, TWO_SEPARATE_TRANSACTIONS, {}); + run_tests({ "The Black Sheep", "Honore De Balzac" }, NON_COMMITTED_TRANSACTION, {}); + run_tests({ "Gospel of Thomas" }, CANCELLED_TRANSACTION, {}); + run_tests({ "Operation Dark Heart" }, CANCELLED_TRANSACTION, true); + run_tests({ "Brexit" }, CANCELLED_TRANSACTION, false); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Headers on consumed Kafka messages are extracted into attributes if requested on ConsumeKafka.", "[ConsumeKafka][Kafka][Headers]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const std::vector<std::pair<std::string, std::string>>& expect_header_attributes, + const std::vector<std::pair<std::string, std::string>>& message_headers, + const optional<std::string>& headers_to_add_as_attributes, + const optional<std::string>& duplicate_header_handling) { + single_consumer_with_plain_text_test(true, false, expect_header_attributes, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, message_headers, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, headers_to_add_as_attributes, duplicate_header_handling, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({ "Homeland", "R. A. Salvatore"}, {}, {{{"Contains dark elves"}, {"Yes"}}}, {}, {}); + run_tests({ "Magician", "Raymond E. Feist"}, {{{"Rating"}, {"10/10"}}}, {{{"Rating"}, {"10/10"}}}, {"Rating"}, {}); + run_tests({ "Mistborn", "Brandon Sanderson"}, {{{"Metal"}, {"Copper"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}}, {"Metal"}, KEEP_FIRST); + run_tests({ "Mistborn", "Brandon Sanderson"}, {{{"Metal"}, {"Iron"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}}, {"Metal"}, KEEP_LATEST); + run_tests({ "Mistborn", "Brandon Sanderson"}, {{{"Metal"}, {"Copper, Iron"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}}, {"Metal"}, COMMA_SEPARATED_MERGE); + run_tests({"The Lord of the Rings", "J. R. R. Tolkien"}, {{{"Parts"}, {"First, second, third"}}}, {{{"Parts"}, {"First, second, third"}}}, {"Parts"}, {}); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Messages are separated into multiple flowfiles if the message demarcator is present in the message.", "[ConsumeKafka][Kafka][MessageDemarcator]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const optional<std::string>& message_demarcator) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, message_demarcator, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({"Barbapapa", "Anette Tison and Talus Taylor"}, "a"); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "The maximum poll records allows ConsumeKafka to combine multiple messages into a single flowfile.", "[ConsumeKafka][Kafka][Batching][MaxPollRecords]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const std::vector<KafkaTestProducer::PublishEvent>& transaction_events, + const optional<std::string>& max_poll_records) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, max_poll_records, "2 sec", "60 sec"); // NOLINT + }; + run_tests({"The Count of Monte Cristo", "Alexandre Dumas"}, NON_TRANSACTIONAL_MESSAGES, "2"); + + const std::vector<std::string> content { + "Make const member functions thread safe", + "Understand special member function generation", + "Use std::unique_ptr for exclusive-ownership resource management", + "Use std::shared_ptr for shared-ownership resource management", + "Use std::weak_ptr for std::shared_ptr-like pointers that can dangle", + "Prefer std::make_unique and std::make_shared to direct use of new", + "When using the Pimpl Idiom, define special member functions inthe implementation file", + "Understand std::move and std::forward", + "Distinguish universal references from rvalue references", + "Use std::move on rvalue references, std::forward on universal references", + "Avoid overloading on universal references", + "Familiarize yourself with alternatives to overloading on universal references", + "Understand reference collapsing", + "Assume that move operations are not present, not cheap, and not used", + "Familiarize yourself with perfect forwarding failure cases", + "Avoid default capture modes", + "Use init capture to move objects into closures", + "Use decltype on auto&& parameters to std::forward them", + "Prefer lambdas to std::bind", + "Prefer task-based programming to thread-based" }; + const std::vector<KafkaTestProducer::PublishEvent> transaction_events(content.size(), PUBLISH); + run_tests(content, transaction_events, "5"); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Non-plain text security context throws scheduling exceptions.", "[ConsumeKafka][Kafka][SecurityProtocol]") { + single_consumer_with_plain_text_test(false, false, {}, { "Miyamoto Musashi", "Eiji Yoshikawa" }, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", ConsumeKafka::SECURITY_PROTOCOL_SSL, "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Acceptable values for message header and key attribute encoding are \"UTF-8\" and \"hex\".", "[ConsumeKafka][Kafka][InvalidEncoding]") { + single_consumer_with_plain_text_test(false, false, {}, { "Shogun", "James Clavell" }, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, "UTF-32", {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + single_consumer_with_plain_text_test(false, false, {}, { "Alice's Adventures in Wonderland", "Lewis Carroll" }, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, "UTF-32", {}, {}, "1", "2 sec", "60 sec"); // NOLINT +} + +TEST_CASE_METHOD(ConsumeKafkaContinuousPublishingTest, "ConsumeKafka can spend no more time polling than allowed in the maximum poll time property.", "[ConsumeKafka][Kafka][Batching][MaxPollTime]") { + auto run_tests = [&] ( + const uint64_t msg_periodicity_ms, + const optional<std::string>& max_poll_records, + const optional<std::string>& max_poll_time, + const optional<std::string>& session_timeout) { + single_consumer_with_continuous_message_producing(msg_periodicity_ms, "localhost:9092", "test_group_id", max_poll_records, max_poll_time, session_timeout); + }; + // For some reason, a session time-out of a few seconds does not work at all, 10 seconds seems to be stable + run_tests(300, "20", "3 seconds", "10000 ms"); + // Running multiple tests does not work properly here. For some reason, producing messages + // while a rebalance is triggered causes this error, and a blocked poll when new + // messages are produced: + // Group "test_group_id" heartbeat error response in state up (join state wait-revoke-rebalance_cb, 1 partition(s) assigned): Broker: Group rebalance in progress + // + // I tried adding a wait time for more than "session.timeout.ms" inbetween tests, but it was not sufficiend Review comment: typo: `s/sufficiend/sufficient/` ########## File path: extensions/librdkafka/ConsumeKafka.cpp ########## @@ -0,0 +1,569 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ConsumeKafka.h" + +#include <algorithm> +#include <limits> + +#include "core/PropertyValidation.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/gsl.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog would potentially start +// reporting issues with the processor health otherwise +class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator { + public: + ConsumeKafkaMaxPollTimeValidator(const std::string &name) // NOLINT + : TimePeriodValidator(name) { + } + ~ConsumeKafkaMaxPollTimeValidator() override = default; + + ValidationResult validate(const std::string& subject, const std::string& input) const override { + uint64_t value; + TimeUnit timeUnit; + uint64_t value_as_ms; + return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid( + core::TimePeriodValue::StringToTime(input, value, timeUnit) && + org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, timeUnit, value_as_ms) && + 0 < value_as_ms && value_as_ms <= 4000).build(); + } +}; +} // namespace core +namespace processors { + +constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS; +constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME; + +constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES; +constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS; + +core::Property ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka Brokers") + ->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>.") + ->withDefaultValue("localhost:9092", core::StandardValidators::get().NON_BLANK_VALIDATOR) + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security Protocol") + ->withDescription("This property is currently not supported. Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") + ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT/*, SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT, SECURITY_PROTOCOL_SASL_SSL*/ }) + ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names") + ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name Format") + ->withDescription("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression.") + ->withAllowableValues<std::string>({TOPIC_FORMAT_NAMES, TOPIC_FORMAT_PATTERNS}) + ->withDefaultValue(TOPIC_FORMAT_NAMES) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor Transactions") + ->withDescription( + "Specifies whether or not MiNiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of " + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. " + "If this value is true, MiNiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer " + "must wait for the producer to finish its entire transaction instead of pulling as the messages become available.") + ->withDefaultValue<bool>(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID") + ->withDescription("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") + ->supportsExpressionLanguage(true) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::OffsetReset(core::PropertyBuilder::createProperty("Offset Reset") + ->withDescription("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that " + "data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.") + ->withAllowableValues<std::string>({OFFSET_RESET_EARLIEST, OFFSET_RESET_LATEST, OFFSET_RESET_NONE}) + ->withDefaultValue(OFFSET_RESET_LATEST) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::KeyAttributeEncoding(core::PropertyBuilder::createProperty("Key Attribute Encoding") + ->withDescription("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.") + ->withAllowableValues<std::string>({KEY_ATTR_ENCODING_UTF_8, KEY_ATTR_ENCODING_HEX}) + ->withDefaultValue(KEY_ATTR_ENCODING_UTF_8) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::MessageDemarcator(core::PropertyBuilder::createProperty("Message Demarcator") + ->withDescription("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch " + "for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. " + "This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. ") + ->supportsExpressionLanguage(true) + ->build()); + +core::Property ConsumeKafka::MessageHeaderEncoding(core::PropertyBuilder::createProperty("Message Header Encoding") + ->withDescription("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. This property indicates the Character Encoding " + "to use for deserializing the headers.") + ->withAllowableValues<std::string>({MSG_HEADER_ENCODING_UTF_8, MSG_HEADER_ENCODING_HEX}) + ->withDefaultValue(MSG_HEADER_ENCODING_UTF_8) + ->build()); + +core::Property ConsumeKafka::HeadersToAddAsAttributes(core::PropertyBuilder::createProperty("Headers To Add As Attributes") + ->withDescription("A Regular Expression that is matched against all message headers. Any message header whose name matches the regex will be added to the FlowFile " + "as an Attribute. If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that " + "header is selected by the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a " + "regex like \".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent MiNiFi " + "from bundling the messages together efficiently.") + ->build()); + +core::Property ConsumeKafka::DuplicateHeaderHandling(core::PropertyBuilder::createProperty("Duplicate Header Handling") + ->withDescription("For headers to be added as attributes, this option specifies how to handle cases where multiple headers are present with the same key. " + "For example in case of receiving these two headers: \"Accept: text/html\" and \"Accept: application/xml\" and we want to attach the value of \"Accept\" " + "as a FlowFile attribute:\n" + " - \"Keep First\" attaches: \"Accept -> text/html\"\n" + " - \"Keep Latest\" attaches: \"Accept -> application/xml\"\n" + " - \"Comma-separated Merge\" attaches: \"Accept -> text/html, application/xml\"\n") + ->withAllowableValues<std::string>({MSG_HEADER_KEEP_FIRST, MSG_HEADER_KEEP_LATEST, MSG_HEADER_COMMA_SEPARATED_MERGE}) + ->withDefaultValue(MSG_HEADER_KEEP_LATEST) // Mirroring NiFi behaviour + ->build()); + +core::Property ConsumeKafka::MaxPollRecords(core::PropertyBuilder::createProperty("Max Poll Records") + ->withDescription("Specifies the maximum number of records Kafka should return when polling each time the processor is triggered.") + ->withDefaultValue<unsigned int>(DEFAULT_MAX_POLL_RECORDS) + ->build()); + +core::Property ConsumeKafka::MaxPollTime(core::PropertyBuilder::createProperty("Max Poll Time") + ->withDescription("Specifies the maximum amount of time the consumer can use for polling data from the brokers. " + "Polling is a blocking operation, so the upper limit of this value is specified in 4 seconds.") + ->withDefaultValue(DEFAULT_MAX_POLL_TIME, std::make_shared<core::ConsumeKafkaMaxPollTimeValidator>(std::string("ConsumeKafkaMaxPollTimeValidator"))) + ->isRequired(true) + ->build()); + +core::Property ConsumeKafka::SessionTimeout(core::PropertyBuilder::createProperty("Session Timeout") + ->withDescription("Client group session and failure detection timeout. The consumer sends periodic heartbeats " + "to indicate its liveness to the broker. If no hearts are received by the broker for a group member within " + "the session timeout, the broker will remove the consumer from the group and trigger a rebalance. " + "The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms.") + ->withDefaultValue<core::TimePeriodValue>("60 seconds") + ->build()); + +const core::Relationship ConsumeKafka::Success("success", "Incoming kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message."); + +void ConsumeKafka::initialize() { + setSupportedProperties({ + KafkaBrokers, + SecurityProtocol, + TopicNames, + TopicNameFormat, + HonorTransactions, + GroupID, + OffsetReset, + KeyAttributeEncoding, + MessageDemarcator, + MessageHeaderEncoding, + HeadersToAddAsAttributes, + DuplicateHeaderHandling, + MaxPollRecords, + MaxPollTime, + SessionTimeout + }); + setSupportedRelationships({ + Success, + }); +} + +void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) { + // Required properties + kafka_brokers_ = utils::getRequiredPropertyOrThrow(context, KafkaBrokers.getName()); + security_protocol_ = utils::getRequiredPropertyOrThrow(context, SecurityProtocol.getName()); + topic_names_ = utils::listFromRequiredCommaSeparatedProperty(context, TopicNames.getName()); + topic_name_format_ = utils::getRequiredPropertyOrThrow(context, TopicNameFormat.getName()); + honor_transactions_ = utils::parseBooleanPropertyOrThrow(context, HonorTransactions.getName()); + group_id_ = utils::getRequiredPropertyOrThrow(context, GroupID.getName()); + offset_reset_ = utils::getRequiredPropertyOrThrow(context, OffsetReset.getName()); + key_attribute_encoding_ = utils::getRequiredPropertyOrThrow(context, KeyAttributeEncoding.getName()); + max_poll_time_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, MaxPollTime.getName()); + session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, SessionTimeout.getName()); + + // Optional properties + context->getProperty(MessageDemarcator.getName(), message_demarcator_); + context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_); + context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_); + + headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(context, HeadersToAddAsAttributes.getName()); + max_poll_records_ = gsl::narrow<std::size_t>(utils::getOptionalUintProperty(context, MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS)); + + // For now security protocols are not yet supported + if (!utils::StringUtils::equalsIgnoreCase(SECURITY_PROTOCOL_PLAINTEXT, security_protocol_)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Security protocols are not supported yet."); + } + + if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) && !utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported key attribute encoding: " + key_attribute_encoding_); + } + + if (!utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_UTF_8, message_header_encoding_) && !utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_HEX, message_header_encoding_)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported message header encoding: " + key_attribute_encoding_); + } + + configure_new_connection(context); +} + +namespace { +void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) { + // Cooperative, incremental assignment is not supported in the current librdkafka version + std::shared_ptr<logging::Logger> logger{logging::LoggerFactory<ConsumeKafka>::getLogger()}; + logger->log_debug("Rebalance triggered."); + rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR; + switch (trigger) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + logger->log_debug("assigned"); + if (logger -> should_log(core::logging::LOG_LEVEL::info)) { + utils::print_topics_list(logger, partitions); + } + assign_error = rd_kafka_assign(rk, partitions); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + logger->log_debug("revoked:"); + rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe unneccessary + if (logger -> should_log(core::logging::LOG_LEVEL::info)) { + utils::print_topics_list(logger, partitions); + } + assign_error = rd_kafka_assign(rk, NULL); + break; + + default: + logger->log_debug("failed: %s", rd_kafka_err2str(trigger)); + assign_error = rd_kafka_assign(rk, NULL); + break; + } + logger->log_debug("assign failure: %s", rd_kafka_err2str(assign_error)); +} +} // namespace + +void ConsumeKafka::create_topic_partition_list() { + kf_topic_partition_list_ = { rd_kafka_topic_partition_list_new(topic_names_.size()), utils::rd_kafka_topic_partition_list_deleter() }; + + // On subscriptions any topics prefixed with ^ will be regex matched + if (utils::StringUtils::equalsIgnoreCase(TOPIC_FORMAT_PATTERNS, topic_name_format_)) { + for (const std::string& topic : topic_names_) { + const std::string regex_format = "^" + topic; + rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), regex_format.c_str(), RD_KAFKA_PARTITION_UA); + } + } else { + for (const std::string& topic : topic_names_) { + rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), topic.c_str(), RD_KAFKA_PARTITION_UA); + } + } + + // Subscribe to topic set using balanced consumer groups + // Subscribing from the same process without an inbetween unsubscribe call + // Does not seem to be triggering a rebalance (maybe librdkafka bug?) + // This might happen until the cross-overship between processors and connections are settled + rd_kafka_resp_err_t subscribe_response = rd_kafka_subscribe(consumer_.get(), kf_topic_partition_list_.get()); + if (RD_KAFKA_RESP_ERR_NO_ERROR != subscribe_response) { + logger_->log_error("rd_kafka_subscribe error %d: %s", subscribe_response, rd_kafka_err2str(subscribe_response)); + } +} + +void ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessContext* context) { + using utils::setKafkaConfigurationField; + + const std::vector<std::string> dynamic_prop_keys = context->getDynamicPropertyKeys(); + if (dynamic_prop_keys.empty()) { + return; + } + logger_->log_info("Loading %d extra kafka configuration fields from ConsumeKafka dynamic properties:", dynamic_prop_keys.size()); + for (const std::string& key : dynamic_prop_keys) { + std::string value; + gsl_Expects(context->getDynamicProperty(key, value)); + logger_->log_info("%s: %s", key.c_str(), value.c_str()); + setKafkaConfigurationField(conf_.get(), key, value); + } +} + +void ConsumeKafka::configure_new_connection(const core::ProcessContext* context) { + using utils::setKafkaConfigurationField; + + conf_ = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() }; + if (conf_ == nullptr) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); + } + + // Set rebalance callback for use with coordinated consumer group balancing + // Rebalance handlers are needed for the initial configuration of the consumer + // If they are not set, offset reset is ignored and polling produces messages + // Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb. + rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb); + + // Uncomment this for librdkafka debug logs: + // setKafkaConfigurationField(conf_.get(), "debug", "all"); + + setKafkaConfigurationField(conf_.get(), "bootstrap.servers", kafka_brokers_); + setKafkaConfigurationField(conf_.get(), "auto.offset.reset", "latest"); + setKafkaConfigurationField(conf_.get(), "enable.auto.commit", "false"); + setKafkaConfigurationField(conf_.get(), "enable.auto.offset.store", "false"); + setKafkaConfigurationField(conf_.get(), "isolation.level", honor_transactions_ ? "read_committed" : "read_uncommitted"); + setKafkaConfigurationField(conf_.get(), "group.id", group_id_); + setKafkaConfigurationField(conf_.get(), "session.timeout.ms", std::to_string(session_timeout_milliseconds_.count())); + setKafkaConfigurationField(conf_.get(), "max.poll.interval.ms", "600000"); // Twice the default, arbitrarily chosen + + // This is a librdkafka option, but the communication timeout is also specified in each of the + // relevant API calls. Could be redundant, but it probably does not hurt to set this + setKafkaConfigurationField(conf_.get(), "metadata.request.timeout.ms", std::to_string(METADATA_COMMUNICATIONS_TIMEOUT_MS)); + + extend_config_from_dynamic_properties(context); + + std::array<char, 512U> errstr{}; + consumer_ = { rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter() }; + if (consumer_ == nullptr) { + const std::string error_msg { errstr.begin(), errstr.end() }; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create Kafka consumer %s" + error_msg); + } + + create_topic_partition_list(); + + // Changing the partition list should happen only as part as the initialization of offsets + // a function like `rd_kafka_position()` might have unexpected effects + // for instance when a consumer gets assigned a partition it used to + // consume at an earlier rebalance. + // + // As far as I understand, instead of rd_kafka_position() an rd_kafka_committed() call if preferred here, + // as it properly fetches offsets from the broker + if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_committed(consumer_.get(), kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) { + logger_ -> log_error("Retrieving committed offsets for topics+partitions failed."); + } + + rd_kafka_resp_err_t poll_set_consumer_response = rd_kafka_poll_set_consumer(consumer_.get()); + if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) { + logger_->log_error("rd_kafka_poll_set_consumer error %d: %s", poll_set_consumer_response, rd_kafka_err2str(poll_set_consumer_response)); + } + + // There is no rd_kafka_seek alternative for rd_kafka_topic_partition_list_t, only rd_kafka_topic_t + // rd_kafka_topic_partition_list_set_offset should reset the offsets to the latest (or whatever is set in the config), + // Also, rd_kafka_committed should also fetch and set latest the latest offset + // In reality, neither of them seem to work (not even with calling rd_kafka_position()) + logger_->log_info("Resetting offset manually."); + while (true) { + std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter> + message_wrapper{ rd_kafka_consumer_poll(consumer_.get(), max_poll_time_milliseconds_.count()), utils::rd_kafka_message_deleter() }; + + if (!message_wrapper || RD_KAFKA_RESP_ERR_NO_ERROR != message_wrapper->err) { + break; + } + utils::print_kafka_message(message_wrapper.get(), logger_); + // Commit offsets on broker for the provided list of partitions + logger_->log_info("Committing offset: %" PRId64 ".", message_wrapper->offset); + rd_kafka_commit_message(consumer_.get(), message_wrapper.get(), /* async = */ 0); + } + logger_->log_info("Done resetting offset manually."); +} + +std::string ConsumeKafka::extract_message(const rd_kafka_message_t* rkmessage) const { + if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage->err) { + throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "ConsumeKafka: received error message from broker: " + std::to_string(rkmessage->err) + " " + rd_kafka_err2str(rkmessage->err)); + } + return { reinterpret_cast<char*>(rkmessage->payload), rkmessage->len }; +} + +std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>> ConsumeKafka::poll_kafka_messages() { + std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>> messages; + messages.reserve(max_poll_records_); + const auto start = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::steady_clock::now() - start; + while (messages.size() < max_poll_records_ && elapsed < max_poll_time_milliseconds_) { + logger_-> log_debug("Polling for new messages for %d milliseconds...", max_poll_time_milliseconds_.count()); Review comment: not yet removed ########## File path: extensions/librdkafka/rdkafka_utils.cpp ########## @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <array> + +#include "rdkafka_utils.h" + +#include "Exception.h" +#include "utils/StringUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +void setKafkaConfigurationField(rd_kafka_conf_t* configuration, const std::string& field_name, const std::string& value) { + static std::array<char, 512U> errstr{}; + rd_kafka_conf_res_t result; + result = rd_kafka_conf_set(configuration, field_name.c_str(), value.c_str(), errstr.data(), errstr.size()); + if (RD_KAFKA_CONF_OK != result) { + const std::string error_msg { errstr.data() }; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error: " + error_msg); + } +} + +void print_topics_list(logging::Logger& logger, rd_kafka_topic_partition_list_t* kf_topic_partition_list) { + for (std::size_t i = 0; i < kf_topic_partition_list->cnt; ++i) { + logger.log_debug("kf_topic_partition_list: topic: %s, partition: %d, offset:%lld", Review comment: `offset` is `int64_t`, not `long long int`. The format specifier should be `"%" PRId64`. ########## File path: extensions/librdkafka/tests/ConsumeKafkaTests.cpp ########## @@ -0,0 +1,590 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_MAIN + +#include <algorithm> +#include <memory> +#include <string> +#include <set> + +#include "TestBase.h" + +#include "../ConsumeKafka.h" +#include "../rdkafka_utils.h" +#include "../../standard-processors/processors/ExtractText.h" +#include "utils/file/FileUtils.h" +#include "utils/OptionalUtils.h" +#include "utils/RegexUtils.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +#include "utils/IntegrationTestUtils.h" + +namespace { +using org::apache::nifi::minifi::utils::optional; + +class KafkaTestProducer { + public: + enum class PublishEvent { + PUBLISH, + TRANSACTION_START, + TRANSACTION_COMMIT, + CANCEL + }; + KafkaTestProducer(const std::string& kafka_brokers, const std::string& topic, const bool transactional) : + logger_(logging::LoggerFactory<KafkaTestProducer>::getLogger()) { + using utils::setKafkaConfigurationField; + + std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() }; + + setKafkaConfigurationField(conf.get(), "bootstrap.servers", kafka_brokers); + setKafkaConfigurationField(conf.get(), "compression.codec", "snappy"); + setKafkaConfigurationField(conf.get(), "batch.num.messages", "1"); + + if (transactional) { + setKafkaConfigurationField(conf.get(), "transactional.id", "ConsumeKafkaTest_transaction_id"); + } + + static std::array<char, 512U> errstr{}; + producer_ = { rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), errstr.data(), errstr.size()), utils::rd_kafka_producer_deleter() }; + if (producer_ == nullptr) { + auto error_msg = "Failed to create Kafka producer" + std::string{ errstr.data() }; + throw std::runtime_error(error_msg); + } + + // The last argument is a config here, but it is already owned by the producer. I assume that this would mean an override on the original config if used + topic_ = { rd_kafka_topic_new(producer_.get(), topic.c_str(), nullptr), utils::rd_kafka_topic_deleter() }; + + if (transactional) { + rd_kafka_init_transactions(producer_.get(), TRANSACTIONS_TIMEOUT_MS.count()); + } + } + + // Uses all the headers for every published message + void publish_messages_to_topic( + const std::vector<std::string>& messages_on_topic, const std::string& message_key, std::vector<PublishEvent> events, + const std::vector<std::pair<std::string, std::string>>& message_headers, const optional<std::string>& message_header_encoding) { + auto next_message = messages_on_topic.cbegin(); + for (const PublishEvent event : events) { + switch (event) { + case PublishEvent::PUBLISH: + REQUIRE(messages_on_topic.cend() != next_message); + publish_message(*next_message, message_key, message_headers, message_header_encoding); + std::advance(next_message, 1); + break; + case PublishEvent::TRANSACTION_START: + logger_->log_debug("Starting new transaction..."); + rd_kafka_begin_transaction(producer_.get()); + break; + case PublishEvent::TRANSACTION_COMMIT: + logger_->log_debug("Committing transaction..."); + rd_kafka_commit_transaction(producer_.get(), TRANSACTIONS_TIMEOUT_MS.count()); + break; + case PublishEvent::CANCEL: + logger_->log_debug("Cancelling transaction..."); + rd_kafka_abort_transaction(producer_.get(), TRANSACTIONS_TIMEOUT_MS.count()); + } + } + } + + private: + void publish_message( + const std::string& message, const std::string& message_key, const std::vector<std::pair<std::string, std::string>>& message_headers, const optional<std::string>& message_header_encoding) { + logger_->log_debug("Producing: %s", message.c_str()); + std::unique_ptr<rd_kafka_headers_t, utils::rd_kafka_headers_deleter> headers(rd_kafka_headers_new(message_headers.size()), utils::rd_kafka_headers_deleter()); + if (!headers) { + throw std::runtime_error("Generating message headers failed."); + } + for (const std::pair<std::string, std::string>& message_header : message_headers) { + rd_kafka_header_add(headers.get(), + const_cast<char*>(message_header.first.c_str()), message_header.first.size(), + const_cast<char*>(message_header.second.c_str()), message_header.second.size()); + } + + if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_producev( + producer_.get(), + RD_KAFKA_V_RKT(topic_.get()), + RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_VALUE(const_cast<char*>(&message[0]), message.size()), + RD_KAFKA_V_HEADERS(headers.release()), + RD_KAFKA_V_KEY(message_key.c_str(), message_key.size()), + RD_KAFKA_V_END)) { + logger_->log_error("Producer failure: %d: %s", rd_kafka_last_error(), rd_kafka_err2str(rd_kafka_last_error())); + } + } + + static const std::chrono::milliseconds TRANSACTIONS_TIMEOUT_MS; + + std::unique_ptr<rd_kafka_t, utils::rd_kafka_producer_deleter> producer_; + std::unique_ptr<rd_kafka_topic_t, utils::rd_kafka_topic_deleter> topic_; + + std::shared_ptr<logging::Logger> logger_; +}; + +const std::chrono::milliseconds KafkaTestProducer::TRANSACTIONS_TIMEOUT_MS{ 2000 }; + +class ConsumeKafkaTest { + public: + using Processor = org::apache::nifi::minifi::core::Processor; + using ConsumeKafka = org::apache::nifi::minifi::processors::ConsumeKafka; + using ExtractText = org::apache::nifi::minifi::processors::ExtractText; + + const KafkaTestProducer::PublishEvent PUBLISH = KafkaTestProducer::PublishEvent::PUBLISH; + const KafkaTestProducer::PublishEvent TRANSACTION_START = KafkaTestProducer::PublishEvent::TRANSACTION_START; + const KafkaTestProducer::PublishEvent TRANSACTION_COMMIT = KafkaTestProducer::PublishEvent::TRANSACTION_COMMIT; + const KafkaTestProducer::PublishEvent CANCEL = KafkaTestProducer::PublishEvent::CANCEL; + + const std::vector<KafkaTestProducer::PublishEvent> NON_TRANSACTIONAL_MESSAGES { PUBLISH, PUBLISH }; + const std::vector<KafkaTestProducer::PublishEvent> SINGLE_COMMITTED_TRANSACTION { TRANSACTION_START, PUBLISH, PUBLISH, TRANSACTION_COMMIT }; + const std::vector<KafkaTestProducer::PublishEvent> TWO_SEPARATE_TRANSACTIONS { TRANSACTION_START, PUBLISH, TRANSACTION_COMMIT, TRANSACTION_START, PUBLISH, TRANSACTION_COMMIT }; + const std::vector<KafkaTestProducer::PublishEvent> NON_COMMITTED_TRANSACTION { TRANSACTION_START, PUBLISH, PUBLISH }; + const std::vector<KafkaTestProducer::PublishEvent> CANCELLED_TRANSACTION { TRANSACTION_START, PUBLISH, CANCEL }; + + const std::string KEEP_FIRST = ConsumeKafka::MSG_HEADER_KEEP_FIRST; + const std::string KEEP_LATEST = ConsumeKafka::MSG_HEADER_KEEP_LATEST; + const std::string COMMA_SEPARATED_MERGE = ConsumeKafka::MSG_HEADER_COMMA_SEPARATED_MERGE; + + static const std::string PRODUCER_TOPIC; + static const std::string TEST_MESSAGE_KEY; + + // Relationships + const core::Relationship success {"success", "description"}; + const core::Relationship failure {"failure", "description"}; + + ConsumeKafkaTest() : + logTestController_(LogTestController::getInstance()), + logger_(logging::LoggerFactory<ConsumeKafkaTest>::getLogger()) { + reInitialize(); + } + + virtual ~ConsumeKafkaTest() { + logTestController_.reset(); + } + + protected: + void reInitialize() { + testController_.reset(new TestController()); + plan_ = testController_->createPlan(); + logTestController_.setError<LogTestController>(); + logTestController_.setError<TestPlan>(); + logTestController_.setTrace<ConsumeKafka>(); + logTestController_.setTrace<ConsumeKafkaTest>(); + logTestController_.setTrace<KafkaTestProducer>(); + logTestController_.setDebug<ExtractText>(); + logTestController_.setDebug<core::ProcessContext>(); + } + + void optional_set_property(const std::shared_ptr<core::Processor>& processor, const std::string& property_name, const optional<std::string>& opt_value) { + if (opt_value) { + plan_->setProperty(processor, property_name, opt_value.value()); + } + } + + std::string decode_key(const std::string& key, const optional<std::string>& key_attribute_encoding) { + if (!key_attribute_encoding || utils::StringUtils::equalsIgnoreCase(ConsumeKafka::KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding.value())) { + return key; + } + if (utils::StringUtils::equalsIgnoreCase(ConsumeKafka::ConsumeKafka::KEY_ATTR_ENCODING_HEX, key_attribute_encoding.value())) { + return utils::StringUtils::from_hex(key); + } + throw std::runtime_error("Message Header Encoding does not match any of the presets in the test."); + } + + std::vector<std::string> sort_and_split_messages(const std::vector<std::string>& messages_on_topic, const optional<std::string>& message_demarcator) { + if (message_demarcator) { + std::vector<std::string> sorted_split_messages; + for (const auto& message : messages_on_topic) { + std::vector<std::string> split_message = utils::StringUtils::split(message, message_demarcator.value()); + std::move(split_message.begin(), split_message.end(), std::back_inserter(sorted_split_messages)); + } + std::sort(sorted_split_messages.begin(), sorted_split_messages.end()); + return sorted_split_messages; + } + std::vector<std::string> sorted_messages{ messages_on_topic.cbegin(), messages_on_topic.cend() }; + std::sort(sorted_messages.begin(), sorted_messages.end()); + return sorted_messages; + } + + static const std::chrono::seconds MAX_CONSUMEKAFKA_POLL_TIME_SECONDS; + static const std::string ATTRIBUTE_FOR_CAPTURING_CONTENT; + static const std::string TEST_FILE_NAME_POSTFIX; + + std::unique_ptr<TestController> testController_; + std::shared_ptr<TestPlan> plan_; + LogTestController& logTestController_; + std::shared_ptr<logging::Logger> logger_; +}; + +class ConsumeKafkaPropertiesTest : public ConsumeKafkaTest { + public: + ConsumeKafkaPropertiesTest() : ConsumeKafkaTest() {} + virtual ~ConsumeKafkaPropertiesTest() { + logTestController_.reset(); + } + + void single_consumer_with_plain_text_test( + bool expect_config_valid, + bool expect_fixed_message_order, + const std::vector<std::pair<std::string, std::string>>& expect_header_attributes, + const std::vector<std::string>& messages_on_topic, + const std::vector<KafkaTestProducer::PublishEvent>& transaction_events, + const std::vector<std::pair<std::string, std::string>>& message_headers, + const std::string& kafka_brokers, + const std::string& security_protocol, + const std::string& topic_names, + const optional<std::string>& topic_name_format, + const optional<bool>& honor_transactions, + const optional<std::string>& group_id, + const optional<std::string>& offset_reset, + const optional<std::string>& key_attribute_encoding, + const optional<std::string>& message_demarcator, + const optional<std::string>& message_header_encoding, + const optional<std::string>& headers_to_add_as_attributes, + const optional<std::string>& duplicate_header_handling, + const optional<std::string>& max_poll_records, + const optional<std::string>& max_poll_time, + const optional<std::string>& session_timeout) { + reInitialize(); + + // Consumer chain + std::shared_ptr<core::Processor> consume_kafka = plan_->addProcessor("ConsumeKafka", "consume_kafka", {success}, false); + std::shared_ptr<core::Processor> extract_text = plan_->addProcessor("ExtractText", "extract_text", {success}, false); + + // Set up connections + plan_->addConnection(consume_kafka, success, extract_text); + extract_text->setAutoTerminatedRelationships({success}); + + const auto bool_to_string = [] (const bool b) -> std::string { return b ? "true" : "false"; }; + + plan_->setProperty(consume_kafka, ConsumeKafka::KafkaBrokers.getName(), kafka_brokers); + plan_->setProperty(consume_kafka, ConsumeKafka::SecurityProtocol.getName(), security_protocol); + plan_->setProperty(consume_kafka, ConsumeKafka::TopicNames.getName(), topic_names); + + optional_set_property(consume_kafka, ConsumeKafka::TopicNameFormat.getName(), topic_name_format); + optional_set_property(consume_kafka, ConsumeKafka::HonorTransactions.getName(), honor_transactions | utils::map(bool_to_string)); + optional_set_property(consume_kafka, ConsumeKafka::GroupID.getName(), group_id); + optional_set_property(consume_kafka, ConsumeKafka::OffsetReset.getName(), offset_reset); + optional_set_property(consume_kafka, ConsumeKafka::KeyAttributeEncoding.getName(), key_attribute_encoding); + optional_set_property(consume_kafka, ConsumeKafka::MessageDemarcator.getName(), message_demarcator); + optional_set_property(consume_kafka, ConsumeKafka::MessageHeaderEncoding.getName(), message_header_encoding); + optional_set_property(consume_kafka, ConsumeKafka::HeadersToAddAsAttributes.getName(), headers_to_add_as_attributes); + optional_set_property(consume_kafka, ConsumeKafka::DuplicateHeaderHandling.getName(), duplicate_header_handling); + optional_set_property(consume_kafka, ConsumeKafka::MaxPollRecords.getName(), max_poll_records); + optional_set_property(consume_kafka, ConsumeKafka::MaxPollTime.getName(), max_poll_time); + optional_set_property(consume_kafka, ConsumeKafka::SessionTimeout.getName(), session_timeout); + + plan_->setProperty(extract_text, ExtractText::Attribute.getName(), ATTRIBUTE_FOR_CAPTURING_CONTENT); + + if (!expect_config_valid) { + REQUIRE_THROWS(plan_->scheduleProcessor(consume_kafka)); + return; + } else { + plan_->scheduleProcessors(); + } + + std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_; + std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_; + + const bool is_transactional = std::count(transaction_events.cbegin(), transaction_events.cend(), KafkaTestProducer::PublishEvent::TRANSACTION_START); + const bool transactions_committed = transaction_events.back() == KafkaTestProducer::PublishEvent::TRANSACTION_COMMIT; + + KafkaTestProducer producer(kafka_brokers, PRODUCER_TOPIC, is_transactional); + producer.publish_messages_to_topic(messages_on_topic, TEST_MESSAGE_KEY, transaction_events, message_headers, message_header_encoding); + + + std::vector<std::shared_ptr<core::FlowFile>> flow_files_produced; + for (std::size_t num_expected_messages_processed = 0; num_expected_messages_processed < messages_on_topic.size(); num_expected_messages_processed += std::stoi(max_poll_records.value_or("1"))) { + plan_->increment_location(); + if ((honor_transactions && false == honor_transactions.value()) || (is_transactional && !transactions_committed)) { + INFO("Non-committed messages received."); + REQUIRE(false == plan_->runCurrentProcessorUntilFlowfileIsProduced(MAX_CONSUMEKAFKA_POLL_TIME_SECONDS)); + return; + } + { + SCOPED_INFO("ConsumeKafka timed out when waiting to receive the message published to the kafka broker."); + REQUIRE(plan_->runCurrentProcessorUntilFlowfileIsProduced(MAX_CONSUMEKAFKA_POLL_TIME_SECONDS)); + } + std::size_t num_flow_files_produced = plan_->getNumFlowFileProducedByCurrentProcessor(); + plan_->increment_location(); + for (std::size_t times_extract_text_run = 0; times_extract_text_run < num_flow_files_produced; ++times_extract_text_run) { + plan_->runCurrentProcessor(); // ExtractText + std::shared_ptr<core::FlowFile> flow_file = plan_->getFlowFileProducedByCurrentProcessor(); + for (const auto& exp_header : expect_header_attributes) { + SCOPED_INFO("ConsumeKafka did not produce the expected flowfile attribute from message header: " << exp_header.first << "."); + const auto header_attr_opt = flow_file->getAttribute(exp_header.first); + REQUIRE(header_attr_opt); + REQUIRE(exp_header.second == header_attr_opt.value()); + } + { + SCOPED_INFO("Message key is missing or incorrect (potential encoding mismatch)."); + REQUIRE(TEST_MESSAGE_KEY == decode_key(flow_file->getAttribute(ConsumeKafka::KAFKA_MESSAGE_KEY_ATTR).value(), key_attribute_encoding)); + REQUIRE("1" == flow_file->getAttribute(ConsumeKafka::KAFKA_COUNT_ATTR).value()); + REQUIRE(flow_file->getAttribute(ConsumeKafka::KAFKA_OFFSET_ATTR)); + REQUIRE(flow_file->getAttribute(ConsumeKafka::KAFKA_PARTITION_ATTR)); + REQUIRE(PRODUCER_TOPIC == flow_file->getAttribute(ConsumeKafka::KAFKA_TOPIC_ATTR).value()); + } + flow_files_produced.emplace_back(std::move(flow_file)); + } + plan_->reset_location(); + } + + const auto contentOrderOfFlowFile = [&] (const std::shared_ptr<core::FlowFile>& lhs, const std::shared_ptr<core::FlowFile>& rhs) { + return lhs->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() < rhs->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value(); + }; + { + SCOPED_INFO("The flowfiles generated by ConsumeKafka are invalid (probably nullptr)."); + REQUIRE_NOTHROW(std::sort(flow_files_produced.begin(), flow_files_produced.end(), contentOrderOfFlowFile)); + } + std::vector<std::string> sorted_split_messages = sort_and_split_messages(messages_on_topic, message_demarcator); + const auto flow_file_content_matches_message = [&] (const std::shared_ptr<core::FlowFile>& flowfile, const std::string message) { + return flowfile->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() == message; + }; + + logger_->log_debug("************"); + std::string expected = "Expected: "; + for (int i = 0; i < sorted_split_messages.size(); ++i) { + expected += sorted_split_messages[i] + ", "; + } + std::string actual = " Actual: "; + for (int i = 0; i < sorted_split_messages.size(); ++i) { + actual += flow_files_produced[i]->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() + ", "; + } + logger_->log_debug("%s", expected.c_str()); + logger_->log_debug("%s", actual.c_str()); + logger_->log_debug("************"); + + INFO("The messages received by ConsumeKafka do not match those published"); + REQUIRE(std::equal(flow_files_produced.begin(), flow_files_produced.end(), sorted_split_messages.begin(), flow_file_content_matches_message)); + } +}; + +class ConsumeKafkaContinuousPublishingTest : public ConsumeKafkaTest { + public: + ConsumeKafkaContinuousPublishingTest() : ConsumeKafkaTest() {} + virtual ~ConsumeKafkaContinuousPublishingTest() { + logTestController_.reset(); + } + + void single_consumer_with_continuous_message_producing( + const uint64_t msg_periodicity_ms, + const std::string& kafka_brokers, + const optional<std::string>& group_id, + const optional<std::string>& max_poll_records, + const optional<std::string>& max_poll_time, + const optional<std::string>& session_timeout) { + reInitialize(); + + std::shared_ptr<core::Processor> consume_kafka = plan_->addProcessor("ConsumeKafka", "consume_kafka", {success}, false); + + plan_->setProperty(consume_kafka, "allow.auto.create.topics", "true", true); // Seems like the topic tests work without this + + plan_->setProperty(consume_kafka, ConsumeKafka::KafkaBrokers.getName(), kafka_brokers); + plan_->setProperty(consume_kafka, ConsumeKafka::TopicNames.getName(), PRODUCER_TOPIC); + optional_set_property(consume_kafka, ConsumeKafka::GroupID.getName(), group_id); + + optional_set_property(consume_kafka, ConsumeKafka::MaxPollRecords.getName(), max_poll_records); + optional_set_property(consume_kafka, ConsumeKafka::MaxPollTime.getName(), max_poll_time); + optional_set_property(consume_kafka, ConsumeKafka::SessionTimeout.getName(), session_timeout); + + consume_kafka->setAutoTerminatedRelationships({success}); + + KafkaTestProducer producer("localhost:9092", PRODUCER_TOPIC, /* transactional = */ false); + + std::atomic_bool producer_loop_stop{ false }; + const auto producer_loop = [&] { + std::size_t num_messages_sent = 0; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + while (!producer_loop_stop) { + producer.publish_messages_to_topic({ "Message after " + std::to_string(msg_periodicity_ms * ++num_messages_sent) + " ms"}, TEST_MESSAGE_KEY, { PUBLISH }, {}, {}); + std::this_thread::sleep_for(std::chrono::milliseconds(msg_periodicity_ms)); + } + }; + + plan_->scheduleProcessors(); + + const auto get_time_property_ms = [] (const std::string& property_string) { + int64_t value; + org::apache::nifi::minifi::core::TimeUnit unit; + REQUIRE(org::apache::nifi::minifi::core::Property::StringToTime(property_string, value, unit)); + int64_t value_as_ms; + REQUIRE(org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, unit, value_as_ms)); + return value_as_ms; + }; + + std::thread producer_thread(producer_loop); + CHECK_NOTHROW(plan_->runNextProcessor()); + producer_loop_stop = true; + producer_thread.join(); + + std::size_t num_flow_files_produced = plan_->getNumFlowFileProducedByCurrentProcessor(); + + const uint64_t max_poll_time_ms = get_time_property_ms(max_poll_time.value_or(ConsumeKafka::DEFAULT_MAX_POLL_TIME)); + const uint64_t max_poll_records_value = max_poll_records ? std::stoi(max_poll_records.value()) : ConsumeKafka::DEFAULT_MAX_POLL_RECORDS; + const uint64_t exp_lower_bound = std::min(max_poll_time_ms / msg_periodicity_ms - 2, max_poll_records_value); + const uint64_t exp_upper_bound = std::min(max_poll_time_ms / msg_periodicity_ms + 2, max_poll_records_value); + logger_->log_debug("Max poll time: %d, Max poll records: %d, Exp. flowfiles produced: (min: %d, max: %d), actual: %d", + max_poll_time_ms, max_poll_records_value, exp_lower_bound, exp_upper_bound, num_flow_files_produced); + + REQUIRE(exp_lower_bound <= num_flow_files_produced); + REQUIRE(num_flow_files_produced <= exp_upper_bound); + } +}; + +const std::string ConsumeKafkaTest::TEST_FILE_NAME_POSTFIX{ "target_kafka_message.txt" }; +const std::string ConsumeKafkaTest::TEST_MESSAGE_KEY{ "consume_kafka_test_key" }; +const std::string ConsumeKafkaTest::PRODUCER_TOPIC{ "ConsumeKafkaTest" }; +const std::string ConsumeKafkaTest::ATTRIBUTE_FOR_CAPTURING_CONTENT{ "flowfile_content" }; +const std::chrono::seconds ConsumeKafkaTest::MAX_CONSUMEKAFKA_POLL_TIME_SECONDS{ 5 }; + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "ConsumeKafka parses and uses kafka topics.", "[ConsumeKafka][Kafka][Topic]") { + auto run_tests = [&] (const std::vector<std::string>& messages_on_topic, const std::string& topic_names, const optional<std::string>& topic_name_format) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", topic_names, topic_name_format, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({ "Ulysses", "James Joyce" }, "ConsumeKafkaTest", {}); + run_tests({ "The Great Gatsby", "F. Scott Fitzgerald" }, "ConsumeKafkaTest", ConsumeKafka::TOPIC_FORMAT_NAMES); + run_tests({ "War and Peace", "Lev Tolstoy" }, "a,b,c,ConsumeKafkaTest,d", ConsumeKafka::TOPIC_FORMAT_NAMES); + run_tests({ "Nineteen Eighty Four", "George Orwell" }, "ConsumeKafkaTest", ConsumeKafka::TOPIC_FORMAT_PATTERNS); + run_tests({ "Hamlet", "William Shakespeare" }, "Cons[emu]*KafkaTest", ConsumeKafka::TOPIC_FORMAT_PATTERNS); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Offsets are reset to the latest when a consumer starts with non-processed messages.", "[ConsumeKafka][Kafka][OffsetReset]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const std::vector<KafkaTestProducer::PublishEvent>& transaction_events) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + KafkaTestProducer producer("localhost:9092", PRODUCER_TOPIC, false); + producer.publish_messages_to_topic({"Dummy messages", "that should be ignored", "due to offset reset on ConsumeKafka startup"}, TEST_MESSAGE_KEY, {PUBLISH, PUBLISH, PUBLISH}, {}, {}); + run_tests({"Brave New World", "Aldous Huxley"}, NON_TRANSACTIONAL_MESSAGES); + producer.publish_messages_to_topic({"Dummy messages", "that should be ignored", "due to offset reset on ConsumeKafka startup"}, TEST_MESSAGE_KEY, {PUBLISH, PUBLISH, PUBLISH}, {}, {}); + run_tests({"Call of the Wild", "Jack London"}, NON_TRANSACTIONAL_MESSAGES); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Key attribute is encoded according to the \"Key Attribute Encoding\" property.", "[ConsumeKafka][Kafka][KeyAttributeEncoding]") { + auto run_tests = [&] (const std::vector<std::string>& messages_on_topic, const optional<std::string>& key_attribute_encoding) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, key_attribute_encoding, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + + run_tests({ "The Odyssey", "Ὅμηρος" }, {}); + run_tests({ "Lolita", "Владимир Владимирович Набоков" }, "utf-8"); + run_tests({ "Crime and Punishment", "Фёдор Михайлович Достоевский" }, "hex"); + run_tests({ "Paradise Lost", "John Milton" }, "hEX"); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Transactional behaviour is supported.", "[ConsumeKafka][Kafka][Transaction]") { + auto run_tests = [&] (const std::vector<std::string>& messages_on_topic, const std::vector<KafkaTestProducer::PublishEvent>& transaction_events, const optional<bool>& honor_transactions) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({ "Pride and Prejudice", "Jane Austen" }, SINGLE_COMMITTED_TRANSACTION, {}); + run_tests({ "Dune", "Frank Herbert" }, TWO_SEPARATE_TRANSACTIONS, {}); + run_tests({ "The Black Sheep", "Honore De Balzac" }, NON_COMMITTED_TRANSACTION, {}); + run_tests({ "Gospel of Thomas" }, CANCELLED_TRANSACTION, {}); + run_tests({ "Operation Dark Heart" }, CANCELLED_TRANSACTION, true); + run_tests({ "Brexit" }, CANCELLED_TRANSACTION, false); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Headers on consumed Kafka messages are extracted into attributes if requested on ConsumeKafka.", "[ConsumeKafka][Kafka][Headers]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const std::vector<std::pair<std::string, std::string>>& expect_header_attributes, + const std::vector<std::pair<std::string, std::string>>& message_headers, + const optional<std::string>& headers_to_add_as_attributes, + const optional<std::string>& duplicate_header_handling) { + single_consumer_with_plain_text_test(true, false, expect_header_attributes, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, message_headers, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, headers_to_add_as_attributes, duplicate_header_handling, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({ "Homeland", "R. A. Salvatore"}, {}, {{{"Contains dark elves"}, {"Yes"}}}, {}, {}); + run_tests({ "Magician", "Raymond E. Feist"}, {{{"Rating"}, {"10/10"}}}, {{{"Rating"}, {"10/10"}}}, {"Rating"}, {}); + run_tests({ "Mistborn", "Brandon Sanderson"}, {{{"Metal"}, {"Copper"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}}, {"Metal"}, KEEP_FIRST); + run_tests({ "Mistborn", "Brandon Sanderson"}, {{{"Metal"}, {"Iron"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}}, {"Metal"}, KEEP_LATEST); + run_tests({ "Mistborn", "Brandon Sanderson"}, {{{"Metal"}, {"Copper, Iron"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}}, {"Metal"}, COMMA_SEPARATED_MERGE); + run_tests({"The Lord of the Rings", "J. R. R. Tolkien"}, {{{"Parts"}, {"First, second, third"}}}, {{{"Parts"}, {"First, second, third"}}}, {"Parts"}, {}); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Messages are separated into multiple flowfiles if the message demarcator is present in the message.", "[ConsumeKafka][Kafka][MessageDemarcator]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const optional<std::string>& message_demarcator) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, message_demarcator, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT + }; + run_tests({"Barbapapa", "Anette Tison and Talus Taylor"}, "a"); +} + +TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "The maximum poll records allows ConsumeKafka to combine multiple messages into a single flowfile.", "[ConsumeKafka][Kafka][Batching][MaxPollRecords]") { + auto run_tests = [&] ( + const std::vector<std::string>& messages_on_topic, + const std::vector<KafkaTestProducer::PublishEvent>& transaction_events, + const optional<std::string>& max_poll_records) { + single_consumer_with_plain_text_test(true, false, {}, messages_on_topic, transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, {}, {}, {}, {}, max_poll_records, "2 sec", "60 sec"); // NOLINT + }; + run_tests({"The Count of Monte Cristo", "Alexandre Dumas"}, NON_TRANSACTIONAL_MESSAGES, "2"); + + const std::vector<std::string> content { + "Make const member functions thread safe", + "Understand special member function generation", + "Use std::unique_ptr for exclusive-ownership resource management", + "Use std::shared_ptr for shared-ownership resource management", + "Use std::weak_ptr for std::shared_ptr-like pointers that can dangle", + "Prefer std::make_unique and std::make_shared to direct use of new", + "When using the Pimpl Idiom, define special member functions inthe implementation file", + "Understand std::move and std::forward", + "Distinguish universal references from rvalue references", + "Use std::move on rvalue references, std::forward on universal references", + "Avoid overloading on universal references", + "Familiarize yourself with alternatives to overloading on universal references", + "Understand reference collapsing", + "Assume that move operations are not present, not cheap, and not used", + "Familiarize yourself with perfect forwarding failure cases", + "Avoid default capture modes", + "Use init capture to move objects into closures", + "Use decltype on auto&& parameters to std::forward them", + "Prefer lambdas to std::bind", + "Prefer task-based programming to thread-based" }; Review comment: Great guidelines, what's the source? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
