This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 091a4623365a7e60f28f11aa7a86b2ecd6aceff1 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Apr 20 18:01:01 2022 +0200 MINIFICPP-1322 PublishKafka queue size and batch size properties should be in sync Closes #1304 Signed-off-by: Marton Szasz <[email protected]> @szaszm: I fixed small issue with the test after merging. --- CMakeLists.txt | 2 +- extensions/librdkafka/PublishKafka.cpp | 8 ++++- .../librdkafka/tests}/CMakeLists.txt | 3 +- extensions/librdkafka/tests/PublishKafkaTests.cpp | 38 ++++++++++++++++++++++ 4 files changed, 47 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b2f0352d5..ff9630eb6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -502,7 +502,7 @@ endif() if (ENABLE_ALL OR ENABLE_LIBRDKAFKA) include(BundledLibRdKafka) use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) - createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka") + createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests") endif() ## Scripting extensions diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index e5456c3d7..a45eba348 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -602,7 +602,13 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon } } value = ""; - if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) { + uint32_t int_val; + if (context->getProperty(QueueBufferMaxMessage.getName(), int_val)) { + if (int_val < batch_size_) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message"); + } + + value = std::to_string(int_val); result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value); if (result != RD_KAFKA_CONF_OK) { diff --git a/libminifi/test/kafka-tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt similarity index 86% rename from libminifi/test/kafka-tests/CMakeLists.txt rename to extensions/librdkafka/tests/CMakeLists.txt index 1ef0e79b7..439fccd8d 100644 --- a/libminifi/test/kafka-tests/CMakeLists.txt +++ b/extensions/librdkafka/tests/CMakeLists.txt @@ -24,8 +24,7 @@ FOREACH(testfile ${KAFKA_INTEGRATION_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) add_executable("${testfilename}" "${testfile}") target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/librdkafka") - target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.4/src") - target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.4/src-cpp") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test") createTests("${testfilename}") target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) target_link_libraries(${testfilename} minifi-rdkafka-extensions) diff --git a/extensions/librdkafka/tests/PublishKafkaTests.cpp b/extensions/librdkafka/tests/PublishKafkaTests.cpp new file mode 100644 index 000000000..b508ee02e --- /dev/null +++ b/extensions/librdkafka/tests/PublishKafkaTests.cpp @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "TestBase.h" +#include "Catch.h" +#include "PublishKafka.h" +#include "SingleProcessorTestController.h" + +namespace org::apache::nifi::minifi::test { + +TEST_CASE("Scheduling should fail when batch size is larger than the max queue message count", "[testPublishKafka]") { + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<processors::PublishKafka>(); + const auto publish_kafka = std::make_shared<processors::PublishKafka>("PublishKafka"); + SingleProcessorTestController test_controller(publish_kafka); + publish_kafka->setProperty(processors::PublishKafka::ClientName, "test_client"); + publish_kafka->setProperty(processors::PublishKafka::SeedBrokers, "test_seedbroker"); + publish_kafka->setProperty(processors::PublishKafka::QueueBufferMaxMessage, "1000"); + publish_kafka->setProperty(processors::PublishKafka::BatchSize, "1500"); + test_controller.enqueueFlowFile(""); + REQUIRE_THROWS_WITH(test_controller.trigger(), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message"); +} + +} // namespace org::apache::nifi::minifi::test
