This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch MINIFICPP-1158 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e4a1732ab2239ce917f255939bad20274a83323a Author: Nghia Le <[email protected]> AuthorDate: Wed Jan 15 13:39:18 2020 +0100 MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and throw in onSchedule Signed-off-by: Arpad Boda <[email protected]> This closes #710 --- CMakeLists.txt | 2 +- extensions/librdkafka/PublishKafka.cpp | 211 +++++++++------------ extensions/librdkafka/PublishKafka.h | 6 + extensions/librdkafka/tests/CMakeLists.txt | 36 ++++ .../tests/PublishKafkaOnScheduleTests.cpp | 74 ++++++++ extensions/opc/include/opcbase.h | 2 - extensions/opc/src/fetchopc.cpp | 25 +-- extensions/opc/src/opcbase.cpp | 28 +-- extensions/opc/src/putopc.cpp | 26 +-- .../processors/LogAttribute.cpp | 22 +-- .../standard-processors/processors/LogAttribute.h | 6 +- .../standard-processors/tests/unit/GetTCPTests.cpp | 3 + libminifi/include/c2/PayloadParser.h | 7 + libminifi/include/core/ConfigurableComponent.h | 3 +- libminifi/include/core/PropertyValidation.h | 31 +++ libminifi/include/core/PropertyValue.h | 12 +- libminifi/include/core/state/Value.h | 136 +++++++++---- libminifi/src/c2/protocols/RESTProtocol.cpp | 4 + libminifi/src/core/PropertyValidation.cpp | 1 + libminifi/src/core/state/Value.cpp | 1 + libminifi/test/integration/IntegrationBase.h | 11 ++ .../integration/OnScheduleErrorHandlingTests.cpp | 36 ++-- libminifi/test/resources/TestKafkaOnSchedule.yml | 46 +++++ 23 files changed, 474 insertions(+), 255 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c3850f8..a57e325 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -357,7 +357,7 @@ option(ENABLE_LIBRDKAFKA "Enables the librdkafka extension." OFF) if (ENABLE_ALL OR ENABLE_LIBRDKAFKA) include(BundledLibRdKafka) use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) - createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "${TEST_DIR}/kafka-tests") + createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests") endif() ## Scripting extensions diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index 69fc1a6..f14dd6c 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -149,6 +149,46 @@ void PublishKafka::initialize() { void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { interrupted_ = false; + + // Try to get a KafkaConnection + std::string client_id, brokers; + if (!context->getProperty(ClientName.getName(), client_id)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client Name property missing or invalid"); + } + if (!context->getProperty(SeedBrokers.getName(), brokers)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Known Brokers property missing or invalid"); + } + + // Get some properties not (only) used directly to set up librdkafka + + // Batch Size + context->getProperty(BatchSize.getName(), batch_size_); + logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size_); + + // Target Batch Payload Size + context->getProperty(TargetBatchPayloadSize.getName(), target_batch_payload_size_); + logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size_); + + // Max Flow Segment Size + context->getProperty(MaxFlowSegSize.getName(), max_flow_seg_size_); + logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size_); + + // Attributes to Send as Headers + std::string value; + if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) { + attributeNameRegex_ = utils::Regex(value); + logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value); + } + + // Future Improvement: Get rid of key since we only need to store one connection with current design. + key_.brokers_ = brokers; + key_.client_id_ = client_id; + + std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_); + std::shared_ptr<KafkaConnection> conn = lease->getConn(); + configureNewConnection(conn, context); + + logger_->log_debug("Successfully configured PublishKafka"); } void PublishKafka::notifyStop() { @@ -181,11 +221,11 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> std::string valueConf; std::array<char, 512U> errstr{}; rd_kafka_conf_res_t result; + const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: "; rd_kafka_conf_t* conf_ = rd_kafka_conf_new(); if (conf_ == nullptr) { - logger_->log_error("Failed to create rd_kafka_conf_t object"); - return false; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); } utils::ScopeGuard confGuard([conf_](){ rd_kafka_conf_destroy(conf_); @@ -194,25 +234,23 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> auto key = conn->getKey(); if (key->brokers_.empty()) { - logger_->log_error("There are no brokers"); - return false; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); } result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } if (key->client_id_.empty()) { - logger_->log_error("Client id is empty"); - return false; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty"); } result = rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } value = ""; @@ -220,8 +258,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: debug [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure debug error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -229,8 +267,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -238,8 +276,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -247,19 +285,17 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } - value = ""; - if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - valueConf = std::to_string(valInt); - result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr.data(), errstr.size()); - logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf); + if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty()) { + result = rd_kafka_conf_set(conf_, "message.max.bytes", value.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: message.max.bytes [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -267,8 +303,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -278,8 +314,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -290,8 +326,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } } @@ -300,8 +336,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: batch.num.messages [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -309,8 +345,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: compression.codec [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -319,16 +355,16 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: security.protocol [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } value = ""; if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { result = rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -336,8 +372,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -345,8 +381,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: ssl.key.location [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } value = ""; @@ -354,13 +390,13 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> result = rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: ssl.key.password [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } } else { - logger_->log_error("PublishKafka: unknown Security Protocol: %s", value); - return false; + auto error_msg = utils::StringUtils::join_pack("PublishKafka: unknown Security Protocol: ", value); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } @@ -374,8 +410,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", prop_key, value); result = rd_kafka_conf_set(conf_, prop_key.c_str(), value.c_str(), errstr.data(), errstr.size()); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } else { logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", prop_key); @@ -391,8 +427,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> rd_kafka_t* producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr.data(), errstr.size()); if (producer == nullptr) { - logger_->log_error("Failed to create Kafka producer %s", errstr.data()); - return false; + auto error_msg = utils::StringUtils::join_pack("Failed to create Kafka producer ", errstr.data()); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } // The producer took ownership of the configuration, we must not free it @@ -497,24 +533,9 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex return; } - // Try to get a KafkaConnection - std::string client_id, brokers; - if (!context->getProperty(ClientName.getName(), client_id)) { - logger_->log_error("Client Name property missing or invalid"); - context->yield(); - return; - } - if (!context->getProperty(SeedBrokers.getName(), brokers)) { - logger_->log_error("Knowb Brokers property missing or invalid"); - context->yield(); - return; - } - - KafkaConnectionKey key; - key.brokers_ = brokers; - key.client_id_ = client_id; + logger_->log_debug("PublishKafka onTrigger"); - std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key); + std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_); if (lease == nullptr) { logger_->log_info("This connection is used by another thread."); context->yield(); @@ -522,64 +543,18 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex } std::shared_ptr<KafkaConnection> conn = lease->getConn(); - if (!conn->initialized()) { - logger_->log_trace("Connection not initialized to %s, %s", client_id, brokers); - if (!configureNewConnection(conn, context)) { - logger_->log_error("Could not configure Kafka Connection"); - context->yield(); - return; - } - } - - // Get some properties not (only) used directly to set up librdkafka - std::string value; - - // Batch Size - uint32_t batch_size; - value = ""; - if (context->getProperty(BatchSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, batch_size)) { - logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size); - } else { - batch_size = 10; - } - - // Target Batch Payload Size - uint64_t target_batch_payload_size; - value = ""; - if (context->getProperty(TargetBatchPayloadSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, target_batch_payload_size)) { - logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size); - } else { - target_batch_payload_size = 512 * 1024U; - } - - // Max Flow Segment Size - uint64_t max_flow_seg_size; - value = ""; - if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, max_flow_seg_size)) { - logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size); - } else { - max_flow_seg_size = 0U; - } - - // Attributes to Send as Headers - utils::Regex attributeNameRegex; - value = ""; - if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) { - attributeNameRegex = utils::Regex(value); - logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value); - } // Collect FlowFiles to process uint64_t actual_bytes = 0U; std::vector<std::shared_ptr<core::FlowFile>> flowFiles; - for (uint32_t i = 0; i < batch_size; i++) { + for (uint32_t i = 0; i < batch_size_; i++) { std::shared_ptr<core::FlowFile> flowFile = session->get(); if (flowFile == nullptr) { break; } actual_bytes += flowFile->getSize(); flowFiles.emplace_back(std::move(flowFile)); - if (target_batch_payload_size != 0U && actual_bytes >= target_batch_payload_size) { + if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) { break; } } @@ -646,8 +621,8 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex bool failEmptyFlowFiles = true; context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles); - PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile, - attributeNameRegex, messages, flow_file_index, failEmptyFlowFiles); + PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile, + attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles); session->read(flowFile, &callback); if (!callback.called_) { diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 9ee3296..c6ed19e 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -353,6 +353,12 @@ class PublishKafka : public core::Processor { std::shared_ptr<logging::Logger> logger_; KafkaPool connection_pool_; + KafkaConnectionKey key_; + + uint32_t batch_size_; + uint64_t target_batch_payload_size_; + uint64_t max_flow_seg_size_; + utils::Regex attributeNameRegex_; std::atomic<bool> interrupted_; std::mutex messages_mutex_; diff --git a/extensions/librdkafka/tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt new file mode 100644 index 0000000..056c7f1 --- /dev/null +++ b/extensions/librdkafka/tests/CMakeLists.txt @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB KAFKA_TESTS "*.cpp") + +SET(KAFKA_TEST_COUNT 0) + +FOREACH(testfile ${KAFKA_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/") + target_wholearchive_library(${testfilename} minifi-rdkafka-extensions) + createTests("${testfilename}") + MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1") + # The line below handles integration test + add_test(NAME "${testfilename}" COMMAND "${testfilename}" "${TEST_RESOURCES}/TestKafkaOnSchedule.yml" "${TEST_RESOURCES}/") + target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) +ENDFOREACH() + +message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test file(s)...") diff --git a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp new file mode 100644 index 0000000..8a835fa --- /dev/null +++ b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include <cassert> +#include "../../../libminifi/test/integration/IntegrationBase.h" +#include "core/logging/Logger.h" +#include "../../../libminifi/test/TestBase.h" +#include "../PublishKafka.h" + +class PublishKafkaOnScheduleTests : public IntegrationBase { + public: + virtual void runAssertions() { + std::string logs = LogTestController::getInstance().log_output.str(); + + auto result = countPatInStr(logs, "value 1 is outside allowed range 1000..1000000000"); + size_t last_pos = result.first; + int occurrences = result.second; + + assert(occurrences > 1); // Verify retry of onSchedule and onUnSchedule calls + + std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called", + "Successfully configured PublishKafka", + "PublishKafka onTrigger"}; + + for (const auto &msg : must_appear_byorder_msgs) { + last_pos = logs.find(msg, last_pos); + assert(last_pos != std::string::npos); + } + } + + virtual void testSetup() { + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<minifi::processors::PublishKafka>(); + } + + virtual void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(3)); + flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999"); + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + + virtual void cleanup() {} +}; + +int main(int argc, char **argv) { + std::string test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + } + + PublishKafkaOnScheduleTests harness; + + harness.run(test_file_location); + + return 0; +} diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h index 2ad4b03..897f75c 100644 --- a/extensions/opc/include/opcbase.h +++ b/extensions/opc/include/opcbase.h @@ -71,8 +71,6 @@ class BaseOPCProcessor : public core::Processor { std::vector<char> keyBuffer_; std::vector<std::vector<char>> trustBuffers_; - bool configOK_; - virtual std::set<core::Property> getSupportedProperties() const {return {OPCServerEndPoint, ApplicationURI, Username, Password, CertificatePath, KeyPath, TrustedPath};} }; diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp index ce29fd7..866b57e 100644 --- a/extensions/opc/src/fetchopc.cpp +++ b/extensions/opc/src/fetchopc.cpp @@ -95,12 +95,6 @@ namespace processors { BaseOPCProcessor::onSchedule(context, factory); - if(!configOK_) { - return; - } - - configOK_ = false; - std::string value; context->getProperty(NodeID.getName(), nodeID_); context->getProperty(NodeIDType.getName(), value); @@ -116,37 +110,30 @@ namespace processors { idType_ = opc::OPCNodeIDType::Path; } else { // Where have our validators gone? - logger_->log_error("%s is not a valid node ID type!", value.c_str()); + auto error_msg = utils::StringUtils::join_pack(value, " is not a valid node ID type!"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } if(idType_ == opc::OPCNodeIDType::Int) { try { int t = std::stoi(nodeID_); } catch(...) { - logger_->log_error("%s cannot be used as an int type node ID", nodeID_.c_str()); - return; + auto error_msg = utils::StringUtils::join_pack(nodeID_, " cannot be used as an int type node ID"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } if(idType_ != opc::OPCNodeIDType::Path) { if(!context->getProperty(NameSpaceIndex.getName(), nameSpaceIdx_)) { - logger_->log_error("%s is mandatory in case %s is not Path", NameSpaceIndex.getName().c_str(), NodeIDType.getName().c_str()); - return; + auto error_msg = utils::StringUtils::join_pack(NameSpaceIndex.getName(), " is mandatory in case ", NodeIDType.getName(), " is not Path"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } context->getProperty(Lazy.getName(), value); lazy_mode_ = value == "On" ? true : false; - - configOK_ = true; } void FetchOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session){ - if(!configOK_) { - logger_->log_error("This processor was not configured properly, yielding. Please check for previous errors in the logs!"); - yield(); - return; - } - logger_->log_trace("FetchOPCProcessor::onTrigger"); std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock); diff --git a/extensions/opc/src/opcbase.cpp b/extensions/opc/src/opcbase.cpp index 4fb42b3..e2fe67d 100644 --- a/extensions/opc/src/opcbase.cpp +++ b/extensions/opc/src/opcbase.cpp @@ -75,33 +75,27 @@ namespace processors { username_.clear(); trustBuffers_.clear(); - configOK_ = false; - context->getProperty(OPCServerEndPoint.getName(), endPointURL_); context->getProperty(ApplicationURI.getName(), applicationURI_); if (context->getProperty(Username.getName(), username_) != context->getProperty(Password.getName(), password_)) { - logger_->log_error("Both or neither of Username and Password should be provided!"); - return; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Both or neither of Username and Password should be provided!"); } auto certificatePathRes = context->getProperty(CertificatePath.getName(), certpath_); auto keyPathRes = context->getProperty(KeyPath.getName(), keypath_); auto trustedPathRes = context->getProperty(TrustedPath.getName(), trustpath_); if (certificatePathRes != keyPathRes || keyPathRes != trustedPathRes) { - logger_->log_error("All or none of Certificate path, Key path and Trusted server certificate path should be provided!"); - return; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path, Key path and Trusted server certificate path should be provided!"); } if (!password_.empty() && (certpath_.empty() || keypath_.empty() || trustpath_.empty() || applicationURI_.empty())) { - logger_->log_error("Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!"); - return; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!"); } if (!certpath_.empty()) { if (applicationURI_.empty()) { - logger_->log_error("Application URI must be provided if Certificate path is provided!"); - return; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!"); } std::ifstream input_cert(certpath_, std::ios::binary); @@ -120,21 +114,19 @@ namespace processors { } if (certBuffer_.empty()) { - logger_->log_error("Failed to load cert from path: %s", certpath_); - return; + auto error_msg = utils::StringUtils::join_pack("Failed to load cert from path: ", certpath_); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } if (keyBuffer_.empty()) { - logger_->log_error("Failed to load key from path: %s", keypath_); - return; + auto error_msg = utils::StringUtils::join_pack("Failed to load key from path: ", keypath_); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } if (trustBuffers_[0].empty()) { - logger_->log_error("Failed to load trusted server certs from path: %s", trustpath_); - return; + auto error_msg = utils::StringUtils::join_pack("Failed to load trusted server certs from path: ", trustpath_); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } - - configOK_ = true; } bool BaseOPCProcessor::reconnect() { diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp index c5acf3f..acc1a63 100644 --- a/extensions/opc/src/putopc.cpp +++ b/extensions/opc/src/putopc.cpp @@ -116,13 +116,6 @@ namespace processors { BaseOPCProcessor::onSchedule(context, factory); - if(!configOK_) { - return; - } - - configOK_ = false; - - context->getProperty(OPCServerEndPoint.getName(), endPointURL_); std::string value; context->getProperty(ParentNodeID.getName(), nodeID_); context->getProperty(ParentNodeIDType.getName(), value); @@ -135,21 +128,22 @@ namespace processors { idType_ = opc::OPCNodeIDType::Path; } else { // Where have our validators gone? - logger_->log_error("%s is not a valid node ID type!", value.c_str()); + auto error_msg = utils::StringUtils::join_pack(value, " is not a valid node ID type!"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } if(idType_ == opc::OPCNodeIDType::Int) { try { int t = std::stoi(nodeID_); } catch(...) { - logger_->log_error("%s cannot be used as an int type node ID", nodeID_.c_str()); - return; + auto error_msg = utils::StringUtils::join_pack(nodeID_, " cannot be used as an int type node ID"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } if(idType_ != opc::OPCNodeIDType::Path) { if(!context->getProperty(ParentNameSpaceIndex.getName(), nameSpaceIdx_)) { - logger_->log_error("%s is mandatory in case %s is not Path", ParentNameSpaceIndex.getName().c_str(), ParentNodeIDType.getName().c_str()); - return; + auto error_msg = utils::StringUtils::join_pack(ParentNameSpaceIndex.getName(), " is mandatory in case ", ParentNodeIDType.getName(), " is not Path"); + throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } } @@ -157,17 +151,9 @@ namespace processors { context->getProperty(ValueType.getName(), typestr); nodeDataType_ = opc::StringToOPCDataTypeMap.at(typestr); // This throws, but allowed values are generated based on this map -> that's a really unexpected error - configOK_ = true; } void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - if (!configOK_) { - logger_->log_error( - "This processor was not configured properly, yielding. Please check for previous errors in the logs!"); - yield(); - return; - } - logger_->log_trace("PutOPCProcessor::onTrigger"); std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock); diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp index 458b6e5..0021877 100644 --- a/extensions/standard-processors/processors/LogAttribute.cpp +++ b/extensions/standard-processors/processors/LogAttribute.cpp @@ -88,23 +88,13 @@ void LogAttribute::initialize() { } void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) { - core::Property flowsToLog = FlowFilesToLog; - - if (getProperty(FlowFilesToLog.getName(), flowsToLog)) { - // we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done - // in configuration. In future releases we can add that exception handling there. - if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid()) - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string()); - flowfiles_to_log_ = flowsToLog.getValue(); - } + context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_); + logger_->log_debug("FlowFiles To Log: %llu", flowfiles_to_log_); - std::string value; - if (context->getProperty(HexencodePayload.getName(), value)) { - utils::StringUtils::StringToBool(value, hexencode_); - } - if (context->getProperty(MaxPayloadLineLength.getName(), value)) { - core::Property::StringToInt(value, max_line_length_); - } + context->getProperty(HexencodePayload.getName(), hexencode_); + + context->getProperty(MaxPayloadLineLength.getName(), max_line_length_); + logger_->log_debug("Maximum Payload Line Length: %u", max_line_length_); } // OnTrigger method, implemented by NiFi LogAttribute void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h index 99f53d4..36627a3 100644 --- a/extensions/standard-processors/processors/LogAttribute.h +++ b/extensions/standard-processors/processors/LogAttribute.h @@ -40,8 +40,8 @@ class LogAttribute : public core::Processor { /*! * Create a new processor */ - LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier()) - : Processor(name, uuid), + explicit LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier()) + : Processor(std::move(name), uuid), flowfiles_to_log_(1), hexencode_(false), max_line_length_(80U), @@ -71,7 +71,7 @@ class LogAttribute : public core::Processor { LogAttrLevelError }; // Convert log level from string to enum - bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) { + bool logLevelStringToEnum(const std::string &logStr, LogAttrLevel &level) { if (logStr == "trace") { level = LogAttrLevelTrace; return true; diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp b/extensions/standard-processors/tests/unit/GetTCPTests.cpp index 86a8221..10d5f77 100644 --- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp +++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp @@ -114,6 +114,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") { server.writeData(buffer, buffer.size()); std::this_thread::sleep_for(std::chrono::seconds(2)); + logAttribute->initialize(); logAttribute->incrementActiveTasks(); logAttribute->setScheduledState(core::ScheduledState::RUNNING); std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); @@ -227,6 +228,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") { server.writeData(buffer, buffer.size()); std::this_thread::sleep_for(std::chrono::seconds(2)); + logAttribute->initialize(); logAttribute->incrementActiveTasks(); logAttribute->setScheduledState(core::ScheduledState::RUNNING); std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); @@ -349,6 +351,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") { server.writeData(buffer, buffer.size()); std::this_thread::sleep_for(std::chrono::seconds(2)); + logAttribute->initialize(); logAttribute->incrementActiveTasks(); logAttribute->setScheduledState(core::ScheduledState::RUNNING); std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); diff --git a/libminifi/include/c2/PayloadParser.h b/libminifi/include/c2/PayloadParser.h index a64eb09..9689c40 100644 --- a/libminifi/include/c2/PayloadParser.h +++ b/libminifi/include/c2/PayloadParser.h @@ -88,6 +88,13 @@ struct convert_if<int64_t> : public convert_if_base<int64_t, state::response::In }; template<> +struct convert_if<uint32_t > : public convert_if_base<uint32_t, state::response::UInt32Value> { + explicit convert_if(const std::shared_ptr<state::response::Value> &node) + : convert_if_base(node) { + } +}; + +template<> struct convert_if<int> : public convert_if_base<int, state::response::IntValue> { explicit convert_if(const std::shared_ptr<state::response::Value> &node) : convert_if_base(node) { diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index de0f047..4905001 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -221,10 +221,11 @@ bool ConfigurableComponent::getProperty(const std::string name, T &value) const{ return true; } else{ - logger_->log_debug("Component %s property name %s, empty value", name, item.getName()); + logger_->log_warn("Component %s property name %s, empty value", name, item.getName()); return false; } } else { + logger_->log_warn("Could not find property %s", name); return false; } } diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h index b0e59e9..f221b48 100644 --- a/libminifi/include/core/PropertyValidation.h +++ b/libminifi/include/core/PropertyValidation.h @@ -180,6 +180,34 @@ class IntegerValidator : public PropertyValidator { } }; +class UnsignedIntValidator : public PropertyValidator { + public: + explicit UnsignedIntValidator(const std::string &name) + : PropertyValidator(name) { + } + virtual ~UnsignedIntValidator() { + + } + ValidationResult validate(const std::string &subject, const std::shared_ptr<minifi::state::response::Value> &input) const { + return PropertyValidator::_validate_internal<minifi::state::response::UInt32Value>(subject, input); + } + + ValidationResult validate(const std::string &subject, const std::string &input) const { + try { + auto negative = input.find_first_of('-') != std::string::npos; + if (negative){ + throw std::out_of_range("non negative expected"); + } + std::stoul(input); + return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(true).build(); + } catch (...) { + + } + return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(false).build(); + } + +}; + class LongValidator : public PropertyValidator { public: explicit LongValidator(const std::string &name, int64_t min = (std::numeric_limits<int64_t>::min)(), int64_t max = (std::numeric_limits<int64_t>::max)()) @@ -327,6 +355,8 @@ class StandardValidators { return init.BOOLEAN_VALIDATOR; } else if (std::dynamic_pointer_cast<minifi::state::response::IntValue>(input) != nullptr) { return init.INTEGER_VALIDATOR; + } else if (std::dynamic_pointer_cast<minifi::state::response::UInt32Value>(input) != nullptr) { + return init.UNSIGNED_INT_VALIDATOR;; } else if (std::dynamic_pointer_cast<minifi::state::response::Int64Value>(input) != nullptr) { return init.LONG_VALIDATOR; } else if (std::dynamic_pointer_cast<minifi::state::response::UInt64Value>(input) != nullptr) { @@ -349,6 +379,7 @@ class StandardValidators { private: std::shared_ptr<PropertyValidator> INVALID; std::shared_ptr<PropertyValidator> INTEGER_VALIDATOR; + std::shared_ptr<PropertyValidator> UNSIGNED_INT_VALIDATOR; std::shared_ptr<PropertyValidator> LONG_VALIDATOR; std::shared_ptr<PropertyValidator> UNSIGNED_LONG_VALIDATOR; std::shared_ptr<PropertyValidator> BOOLEAN_VALIDATOR; diff --git a/libminifi/include/core/PropertyValue.h b/libminifi/include/core/PropertyValue.h index 3c32c86..be62226 100644 --- a/libminifi/include/core/PropertyValue.h +++ b/libminifi/include/core/PropertyValue.h @@ -40,6 +40,8 @@ static inline std::shared_ptr<state::response::Value> convert(const std::shared_ } } else if (prior->getTypeIndex() == state::response::Value::INT64_TYPE) { return std::make_shared<state::response::Int64Value>(ref); + } else if (prior->getTypeIndex() == state::response::Value::UINT32_TYPE) { + return std::make_shared<state::response::UInt32Value>(ref); } else if (prior->getTypeIndex() == state::response::Value::INT_TYPE) { return std::make_shared<state::response::IntValue>(ref); } else if (prior->getTypeIndex() == state::response::Value::BOOL_TYPE) { @@ -67,7 +69,7 @@ class PropertyValue : public state::response::ValueNode { validator_(o.validator_), state::response::ValueNode(o) { } - PropertyValue(PropertyValue &&o) + PropertyValue(PropertyValue &&o) noexcept : type_id(o.type_id), validator_(std::move(o.validator_)), state::response::ValueNode(std::move(o)) { @@ -106,6 +108,14 @@ class PropertyValue : public state::response::ValueNode { throw std::runtime_error("Invalid conversion to int64_t"); } + operator uint32_t() const { + uint32_t res; + if (value_->convertValue(res)) { + return res; + } + throw std::runtime_error("Invalid conversion to uint32_t for" + value_->getStringValue()); + } + operator int() const { int res; if (value_->convertValue(res)) { diff --git a/libminifi/include/core/state/Value.h b/libminifi/include/core/state/Value.h index 17bbdae..6e3a644 100644 --- a/libminifi/include/core/state/Value.h +++ b/libminifi/include/core/state/Value.h @@ -71,6 +71,7 @@ class Value { static const std::type_index UINT64_TYPE; static const std::type_index INT64_TYPE; + static const std::type_index UINT32_TYPE; static const std::type_index INT_TYPE; static const std::type_index BOOL_TYPE; static const std::type_index STRING_TYPE; @@ -87,6 +88,15 @@ class Value { type_id = std::type_index(typeid(T)); } + virtual bool getValue(uint32_t &ref) { + const auto negative = string_value.find_first_of('-') != std::string::npos; + if (negative){ + return false; + } + ref = std::stoul(string_value); + return true; + } + virtual bool getValue(int &ref) { ref = std::stol(string_value); return true; @@ -115,6 +125,64 @@ class Value { std::type_index type_id; }; +class UInt32Value : public Value { + public: + explicit UInt32Value(uint32_t value) + : Value(std::to_string(value)), + value(value) { + setTypeId<uint32_t>(); + } + + explicit UInt32Value(const std::string &strvalue) + : Value(strvalue), + value(std::stoul(strvalue)) { + /** + * This is a fundamental change in that we would be changing where this error occurs. + * We should be prudent about breaking backwards compatibility, but since Uint32Value + * is only created with a validator and type, we **should** be okay. + */ + const auto negative = strvalue.find_first_of('-') != std::string::npos; + if (negative){ + throw std::out_of_range("negative value detected"); + } + setTypeId<uint32_t>(); + } + + uint32_t getValue() const { + return value; + } + protected: + + virtual bool getValue(uint32_t &ref) { + ref = value; + return true; + } + + virtual bool getValue(int &ref) { + if (value <= (std::numeric_limits<int>::max)()) { + ref = value; + return true; + } + return false; + } + + virtual bool getValue(int64_t &ref) { + ref = value; + return true; + } + + virtual bool getValue(uint64_t &ref) { + ref = value; + return true; + } + + virtual bool getValue(bool &ref) { + return false; + } + + uint32_t value; +}; + class IntValue : public Value { public: explicit IntValue(int value) @@ -139,6 +207,14 @@ class IntValue : public Value { return true; } + virtual bool getValue(uint32_t &ref) { + if (value >= 0) { + ref = value; + return true; + } + return false; + } + virtual bool getValue(int64_t &ref) { ref = value; return true; @@ -177,39 +253,19 @@ class BoolValue : public Value { protected: virtual bool getValue(int &ref) { - if (ref == 1) { - ref = true; - return true; - } else if (ref == 0) { - ref = false; - return true; - } else { - return false; - } + return PreventSwearingInFutureRefactor(ref); + } + + virtual bool getValue(uint32_t &ref) { + return PreventSwearingInFutureRefactor(ref); } virtual bool getValue(int64_t &ref) { - if (ref == 1) { - ref = true; - return true; - } else if (ref == 0) { - ref = false; - return true; - } else { - return false; - } + return PreventSwearingInFutureRefactor(ref); } virtual bool getValue(uint64_t &ref) { - if (ref == 1) { - ref = true; - return true; - } else if (ref == 0) { - ref = false; - return true; - } else { - return false; - } + return PreventSwearingInFutureRefactor(ref); } virtual bool getValue(bool &ref) { @@ -218,6 +274,16 @@ class BoolValue : public Value { } bool value; + + private: + template<typename T> + bool PreventSwearingInFutureRefactor(T &ref) { + if (value != 0 && value != 1) { + return false; + } + ref = value != 0; + return true; + } }; class UInt64Value : public Value { @@ -252,6 +318,10 @@ class UInt64Value : public Value { return false; } + virtual bool getValue(uint32_t &ref) { + return false; + } + virtual bool getValue(int64_t &ref) { if (value <= (std::numeric_limits<int64_t>::max)()) { ref = value; @@ -294,6 +364,10 @@ class Int64Value : public Value { return false; } + virtual bool getValue(uint32_t &ref) { + return false; + } + virtual bool getValue(int64_t &ref) { ref = value; return true; @@ -331,7 +405,7 @@ static inline std::shared_ptr<Value> createValue(const std::string &object) { } static inline std::shared_ptr<Value> createValue(const uint32_t &object) { - return std::make_shared<UInt64Value>(object); + return std::make_shared<UInt32Value>(object); } #if ( defined(__APPLE__) || defined(__MACH__) || defined(DARWIN) ) static inline std::shared_ptr<Value> createValue(const size_t &object) { @@ -370,6 +444,7 @@ class ValueNode { auto operator=(const T ref) -> typename std::enable_if<std::is_same<T, int >::value || std::is_same<T, uint32_t >::value || std::is_same<T, size_t >::value || + std::is_same<T, int64_t>::value || std::is_same<T, uint64_t >::value || std::is_same<T, bool >::value || std::is_same<T, char* >::value || @@ -379,10 +454,7 @@ class ValueNode { return *this; } - ValueNode &operator=(const ValueNode &ref) { - value_ = ref.value_; - return *this; - } + ValueNode &operator=(const ValueNode &ref) = default; inline bool operator==(const ValueNode &rhs) const { return to_string() == rhs.to_string(); diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp index 37a53e6..1cdec27 100644 --- a/libminifi/src/c2/protocols/RESTProtocol.cpp +++ b/libminifi/src/c2/protocols/RESTProtocol.cpp @@ -165,6 +165,10 @@ void setJsonStr(const std::string& key, const state::response::ValueNode& value, int value = 0; base_type->convertValue(value); valueVal.SetInt(value); + } else if (type_index == state::response::Value::UINT32_TYPE) { + uint32_t value = 0; + base_type->convertValue(value); + valueVal.SetUint(value); } else if (type_index == state::response::Value::INT64_TYPE) { int64_t value = 0; base_type->convertValue(value); diff --git a/libminifi/src/core/PropertyValidation.cpp b/libminifi/src/core/PropertyValidation.cpp index ab1f53a..d476416 100644 --- a/libminifi/src/core/PropertyValidation.cpp +++ b/libminifi/src/core/PropertyValidation.cpp @@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared< StandardValidators::StandardValidators() { INVALID = std::make_shared<AlwaysValid>(false, "INVALID"); INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR"); + UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("NON_NEGATIVE_INTEGER_VALIDATOR"); LONG_VALIDATOR = std::make_shared<LongValidator>("LONG_VALIDATOR"); // name is used by java nifi validators, so we should keep this LONG and not change to reflect // its internal use diff --git a/libminifi/src/core/state/Value.cpp b/libminifi/src/core/state/Value.cpp index 4c43c3e..ae1bae3 100644 --- a/libminifi/src/core/state/Value.cpp +++ b/libminifi/src/core/state/Value.cpp @@ -29,6 +29,7 @@ namespace response { const std::type_index Value::UINT64_TYPE = std::type_index(typeid(uint64_t)); const std::type_index Value::INT64_TYPE = std::type_index(typeid(int64_t)); +const std::type_index Value::UINT32_TYPE = std::type_index(typeid(uint32_t)); const std::type_index Value::INT_TYPE = std::type_index(typeid(int)); const std::type_index Value::BOOL_TYPE = std::type_index(typeid(bool)); const std::type_index Value::STRING_TYPE = std::type_index(typeid(std::string)); diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h index c51dd2c..0cc4cc6 100644 --- a/libminifi/test/integration/IntegrationBase.h +++ b/libminifi/test/integration/IntegrationBase.h @@ -44,6 +44,17 @@ class IntegrationBase { configureSecurity(); } + // Return the last position and number of occurrences. + std::pair<size_t, int> countPatInStr(const std::string &str, const std::string &pattern) { + size_t last_pos = 0; + int occurrences = 0; + for(size_t pos = str.find(pattern); pos != std::string::npos; pos = str.find(pattern, pos + pattern.size())) { + last_pos = pos; + occurrences++; + } + return {last_pos, occurrences}; + } + virtual void testSetup() = 0; virtual void shutdownBeforeFlowController() { diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp index 18b5803..4f5412e 100644 --- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp +++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp @@ -1,6 +1,4 @@ /** - * @file GenerateFlowFile.h - * GenerateFlowFile class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -29,32 +27,22 @@ class OnScheduleErrorHandlingTests : public IntegrationBase { public: virtual void runAssertions() { std::string logs = LogTestController::getInstance().log_output.str(); - size_t pos = 0; - size_t last_pos = 0; - unsigned int occurances = 0; - do { - pos = logs.find(minifi::processors::KamikazeProcessor::OnScheduleExceptionStr, pos); - if (pos != std::string::npos) { - last_pos = pos; - pos = logs.find(minifi::processors::KamikazeProcessor::OnUnScheduleLogStr, pos); - if (pos != std::string::npos) { - last_pos = pos; - occurances++; - } - } - } while (pos != std::string::npos); - assert(occurances > 1); // Verify retry of onSchedule and onUnSchedule calls + auto result = countPatInStr(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr); + size_t last_pos = result.first; + int occurrences = result.second; - // Make sure onSchedule succeeded after property was set - assert(logs.find(minifi::processors::KamikazeProcessor::OnScheduleLogStr, last_pos) != std::string::npos); + assert(occurrences > 1); // Verify retry of onSchedule and onUnSchedule calls - // Make sure onTrigger was called after onshedule succeeded - pos = logs.find(minifi::processors::KamikazeProcessor::OnTriggerExceptionStr); - assert(pos != std::string::npos && pos > last_pos); + std::vector<std::string> must_appear_byorder_msgs = {minifi::processors::KamikazeProcessor::OnUnScheduleLogStr, + minifi::processors::KamikazeProcessor::OnScheduleLogStr, + minifi::processors::KamikazeProcessor::OnTriggerExceptionStr, + "[warning] ProcessSession rollback for kamikaze executed"}; - pos = logs.find("[warning] ProcessSession rollback for kamikaze executed"); // Check rollback - assert(pos != std::string::npos && pos > last_pos); + for (const auto &msg : must_appear_byorder_msgs) { + last_pos = logs.find(msg, last_pos); + assert(last_pos != std::string::npos); + } assert(logs.find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos); } diff --git a/libminifi/test/resources/TestKafkaOnSchedule.yml b/libminifi/test/resources/TestKafkaOnSchedule.yml new file mode 100644 index 0000000..eabb0e8 --- /dev/null +++ b/libminifi/test/resources/TestKafkaOnSchedule.yml @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +Flow Controller: + name: MiNiFi Flow + onschedule retry interval: 1000 ms +Processors: + - Properties: +# Batch Size: 1234 + Client Name: lmn +# Compress Codec: none +# Delivery Guarantee: '1' + Known Brokers: localhost:9092 + Max Request Size: '1' +# Message Timeout: 5 sec +# Request Timeout: 10 sec + Topic Name: test + class: org.apache.nifi.processors.standard.PublishKafka + id: 3744352b-6eb1-4677-98a6-353417a90496 + name: kafka + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 100 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: +Connections: +Remote Processing Groups: +
