This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6cfbf79 MINIFICPP-1373 - Implement ConsumeKafka
6cfbf79 is described below
commit 6cfbf797a6c060a4781e0185a46c7cd9c4229384
Author: Adam Hunyadi <[email protected]>
AuthorDate: Fri Feb 26 10:56:02 2021 +0100
MINIFICPP-1373 - Implement ConsumeKafka
Signed-off-by: Arpad Boda <[email protected]>
This closes #940
---
CMakeLists.txt | 2 +-
PROCESSORS.md | 33 +-
extensions/librdkafka/ConsumeKafka.cpp | 570 ++++++++++++++++++++
extensions/librdkafka/ConsumeKafka.h | 192 +++++++
.../{tests => docker_tests}/CMakeLists.txt | 2 +-
extensions/librdkafka/rdkafka_utils.cpp | 121 +++++
extensions/librdkafka/rdkafka_utils.h | 105 ++++
extensions/librdkafka/tests/CMakeLists.txt | 6 +-
extensions/librdkafka/tests/ConsumeKafkaTests.cpp | 590 +++++++++++++++++++++
libminifi/include/Connection.h | 4 +-
libminifi/include/core/FlowFile.h | 5 +-
libminifi/include/utils/GeneralUtils.h | 12 +
libminifi/include/utils/ProcessorConfigUtils.h | 81 +++
libminifi/include/utils/StringUtils.h | 8 +-
libminifi/src/Connection.cpp | 2 +-
libminifi/src/core/FlowFile.cpp | 17 +-
libminifi/src/utils/StringUtils.cpp | 17 +-
libminifi/test/TestBase.cpp | 169 ++++--
libminifi/test/TestBase.h | 51 +-
libminifi/test/unit/StringUtilsTests.cpp | 10 +
20 files changed, 1908 insertions(+), 89 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ff0024f..e43b850 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -444,7 +444,7 @@ option(ENABLE_LIBRDKAFKA "Enables the librdkafka
extension." OFF)
if (ENABLE_ALL OR ENABLE_LIBRDKAFKA)
include(BundledLibRdKafka)
use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR})
- createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables
librdkafka functionality including PublishKafka" "extensions/librdkafka"
"extensions/librdkafka/tests")
+ createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables
librdkafka functionality including PublishKafka" "extensions/librdkafka"
"extensions/librdkafka/tests" "extensions/librdkafka/docker-tests")
endif()
## Scripting extensions
diff --git a/PROCESSORS.md b/PROCESSORS.md
index c2cd82a..0ca385d 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -9,6 +9,7 @@
- [CapturePacket](#capturepacket)
- [CaptureRTSPFrame](#capturertspframe)
- [CompressContent](#compresscontent)
+- [ConsumeKafka](#consumekafka)
- [ConsumeMQTT](#consumemqtt)
- [DeleteS3Object](#deletes3object)
- [ExecuteProcess](#executeprocess)
@@ -180,6 +181,37 @@ In the list below, the names of required properties appear
in bold. Any other pr
|failure|FlowFiles will be transferred to the failure relationship if they
fail to compress/decompress|
|success|FlowFiles will be transferred to the success relationship after
successfully being compressed or decompressed|
+## ConsumeKafka
+
+### Description
+
+Consumes messages from Apache Kafka and transform them into MiNiFi FlowFiles.
The application should make sure that the processor is triggered at regular
intervals, even if no messages are expected, to serve any queued callbacks
waiting to be called. Rebalancing can also only happen on trigger.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|Duplicate Header Handling|Keep Latest|Comma-separated Merge<br>Keep
First<br>Keep Latest<br>|For headers to be added as attributes, this option
specifies how to handle cases where multiple headers are present with the same
key. For example in case of receiving these two headers: "Accept: text/html"
and "Accept: application/xml" and we want to attach the value of "Accept" as a
FlowFile attribute:<br/> - "Keep First" attaches: "Accept -> text/html"<br/> -
"Keep Latest" attaches: "Accept - [...]
+|**Group ID**|||A Group ID is used to identify consumers that are within the
same consumer group. Corresponds to Kafka's 'group.id' property.<br/>**Supports
Expression Language: true**|
+|Headers To Add As Attributes|||A comma separated list to match against all
message headers. Any message header whose name matches an item from the list
will be added to the FlowFile as an Attribute. If not specified, no Header
values will be added as FlowFile attributes. The behaviour on when multiple
headers of the same name are present is set using the DuplicateHeaderHandling
attribute.|
+|**Honor Transactions**|true||Specifies whether or not MiNiFi should honor
transactional guarantees when communicating with Kafka. If false, the Processor
will use an "isolation level" of read_uncomitted. This means that messages will
be received as soon as they are written to Kafka but will be pulled, even if
the producer cancels the transactions. If this value is true, MiNiFi will not
receive any messages for which the producer's transaction was canceled, but
this can result in some la [...]
+|**Kafka Brokers**|localhost:9092||A comma-separated list of known Kafka
Brokers in the format <host>:<port>.<br/>**Supports Expression Language: true**|
+|**Key Attribute Encoding**|UTF-8|Hex<br>UTF-8<br>|FlowFiles that are emitted
have an attribute named 'kafka.key'. This property dictates how the value of
the attribute should be encoded.|
+|Max Poll Records|10000||Specifies the maximum number of records Kafka should
return when polling each time the processor is triggered.|
+|**Max Poll Time**|4 seconds||Specifies the maximum amount of time the
consumer can use for polling data from the brokers. Polling is a blocking
operation, so the upper limit of this value is specified in 4 seconds.|
+|Message Demarcator|||Since KafkaConsumer receives messages in batches, you
have an option to output FlowFiles which contains all Kafka messages in a
single batch for a given topic and partition and this property allows you to
provide a string (interpreted as UTF-8) to use for demarcating apart multiple
Kafka messages. This is an optional property and if not provided each Kafka
message received will result in a single FlowFile which time it is triggered.
<br/>**Supports Expression Langua [...]
+|Message Header Encoding|UTF-8|Hex<br>UTF-8<br>|Any message header that is
found on a Kafka message will be added to the outbound FlowFile as an
attribute. This property indicates the Character Encoding to use for
deserializing the headers.|
+|**Offset Reset**|latest|earliest<br>latest<br>none<br>|Allows you to manage
the condition when there is no initial offset in Kafka or if the current offset
does not exist any more on the server (e.g. because that data has been
deleted). Corresponds to Kafka's 'auto.offset.reset' property.|
+|**Security Protocol**|PLAINTEXT|PLAINTEXT<br>|This property is currently not
supported. Protocol used to communicate with brokers. Corresponds to Kafka's
'security.protocol' property.|
+|Session Timeout|60 seconds||Client group session and failure detection
timeout. The consumer sends periodic heartbeats to indicate its liveness to the
broker. If no hearts are received by the broker for a group member within the
session timeout, the broker will remove the consumer from the group and trigger
a rebalance. The allowed range is configured with the broker configuration
properties group.min.session.timeout.ms and group.max.session.timeout.ms.|
+|**Topic Name Format**|Names|Names<br>Patterns<br>|Specifies whether the
Topic(s) provided are a comma separated list of names or a single regular
expression.|
+|**Topic Names**|||The name of the Kafka Topic(s) to pull from. Multiple topic
names are supported as a comma separated list.<br/>**Supports Expression
Language: true**|
+### Properties
+
+| Name | Description |
+| - | - |
+|success|Incoming kafka messages as flowfiles. Depending on the demarcation
strategy, this can be one or multiple flowfiles per message.|
## ConsumeMQTT
@@ -922,7 +954,6 @@ Supports Expression Language: true (will be evaluated using
flow file attributes
|failure|Any FlowFile that cannot be sent to Kafka will be routed to this
Relationship|
|success|Any FlowFile that is successfully sent to Kafka will be routed to
this Relationship|
-
## PublishMQTT
### Description
diff --git a/extensions/librdkafka/ConsumeKafka.cpp
b/extensions/librdkafka/ConsumeKafka.cpp
new file mode 100644
index 0000000..290466c
--- /dev/null
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeKafka.h"
+
+#include <algorithm>
+#include <limits>
+
+#include "core/PropertyValidation.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog
would potentially start
+// reporting issues with the processor health otherwise
+class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
+ public:
+ ConsumeKafkaMaxPollTimeValidator(const std::string &name) // NOLINT
+ : TimePeriodValidator(name) {
+ }
+ ~ConsumeKafkaMaxPollTimeValidator() override = default;
+
+ ValidationResult validate(const std::string& subject, const std::string&
input) const override {
+ uint64_t value;
+ TimeUnit timeUnit;
+ uint64_t value_as_ms;
+ return
ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
+ core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
+ org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value,
timeUnit, value_as_ms) &&
+ 0 < value_as_ms && value_as_ms <= 4000).build();
+ }
+};
+} // namespace core
+namespace processors {
+
+constexpr const std::size_t ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+constexpr char const* ConsumeKafka::DEFAULT_MAX_POLL_TIME;
+
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_NAMES;
+constexpr char const* ConsumeKafka::TOPIC_FORMAT_PATTERNS;
+
+core::Property
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka
Brokers")
+ ->withDescription("A comma-separated list of known Kafka Brokers in the
format <host>:<port>.")
+ ->withDefaultValue("localhost:9092",
core::StandardValidators::get().NON_BLANK_VALIDATOR)
+ ->supportsExpressionLanguage(true)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security
Protocol")
+ ->withDescription("This property is currently not supported. Protocol used
to communicate with brokers. Corresponds to Kafka's 'security.protocol'
property.")
+ ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT/*,
SECURITY_PROTOCOL_SSL, SECURITY_PROTOCOL_SASL_PLAINTEXT,
SECURITY_PROTOCOL_SASL_SSL*/ })
+ ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
+ ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple
topic names are supported as a comma separated list.")
+ ->supportsExpressionLanguage(true)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name
Format")
+ ->withDescription("Specifies whether the Topic(s) provided are a comma
separated list of names or a single regular expression.")
+ ->withAllowableValues<std::string>({TOPIC_FORMAT_NAMES,
TOPIC_FORMAT_PATTERNS})
+ ->withDefaultValue(TOPIC_FORMAT_NAMES)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor
Transactions")
+ ->withDescription(
+ "Specifies whether or not MiNiFi should honor transactional guarantees
when communicating with Kafka. If false, the Processor will use an \"isolation
level\" of "
+ "read_uncomitted. This means that messages will be received as soon as
they are written to Kafka but will be pulled, even if the producer cancels the
transactions. "
+ "If this value is true, MiNiFi will not receive any messages for which
the producer's transaction was canceled, but this can result in some latency
since the consumer "
+ "must wait for the producer to finish its entire transaction instead of
pulling as the messages become available.")
+ ->withDefaultValue<bool>(true)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID")
+ ->withDescription("A Group ID is used to identify consumers that are within
the same consumer group. Corresponds to Kafka's 'group.id' property.")
+ ->supportsExpressionLanguage(true)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::OffsetReset(core::PropertyBuilder::createProperty("Offset Reset")
+ ->withDescription("Allows you to manage the condition when there is no
initial offset in Kafka or if the current offset does not exist any more on the
server (e.g. because that "
+ "data has been deleted). Corresponds to Kafka's 'auto.offset.reset'
property.")
+ ->withAllowableValues<std::string>({OFFSET_RESET_EARLIEST,
OFFSET_RESET_LATEST, OFFSET_RESET_NONE})
+ ->withDefaultValue(OFFSET_RESET_LATEST)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::KeyAttributeEncoding(core::PropertyBuilder::createProperty("Key
Attribute Encoding")
+ ->withDescription("FlowFiles that are emitted have an attribute named
'kafka.key'. This property dictates how the value of the attribute should be
encoded.")
+ ->withAllowableValues<std::string>({KEY_ATTR_ENCODING_UTF_8,
KEY_ATTR_ENCODING_HEX})
+ ->withDefaultValue(KEY_ATTR_ENCODING_UTF_8)
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::MessageDemarcator(core::PropertyBuilder::createProperty("Message
Demarcator")
+ ->withDescription("Since KafkaConsumer receives messages in batches, you
have an option to output FlowFiles which contains all Kafka messages in a
single batch "
+ "for a given topic and partition and this property allows you to provide
a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka
messages. "
+ "This is an optional property and if not provided each Kafka message
received will result in a single FlowFile which time it is triggered. ")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+core::Property
ConsumeKafka::MessageHeaderEncoding(core::PropertyBuilder::createProperty("Message
Header Encoding")
+ ->withDescription("Any message header that is found on a Kafka message will
be added to the outbound FlowFile as an attribute. This property indicates the
Character Encoding "
+ "to use for deserializing the headers.")
+ ->withAllowableValues<std::string>({MSG_HEADER_ENCODING_UTF_8,
MSG_HEADER_ENCODING_HEX})
+ ->withDefaultValue(MSG_HEADER_ENCODING_UTF_8)
+ ->build());
+
+core::Property
ConsumeKafka::HeadersToAddAsAttributes(core::PropertyBuilder::createProperty("Headers
To Add As Attributes")
+ ->withDescription("A comma separated list to match against all message
headers. Any message header whose name matches an item from the list will be
added to the FlowFile "
+ "as an Attribute. If not specified, no Header values will be added as
FlowFile attributes. The behaviour on when multiple headers of the same name
are present is set using "
+ "the DuplicateHeaderHandling attribute.")
+ ->build());
+
+core::Property
ConsumeKafka::DuplicateHeaderHandling(core::PropertyBuilder::createProperty("Duplicate
Header Handling")
+ ->withDescription("For headers to be added as attributes, this option
specifies how to handle cases where multiple headers are present with the same
key. "
+ "For example in case of receiving these two headers: \"Accept:
text/html\" and \"Accept: application/xml\" and we want to attach the value of
\"Accept\" "
+ "as a FlowFile attribute:\n"
+ " - \"Keep First\" attaches: \"Accept -> text/html\"\n"
+ " - \"Keep Latest\" attaches: \"Accept -> application/xml\"\n"
+ " - \"Comma-separated Merge\" attaches: \"Accept -> text/html,
application/xml\"\n")
+ ->withAllowableValues<std::string>({MSG_HEADER_KEEP_FIRST,
MSG_HEADER_KEEP_LATEST, MSG_HEADER_COMMA_SEPARATED_MERGE})
+ ->withDefaultValue(MSG_HEADER_KEEP_LATEST) // Mirroring NiFi behaviour
+ ->build());
+
+core::Property
ConsumeKafka::MaxPollRecords(core::PropertyBuilder::createProperty("Max Poll
Records")
+ ->withDescription("Specifies the maximum number of records Kafka should
return when polling each time the processor is triggered.")
+ ->withDefaultValue<unsigned int>(DEFAULT_MAX_POLL_RECORDS)
+ ->build());
+
+core::Property
ConsumeKafka::MaxPollTime(core::PropertyBuilder::createProperty("Max Poll Time")
+ ->withDescription("Specifies the maximum amount of time the consumer can use
for polling data from the brokers. "
+ "Polling is a blocking operation, so the upper limit of this value is
specified in 4 seconds.")
+ ->withDefaultValue(DEFAULT_MAX_POLL_TIME,
std::make_shared<core::ConsumeKafkaMaxPollTimeValidator>(std::string("ConsumeKafkaMaxPollTimeValidator")))
+ ->isRequired(true)
+ ->build());
+
+core::Property
ConsumeKafka::SessionTimeout(core::PropertyBuilder::createProperty("Session
Timeout")
+ ->withDescription("Client group session and failure detection timeout. The
consumer sends periodic heartbeats "
+ "to indicate its liveness to the broker. If no hearts are received by
the broker for a group member within "
+ "the session timeout, the broker will remove the consumer from the group
and trigger a rebalance. "
+ "The allowed range is configured with the broker configuration
properties group.min.session.timeout.ms and group.max.session.timeout.ms.")
+ ->withDefaultValue<core::TimePeriodValue>("60 seconds")
+ ->build());
+
+const core::Relationship ConsumeKafka::Success("success", "Incoming kafka
messages as flowfiles. Depending on the demarcation strategy, this can be one
or multiple flowfiles per message.");
+
+void ConsumeKafka::initialize() {
+ setSupportedProperties({
+ KafkaBrokers,
+ SecurityProtocol,
+ TopicNames,
+ TopicNameFormat,
+ HonorTransactions,
+ GroupID,
+ OffsetReset,
+ KeyAttributeEncoding,
+ MessageDemarcator,
+ MessageHeaderEncoding,
+ HeadersToAddAsAttributes,
+ DuplicateHeaderHandling,
+ MaxPollRecords,
+ MaxPollTime,
+ SessionTimeout
+ });
+ setSupportedRelationships({
+ Success,
+ });
+}
+
+void ConsumeKafka::onSchedule(core::ProcessContext* context,
core::ProcessSessionFactory* /* sessionFactory */) {
+ gsl_Expects(context);
+ // Required properties
+ kafka_brokers_ = utils::getRequiredPropertyOrThrow(context,
KafkaBrokers.getName());
+ security_protocol_ = utils::getRequiredPropertyOrThrow(context,
SecurityProtocol.getName());
+ topic_names_ =
utils::listFromRequiredCommaSeparatedProperty(context, TopicNames.getName());
+ topic_name_format_ = utils::getRequiredPropertyOrThrow(context,
TopicNameFormat.getName());
+ honor_transactions_ = utils::parseBooleanPropertyOrThrow(context,
HonorTransactions.getName());
+ group_id_ = utils::getRequiredPropertyOrThrow(context,
GroupID.getName());
+ offset_reset_ = utils::getRequiredPropertyOrThrow(context,
OffsetReset.getName());
+ key_attribute_encoding_ = utils::getRequiredPropertyOrThrow(context,
KeyAttributeEncoding.getName());
+ max_poll_time_milliseconds_ = utils::parseTimePropertyMSOrThrow(context,
MaxPollTime.getName());
+ session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(context,
SessionTimeout.getName());
+
+ // Optional properties
+ context->getProperty(MessageDemarcator.getName(), message_demarcator_);
+ context->getProperty(MessageHeaderEncoding.getName(),
message_header_encoding_);
+ context->getProperty(DuplicateHeaderHandling.getName(),
duplicate_header_handling_);
+
+ headers_to_add_as_attributes_ =
utils::listFromCommaSeparatedProperty(context,
HeadersToAddAsAttributes.getName());
+ max_poll_records_ =
gsl::narrow<std::size_t>(utils::getOptionalUintProperty(*context,
MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS));
+
+ // For now security protocols are not yet supported
+ if (!utils::StringUtils::equalsIgnoreCase(SECURITY_PROTOCOL_PLAINTEXT,
security_protocol_)) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Security protocols are not
supported yet.");
+ }
+
+ if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8,
key_attribute_encoding_) &&
!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX,
key_attribute_encoding_)) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported key attribute
encoding: " + key_attribute_encoding_);
+ }
+
+ if (!utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_UTF_8,
message_header_encoding_) &&
!utils::StringUtils::equalsIgnoreCase(MSG_HEADER_ENCODING_HEX,
message_header_encoding_)) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Unsupported message header
encoding: " + key_attribute_encoding_);
+ }
+
+ configure_new_connection(*context);
+}
+
+namespace {
+void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger,
rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) {
+ // Cooperative, incremental assignment is not supported in the current
librdkafka version
+ std::shared_ptr<logging::Logger>
logger{logging::LoggerFactory<ConsumeKafka>::getLogger()};
+ logger->log_debug("Rebalance triggered.");
+ rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR;
+ switch (trigger) {
+ case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+ logger->log_debug("assigned:");
+ if (logger->should_log(core::logging::LOG_LEVEL::debug)) {
+ utils::print_topics_list(*logger, *partitions);
+ }
+ assign_error = rd_kafka_assign(rk, partitions);
+ break;
+
+ case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+ logger->log_debug("revoked:");
+ rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe
unneccessary
+ if (logger->should_log(core::logging::LOG_LEVEL::debug)) {
+ utils::print_topics_list(*logger, *partitions);
+ }
+ assign_error = rd_kafka_assign(rk, NULL);
+ break;
+
+ default:
+ logger->log_debug("failed: %s", rd_kafka_err2str(trigger));
+ assign_error = rd_kafka_assign(rk, NULL);
+ break;
+ }
+ logger->log_debug("assign failure: %s", rd_kafka_err2str(assign_error));
+}
+} // namespace
+
+void ConsumeKafka::create_topic_partition_list() {
+ kf_topic_partition_list_ = {
rd_kafka_topic_partition_list_new(topic_names_.size()),
utils::rd_kafka_topic_partition_list_deleter() };
+
+ // On subscriptions any topics prefixed with ^ will be regex matched
+ if (utils::StringUtils::equalsIgnoreCase(TOPIC_FORMAT_PATTERNS,
topic_name_format_)) {
+ for (const std::string& topic : topic_names_) {
+ const std::string regex_format = "^" + topic;
+ rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(),
regex_format.c_str(), RD_KAFKA_PARTITION_UA);
+ }
+ } else {
+ for (const std::string& topic : topic_names_) {
+ rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(),
topic.c_str(), RD_KAFKA_PARTITION_UA);
+ }
+ }
+
+ // Subscribe to topic set using balanced consumer groups
+ // Subscribing from the same process without an inbetween unsubscribe call
+ // Does not seem to be triggering a rebalance (maybe librdkafka bug?)
+ // This might happen until the cross-overship between processors and
connections are settled
+ rd_kafka_resp_err_t subscribe_response = rd_kafka_subscribe(consumer_.get(),
kf_topic_partition_list_.get());
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != subscribe_response) {
+ logger_->log_error("rd_kafka_subscribe error %d: %s", subscribe_response,
rd_kafka_err2str(subscribe_response));
+ }
+}
+
+void ConsumeKafka::extend_config_from_dynamic_properties(const
core::ProcessContext& context) {
+ using utils::setKafkaConfigurationField;
+
+ const std::vector<std::string> dynamic_prop_keys =
context.getDynamicPropertyKeys();
+ if (dynamic_prop_keys.empty()) {
+ return;
+ }
+ logger_->log_info("Loading %d extra kafka configuration fields from
ConsumeKafka dynamic properties:", dynamic_prop_keys.size());
+ for (const std::string& key : dynamic_prop_keys) {
+ std::string value;
+ gsl_Expects(context.getDynamicProperty(key, value));
+ logger_->log_info("%s: %s", key.c_str(), value.c_str());
+ setKafkaConfigurationField(*conf_, key, value);
+ }
+}
+
+void ConsumeKafka::configure_new_connection(const core::ProcessContext&
context) {
+ using utils::setKafkaConfigurationField;
+
+ conf_ = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() };
+ if (conf_ == nullptr) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create
rd_kafka_conf_t object");
+ }
+
+ // Set rebalance callback for use with coordinated consumer group balancing
+ // Rebalance handlers are needed for the initial configuration of the
consumer
+ // If they are not set, offset reset is ignored and polling produces messages
+ // Registering a rebalance_cb turns off librdkafka's automatic partition
assignment/revocation and instead delegates that responsibility to the
application's rebalance_cb.
+ rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb);
+
+ // Uncomment this for librdkafka debug logs:
+ // setKafkaConfigurationField(conf_.get(), "debug", "all");
+
+ setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_);
+ setKafkaConfigurationField(*conf_, "auto.offset.reset", "latest");
+ setKafkaConfigurationField(*conf_, "enable.auto.commit", "false");
+ setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false");
+ setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ?
"read_committed" : "read_uncommitted");
+ setKafkaConfigurationField(*conf_, "group.id", group_id_);
+ setKafkaConfigurationField(*conf_, "session.timeout.ms",
std::to_string(session_timeout_milliseconds_.count()));
+ setKafkaConfigurationField(*conf_, "max.poll.interval.ms", "600000"); //
Twice the default, arbitrarily chosen
+
+ // This is a librdkafka option, but the communication timeout is also
specified in each of the
+ // relevant API calls. Could be redundant, but it probably does not hurt to
set this
+ setKafkaConfigurationField(*conf_, "metadata.request.timeout.ms",
std::to_string(METADATA_COMMUNICATIONS_TIMEOUT_MS));
+
+ extend_config_from_dynamic_properties(context);
+
+ std::array<char, 512U> errstr{};
+ consumer_ = { rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(),
errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter() };
+ if (consumer_ == nullptr) {
+ const std::string error_msg { errstr.data() };
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create Kafka
consumer " + error_msg);
+ }
+
+ create_topic_partition_list();
+
+ // Changing the partition list should happen only as part as the
initialization of offsets
+ // a function like `rd_kafka_position()` might have unexpected effects
+ // for instance when a consumer gets assigned a partition it used to
+ // consume at an earlier rebalance.
+ //
+ // As far as I understand, instead of rd_kafka_position() an
rd_kafka_committed() call if preferred here,
+ // as it properly fetches offsets from the broker
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_committed(consumer_.get(),
kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS)) {
+ logger_->log_error("Retrieving committed offsets for topics+partitions
failed.");
+ }
+
+ rd_kafka_resp_err_t poll_set_consumer_response =
rd_kafka_poll_set_consumer(consumer_.get());
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) {
+ logger_->log_error("rd_kafka_poll_set_consumer error %d: %s",
poll_set_consumer_response, rd_kafka_err2str(poll_set_consumer_response));
+ }
+
+ // There is no rd_kafka_seek alternative for
rd_kafka_topic_partition_list_t, only rd_kafka_topic_t
+ // rd_kafka_topic_partition_list_set_offset should reset the offsets to the
latest (or whatever is set in the config),
+ // Also, rd_kafka_committed should also fetch and set latest the latest
offset
+ // In reality, neither of them seem to work (not even with calling
rd_kafka_position())
+ logger_->log_info("Resetting offset manually.");
+ while (true) {
+ std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>
+ message_wrapper{ rd_kafka_consumer_poll(consumer_.get(),
max_poll_time_milliseconds_.count()), utils::rd_kafka_message_deleter() };
+
+ if (!message_wrapper || RD_KAFKA_RESP_ERR_NO_ERROR !=
message_wrapper->err) {
+ break;
+ }
+ utils::print_kafka_message(*message_wrapper, *logger_);
+ // Commit offsets on broker for the provided list of partitions
+ logger_->log_info("Committing offset: %" PRId64 ".",
message_wrapper->offset);
+ rd_kafka_commit_message(consumer_.get(), message_wrapper.get(), /* async =
*/ 0);
+ }
+ logger_->log_info("Done resetting offset manually.");
+}
+
+std::string ConsumeKafka::extract_message(const rd_kafka_message_t& rkmessage)
const {
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "ConsumeKafka:
received error message from broker: " + std::to_string(rkmessage.err) + " " +
rd_kafka_err2str(rkmessage.err));
+ }
+ return { reinterpret_cast<char*>(rkmessage.payload), rkmessage.len };
+}
+
+std::vector<std::unique_ptr<rd_kafka_message_t,
utils::rd_kafka_message_deleter>> ConsumeKafka::poll_kafka_messages() {
+ std::vector<std::unique_ptr<rd_kafka_message_t,
utils::rd_kafka_message_deleter>> messages;
+ messages.reserve(max_poll_records_);
+ const auto start = std::chrono::steady_clock::now();
+ auto elapsed = std::chrono::steady_clock::now() - start;
+ while (messages.size() < max_poll_records_ && elapsed <
max_poll_time_milliseconds_) {
+ logger_->log_debug("Polling for new messages for %d milliseconds...",
max_poll_time_milliseconds_.count());
+ std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>
+ message { rd_kafka_consumer_poll(consumer_.get(),
std::chrono::duration_cast<std::chrono::milliseconds>(max_poll_time_milliseconds_
- elapsed).count()), utils::rd_kafka_message_deleter() };
+ if (!message) {
+ break;
+ }
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != message->err) {
+ logger_->log_error("Received message with error %d: %s", message->err,
rd_kafka_err2str(message->err));
+ break;
+ }
+ utils::print_kafka_message(*message, *logger_);
+ messages.emplace_back(std::move(message));
+ elapsed = std::chrono::steady_clock::now() - start;
+ }
+ return messages;
+}
+
+utils::KafkaEncoding ConsumeKafka::key_attr_encoding_attr_to_enum() const {
+ if (utils::StringUtils::equalsIgnoreCase(key_attribute_encoding_,
KEY_ATTR_ENCODING_UTF_8)) {
+ return utils::KafkaEncoding::UTF8;
+ }
+ if (utils::StringUtils::equalsIgnoreCase(key_attribute_encoding_,
KEY_ATTR_ENCODING_HEX)) {
+ return utils::KafkaEncoding::HEX;
+ }
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Key Attribute
Encoding\" property not recognized.");
+}
+
+utils::KafkaEncoding ConsumeKafka::message_header_encoding_attr_to_enum()
const {
+ if (utils::StringUtils::equalsIgnoreCase(message_header_encoding_,
MSG_HEADER_ENCODING_UTF_8)) {
+ return utils::KafkaEncoding::UTF8;
+ }
+ if (utils::StringUtils::equalsIgnoreCase(message_header_encoding_,
MSG_HEADER_ENCODING_HEX)) {
+ return utils::KafkaEncoding::HEX;
+ }
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Message
Header Encoding\" property not recognized.");
+}
+
+std::string ConsumeKafka::resolve_duplicate_headers(const
std::vector<std::string>& matching_headers) const {
+ if (MSG_HEADER_KEEP_FIRST == duplicate_header_handling_) {
+ return matching_headers.front();
+ }
+ if (MSG_HEADER_KEEP_LATEST == duplicate_header_handling_) {
+ return matching_headers.back();
+ }
+ if (MSG_HEADER_COMMA_SEPARATED_MERGE == duplicate_header_handling_) {
+ return utils::StringUtils::join(", ", matching_headers);
+ }
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "\"Duplicate
Header Handling\" property not recognized.");
+}
+
+std::vector<std::string> ConsumeKafka::get_matching_headers(const
rd_kafka_message_t& message, const std::string& header_name) const {
+ // Headers fetched this way are freed when rd_kafka_message_destroy is called
+ // Detaching them using rd_kafka_message_detach_headers does not seem to work
+ rd_kafka_headers_t* headers_raw;
+ const rd_kafka_resp_err_t get_header_response =
rd_kafka_message_headers(&message, &headers_raw);
+ if (RD_KAFKA_RESP_ERR__NOENT == get_header_response) {
+ return {};
+ }
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != get_header_response) {
+ logger_->log_error("Failed to fetch message headers: %d: %s",
rd_kafka_last_error(), rd_kafka_err2str(rd_kafka_last_error()));
+ }
+ std::vector<std::string> matching_headers;
+ for (std::size_t header_idx = 0;; ++header_idx) {
+ const char* value; // Not to be freed
+ std::size_t size;
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_header_get(headers_raw,
header_idx, header_name.c_str(), (const void**)(&value), &size)) {
+ break;
+ }
+ if (size < 200) {
+ logger_->log_debug("%.*s", static_cast<int>(size), value);
+ } else {
+ logger_->log_debug("%.*s...", 200, value);
+ }
+ matching_headers.emplace_back(value, size);
+ }
+ return matching_headers;
+}
+
+std::vector<std::pair<std::string, std::string>>
ConsumeKafka::get_flowfile_attributes_from_message_header(const
rd_kafka_message_t& message) const {
+ std::vector<std::pair<std::string, std::string>> attributes_from_headers;
+ for (const std::string& header_name : headers_to_add_as_attributes_) {
+ const std::vector<std::string> matching_headers =
get_matching_headers(message, header_name);
+ if (matching_headers.size()) {
+ attributes_from_headers.emplace_back(header_name,
utils::get_encoded_string(resolve_duplicate_headers(matching_headers),
message_header_encoding_attr_to_enum()));
+ }
+ }
+ return attributes_from_headers;
+}
+
+void
ConsumeKafka::add_kafka_attributes_to_flowfile(std::shared_ptr<FlowFileRecord>&
flow_file, const rd_kafka_message_t& message) const {
+ // We do not currently support batching messages into a single flowfile
+ flow_file->setAttribute(KAFKA_COUNT_ATTR, "1");
+ const utils::optional<std::string> message_key =
utils::get_encoded_message_key(message, key_attr_encoding_attr_to_enum());
+ if (message_key) {
+ flow_file->setAttribute(KAFKA_MESSAGE_KEY_ATTR, message_key.value());
+ }
+ flow_file->setAttribute(KAFKA_OFFSET_ATTR, std::to_string(message.offset));
+ flow_file->setAttribute(KAFKA_PARTITION_ATTR,
std::to_string(message.partition));
+ flow_file->setAttribute(KAFKA_TOPIC_ATTR, rd_kafka_topic_name(message.rkt));
+}
+
+utils::optional<std::vector<std::shared_ptr<FlowFileRecord>>>
ConsumeKafka::transform_pending_messages_into_flowfiles(core::ProcessSession&
session) const {
+ std::vector<std::shared_ptr<FlowFileRecord>> flow_files_created;
+ for (const auto& message : pending_messages_) {
+ std::string message_content = extract_message(*message);
+ std::vector<std::pair<std::string, std::string>> attributes_from_headers =
get_flowfile_attributes_from_message_header(*message);
+ std::vector<std::string> split_message{ message_demarcator_.size() ?
+ utils::StringUtils::split(message_content, message_demarcator_) :
+ std::vector<std::string>{ message_content }};
+ for (auto& flowfile_content : split_message) {
+ std::shared_ptr<FlowFileRecord> flow_file =
std::static_pointer_cast<FlowFileRecord>(session.create());
+ if (flow_file == nullptr) {
+ logger_->log_error("Failed to create flowfile.");
+ // Either transform all flowfiles or none
+ return {};
+ }
+ // flowfile content is consumed here
+ WriteCallback stream_writer_callback(&flowfile_content[0],
flowfile_content.size());
+ session.write(flow_file, &stream_writer_callback);
+ for (const auto& kv : attributes_from_headers) {
+ flow_file->setAttribute(kv.first, kv.second);
+ }
+ add_kafka_attributes_to_flowfile(flow_file, *message);
+ flow_files_created.emplace_back(std::move(flow_file));
+ }
+ }
+ return { flow_files_created };
+}
+
+
+void ConsumeKafka::process_pending_messages(core::ProcessSession& session) {
+ utils::optional<std::vector<std::shared_ptr<FlowFileRecord>>>
flow_files_created = transform_pending_messages_into_flowfiles(session);
+ if (!flow_files_created) {
+ return;
+ }
+ for (const auto& flow_file : flow_files_created.value()) {
+ session.transfer(flow_file, Success);
+ }
+ session.commit();
+ // Commit the offset from the latest message only
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_commit_message(consumer_.get(),
pending_messages_.back().get(), /* async = */ 0)) {
+ logger_->log_error("Committing offset failed.");
+ }
+ pending_messages_.clear();
+}
+
+void ConsumeKafka::onTrigger(core::ProcessContext* /* context */,
core::ProcessSession* session) {
+ std::unique_lock<std::mutex> lock(do_not_call_on_trigger_concurrently_);
+ logger_->log_debug("ConsumeKafka onTrigger");
+
+ if (pending_messages_.size()) {
+ process_pending_messages(*session);
+ return;
+ }
+
+ pending_messages_ = poll_kafka_messages();
+ if (pending_messages_.empty()) {
+ return;
+ }
+ process_pending_messages(*session);
+}
+
+int64_t ConsumeKafka::WriteCallback::process(const
std::shared_ptr<io::BaseStream>& stream) {
+ int64_t ret = 0;
+ if (data_) {
+ ret = stream->write(data_, gsl::narrow<int>(dataSize_));
+ }
+ return ret;
+}
+
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/librdkafka/ConsumeKafka.h
b/extensions/librdkafka/ConsumeKafka.h
new file mode 100644
index 0000000..868f81d
--- /dev/null
+++ b/extensions/librdkafka/ConsumeKafka.h
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "core/Processor.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rdkafka.h"
+#include "rdkafka_utils.h"
+#include "KafkaConnection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ConsumeKafka : public core::Processor {
+ public:
+ static constexpr char const* ProcessorName = "ConsumeKafka";
+
+ // Supported Properties
+ static core::Property KafkaBrokers;
+ static core::Property SecurityProtocol;
+ static core::Property TopicNames;
+ static core::Property TopicNameFormat;
+ static core::Property HonorTransactions;
+ static core::Property GroupID;
+ static core::Property OffsetReset;
+ static core::Property KeyAttributeEncoding;
+ static core::Property MessageDemarcator;
+ static core::Property MessageHeaderEncoding;
+ static core::Property HeadersToAddAsAttributes;
+ static core::Property DuplicateHeaderHandling;
+ static core::Property MaxPollRecords;
+ static core::Property MaxPollTime;
+ static core::Property SessionTimeout;
+
+ // Supported Relationships
+ static const core::Relationship Success;
+
+ // Security Protocol allowable values
+ static constexpr char const* SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT";
+ static constexpr char const* SECURITY_PROTOCOL_SSL = "SSL";
+ static constexpr char const* SECURITY_PROTOCOL_SASL_PLAINTEXT =
"SASL_PLAINTEXT";
+ static constexpr char const* SECURITY_PROTOCOL_SASL_SSL = "SASL_SSL";
+
+ // Topic Name Format allowable values
+ static constexpr char const* TOPIC_FORMAT_NAMES = "Names";
+ static constexpr char const* TOPIC_FORMAT_PATTERNS = "Patterns";
+
+ // Offset Reset allowable values
+ static constexpr char const* OFFSET_RESET_EARLIEST = "earliest";
+ static constexpr char const* OFFSET_RESET_LATEST = "latest";
+ static constexpr char const* OFFSET_RESET_NONE = "none";
+
+ // Key Attribute Encoding allowable values
+ static constexpr char const* KEY_ATTR_ENCODING_UTF_8 = "UTF-8";
+ static constexpr char const* KEY_ATTR_ENCODING_HEX = "Hex";
+
+ // Message Header Encoding allowable values
+ static constexpr char const* MSG_HEADER_ENCODING_UTF_8 = "UTF-8";
+ static constexpr char const* MSG_HEADER_ENCODING_HEX = "Hex";
+
+ // Duplicate Header Handling allowable values
+ static constexpr char const* MSG_HEADER_KEEP_FIRST = "Keep First";
+ static constexpr char const* MSG_HEADER_KEEP_LATEST = "Keep Latest";
+ static constexpr char const* MSG_HEADER_COMMA_SEPARATED_MERGE =
"Comma-separated Merge";
+
+ // Flowfile attributes written
+ static constexpr char const* KAFKA_COUNT_ATTR = "kafka.count"; // Always 1
until we start supporting merging from batches
+ static constexpr char const* KAFKA_MESSAGE_KEY_ATTR = "kafka.key";
+ static constexpr char const* KAFKA_OFFSET_ATTR = "kafka.offset";
+ static constexpr char const* KAFKA_PARTITION_ATTR = "kafka.partition";
+ static constexpr char const* KAFKA_TOPIC_ATTR = "kafka.topic";
+
+ static constexpr const std::size_t DEFAULT_MAX_POLL_RECORDS{ 10000 };
+ static constexpr char const* DEFAULT_MAX_POLL_TIME = "4 seconds";
+ static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 60000
};
+
+ explicit ConsumeKafka(const std::string& name, const utils::Identifier& uuid
= utils::Identifier()) :
+ Processor(name, uuid),
+ logger_(logging::LoggerFactory<ConsumeKafka>::getLogger()) {}
+
+ virtual ~ConsumeKafka() = default;
+
+ public:
+ bool supportsDynamicProperties() override {
+ return true;
+ }
+ /**
+ * Function that's executed when the processor is scheduled.
+ * @param context process context.
+ * @param sessionFactory process session factory that is used when creating
+ * ProcessSession objects.
+ */
+ void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*
/* sessionFactory */) override;
+ /**
+ * Execution trigger for the RetryFlowFile Processor
+ * @param context processor context
+ * @param session processor session reference.
+ */
+ void onTrigger(core::ProcessContext* context, core::ProcessSession* session)
override;
+
+ // Initialize, overwrite by NiFi RetryFlowFile
+ void initialize() override;
+
+ private:
+ void create_topic_partition_list();
+ void extend_config_from_dynamic_properties(const core::ProcessContext&
context);
+ void configure_new_connection(const core::ProcessContext& context);
+ std::string extract_message(const rd_kafka_message_t& rkmessage) const;
+ std::vector<std::unique_ptr<rd_kafka_message_t,
utils::rd_kafka_message_deleter>> poll_kafka_messages();
+ utils::KafkaEncoding key_attr_encoding_attr_to_enum() const;
+ utils::KafkaEncoding message_header_encoding_attr_to_enum() const;
+ std::string resolve_duplicate_headers(const std::vector<std::string>&
matching_headers) const;
+ std::vector<std::string> get_matching_headers(const rd_kafka_message_t&
message, const std::string& header_name) const;
+ std::vector<std::pair<std::string, std::string>>
get_flowfile_attributes_from_message_header(const rd_kafka_message_t& message)
const;
+ void add_kafka_attributes_to_flowfile(std::shared_ptr<FlowFileRecord>&
flow_file, const rd_kafka_message_t& message) const;
+ utils::optional<std::vector<std::shared_ptr<FlowFileRecord>>>
transform_pending_messages_into_flowfiles(core::ProcessSession& session) const;
+ void process_pending_messages(core::ProcessSession& session);
+
+ private:
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(char *data, uint64_t size) :
+ data_(reinterpret_cast<uint8_t*>(data)),
+ dataSize_(size) {}
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream);
+ private:
+ uint8_t* data_;
+ uint64_t dataSize_;
+ };
+
+ std::string kafka_brokers_;
+ std::string security_protocol_;
+ std::vector<std::string> topic_names_;
+ std::string topic_name_format_;
+ bool honor_transactions_;
+ std::string group_id_;
+ std::string offset_reset_;
+ std::string key_attribute_encoding_;
+ std::string message_demarcator_;
+ std::string message_header_encoding_;
+ std::string duplicate_header_handling_;
+ std::vector<std::string> headers_to_add_as_attributes_;
+ std::size_t max_poll_records_;
+ std::chrono::milliseconds max_poll_time_milliseconds_;
+ std::chrono::milliseconds communications_timeout_milliseconds_;
+ std::chrono::milliseconds session_timeout_milliseconds_;
+
+ std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_;
+ std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_;
+ std::unique_ptr<rd_kafka_topic_partition_list_t,
utils::rd_kafka_topic_partition_list_deleter> kf_topic_partition_list_;
+
+ // Intermediate container type for messages that have been processed, but are
+ // not yet persisted (eg. in case of I/O error)
+ std::vector<std::unique_ptr<rd_kafka_message_t,
utils::rd_kafka_message_deleter>> pending_messages_;
+
+ std::mutex do_not_call_on_trigger_concurrently_;
+
+ std::shared_ptr<logging::Logger>
logger_{logging::LoggerFactory<ConsumeKafka>::getLogger()};
+};
+
+REGISTER_RESOURCE(ConsumeKafka, "Consumes messages from Apache Kafka and
transform them into MiNiFi FlowFiles. "
+ "The application should make sure that the processor is triggered at
regular intervals, even if no messages are expected, "
+ "to serve any queued callbacks waiting to be called. Rebalancing can also
only happen on trigger."); // NOLINT
+
+} // namespace processors
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/librdkafka/tests/CMakeLists.txt
b/extensions/librdkafka/docker_tests/CMakeLists.txt
similarity index 91%
copy from extensions/librdkafka/tests/CMakeLists.txt
copy to extensions/librdkafka/docker_tests/CMakeLists.txt
index 056c7f1..2b1bdd8 100644
--- a/extensions/librdkafka/tests/CMakeLists.txt
+++ b/extensions/librdkafka/docker_tests/CMakeLists.txt
@@ -29,7 +29,7 @@ FOREACH(testfile ${KAFKA_TESTS})
createTests("${testfilename}")
MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
# The line below handles integration test
- add_test(NAME "${testfilename}" COMMAND "${testfilename}"
"${TEST_RESOURCES}/TestKafkaOnSchedule.yml" "${TEST_RESOURCES}/")
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}")
target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
ENDFOREACH()
diff --git a/extensions/librdkafka/rdkafka_utils.cpp
b/extensions/librdkafka/rdkafka_utils.cpp
new file mode 100644
index 0000000..6020cb6
--- /dev/null
+++ b/extensions/librdkafka/rdkafka_utils.cpp
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <array>
+
+#include "rdkafka_utils.h"
+
+#include "Exception.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+void setKafkaConfigurationField(rd_kafka_conf_t& configuration, const
std::string& field_name, const std::string& value) {
+ static std::array<char, 512U> errstr{};
+ rd_kafka_conf_res_t result;
+ result = rd_kafka_conf_set(&configuration, field_name.c_str(),
value.c_str(), errstr.data(), errstr.size());
+ if (RD_KAFKA_CONF_OK != result) {
+ const std::string error_msg { errstr.data() };
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "rd_kafka configuration error:
" + error_msg);
+ }
+}
+
+void print_topics_list(logging::Logger& logger,
rd_kafka_topic_partition_list_t& kf_topic_partition_list) {
+ for (std::size_t i = 0; i < kf_topic_partition_list.cnt; ++i) {
+ logger.log_debug("kf_topic_partition_list: topic: %s, partition: %d,
offset: %" PRId64 ".",
+ kf_topic_partition_list.elems[i].topic,
kf_topic_partition_list.elems[i].partition,
kf_topic_partition_list.elems[i].offset);
+ }
+}
+
+std::string get_human_readable_kafka_message_timestamp(const
rd_kafka_message_t& rkmessage) {
+ rd_kafka_timestamp_type_t tstype;
+ int64_t timestamp;
+ timestamp = rd_kafka_message_timestamp(&rkmessage, &tstype);
+ const char *tsname = "?";
+ if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME) {
+ tsname = "create time";
+ } else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) {
+ tsname = "log append time";
+ }
+ const int64_t seconds_since_timestamp = timestamp == -1 ? 0 :
static_cast<int64_t>(time(NULL)) - static_cast<int64_t>(timestamp / 1000);
+ return {"[Timestamp](" + std::string(tsname) + " " +
std::to_string(timestamp) + " (" + std::to_string(seconds_since_timestamp) + "
s ago)"};
+}
+
+std::string get_human_readable_kafka_message_headers(const rd_kafka_message_t&
rkmessage, logging::Logger& logger) {
+ rd_kafka_headers_t* hdrs;
+ const rd_kafka_resp_err_t get_header_response =
rd_kafka_message_headers(&rkmessage, &hdrs);
+ if (RD_KAFKA_RESP_ERR_NO_ERROR == get_header_response) {
+ std::vector<std::string> header_list;
+ kafka_headers_for_each(*hdrs, [&] (const std::string& key, gsl::span<const
char> val) { header_list.emplace_back(key + ": " + std::string{ val.data(),
val.size() }); });
+ return StringUtils::join(", ", header_list);
+ }
+ if (RD_KAFKA_RESP_ERR__NOENT == get_header_response) {
+ return "[None]";
+ }
+ logger.log_error("Failed to fetch message headers: %d: %s",
rd_kafka_last_error(), rd_kafka_err2str(rd_kafka_last_error()));
+ return "[Error]";
+}
+
+void print_kafka_message(const rd_kafka_message_t& rkmessage, logging::Logger&
logger) {
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) {
+ const std::string error_msg = "ConsumeKafka: received error message from
broker. Librdkafka error msg: " + std::string(rd_kafka_err2str(rkmessage.err));
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, error_msg);
+ }
+ std::string topicName = rd_kafka_topic_name(rkmessage.rkt);
+ std::string message(reinterpret_cast<char*>(rkmessage.payload),
rkmessage.len);
+ const char* key = reinterpret_cast<const char*>(rkmessage.key);
+ const std::size_t key_len = rkmessage.key_len;
+
+ std::string message_as_string;
+ message_as_string += "[Topic](" + topicName + "), ";
+ message_as_string += "[Key](" + (key != nullptr ? std::string(key, key_len)
: std::string("[None]")) + "), ";
+ message_as_string += "[Offset](" + std::to_string(rkmessage.offset) + "), ";
+ message_as_string += "[Message Length](" + std::to_string(rkmessage.len) +
"), ";
+ message_as_string += get_human_readable_kafka_message_timestamp(rkmessage) +
"), ";
+ message_as_string += "[Headers](";
+ message_as_string += get_human_readable_kafka_message_headers(rkmessage,
logger) + ")";
+ message_as_string += "[Payload](" + message + ")";
+
+ logger.log_debug("Message: %s", message_as_string.c_str());
+}
+
+std::string get_encoded_string(const std::string& input, KafkaEncoding
encoding) {
+ switch (encoding) {
+ case KafkaEncoding::UTF8:
+ return input;
+ case KafkaEncoding::HEX:
+ return StringUtils::to_hex(input, /* uppercase = */ true);
+ }
+ throw std::runtime_error("Invalid encoding selected: " + input);
+}
+
+optional<std::string> get_encoded_message_key(const rd_kafka_message_t&
message, KafkaEncoding encoding) {
+ if (nullptr == message.key) {
+ return {};
+ }
+ return get_encoded_string({reinterpret_cast<const char*>(message.key),
message.key_len}, encoding);
+}
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/librdkafka/rdkafka_utils.h
b/extensions/librdkafka/rdkafka_utils.h
new file mode 100644
index 0000000..4ea8f47
--- /dev/null
+++ b/extensions/librdkafka/rdkafka_utils.h
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/OptionalUtils.h"
+#include "utils/gsl.h"
+#include "rdkafka.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+enum class KafkaEncoding {
+ UTF8,
+ HEX
+};
+
+struct rd_kafka_conf_deleter {
+ void operator()(rd_kafka_conf_t* ptr) const noexcept {
rd_kafka_conf_destroy(ptr); }
+};
+
+struct rd_kafka_producer_deleter {
+ void operator()(rd_kafka_t* ptr) const noexcept {
+ rd_kafka_resp_err_t flush_ret = rd_kafka_flush(ptr, 10000 /* ms */); //
Matching the wait time of KafkaConnection.cpp
+ // If concerned, we could log potential errors here:
+ // if (RD_KAFKA_RESP_ERR__TIMED_OUT == flush_ret) {
+ // std::cerr << "Deleting producer failed: time-out while trying to
flush" << std::endl;
+ // }
+ rd_kafka_destroy(ptr);
+ }
+};
+
+struct rd_kafka_consumer_deleter {
+ void operator()(rd_kafka_t* ptr) const noexcept {
+ rd_kafka_consumer_close(ptr);
+ rd_kafka_destroy(ptr);
+ }
+};
+
+struct rd_kafka_topic_partition_list_deleter {
+ void operator()(rd_kafka_topic_partition_list_t* ptr) const noexcept {
rd_kafka_topic_partition_list_destroy(ptr); }
+};
+
+struct rd_kafka_topic_conf_deleter {
+ void operator()(rd_kafka_topic_conf_t* ptr) const noexcept {
rd_kafka_topic_conf_destroy(ptr); }
+};
+struct rd_kafka_topic_deleter {
+ void operator()(rd_kafka_topic_t* ptr) const noexcept {
rd_kafka_topic_destroy(ptr); }
+};
+
+struct rd_kafka_message_deleter {
+ void operator()(rd_kafka_message_t* ptr) const noexcept {
rd_kafka_message_destroy(ptr); }
+};
+
+struct rd_kafka_headers_deleter {
+ void operator()(rd_kafka_headers_t* ptr) const noexcept {
rd_kafka_headers_destroy(ptr); }
+};
+
+template <typename T>
+void kafka_headers_for_each(const rd_kafka_headers_t& headers, T
key_value_handle) {
+ const char *key; // Null terminated, not to be freed
+ const void *value;
+ std::size_t size;
+ for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR ==
rd_kafka_header_get_all(&headers, i, &key, &value, &size); ++i) {
+ key_value_handle(std::string(key), gsl::span<const char>(static_cast<const
char*>(value), size));
+ }
+}
+
+void setKafkaConfigurationField(rd_kafka_conf_t& configuration, const
std::string& field_name, const std::string& value);
+void print_topics_list(logging::Logger& logger,
rd_kafka_topic_partition_list_t& kf_topic_partition_list);
+void print_kafka_message(const rd_kafka_message_t& rkmessage, logging::Logger&
logger);
+std::string get_encoded_string(const std::string& input, KafkaEncoding
encoding);
+optional<std::string> get_encoded_message_key(const rd_kafka_message_t&
message, KafkaEncoding encoding);
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/librdkafka/tests/CMakeLists.txt
b/extensions/librdkafka/tests/CMakeLists.txt
index 056c7f1..eb5713c 100644
--- a/extensions/librdkafka/tests/CMakeLists.txt
+++ b/extensions/librdkafka/tests/CMakeLists.txt
@@ -29,8 +29,12 @@ FOREACH(testfile ${KAFKA_TESTS})
createTests("${testfilename}")
MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
# The line below handles integration test
- add_test(NAME "${testfilename}" COMMAND "${testfilename}"
"${TEST_RESOURCES}/TestKafkaOnSchedule.yml" "${TEST_RESOURCES}/")
+ target_include_directories(${testfilename} BEFORE PRIVATE
"../../standard-processors/processors")
+ target_include_directories(${testfilename} BEFORE PRIVATE
"${CMAKE_SOURCE_DIR}/thirdparty/catch")
+ target_include_directories(${testfilename} BEFORE PRIVATE
"${CMAKE_SOURCE_DIR}/libminifi/test")
+ target_wholearchive_library(${testfilename} minifi-standard-processors)
target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}"
"${TEST_RESOURCES}/TestKafkaOnSchedule.yml" "${TEST_RESOURCES}/")
ENDFOREACH()
message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test
file(s)...")
diff --git a/extensions/librdkafka/tests/ConsumeKafkaTests.cpp
b/extensions/librdkafka/tests/ConsumeKafkaTests.cpp
new file mode 100644
index 0000000..bca26a9
--- /dev/null
+++ b/extensions/librdkafka/tests/ConsumeKafkaTests.cpp
@@ -0,0 +1,590 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <set>
+
+#include "TestBase.h"
+
+#include "../ConsumeKafka.h"
+#include "../rdkafka_utils.h"
+#include "../../standard-processors/processors/ExtractText.h"
+#include "utils/file/FileUtils.h"
+#include "utils/OptionalUtils.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+#include "utils/IntegrationTestUtils.h"
+
+namespace {
+using org::apache::nifi::minifi::utils::optional;
+
+class KafkaTestProducer {
+ public:
+ enum class PublishEvent {
+ PUBLISH,
+ TRANSACTION_START,
+ TRANSACTION_COMMIT,
+ CANCEL
+ };
+ KafkaTestProducer(const std::string& kafka_brokers, const std::string&
topic, const bool transactional) :
+ logger_(logging::LoggerFactory<KafkaTestProducer>::getLogger()) {
+ using utils::setKafkaConfigurationField;
+
+ std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf = {
rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() };
+
+ setKafkaConfigurationField(*conf, "bootstrap.servers", kafka_brokers);
+ setKafkaConfigurationField(*conf, "compression.codec", "snappy");
+ setKafkaConfigurationField(*conf, "batch.num.messages", "1");
+
+ if (transactional) {
+ setKafkaConfigurationField(*conf, "transactional.id",
"ConsumeKafkaTest_transaction_id");
+ }
+
+ static std::array<char, 512U> errstr{};
+ producer_ = { rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(),
errstr.data(), errstr.size()), utils::rd_kafka_producer_deleter() };
+ if (producer_ == nullptr) {
+ auto error_msg = "Failed to create Kafka producer" + std::string{
errstr.data() };
+ throw std::runtime_error(error_msg);
+ }
+
+ // The last argument is a config here, but it is already owned by the
producer. I assume that this would mean an override on the original config if
used
+ topic_ = { rd_kafka_topic_new(producer_.get(), topic.c_str(), nullptr),
utils::rd_kafka_topic_deleter() };
+
+ if (transactional) {
+ rd_kafka_init_transactions(producer_.get(),
TRANSACTIONS_TIMEOUT_MS.count());
+ }
+ }
+
+ // Uses all the headers for every published message
+ void publish_messages_to_topic(
+ const std::vector<std::string>& messages_on_topic, const std::string&
message_key, std::vector<PublishEvent> events,
+ const std::vector<std::pair<std::string, std::string>>& message_headers,
const optional<std::string>& message_header_encoding) {
+ auto next_message = messages_on_topic.cbegin();
+ for (const PublishEvent event : events) {
+ switch (event) {
+ case PublishEvent::PUBLISH:
+ REQUIRE(messages_on_topic.cend() != next_message);
+ publish_message(*next_message, message_key, message_headers,
message_header_encoding);
+ std::advance(next_message, 1);
+ break;
+ case PublishEvent::TRANSACTION_START:
+ logger_->log_debug("Starting new transaction...");
+ rd_kafka_begin_transaction(producer_.get());
+ break;
+ case PublishEvent::TRANSACTION_COMMIT:
+ logger_->log_debug("Committing transaction...");
+ rd_kafka_commit_transaction(producer_.get(),
TRANSACTIONS_TIMEOUT_MS.count());
+ break;
+ case PublishEvent::CANCEL:
+ logger_->log_debug("Cancelling transaction...");
+ rd_kafka_abort_transaction(producer_.get(),
TRANSACTIONS_TIMEOUT_MS.count());
+ }
+ }
+ }
+
+ private:
+ void publish_message(
+ const std::string& message, const std::string& message_key, const
std::vector<std::pair<std::string, std::string>>& message_headers, const
optional<std::string>& message_header_encoding) {
+ logger_->log_debug("Producing: %s", message.c_str());
+ std::unique_ptr<rd_kafka_headers_t, utils::rd_kafka_headers_deleter>
headers(rd_kafka_headers_new(message_headers.size()),
utils::rd_kafka_headers_deleter());
+ if (!headers) {
+ throw std::runtime_error("Generating message headers failed.");
+ }
+ for (const std::pair<std::string, std::string>& message_header :
message_headers) {
+ rd_kafka_header_add(headers.get(),
+ const_cast<char*>(message_header.first.c_str()),
message_header.first.size(),
+ const_cast<char*>(message_header.second.c_str()),
message_header.second.size());
+ }
+
+ if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_producev(
+ producer_.get(),
+ RD_KAFKA_V_RKT(topic_.get()),
+ RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_VALUE(const_cast<char*>(&message[0]), message.size()),
+ RD_KAFKA_V_HEADERS(headers.release()),
+ RD_KAFKA_V_KEY(message_key.c_str(), message_key.size()),
+ RD_KAFKA_V_END)) {
+ logger_->log_error("Producer failure: %d: %s", rd_kafka_last_error(),
rd_kafka_err2str(rd_kafka_last_error()));
+ }
+ }
+
+ static const std::chrono::milliseconds TRANSACTIONS_TIMEOUT_MS;
+
+ std::unique_ptr<rd_kafka_t, utils::rd_kafka_producer_deleter> producer_;
+ std::unique_ptr<rd_kafka_topic_t, utils::rd_kafka_topic_deleter> topic_;
+
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+const std::chrono::milliseconds KafkaTestProducer::TRANSACTIONS_TIMEOUT_MS{
2000 };
+
+class ConsumeKafkaTest {
+ public:
+ using Processor = org::apache::nifi::minifi::core::Processor;
+ using ConsumeKafka = org::apache::nifi::minifi::processors::ConsumeKafka;
+ using ExtractText = org::apache::nifi::minifi::processors::ExtractText;
+
+ const KafkaTestProducer::PublishEvent PUBLISH =
KafkaTestProducer::PublishEvent::PUBLISH;
+ const KafkaTestProducer::PublishEvent TRANSACTION_START =
KafkaTestProducer::PublishEvent::TRANSACTION_START;
+ const KafkaTestProducer::PublishEvent TRANSACTION_COMMIT =
KafkaTestProducer::PublishEvent::TRANSACTION_COMMIT;
+ const KafkaTestProducer::PublishEvent CANCEL =
KafkaTestProducer::PublishEvent::CANCEL;
+
+ const std::vector<KafkaTestProducer::PublishEvent>
NON_TRANSACTIONAL_MESSAGES { PUBLISH, PUBLISH };
+ const std::vector<KafkaTestProducer::PublishEvent>
SINGLE_COMMITTED_TRANSACTION { TRANSACTION_START, PUBLISH, PUBLISH,
TRANSACTION_COMMIT };
+ const std::vector<KafkaTestProducer::PublishEvent> TWO_SEPARATE_TRANSACTIONS
{ TRANSACTION_START, PUBLISH, TRANSACTION_COMMIT, TRANSACTION_START,
PUBLISH, TRANSACTION_COMMIT };
+ const std::vector<KafkaTestProducer::PublishEvent> NON_COMMITTED_TRANSACTION
{ TRANSACTION_START, PUBLISH, PUBLISH };
+ const std::vector<KafkaTestProducer::PublishEvent> CANCELLED_TRANSACTION
{ TRANSACTION_START, PUBLISH, CANCEL };
+
+ const std::string KEEP_FIRST =
ConsumeKafka::MSG_HEADER_KEEP_FIRST;
+ const std::string KEEP_LATEST =
ConsumeKafka::MSG_HEADER_KEEP_LATEST;
+ const std::string COMMA_SEPARATED_MERGE =
ConsumeKafka::MSG_HEADER_COMMA_SEPARATED_MERGE;
+
+ static const std::string PRODUCER_TOPIC;
+ static const std::string TEST_MESSAGE_KEY;
+
+ // Relationships
+ const core::Relationship success {"success", "description"};
+ const core::Relationship failure {"failure", "description"};
+
+ ConsumeKafkaTest() :
+ logTestController_(LogTestController::getInstance()),
+ logger_(logging::LoggerFactory<ConsumeKafkaTest>::getLogger()) {
+ reInitialize();
+ }
+
+ virtual ~ConsumeKafkaTest() {
+ logTestController_.reset();
+ }
+
+ protected:
+ void reInitialize() {
+ testController_.reset(new TestController());
+ plan_ = testController_->createPlan();
+ logTestController_.setError<LogTestController>();
+ logTestController_.setError<TestPlan>();
+ logTestController_.setTrace<ConsumeKafka>();
+ logTestController_.setTrace<ConsumeKafkaTest>();
+ logTestController_.setTrace<KafkaTestProducer>();
+ logTestController_.setDebug<ExtractText>();
+ logTestController_.setDebug<core::ProcessContext>();
+ }
+
+ void optional_set_property(const std::shared_ptr<core::Processor>&
processor, const std::string& property_name, const optional<std::string>&
opt_value) {
+ if (opt_value) {
+ plan_->setProperty(processor, property_name, opt_value.value());
+ }
+ }
+
+ std::string decode_key(const std::string& key, const optional<std::string>&
key_attribute_encoding) {
+ if (!key_attribute_encoding ||
utils::StringUtils::equalsIgnoreCase(ConsumeKafka::KEY_ATTR_ENCODING_UTF_8,
key_attribute_encoding.value())) {
+ return key;
+ }
+ if
(utils::StringUtils::equalsIgnoreCase(ConsumeKafka::ConsumeKafka::KEY_ATTR_ENCODING_HEX,
key_attribute_encoding.value())) {
+ return utils::StringUtils::from_hex(key);
+ }
+ throw std::runtime_error("Message Header Encoding does not match any of
the presets in the test.");
+ }
+
+ std::vector<std::string> sort_and_split_messages(const
std::vector<std::string>& messages_on_topic, const optional<std::string>&
message_demarcator) {
+ if (message_demarcator) {
+ std::vector<std::string> sorted_split_messages;
+ for (const auto& message : messages_on_topic) {
+ std::vector<std::string> split_message =
utils::StringUtils::split(message, message_demarcator.value());
+ std::move(split_message.begin(), split_message.end(),
std::back_inserter(sorted_split_messages));
+ }
+ std::sort(sorted_split_messages.begin(), sorted_split_messages.end());
+ return sorted_split_messages;
+ }
+ std::vector<std::string> sorted_messages{ messages_on_topic.cbegin(),
messages_on_topic.cend() };
+ std::sort(sorted_messages.begin(), sorted_messages.end());
+ return sorted_messages;
+ }
+
+ static const std::chrono::seconds MAX_CONSUMEKAFKA_POLL_TIME_SECONDS;
+ static const std::string ATTRIBUTE_FOR_CAPTURING_CONTENT;
+ static const std::string TEST_FILE_NAME_POSTFIX;
+
+ std::unique_ptr<TestController> testController_;
+ std::shared_ptr<TestPlan> plan_;
+ LogTestController& logTestController_;
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+class ConsumeKafkaPropertiesTest : public ConsumeKafkaTest {
+ public:
+ ConsumeKafkaPropertiesTest() : ConsumeKafkaTest() {}
+ virtual ~ConsumeKafkaPropertiesTest() {
+ logTestController_.reset();
+ }
+
+ void single_consumer_with_plain_text_test(
+ bool expect_config_valid,
+ bool expect_fixed_message_order,
+ const std::vector<std::pair<std::string, std::string>>&
expect_header_attributes,
+ const std::vector<std::string>& messages_on_topic,
+ const std::vector<KafkaTestProducer::PublishEvent>& transaction_events,
+ const std::vector<std::pair<std::string, std::string>>& message_headers,
+ const std::string& kafka_brokers,
+ const std::string& security_protocol,
+ const std::string& topic_names,
+ const optional<std::string>& topic_name_format,
+ const optional<bool>& honor_transactions,
+ const optional<std::string>& group_id,
+ const optional<std::string>& offset_reset,
+ const optional<std::string>& key_attribute_encoding,
+ const optional<std::string>& message_demarcator,
+ const optional<std::string>& message_header_encoding,
+ const optional<std::string>& headers_to_add_as_attributes,
+ const optional<std::string>& duplicate_header_handling,
+ const optional<std::string>& max_poll_records,
+ const optional<std::string>& max_poll_time,
+ const optional<std::string>& session_timeout) {
+ reInitialize();
+
+ // Consumer chain
+ std::shared_ptr<core::Processor> consume_kafka =
plan_->addProcessor("ConsumeKafka", "consume_kafka", {success}, false);
+ std::shared_ptr<core::Processor> extract_text =
plan_->addProcessor("ExtractText", "extract_text", {success}, false);
+
+ // Set up connections
+ plan_->addConnection(consume_kafka, success, extract_text);
+ extract_text->setAutoTerminatedRelationships({success});
+
+ const auto bool_to_string = [] (const bool b) -> std::string { return b ?
"true" : "false"; };
+
+ plan_->setProperty(consume_kafka, ConsumeKafka::KafkaBrokers.getName(),
kafka_brokers);
+ plan_->setProperty(consume_kafka,
ConsumeKafka::SecurityProtocol.getName(), security_protocol);
+ plan_->setProperty(consume_kafka, ConsumeKafka::TopicNames.getName(),
topic_names);
+
+ optional_set_property(consume_kafka,
ConsumeKafka::TopicNameFormat.getName(), topic_name_format);
+ optional_set_property(consume_kafka,
ConsumeKafka::HonorTransactions.getName(), honor_transactions |
utils::map(bool_to_string));
+ optional_set_property(consume_kafka, ConsumeKafka::GroupID.getName(),
group_id);
+ optional_set_property(consume_kafka, ConsumeKafka::OffsetReset.getName(),
offset_reset);
+ optional_set_property(consume_kafka,
ConsumeKafka::KeyAttributeEncoding.getName(), key_attribute_encoding);
+ optional_set_property(consume_kafka,
ConsumeKafka::MessageDemarcator.getName(), message_demarcator);
+ optional_set_property(consume_kafka,
ConsumeKafka::MessageHeaderEncoding.getName(), message_header_encoding);
+ optional_set_property(consume_kafka,
ConsumeKafka::HeadersToAddAsAttributes.getName(), headers_to_add_as_attributes);
+ optional_set_property(consume_kafka,
ConsumeKafka::DuplicateHeaderHandling.getName(), duplicate_header_handling);
+ optional_set_property(consume_kafka,
ConsumeKafka::MaxPollRecords.getName(), max_poll_records);
+ optional_set_property(consume_kafka, ConsumeKafka::MaxPollTime.getName(),
max_poll_time);
+ optional_set_property(consume_kafka,
ConsumeKafka::SessionTimeout.getName(), session_timeout);
+
+ plan_->setProperty(extract_text, ExtractText::Attribute.getName(),
ATTRIBUTE_FOR_CAPTURING_CONTENT);
+
+ if (!expect_config_valid) {
+ REQUIRE_THROWS(plan_->scheduleProcessor(consume_kafka));
+ return;
+ } else {
+ plan_->scheduleProcessors();
+ }
+
+ std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_;
+ std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_;
+
+ const bool is_transactional = std::count(transaction_events.cbegin(),
transaction_events.cend(), KafkaTestProducer::PublishEvent::TRANSACTION_START);
+ const bool transactions_committed = transaction_events.back() ==
KafkaTestProducer::PublishEvent::TRANSACTION_COMMIT;
+
+ KafkaTestProducer producer(kafka_brokers, PRODUCER_TOPIC,
is_transactional);
+ producer.publish_messages_to_topic(messages_on_topic, TEST_MESSAGE_KEY,
transaction_events, message_headers, message_header_encoding);
+
+
+ std::vector<std::shared_ptr<core::FlowFile>> flow_files_produced;
+ for (std::size_t num_expected_messages_processed = 0;
num_expected_messages_processed < messages_on_topic.size();
num_expected_messages_processed += std::stoi(max_poll_records.value_or("1"))) {
+ plan_->increment_location();
+ if ((honor_transactions && false == honor_transactions.value()) ||
(is_transactional && !transactions_committed)) {
+ INFO("Non-committed messages received.");
+ REQUIRE(false ==
plan_->runCurrentProcessorUntilFlowfileIsProduced(MAX_CONSUMEKAFKA_POLL_TIME_SECONDS));
+ return;
+ }
+ {
+ SCOPED_INFO("ConsumeKafka timed out when waiting to receive the
message published to the kafka broker.");
+
REQUIRE(plan_->runCurrentProcessorUntilFlowfileIsProduced(MAX_CONSUMEKAFKA_POLL_TIME_SECONDS));
+ }
+ std::size_t num_flow_files_produced =
plan_->getNumFlowFileProducedByCurrentProcessor();
+ plan_->increment_location();
+ for (std::size_t times_extract_text_run = 0; times_extract_text_run <
num_flow_files_produced; ++times_extract_text_run) {
+ plan_->runCurrentProcessor(); // ExtractText
+ std::shared_ptr<core::FlowFile> flow_file =
plan_->getFlowFileProducedByCurrentProcessor();
+ for (const auto& exp_header : expect_header_attributes) {
+ SCOPED_INFO("ConsumeKafka did not produce the expected flowfile
attribute from message header: " << exp_header.first << ".");
+ const auto header_attr_opt =
flow_file->getAttribute(exp_header.first);
+ REQUIRE(header_attr_opt);
+ REQUIRE(exp_header.second == header_attr_opt.value());
+ }
+ {
+ SCOPED_INFO("Message key is missing or incorrect (potential encoding
mismatch).");
+ REQUIRE(TEST_MESSAGE_KEY ==
decode_key(flow_file->getAttribute(ConsumeKafka::KAFKA_MESSAGE_KEY_ATTR).value(),
key_attribute_encoding));
+ REQUIRE("1" ==
flow_file->getAttribute(ConsumeKafka::KAFKA_COUNT_ATTR).value());
+ REQUIRE(flow_file->getAttribute(ConsumeKafka::KAFKA_OFFSET_ATTR));
+ REQUIRE(flow_file->getAttribute(ConsumeKafka::KAFKA_PARTITION_ATTR));
+ REQUIRE(PRODUCER_TOPIC ==
flow_file->getAttribute(ConsumeKafka::KAFKA_TOPIC_ATTR).value());
+ }
+ flow_files_produced.emplace_back(std::move(flow_file));
+ }
+ plan_->reset_location();
+ }
+
+ const auto contentOrderOfFlowFile = [&] (const
std::shared_ptr<core::FlowFile>& lhs, const std::shared_ptr<core::FlowFile>&
rhs) {
+ return lhs->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() <
rhs->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value();
+ };
+ {
+ SCOPED_INFO("The flowfiles generated by ConsumeKafka are invalid
(probably nullptr).");
+ REQUIRE_NOTHROW(std::sort(flow_files_produced.begin(),
flow_files_produced.end(), contentOrderOfFlowFile));
+ }
+ std::vector<std::string> sorted_split_messages =
sort_and_split_messages(messages_on_topic, message_demarcator);
+ const auto flow_file_content_matches_message = [&] (const
std::shared_ptr<core::FlowFile>& flowfile, const std::string message) {
+ return flowfile->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value()
== message;
+ };
+
+ logger_->log_debug("************");
+ std::string expected = "Expected: ";
+ for (int i = 0; i < sorted_split_messages.size(); ++i) {
+ expected += sorted_split_messages[i] + ", ";
+ }
+ std::string actual = " Actual: ";
+ for (int i = 0; i < sorted_split_messages.size(); ++i) {
+ actual +=
flow_files_produced[i]->getAttribute(ATTRIBUTE_FOR_CAPTURING_CONTENT).value() +
", ";
+ }
+ logger_->log_debug("%s", expected.c_str());
+ logger_->log_debug("%s", actual.c_str());
+ logger_->log_debug("************");
+
+ INFO("The messages received by ConsumeKafka do not match those published");
+ REQUIRE(std::equal(flow_files_produced.begin(), flow_files_produced.end(),
sorted_split_messages.begin(), flow_file_content_matches_message));
+ }
+};
+
+class ConsumeKafkaContinuousPublishingTest : public ConsumeKafkaTest {
+ public:
+ ConsumeKafkaContinuousPublishingTest() : ConsumeKafkaTest() {}
+ virtual ~ConsumeKafkaContinuousPublishingTest() {
+ logTestController_.reset();
+ }
+
+ void single_consumer_with_continuous_message_producing(
+ const uint64_t msg_periodicity_ms,
+ const std::string& kafka_brokers,
+ const optional<std::string>& group_id,
+ const optional<std::string>& max_poll_records,
+ const optional<std::string>& max_poll_time,
+ const optional<std::string>& session_timeout) {
+ reInitialize();
+
+ std::shared_ptr<core::Processor> consume_kafka =
plan_->addProcessor("ConsumeKafka", "consume_kafka", {success}, false);
+
+ plan_->setProperty(consume_kafka, "allow.auto.create.topics", "true",
true); // Seems like the topic tests work without this
+
+ plan_->setProperty(consume_kafka, ConsumeKafka::KafkaBrokers.getName(),
kafka_brokers);
+ plan_->setProperty(consume_kafka, ConsumeKafka::TopicNames.getName(),
PRODUCER_TOPIC);
+ optional_set_property(consume_kafka, ConsumeKafka::GroupID.getName(),
group_id);
+
+ optional_set_property(consume_kafka,
ConsumeKafka::MaxPollRecords.getName(), max_poll_records);
+ optional_set_property(consume_kafka, ConsumeKafka::MaxPollTime.getName(),
max_poll_time);
+ optional_set_property(consume_kafka,
ConsumeKafka::SessionTimeout.getName(), session_timeout);
+
+ consume_kafka->setAutoTerminatedRelationships({success});
+
+ KafkaTestProducer producer("localhost:9092", PRODUCER_TOPIC, /*
transactional = */ false);
+
+ std::atomic_bool producer_loop_stop{ false };
+ const auto producer_loop = [&] {
+ std::size_t num_messages_sent = 0;
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ while (!producer_loop_stop) {
+ producer.publish_messages_to_topic({ "Message after " +
std::to_string(msg_periodicity_ms * ++num_messages_sent) + " ms"},
TEST_MESSAGE_KEY, { PUBLISH }, {}, {});
+
std::this_thread::sleep_for(std::chrono::milliseconds(msg_periodicity_ms));
+ }
+ };
+
+ plan_->scheduleProcessors();
+
+ const auto get_time_property_ms = [] (const std::string& property_string) {
+ int64_t value;
+ org::apache::nifi::minifi::core::TimeUnit unit;
+
REQUIRE(org::apache::nifi::minifi::core::Property::StringToTime(property_string,
value, unit));
+ int64_t value_as_ms;
+
REQUIRE(org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value,
unit, value_as_ms));
+ return value_as_ms;
+ };
+
+ std::thread producer_thread(producer_loop);
+ CHECK_NOTHROW(plan_->runNextProcessor());
+ producer_loop_stop = true;
+ producer_thread.join();
+
+ std::size_t num_flow_files_produced =
plan_->getNumFlowFileProducedByCurrentProcessor();
+
+ const uint64_t max_poll_time_ms =
get_time_property_ms(max_poll_time.value_or(ConsumeKafka::DEFAULT_MAX_POLL_TIME));
+ const uint64_t max_poll_records_value = max_poll_records ?
std::stoi(max_poll_records.value()) : ConsumeKafka::DEFAULT_MAX_POLL_RECORDS;
+ const uint64_t exp_lower_bound = std::min(max_poll_time_ms /
msg_periodicity_ms - 2, max_poll_records_value);
+ const uint64_t exp_upper_bound = std::min(max_poll_time_ms /
msg_periodicity_ms + 2, max_poll_records_value);
+ logger_->log_debug("Max poll time: %d, Max poll records: %d, Exp.
flowfiles produced: (min: %d, max: %d), actual: %d",
+ max_poll_time_ms, max_poll_records_value, exp_lower_bound,
exp_upper_bound, num_flow_files_produced);
+
+ REQUIRE(exp_lower_bound <= num_flow_files_produced);
+ REQUIRE(num_flow_files_produced <= exp_upper_bound);
+ }
+};
+
+const std::string ConsumeKafkaTest::TEST_FILE_NAME_POSTFIX{
"target_kafka_message.txt" };
+const std::string ConsumeKafkaTest::TEST_MESSAGE_KEY{ "consume_kafka_test_key"
};
+const std::string ConsumeKafkaTest::PRODUCER_TOPIC{ "ConsumeKafkaTest" };
+const std::string ConsumeKafkaTest::ATTRIBUTE_FOR_CAPTURING_CONTENT{
"flowfile_content" };
+const std::chrono::seconds
ConsumeKafkaTest::MAX_CONSUMEKAFKA_POLL_TIME_SECONDS{ 5 };
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "ConsumeKafka parses and uses
kafka topics.", "[ConsumeKafka][Kafka][Topic]") {
+ auto run_tests = [&] (const std::vector<std::string>& messages_on_topic,
const std::string& topic_names, const optional<std::string>& topic_name_format)
{
+ single_consumer_with_plain_text_test(true, false, {}, messages_on_topic,
NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT", topic_names,
topic_name_format, {}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec",
"60 sec"); // NOLINT
+ };
+ run_tests({ "Ulysses", "James Joyce" },
"ConsumeKafkaTest", {});
+ run_tests({ "The Great Gatsby", "F. Scott Fitzgerald" },
"ConsumeKafkaTest", ConsumeKafka::TOPIC_FORMAT_NAMES);
+ run_tests({ "War and Peace", "Lev Tolstoy" },
"a,b,c,ConsumeKafkaTest,d", ConsumeKafka::TOPIC_FORMAT_NAMES);
+ run_tests({ "Nineteen Eighty Four", "George Orwell" },
"ConsumeKafkaTest", ConsumeKafka::TOPIC_FORMAT_PATTERNS);
+ run_tests({ "Hamlet", "William Shakespeare" },
"Cons[emu]*KafkaTest", ConsumeKafka::TOPIC_FORMAT_PATTERNS);
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Offsets are reset to the latest
when a consumer starts with non-processed messages.",
"[ConsumeKafka][Kafka][OffsetReset]") {
+ auto run_tests = [&] (
+ const std::vector<std::string>& messages_on_topic,
+ const std::vector<KafkaTestProducer::PublishEvent>& transaction_events) {
+ single_consumer_with_plain_text_test(true, false, {}, messages_on_topic,
transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {},
{}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+ };
+ KafkaTestProducer producer("localhost:9092", PRODUCER_TOPIC, false);
+ producer.publish_messages_to_topic({"Dummy messages", "that should be
ignored", "due to offset reset on ConsumeKafka startup"}, TEST_MESSAGE_KEY,
{PUBLISH, PUBLISH, PUBLISH}, {}, {});
+ run_tests({"Brave New World", "Aldous Huxley"}, NON_TRANSACTIONAL_MESSAGES);
+ producer.publish_messages_to_topic({"Dummy messages", "that should be
ignored", "due to offset reset on ConsumeKafka startup"}, TEST_MESSAGE_KEY,
{PUBLISH, PUBLISH, PUBLISH}, {}, {});
+ run_tests({"Call of the Wild", "Jack London"}, NON_TRANSACTIONAL_MESSAGES);
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Key attribute is encoded
according to the \"Key Attribute Encoding\" property.",
"[ConsumeKafka][Kafka][KeyAttributeEncoding]") {
+ auto run_tests = [&] (const std::vector<std::string>& messages_on_topic,
const optional<std::string>& key_attribute_encoding) {
+ single_consumer_with_plain_text_test(true, false, {}, messages_on_topic,
NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT",
"ConsumeKafkaTest", {}, {}, "test_group_id", {}, key_attribute_encoding, {},
{}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+ };
+
+ run_tests({ "The Odyssey", "Ὅμηρος" }, {});
+ run_tests({ "Lolita", "Владимир Владимирович Набоков" },
"utf-8");
+ run_tests({ "Crime and Punishment", "Фёдор Михайлович Достоевский" },
"hex");
+ run_tests({ "Paradise Lost", "John Milton" },
"hEX");
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Transactional behaviour is
supported.", "[ConsumeKafka][Kafka][Transaction]") {
+ auto run_tests = [&] (const std::vector<std::string>& messages_on_topic,
const std::vector<KafkaTestProducer::PublishEvent>& transaction_events, const
optional<bool>& honor_transactions) {
+ single_consumer_with_plain_text_test(true, false, {}, messages_on_topic,
transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {},
{}, "test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+ };
+ run_tests({ "Pride and Prejudice", "Jane Austen" },
SINGLE_COMMITTED_TRANSACTION, {});
+ run_tests({ "Dune", "Frank Herbert" },
TWO_SEPARATE_TRANSACTIONS, {});
+ run_tests({ "The Black Sheep", "Honore De Balzac" },
NON_COMMITTED_TRANSACTION, {});
+ run_tests({ "Gospel of Thomas" },
CANCELLED_TRANSACTION, {});
+ run_tests({ "Operation Dark Heart" },
CANCELLED_TRANSACTION, true);
+ run_tests({ "Brexit" },
CANCELLED_TRANSACTION, false);
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Headers on consumed Kafka
messages are extracted into attributes if requested on ConsumeKafka.",
"[ConsumeKafka][Kafka][Headers]") {
+ auto run_tests = [&] (
+ const std::vector<std::string>& messages_on_topic,
+ const std::vector<std::pair<std::string, std::string>>&
expect_header_attributes,
+ const std::vector<std::pair<std::string, std::string>>& message_headers,
+ const optional<std::string>& headers_to_add_as_attributes,
+ const optional<std::string>& duplicate_header_handling) {
+ single_consumer_with_plain_text_test(true, false,
expect_header_attributes, messages_on_topic, NON_TRANSACTIONAL_MESSAGES,
message_headers, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {},
"test_group_id", {}, {}, {}, {}, headers_to_add_as_attributes,
duplicate_header_handling, "1", "2 sec", "60 sec"); // NOLINT
+ };
+ run_tests({ "Homeland", "R. A. Salvatore"},
{}, {{{"Contains dark elves"}, {"Yes"}}},
{}, {});
+ run_tests({ "Magician", "Raymond E. Feist"},
{{{"Rating"}, {"10/10"}}}, {{{"Rating"}, {"10/10"}}},
{"Rating"}, {});
+ run_tests({ "Mistborn", "Brandon Sanderson"},
{{{"Metal"}, {"Copper"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}},
{"Metal"}, KEEP_FIRST);
+ run_tests({ "Mistborn", "Brandon Sanderson"},
{{{"Metal"}, {"Iron"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"}, {"Iron"}}},
{"Metal"}, KEEP_LATEST);
+ run_tests({ "Mistborn", "Brandon Sanderson"},
{{{"Metal"}, {"Copper, Iron"}}}, {{{"Metal"}, {"Copper"}}, {{"Metal"},
{"Iron"}}}, {"Metal"}, COMMA_SEPARATED_MERGE);
+ run_tests({"The Lord of the Rings", "J. R. R. Tolkien"}, {{{"Parts"},
{"First, second, third"}}}, {{{"Parts"}, {"First, second, third"}}},
{"Parts"}, {});
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Messages are separated into
multiple flowfiles if the message demarcator is present in the message.",
"[ConsumeKafka][Kafka][MessageDemarcator]") {
+ auto run_tests = [&] (
+ const std::vector<std::string>& messages_on_topic,
+ const optional<std::string>& message_demarcator) {
+ single_consumer_with_plain_text_test(true, false, {}, messages_on_topic,
NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092", "PLAINTEXT",
"ConsumeKafkaTest", {}, {}, "test_group_id", {}, {}, message_demarcator, {},
{}, {}, "1", "2 sec", "60 sec"); // NOLINT
+ };
+ run_tests({"Barbapapa", "Anette Tison and Talus Taylor"}, "a");
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "The maximum poll records allows
ConsumeKafka to combine multiple messages into a single flowfile.",
"[ConsumeKafka][Kafka][Batching][MaxPollRecords]") {
+ auto run_tests = [&] (
+ const std::vector<std::string>& messages_on_topic,
+ const std::vector<KafkaTestProducer::PublishEvent>& transaction_events,
+ const optional<std::string>& max_poll_records) {
+ single_consumer_with_plain_text_test(true, false, {}, messages_on_topic,
transaction_events, {}, "localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {},
{}, "test_group_id", {}, {}, {}, {}, {}, {}, max_poll_records, "2 sec", "60
sec"); // NOLINT
+ };
+ run_tests({"The Count of Monte Cristo", "Alexandre Dumas"},
NON_TRANSACTIONAL_MESSAGES, "2");
+
+ const std::vector<std::string> content {
+ "Make const member functions thread safe",
+ "Understand special member function generation",
+ "Use std::unique_ptr for exclusive-ownership resource management",
+ "Use std::shared_ptr for shared-ownership resource management",
+ "Use std::weak_ptr for std::shared_ptr-like pointers that can dangle",
+ "Prefer std::make_unique and std::make_shared to direct use of new",
+ "When using the Pimpl Idiom, define special member functions inthe
implementation file",
+ "Understand std::move and std::forward",
+ "Distinguish universal references from rvalue references",
+ "Use std::move on rvalue references, std::forward on universal
references",
+ "Avoid overloading on universal references",
+ "Familiarize yourself with alternatives to overloading on universal
references",
+ "Understand reference collapsing",
+ "Assume that move operations are not present, not cheap, and not used",
+ "Familiarize yourself with perfect forwarding failure cases",
+ "Avoid default capture modes",
+ "Use init capture to move objects into closures",
+ "Use decltype on auto&& parameters to std::forward them",
+ "Prefer lambdas to std::bind",
+ "Prefer task-based programming to thread-based" };
+ const std::vector<KafkaTestProducer::PublishEvent>
transaction_events(content.size(), PUBLISH);
+ run_tests(content, transaction_events, "5");
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Non-plain text security context
throws scheduling exceptions.", "[ConsumeKafka][Kafka][SecurityProtocol]") {
+ single_consumer_with_plain_text_test(false, false, {}, { "Miyamoto Musashi",
"Eiji Yoshikawa" }, NON_TRANSACTIONAL_MESSAGES, {}, "localhost:9092",
ConsumeKafka::SECURITY_PROTOCOL_SSL, "ConsumeKafkaTest", {}, {},
"test_group_id", {}, {}, {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+}
+
+TEST_CASE_METHOD(ConsumeKafkaPropertiesTest, "Acceptable values for message
header and key attribute encoding are \"UTF-8\" and \"hex\".",
"[ConsumeKafka][Kafka][InvalidEncoding]") {
+ single_consumer_with_plain_text_test(false, false, {}, {
"Shogun", "James Clavell" }, NON_TRANSACTIONAL_MESSAGES, {},
"localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {},
"UTF-32", {}, {}, {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+ single_consumer_with_plain_text_test(false, false, {}, { "Alice's Adventures
in Wonderland", "Lewis Carroll" }, NON_TRANSACTIONAL_MESSAGES, {},
"localhost:9092", "PLAINTEXT", "ConsumeKafkaTest", {}, {}, "test_group_id", {},
{}, {}, "UTF-32", {}, {}, "1", "2 sec", "60 sec"); // NOLINT
+}
+
+TEST_CASE_METHOD(ConsumeKafkaContinuousPublishingTest, "ConsumeKafka can spend
no more time polling than allowed in the maximum poll time property.",
"[ConsumeKafka][Kafka][Batching][MaxPollTime]") {
+ auto run_tests = [&] (
+ const uint64_t msg_periodicity_ms,
+ const optional<std::string>& max_poll_records,
+ const optional<std::string>& max_poll_time,
+ const optional<std::string>& session_timeout) {
+ single_consumer_with_continuous_message_producing(msg_periodicity_ms,
"localhost:9092", "test_group_id", max_poll_records, max_poll_time,
session_timeout);
+ };
+ // For some reason, a session time-out of a few seconds does not work at
all, 10 seconds seems to be stable
+ run_tests(300, "20", "3 seconds", "10000 ms");
+ // Running multiple tests does not work properly here. For some reason,
producing messages
+ // while a rebalance is triggered causes this error, and a blocked poll when
new
+ // messages are produced:
+ // Group "test_group_id" heartbeat error response in state up (join
state wait-revoke-rebalance_cb, 1 partition(s) assigned): Broker: Group
rebalance in progress
+ //
+ // I tried adding a wait time for more than "session.timeout.ms" inbetween
tests, but it was not sufficient
+}
+
+} // namespace
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index f54844a..3084d2e 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -141,7 +141,7 @@ class Connection : public core::Connectable, public
std::enable_shared_from_this
}
// Check whether the queue is empty
- bool isEmpty();
+ bool isEmpty() const;
// Check whether the queue is full to apply back pressure
bool isFull();
// Get queue size
@@ -199,7 +199,7 @@ class Connection : public core::Connectable, public
std::enable_shared_from_this
private:
bool drop_empty_;
// Mutex for protection
- std::mutex mutex_;
+ mutable std::mutex mutex_;
// Queued data size
std::atomic<uint64_t> queued_data_size_;
// Queue for the Flow File
diff --git a/libminifi/include/core/FlowFile.h
b/libminifi/include/core/FlowFile.h
index 4ced9cd..d2924bc 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -25,6 +25,7 @@
#include <utility>
#include <vector>
+#include "utils/OptionalUtils.h"
#include "utils/TimeUtil.h"
#include "ResourceClaim.h"
#include "Connectable.h"
@@ -134,7 +135,9 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
* @param value value to set
* @return result of finding key
*/
- bool getAttribute(std::string key, std::string& value) const;
+ bool getAttribute(const std::string& key, std::string& value) const;
+
+ utils::optional<std::string> getAttribute(const std::string& key) const;
/**
* Updates the value in the attribute map that corresponds
diff --git a/libminifi/include/utils/GeneralUtils.h
b/libminifi/include/utils/GeneralUtils.h
index c132650..4b11c67 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -49,6 +49,18 @@ constexpr T intdiv_ceil(T numerator, T denominator) {
: numerator / denominator + (numerator % denominator != 0));
}
+#if __cplusplus > 201703L
+using std::identity;
+#else
+// from https://stackoverflow.com/questions/15202474
+struct identity {
+ template<typename U>
+ constexpr auto operator()(U&& v) const noexcept ->
decltype(std::forward<U>(v)) {
+ return std::forward<U>(v);
+ }
+};
+#endif /* < C++20 */
+
using gsl::owner;
#if __cplusplus < 201402L
diff --git a/libminifi/include/utils/ProcessorConfigUtils.h
b/libminifi/include/utils/ProcessorConfigUtils.h
new file mode 100644
index 0000000..cb75bfc
--- /dev/null
+++ b/libminifi/include/utils/ProcessorConfigUtils.h
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <vector>
+#include <string>
+
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+std::string getRequiredPropertyOrThrow(const core::ProcessContext* context,
const std::string& property_name) {
+ std::string value;
+ if (!context->getProperty(property_name, value)) {
+ throw std::runtime_error(property_name + " property missing or invalid");
+ }
+ return value;
+}
+
+std::vector<std::string> listFromCommaSeparatedProperty(const
core::ProcessContext* context, const std::string& property_name) {
+ std::string property_string;
+ context->getProperty(property_name, property_string);
+ return utils::StringUtils::splitAndTrim(property_string, ",");
+}
+
+std::vector<std::string> listFromRequiredCommaSeparatedProperty(const
core::ProcessContext* context, const std::string& property_name) {
+ return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context,
property_name), ",");
+}
+
+bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const
std::string& property_name) {
+ bool value;
+ const std::string value_str = getRequiredPropertyOrThrow(context,
property_name);
+ utils::optional<bool> maybe_value = utils::StringUtils::toBool(value_str);
+ if (!maybe_value) {
+ throw std::runtime_error(property_name + " property is invalid: value is "
+ value_str);
+ }
+ return maybe_value.value();
+}
+
+std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext*
context, const std::string& property_name) {
+ core::TimeUnit unit;
+ uint64_t time_value_ms;
+ const std::string value_str = getRequiredPropertyOrThrow(context,
property_name);
+ if (!core::Property::StringToTime(value_str, time_value_ms, unit) ||
!core::Property::ConvertTimeUnitToMS(time_value_ms, unit, time_value_ms)) {
+ throw std::runtime_error(property_name + " property is invalid: value is "
+ value_str);
+ }
+ return std::chrono::milliseconds(time_value_ms);
+}
+
+utils::optional<uint64_t> getOptionalUintProperty(const core::ProcessContext&
context, const std::string& property_name) {
+ uint64_t value;
+ if (context.getProperty(property_name, value)) {
+ return { value };
+ }
+ return utils::nullopt;
+}
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/include/utils/StringUtils.h
b/libminifi/include/utils/StringUtils.h
index 54bab6c..492dc7b 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -117,15 +117,15 @@ class StringUtils {
/**
* Compares strings by lower casing them.
*/
- static inline bool equalsIgnoreCase(const std::string &left, const
std::string right) {
- if (left.length() == right.length()) {
- return std::equal(right.begin(), right.end(), left.begin(), [](unsigned
char lc, unsigned char rc) {return tolower(lc) == tolower(rc);});
- } else {
+ static inline bool equalsIgnoreCase(const std::string& left, const
std::string& right) {
+ if (left.length() != right.length()) {
return false;
}
+ return std::equal(right.cbegin(), right.cend(), left.cbegin(), [](unsigned
char lc, unsigned char rc) { return std::tolower(lc) == std::tolower(rc); });
}
static std::vector<std::string> split(const std::string &str, const
std::string &delimiter);
+ static std::vector<std::string> splitAndTrim(const std::string &str, const
std::string &delimiter);
/**
* Converts a string to a float
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 75f8d57..ccae760 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -111,7 +111,7 @@ Connection::Connection(const
std::shared_ptr<core::Repository> &flow_repository,
logger_->log_debug("Connection %s created", name_);
}
-bool Connection::isEmpty() {
+bool Connection::isEmpty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 322f5a5..4492101 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -142,14 +142,21 @@ std::vector<utils::Identifier>
&FlowFile::getlineageIdentifiers() {
return lineage_Identifiers_;
}
-bool FlowFile::getAttribute(std::string key, std::string& value) const {
+bool FlowFile::getAttribute(const std::string& key, std::string& value) const {
+ const auto attribute = getAttribute(key);
+ if (!attribute) {
+ return false;
+ }
+ value = attribute.value();
+ return true;
+}
+
+utils::optional<std::string> FlowFile::getAttribute(const std::string& key)
const {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
- value = it->second;
- return true;
- } else {
- return false;
+ return it->second;
}
+ return utils::nullopt;
}
// Get Size
diff --git a/libminifi/src/utils/StringUtils.cpp
b/libminifi/src/utils/StringUtils.cpp
index 7a02720..c76f229 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -46,8 +46,13 @@ std::string StringUtils::trim(const std::string& s) {
return trimRight(trimLeft(s));
}
-std::vector<std::string> StringUtils::split(const std::string &str, const
std::string &delimiter) {
+template<typename Fun>
+std::vector<std::string> split_transformed(const std::string& str, const
std::string& delimiter, Fun transformation) {
std::vector<std::string> result;
+ if (delimiter.empty()) {
+ std::transform(str.begin(), str.end(), std::back_inserter(result), [&]
(const char c) { return transformation(std::string{c}); });
+ return result;
+ }
auto curr = str.begin();
auto end = str.end();
auto is_func = [delimiter](int s) {
@@ -59,13 +64,21 @@ std::vector<std::string> StringUtils::split(const
std::string &str, const std::s
break;
}
auto next = std::find_if(curr, end, is_func);
- result.push_back(std::string(curr, next));
+ result.push_back(transformation(std::string(curr, next)));
curr = next;
}
return result;
}
+std::vector<std::string> StringUtils::split(const std::string& str, const
std::string& delimiter) {
+ return split_transformed(str, delimiter, identity{});
+}
+
+std::vector<std::string> StringUtils::splitAndTrim(const std::string& str,
const std::string& delimiter) {
+ return split_transformed(str, delimiter, trim);
+}
+
bool StringUtils::StringToFloat(std::string input, float &output,
FailurePolicy cp /*= RETURN*/) {
try {
output = std::stof(input);
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 7d81d44..0caf275 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -17,6 +17,7 @@
*/
#include "./TestBase.h"
+#include "utils/IntegrationTestUtils.h"
#include "spdlog/spdlog.h"
@@ -61,44 +62,42 @@ TestPlan::~TestPlan() {
for (auto& processor : configured_processors_) {
processor->setScheduledState(core::ScheduledState::STOPPED);
}
+ for (auto& connection : relationships_) {
+ // This is a patch solving circular references between processors and
connections
+ connection->setSource(nullptr);
+ connection->setDestination(nullptr);
+ }
controller_services_provider_->clearControllerServices();
}
std::shared_ptr<core::Processor> TestPlan::addProcessor(const
std::shared_ptr<core::Processor> &processor, const std::string& /*name*/, const
std::initializer_list<core::Relationship>& relationships,
- bool linkToPrevious) {
+ bool linkToPrevious) {
if (finalized) {
return nullptr;
}
std::lock_guard<std::recursive_mutex> guard(mutex);
-
utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate();
-
processor->setStreamFactory(stream_factory);
// initialize the processor
processor->initialize();
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
-
processor_mapping_[processor->getUUID()] = processor;
-
if (!linkToPrevious) {
termination_ = *(relationships.begin());
} else {
std::shared_ptr<core::Processor> last = processor_queue_.back();
-
if (last == nullptr) {
last = processor;
termination_ = *(relationships.begin());
}
-
std::stringstream connection_name;
connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr();
- logger_->log_info("Creating %s connection for proc %d",
connection_name.str(), processor_queue_.size() + 1);
std::shared_ptr<minifi::Connection> connection =
std::make_shared<minifi::Connection>(flow_repo_, content_repo_,
connection_name.str());
+ logger_->log_info("Creating %s connection for proc %d",
connection_name.str(), processor_queue_.size() + 1);
for (const auto& relationship : relationships) {
connection->addRelationship(relationship);
}
-
// link the connections so that we can test results at the end for this
connection->setSource(last);
connection->setDestination(processor);
@@ -111,28 +110,19 @@ std::shared_ptr<core::Processor>
TestPlan::addProcessor(const std::shared_ptr<co
}
relationships_.push_back(connection);
}
-
std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(processor);
-
processor_nodes_.push_back(node);
-
// std::shared_ptr<core::ProcessContext> context =
std::make_shared<core::ProcessContext>(node, controller_services_provider_,
prov_repo_, flow_repo_, configuration_, content_repo_);
-
auto contextBuilder =
core::ClassLoader::getDefaultClassLoader().instantiate<core::ProcessContextBuilder>("ProcessContextBuilder");
-
contextBuilder =
contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_.get())->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
-
auto context = contextBuilder->build(node);
-
processor_contexts_.push_back(context);
-
processor_queue_.push_back(processor);
-
return processor;
}
std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string
&processor_name, const utils::Identifier& uuid, const std::string &name,
- const
std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
+ const std::initializer_list<core::Relationship>& relationships, bool
linkToPrevious) {
if (finalized) {
return nullptr;
}
@@ -150,7 +140,7 @@ std::shared_ptr<core::Processor>
TestPlan::addProcessor(const std::string &proce
}
std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string
&processor_name, const std::string &name, const
std::initializer_list<core::Relationship>& relationships,
- bool linkToPrevious) {
+ bool linkToPrevious) {
if (finalized) {
return nullptr;
}
@@ -247,45 +237,65 @@ void TestPlan::reset(bool reschedule) {
}
}
-bool TestPlan::runNextProcessor(std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify) {
- if (!finalized) {
- finalize();
+std::vector<std::shared_ptr<core::Processor>>::iterator
TestPlan::getProcessorItByUuid(const std::string& uuid) {
+ const auto processor_node_matches_processor = [&uuid] (const
std::shared_ptr<core::Processor>& processor) {
+ return processor->getUUIDStr() == uuid;
+ };
+ auto processor_found_at = std::find_if(processor_queue_.begin(),
processor_queue_.end(), processor_node_matches_processor);
+ if (processor_found_at == processor_queue_.end()) {
+ throw std::runtime_error("Processor not found in test plan.");
}
- logger_->log_info("Running next processor %d, processor_queue_.size %d,
processor_contexts_.size %d", location, processor_queue_.size(),
processor_contexts_.size());
- std::lock_guard<std::recursive_mutex> guard(mutex);
- location++;
- std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
- std::shared_ptr<core::ProcessContext> context =
processor_contexts_.at(location);
- std::shared_ptr<core::ProcessSessionFactory> factory =
std::make_shared<core::ProcessSessionFactory>(context);
- factories_.push_back(factory);
+ return processor_found_at;
+}
+
+std::shared_ptr<core::ProcessContext>
TestPlan::getProcessContextForProcessor(const std::shared_ptr<core::Processor>&
processor) {
+ const auto contextMatchesProcessor = [&processor] (const
std::shared_ptr<core::ProcessContext>& context) {
+ return context->getProcessorNode()->getUUIDStr() ==
processor->getUUIDStr();
+ };
+ const auto context_found_at = std::find_if(processor_contexts_.begin(),
processor_contexts_.end(), contextMatchesProcessor);
+ if (context_found_at == processor_contexts_.end()) {
+ throw std::runtime_error("Context not found in test plan.");
+ }
+ return *context_found_at;
+}
+
+void TestPlan::scheduleProcessor(const std::shared_ptr<core::Processor>&
processor, const std::shared_ptr<core::ProcessContext>& context) {
if (std::find(configured_processors_.begin(), configured_processors_.end(),
processor) == configured_processors_.end()) {
+ // Ordering on factories and list of configured processors do not matter
+ std::shared_ptr<core::ProcessSessionFactory> factory =
std::make_shared<core::ProcessSessionFactory>(context);
+ factories_.push_back(factory);
processor->onSchedule(context, factory);
configured_processors_.push_back(processor);
}
- std::shared_ptr<core::ProcessSession> current_session =
std::make_shared<core::ProcessSession>(context);
- process_sessions_.push_back(current_session);
- current_flowfile_ = nullptr;
- processor->incrementActiveTasks();
- processor->setScheduledState(core::ScheduledState::RUNNING);
- if (verify != nullptr) {
- verify(context, current_session);
- } else {
- logger_->log_info("Running %s", processor->getName());
- processor->onTrigger(context, current_session);
+}
+
+void TestPlan::scheduleProcessor(const std::shared_ptr<core::Processor>&
processor) {
+ scheduleProcessor(processor, getProcessContextForProcessor(processor));
+}
+
+void TestPlan::scheduleProcessors() {
+ for(std::size_t target_location = 0; target_location <
processor_queue_.size(); ++target_location) {
+ std::shared_ptr<core::Processor> processor =
processor_queue_.at(target_location);
+ std::shared_ptr<core::ProcessContext> context =
processor_contexts_.at(target_location);
+ scheduleProcessor(processor, context);
}
- current_session->commit();
- return gsl::narrow<size_t>(location + 1) < processor_queue_.size();
}
-bool TestPlan::runCurrentProcessor(std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify) {
+bool TestPlan::runProcessor(const std::shared_ptr<core::Processor>& processor,
std::function<void(const std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify) {
+ const std::size_t processor_location =
std::distance(processor_queue_.begin(),
getProcessorItByUuid(processor->getUUIDStr()));
+ return runProcessor(gsl::narrow<int>(processor_location), verify);
+}
+
+bool TestPlan::runProcessor(int target_location, std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify) {
if (!finalized) {
finalize();
}
- logger_->log_info("Rerunning current processor %d, processor_queue_.size %d,
processor_contexts_.size %d", location, processor_queue_.size(),
processor_contexts_.size());
+ logger_->log_info("Running next processor %d, processor_queue_.size %d,
processor_contexts_.size %d", target_location, processor_queue_.size(),
processor_contexts_.size());
std::lock_guard<std::recursive_mutex> guard(mutex);
- std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
- std::shared_ptr<core::ProcessContext> context =
processor_contexts_.at(location);
+ std::shared_ptr<core::Processor> processor =
processor_queue_.at(target_location);
+ std::shared_ptr<core::ProcessContext> context =
processor_contexts_.at(target_location);
+ scheduleProcessor(processor, context);
std::shared_ptr<core::ProcessSession> current_session =
std::make_shared<core::ProcessSession>(context);
process_sessions_.push_back(current_session);
current_flowfile_ = nullptr;
@@ -298,7 +308,58 @@ bool
TestPlan::runCurrentProcessor(std::function<void(const std::shared_ptr<core
processor->onTrigger(context, current_session);
}
current_session->commit();
- return gsl::narrow<size_t>(location + 1) < processor_queue_.size();
+ return gsl::narrow<size_t>(target_location + 1) < processor_queue_.size();
+}
+
+bool TestPlan::runNextProcessor(std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify) {
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+ ++location;
+ return runProcessor(location, verify);
+}
+
+bool TestPlan::runCurrentProcessor(std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify) {
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+ return runProcessor(location);
+}
+
+bool TestPlan::runCurrentProcessorUntilFlowfileIsProduced(const
std::chrono::seconds& wait_duration) {
+ using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+ const auto isFlowFileProduced = [&] {
+ runCurrentProcessor();
+ const std::vector<minifi::Connection*> connections =
getProcessorOutboundConnections(processor_queue_.at(location));
+ return std::any_of(connections.cbegin(), connections.cend(), [] (const
minifi::Connection* conn) { return !conn->isEmpty(); });
+ };
+ return verifyEventHappenedInPollTime(wait_duration, isFlowFileProduced);
+}
+
+std::size_t TestPlan::getNumFlowFileProducedByCurrentProcessor() {
+ const std::shared_ptr<core::Processor>& processor =
processor_queue_.at(location);
+ std::vector<minifi::Connection*> connections =
getProcessorOutboundConnections(processor);
+ std::size_t num_flow_files = 0;
+ for (auto connection : connections) {
+ num_flow_files += connection->getQueueSize();
+ }
+ return num_flow_files;
+}
+
+std::shared_ptr<core::FlowFile>
TestPlan::getFlowFileProducedByCurrentProcessor() {
+ const std::shared_ptr<core::Processor>& processor =
processor_queue_.at(location);
+ std::vector<minifi::Connection*> connections =
getProcessorOutboundConnections(processor);
+ for (auto connection : connections) {
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flowfile =
connection->poll(expiredFlowRecords);
+ if (flowfile) {
+ return flowfile;
+ }
+ if (expiredFlowRecords.empty()) {
+ continue;
+ }
+ if (expiredFlowRecords.size() != 1) {
+ throw std::runtime_error("Multiple expired flowfiles present in a single
connection.");
+ }
+ return *expiredFlowRecords.begin();
+ }
+ return nullptr;
}
std::set<std::shared_ptr<provenance::ProvenanceEventRecord>>
TestPlan::getProvenanceRecords() {
@@ -312,6 +373,20 @@ std::shared_ptr<core::FlowFile>
TestPlan::getCurrentFlowFile() {
return current_flowfile_;
}
+std::vector<minifi::Connection*>
TestPlan::getProcessorOutboundConnections(const
std::shared_ptr<core::Processor>& processor) {
+ const auto is_processor_outbound_connection = [&processor] (const
std::shared_ptr<minifi::Connection>& connection) {
+ // A connection is outbound from a processor if its source uuid matches
the processor
+ return connection->getSource()->getUUIDStr() == processor->getUUIDStr();
+ };
+ std::vector<minifi::Connection*> connections;
+ for (auto relationship : relationships_) {
+ if (is_processor_outbound_connection(relationship)) {
+ connections.emplace_back(relationship.get());
+ }
+ }
+ return connections;
+}
+
std::shared_ptr<core::ProcessContext> TestPlan::getCurrentContext() {
return processor_contexts_.at(location);
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 1a7d960..fd9bd36 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -20,13 +20,14 @@
#define LIBMINIFI_TEST_TESTBASE_H_
#include <cstdio>
#include <cstdlib>
+#include <map>
+#include <set>
#include <sstream>
+#include <utility>
+#include <vector>
#include "ResourceClaim.h"
#include "utils/file/FileUtils.h"
#include "catch.hpp"
-#include <vector>
-#include <set>
-#include <map>
#include "core/logging/Logger.h"
#include "core/Core.h"
#include "properties/Configure.h"
@@ -39,7 +40,6 @@
#include "spdlog/sinks/ostream_sink.h"
#include "spdlog/sinks/dist_sink.h"
#include "unit/ProvenanceTestHelper.h"
-#include "core/Core.h"
#include "core/FlowFile.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
@@ -49,7 +49,6 @@
#include "core/controller/ControllerServiceNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "core/state/nodes/FlowInformation.h"
-#include "properties/Configure.h"
#include "utils/ClassUtils.h"
class LogTestController {
@@ -196,7 +195,7 @@ class LogTestController {
}
my_properties_->set("logger.root", "ERROR,ostream");
my_properties_->set("logger." + core::getClassName<LogTestController>(),
"INFO");
- my_properties_->set("logger." +
core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
+ my_properties_->set("logger." +
core::getClassName<logging::LoggerConfiguration>(), "INFO");
std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink =
std::make_shared<spdlog::sinks::dist_sink_mt>();
dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output,
true));
dist_sink->add_sink(std::make_shared<spdlog::sinks::stderr_sink_mt>());
@@ -223,7 +222,6 @@ class LogTestController {
class TestPlan {
public:
-
explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository>
prov_repo,
const
std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const
std::shared_ptr<minifi::Configure> &configuration, const char* state_dir);
@@ -233,22 +231,15 @@ class TestPlan {
core::Relationship
relationship = core::Relationship("success", "description"), bool
linkToPrevious = false) {
return addProcessor(processor, name, { relationship }, linkToPrevious);
}
-
std::shared_ptr<core::Processor> addProcessor(const std::string
&processor_name, const std::string &name, core::Relationship relationship =
core::Relationship("success", "description"),
bool linkToPrevious = false) {
return addProcessor(processor_name, name, { relationship },
linkToPrevious);
}
+ std::shared_ptr<core::Processor> addProcessor(const
std::shared_ptr<core::Processor> &processor, const std::string &name, const
std::initializer_list<core::Relationship>& relationships, bool linkToPrevious =
false); // NOLINT
+ std::shared_ptr<core::Processor> addProcessor(const std::string
&processor_name, const std::string &name, const
std::initializer_list<core::Relationship>& relationships, bool linkToPrevious =
false); // NOLINT
+ std::shared_ptr<core::Processor> addProcessor(const std::string
&processor_name, const utils::Identifier& uuid, const std::string &name, const
std::initializer_list<core::Relationship>& relationships, bool linkToPrevious =
false); // NOLINT
- std::shared_ptr<core::Processor> addProcessor(const
std::shared_ptr<core::Processor> &processor, const std::string &name, const
std::initializer_list<core::Relationship>& relationships,
- bool linkToPrevious = false);
-
- std::shared_ptr<core::Processor> addProcessor(const std::string
&processor_name, const std::string &name, const
std::initializer_list<core::Relationship>& relationships,
- bool linkToPrevious = false);
-
- std::shared_ptr<core::Processor> addProcessor(const std::string
&processor_name, const utils::Identifier& uuid, const std::string &name, const
std::initializer_list<core::Relationship>& relationships,
- bool linkToPrevious = false);
-
- std::shared_ptr<minifi::Connection> addConnection(const
std::shared_ptr<core::Processor>& source_proc, const core::Relationship&
source_relationship, const std::shared_ptr<core::Processor>& destination_proc);
+ std::shared_ptr<minifi::Connection> addConnection(const
std::shared_ptr<core::Processor>& source_proc, const core::Relationship&
source_relationship, const std::shared_ptr<core::Processor>& destination_proc);
// NOLINT
std::shared_ptr<core::controller::ControllerServiceNode> addController(const
std::string &controller_name, const std::string &name);
@@ -257,14 +248,30 @@ class TestPlan {
bool setProperty(const
std::shared_ptr<core::controller::ControllerServiceNode>
controller_service_node, const std::string &prop, const std::string &value,
bool dynamic = false);
void reset(bool reschedule = false);
+ void increment_location() { ++location; }
+ void reset_location() { location = -1; }
- bool runNextProcessor(std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+ std::vector<std::shared_ptr<core::Processor>>::iterator
getProcessorItByUuid(const std::string& uuid);
+ std::shared_ptr<core::ProcessContext> getProcessContextForProcessor(const
std::shared_ptr<core::Processor>& processor);
+ void scheduleProcessor(const std::shared_ptr<core::Processor>& processor,
const std::shared_ptr<core::ProcessContext>& context);
+ void scheduleProcessor(const std::shared_ptr<core::Processor>& processor);
+ void scheduleProcessors();
+
+ // Note: all this verify logic is only used in TensorFlow tests as a
replacement for UpdateAttribute
+ // It should probably not be the part of the standard way of running
processors
+ bool runProcessor(const std::shared_ptr<core::Processor>& processor,
std::function<void(const std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+ bool runProcessor(int target_location, std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+ bool runNextProcessor(std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify = nullptr);
bool runCurrentProcessor(std::function<void(const
std::shared_ptr<core::ProcessContext>, const
std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+ bool runCurrentProcessorUntilFlowfileIsProduced(const std::chrono::seconds&
wait_duration);
std::set<std::shared_ptr<provenance::ProvenanceEventRecord>>
getProvenanceRecords();
std::shared_ptr<core::FlowFile> getCurrentFlowFile();
+ std::vector<minifi::Connection*> getProcessorOutboundConnections(const
std::shared_ptr<core::Processor>& processor);
+ std::size_t getNumFlowFileProducedByCurrentProcessor();
+ std::shared_ptr<core::FlowFile> getFlowFileProducedByCurrentProcessor();
std::shared_ptr<core::ProcessContext> getCurrentContext();
@@ -349,16 +356,15 @@ class TestPlan {
std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>
controller_service_nodes_;
std::map<utils::Identifier, std::shared_ptr<core::Processor>>
processor_mapping_;
std::vector<std::shared_ptr<core::Processor>> processor_queue_;
- std::vector<std::shared_ptr<core::Processor>> configured_processors_;
+ std::vector<std::shared_ptr<core::Processor>> configured_processors_; // Do
not assume ordering
std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
- std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
+ std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_; // Do
not assume ordering
std::vector<std::shared_ptr<minifi::Connection>> relationships_;
core::Relationship termination_;
private:
-
std::shared_ptr<logging::Logger> logger_;
};
@@ -423,7 +429,6 @@ class TestController {
std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
LogTestController &log;
std::vector<std::string> directories;
-
};
#endif /* LIBMINIFI_TEST_TESTBASE_H_ */
diff --git a/libminifi/test/unit/StringUtilsTests.cpp
b/libminifi/test/unit/StringUtilsTests.cpp
index 2ecc8de..c48df4a 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -50,6 +50,16 @@ TEST_CASE("TestStringUtils::split4", "[test split
classname]") {
REQUIRE(expected ==
StringUtils::split(org::apache::nifi::minifi::core::getClassName<org::apache::nifi::minifi::utils::StringUtils>(),
"::"));
}
+TEST_CASE("TestStringUtils::split5", "[test split with delimiter set to empty
string]") {
+ std::vector<std::string> expected{ "h", "e", "l", "l", "o", " ", "w", "o",
"r", "l", "d" };
+ REQUIRE(expected == StringUtils::split("hello world", ""));
+}
+
+TEST_CASE("TestStringUtils::splitTrasformed", "[test split with trim]") {
+ std::vector<std::string> expected{ "hello", "world peace" };
+ REQUIRE(expected == StringUtils::splitAndTrim("hello, world peace", ","));
+}
+
TEST_CASE("StringUtils::replaceEnvironmentVariables works correctly",
"[replaceEnvironmentVariables]") {
utils::Environment::setEnvironmentVariable("blahblahnamenamenotexist",
"computer", 0);