This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ff785cea511c4266525a4f9b6f5eccdd90817aa4 Author: Adam Debreceni <[email protected]> AuthorDate: Tue Jun 30 11:27:09 2020 +0200 MINIFICPP-1273 - Drain connections on flow shutdown Signed-off-by: Arpad Boda <[email protected]> This closes #827 --- CMakeLists.txt | 2 + libminifi/include/Connection.h | 2 +- libminifi/include/core/ProcessGroup.h | 2 + libminifi/src/Connection.cpp | 10 +- libminifi/src/FlowController.cpp | 5 +- libminifi/src/core/ProcessGroup.cpp | 19 +++- libminifi/test/flow-tests/CMakeLists.txt | 31 ++++++ libminifi/test/flow-tests/FlowControllerTests.cpp | 125 ++++++++++++++++++++++ 8 files changed, 185 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 700b95f..3479070 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -766,6 +766,8 @@ endif() ## Add KeyValueStorageService tests registerTest("${TEST_DIR}/keyvalue-tests") +registerTest("${TEST_DIR}/flow-tests") + include(BuildDocs) include(DockerConfig) diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index 64a805e..5a40d87 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -168,7 +168,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this // Poll the flow file from queue, the expired flow file record also being returned std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords); // Drain the flow records - void drain(); + void drain(bool delete_permanently); void yield() override {} diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index fcbb78a..9944658 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -228,6 +228,8 @@ class ProcessGroup { void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap); + void drainConnections(); + protected: void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 9038e91..06cfcce 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -262,15 +262,17 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core:: return NULL; } -void Connection::drain() { +void Connection::drain(bool delete_permanently) { std::lock_guard<std::mutex> lock(mutex_); while (!queue_.empty()) { std::shared_ptr<core::FlowFile> item = queue_.front(); queue_.pop(); - logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_); - if (flow_repository_->Delete(item->getUUIDStr())) { - item->setStoredToRepository(false); + logger_->log_debug("Delete flow file UUID %s from connection %s", item->getUUIDStr(), name_); + if (delete_permanently) { + if (flow_repository_->Delete(item->getUUIDStr())) { + item->setStoredToRepository(false); + } } } queued_data_size_ = 0; diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 3976936..669285b 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -250,6 +250,9 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) { * -Stopping the schedulers doesn't actually quit the onTrigger functions of processors * -They only guarantee that the processors are not scheduled any more * -After the threadpool is stopped we can make sure that processors don't need repos and controllers anymore */ + if (this->root_) { + this->root_->drainConnections(); + } this->flow_file_repo_->stop(); this->provenance_repo_->stop(); // stop the ControllerServices @@ -923,7 +926,7 @@ int16_t FlowController::clearConnection(const std::string &connection) { auto conn = connections.find(connection); if (conn != connections.end()) { logger_->log_info("Clearing connection %s", connection); - conn->second->drain(); + conn->second->drain(true); } } return -1; diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 9eec44c..ee88d79 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -92,13 +92,12 @@ ProcessGroup::~ProcessGroup() { onScheduleTimer_->stop(); } - for (auto &&connection : connections_) { - connection->drain(); + for (auto&& connection : connections_) { + connection->drain(false); } - for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) { - ProcessGroup *processGroup(*it); - delete processGroup; + for (ProcessGroup* childGroup : child_process_groups_) { + delete childGroup; } } @@ -403,6 +402,16 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { } } +void ProcessGroup::drainConnections() { + for (auto&& connection : connections_) { + connection->drain(false); + } + + for (ProcessGroup* childGroup : child_process_groups_) { + childGroup->drainConnections(); + } +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ diff --git a/libminifi/test/flow-tests/CMakeLists.txt b/libminifi/test/flow-tests/CMakeLists.txt new file mode 100644 index 0000000..a17cf23 --- /dev/null +++ b/libminifi/test/flow-tests/CMakeLists.txt @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB FLOW_TESTS "*.cpp") +SET(FLOW_TEST_COUNT 0) +FOREACH(testfile ${FLOW_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + createTests("${testfilename}") + target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) + target_wholearchive_library(${testfilename} minifi-standard-processors) + MATH(EXPR FLOW_TEST_COUNT "${FLOW_TEST_COUNT}+1") + add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) +ENDFOREACH() +message("-- Finished building ${FLOW_TEST_COUNT} flow related test file(s)...") diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp new file mode 100644 index 0000000..eb444f6 --- /dev/null +++ b/libminifi/test/flow-tests/FlowControllerTests.cpp @@ -0,0 +1,125 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include <chrono> +#include <map> +#include <memory> +#include <string> +#include <thread> + +#include "core/Core.h" +#include "core/repository/AtomicRepoEntries.h" +#include "core/RepositoryFactory.h" +#include "FlowFileRecord.h" +#include "provenance/Provenance.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "../TestBase.h" +#include "YamlConfiguration.h" + +const char* yamlConfig = +R"( +Flow Controller: + name: MiNiFi Flow + id: 2438e3c8-015a-1000-79ca-83af40ec1990 +Processors: + - name: Generator + id: 2438e3c8-015a-1000-79ca-83af40ec1991 + class: org.apache.nifi.processors.standard.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 100 ms + penalization period: 300 ms + yield period: 100 ms + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Batch Size: 10 + - name: LogAttribute + id: 2438e3c8-015a-1000-79ca-83af40ec1992 + class: org.apache.nifi.processors.standard.LogAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 1000 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: + +Connections: + - name: Gen + id: 2438e3c8-015a-1000-79ca-83af40ec1997 + source name: Generator + source id: 2438e3c8-015a-1000-79ca-83af40ec1991 + source relationship name: success + destination name: LogAttribute + destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 + max work queue size: 0 + max work queue data size: 1 MB + flowfile expiration: 60 sec + +Remote Processing Groups: + +)"; + +TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") { + TestController testController; + char format[] = "/tmp/flowTest.XXXXXX"; + std::string dir = testController.createTempDirectory(format); + + std::string yamlPath = utils::file::FileUtils::concat_path(dir, "config.yml"); + std::ofstream{yamlPath} << yamlConfig; + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> ff_repo = std::make_shared<TestFlowRepository>(); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, yamlPath); + + REQUIRE(content_repo->initialize(configuration)); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); + + std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; + std::unique_ptr<core::FlowConfiguration> flow = utils::make_unique<core::YamlConfiguration>(prov_repo, ff_repo, content_repo, stream_factory, configuration, yamlPath); + std::shared_ptr<core::ProcessGroup> root = flow->getRoot(); + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>( + prov_repo, ff_repo, configuration, + std::move(flow), + content_repo, DEFAULT_ROOT_GROUP_NAME, true); + + + root->getConnections(connectionMap); + // adds the single connection to the map both by name and id + REQUIRE(connectionMap.size() == 2); + controller->load(root); + controller->start(); + + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + + for (auto& it : connectionMap) { + REQUIRE(it.second->getQueueSize() > 10); + } + + controller->stop(true); + + for (auto& it : connectionMap) { + REQUIRE(it.second->isEmpty()); + } +}
