This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e0257d82291d0e0e92e0733ca22a81f416d64239 Author: Gabor Gyimesi <[email protected]> AuthorDate: Fri Feb 4 17:20:18 2022 +0100 MINIFICPP-1750 Fix C2 clear corecomponent state command Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1255 --- .../tests/C2ClearCoreComponentStateTest.cpp | 152 +++++++++++++++++++++ extensions/http-curl/tests/CMakeLists.txt | 1 + extensions/http-curl/tests/HTTPHandlers.h | 20 ++- libminifi/include/c2/C2Agent.h | 2 + libminifi/include/utils/TestUtils.h | 2 +- libminifi/src/c2/C2Agent.cpp | 76 +++++------ 6 files changed, 212 insertions(+), 41 deletions(-) diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp new file mode 100644 index 0000000..c0f4006 --- /dev/null +++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp @@ -0,0 +1,152 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include <string> +#include "TestBase.h" +#include "HTTPIntegrationBase.h" +#include "HTTPHandlers.h" +#include "processors/TailFile.h" +#include "state/ProcessorController.h" +#include "utils/file/FileUtils.h" +#include "utils/TestUtils.h" + +using namespace std::literals::chrono_literals; + +class VerifyC2ClearCoreComponentState : public VerifyC2Base { + public: + explicit VerifyC2ClearCoreComponentState(const std::atomic_bool& component_cleared_successfully) : component_cleared_successfully_(component_cleared_successfully) { + auto temp_dir = testController.createTempDirectory(); + test_file_1_ = minifi::utils::putFileToDir(temp_dir, "test1.txt", "foo\n"); + test_file_2_ = minifi::utils::putFileToDir(temp_dir, "test2.txt", "foobar\n"); + } + + void testSetup() override { + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessContext>(); + VerifyC2Base::testSetup(); + } + + void runAssertions() override { + using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; + assert(verifyEventHappenedInPollTime(40s, [&] { return component_cleared_successfully_.load(); })); + } + + protected: + void updateProperties(std::shared_ptr<minifi::FlowController> flow_controller) override { + std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile1")[0]) + ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_1_); + std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile2")[0]) + ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_2_); + } + + TestController testController; + std::string test_file_1_; + std::string test_file_2_; + const std::atomic_bool& component_cleared_successfully_; +}; + +class ClearCoreComponentStateHandler: public HeartbeatHandler { + public: + explicit ClearCoreComponentStateHandler(std::atomic_bool& component_cleared_successfully) : component_cleared_successfully_(component_cleared_successfully) { + } + + void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override { + switch (flow_state_) { + case FlowState::STARTED: + sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn); + flow_state_ = FlowState::FIRST_DESCRIBE_SENT; + break; + case FlowState::FIRST_DESCRIBE_SENT: { + sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { {"corecomponent1", "TailFile1"} }); + flow_state_ = FlowState::CLEAR_SENT; + break; + } + default: + sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889347", conn); + flow_state_ = FlowState::SECOND_DESCRIBE_SENT; + } + } + + void handleAcknowledge(const rapidjson::Document& root) override { + switch (flow_state_) { + case FlowState::FIRST_DESCRIBE_SENT: { + assert(root.HasMember("corecomponentstate")); + + auto assertExpectedTailFileState = [&](const char* uuid, const char* name, const char* position) { + assert(root["corecomponentstate"].HasMember(uuid)); + const auto& tf = root["corecomponentstate"][uuid]; + assert(tf.HasMember("file.0.name")); + assert(std::string(tf["file.0.name"].GetString()) == name); + assert(tf.HasMember("file.0.position")); + assert(std::string(tf["file.0.position"].GetString()) == position); + assert(tf.HasMember("file.0.current")); + assert(strlen(tf["file.0.current"].GetString()) > 0U); + }; + + assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1993", "test1.txt", "4"); + assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1994", "test2.txt", "7"); + + last_read_time_1_ = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString()); + last_read_time_2_ = std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString()); + break; + } + case FlowState::CLEAR_SENT: + break; + case FlowState::SECOND_DESCRIBE_SENT: { + auto clearedStateFound = [this, &root]() { + return root.HasMember("corecomponentstate") && + root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") && + root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994") && + std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString()) == last_read_time_2_ && + std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString()) != last_read_time_1_; + }; + component_cleared_successfully_ = clearedStateFound(); + break; + } + default: + throw std::runtime_error("Invalid flow state state when handling acknowledge message!"); + } + } + + private: + enum class FlowState { + STARTED, + FIRST_DESCRIBE_SENT, + CLEAR_SENT, + SECOND_DESCRIBE_SENT + }; + + std::atomic<FlowState> flow_state_{FlowState::STARTED}; + std::atomic_bool& component_cleared_successfully_; + std::string last_read_time_1_; + std::string last_read_time_2_; +}; + +int main(int argc, char **argv) { + std::atomic_bool component_cleared_successfully{false}; + const cmd_args args = parse_cmdline_args(argc, argv, "api/heartbeat"); + VerifyC2ClearCoreComponentState harness(component_cleared_successfully); + harness.setKeyDir(args.key_dir); + ClearCoreComponentStateHandler handler(component_cleared_successfully); + harness.setUrl(args.url, &handler); + harness.run(args.test_file); + return 0; +} diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index e5410b5..3fed4bd 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -98,3 +98,4 @@ add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest "${TEST_RESOURCES}/C2P add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest) add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest) add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests) +add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/") diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index c769bd3..b198677 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -26,6 +26,7 @@ #include <utility> #include <vector> #include <set> +#include <unordered_map> #include "civetweb.h" #include "CivetServer.h" @@ -393,11 +394,26 @@ class HeartbeatHandler : public ServerAwareHandler { mg_printf(conn, "%s", resp.c_str()); } - void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn) { + void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn, + const std::unordered_map<std::string, std::string>& args = {}) { + std::string resp_args; + if (!args.empty()) { + resp_args = ", \"args\": {"; + auto it = args.begin(); + while (it != args.end()) { + resp_args += "\"" + it->first + "\": \"" + it->second + "\""; + ++it; + if (it != args.end()) { + resp_args += ", "; + } + } + resp_args += "}"; + } std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" "\"operation\" : \"" + operation + "\"," "\"operationid\" : \"" + operationId + "\"," - "\"operand\": \"" + operand + "\"}]}"; + "\"operand\": \"" + operand + "\"" + + resp_args + "}]}"; mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index a62adc1..a25c280 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -137,6 +137,8 @@ class C2Agent : public state::UpdateController { */ virtual void handle_c2_server_response(const C2ContentResponse &resp); + void handle_clear(const C2ContentResponse &resp); + /** * Handles an update request * @param C2ContentResponse response diff --git a/libminifi/include/utils/TestUtils.h b/libminifi/include/utils/TestUtils.h index 5f6e3d4..4a82a35 100644 --- a/libminifi/include/utils/TestUtils.h +++ b/libminifi/include/utils/TestUtils.h @@ -46,7 +46,7 @@ std::string putFileToDir(const std::string& dir_path, const std::string& file_na std::string getFileContent(const std::string& file_name) { std::ifstream file_handle(file_name, std::ios::binary | std::ios::in); - REQUIRE(file_handle.is_open()); + assert(file_handle.is_open()); std::string file_content{ (std::istreambuf_iterator<char>(file_handle)), (std::istreambuf_iterator<char>()) }; return file_content; } diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 50ed9b6..e7d7d3c 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -334,44 +334,7 @@ struct C2DebugBundleError : public C2TransferError { void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { switch (resp.op.value()) { case Operation::CLEAR: - // we've been told to clear something - if (resp.name == "connection") { - for (const auto& connection : resp.operation_arguments) { - logger_->log_debug("Clearing connection %s", connection.second.to_string()); - update_sink_->clearConnection(connection.second.to_string()); - } - C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true); - enqueue_c2_response(std::move(response)); - } else if (resp.name == "repositories") { - update_sink_->drainRepositories(); - C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true); - enqueue_c2_response(std::move(response)); - } else if (resp.name == "corecomponentstate") { - // TODO(bakaid): untested - std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name); - auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_); - if (state_manager_provider != nullptr) { - for (auto &component : components) { - logger_->log_debug("Clearing state for component %s", component->getComponentName()); - auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID()); - if (state_manager != nullptr) { - component->stop(); - state_manager->clear(); - state_manager->persist(); - component->start(); - } else { - logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID().to_string()); - } - } - } else { - logger_->log_error("Failed to get StateManagerProvider"); - } - C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true); - enqueue_c2_response(std::move(response)); - } else { - logger_->log_debug("Clearing unknown %s", resp.name); - } - + handle_clear(resp); break; case Operation::UPDATE: handle_update(resp); @@ -556,6 +519,43 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { enqueue_c2_response(std::move(response)); } +void C2Agent::handle_clear(const C2ContentResponse &resp) { + if (resp.name == "connection") { + for (const auto& connection : resp.operation_arguments) { + logger_->log_debug("Clearing connection %s", connection.second.to_string()); + update_sink_->clearConnection(connection.second.to_string()); + } + } else if (resp.name == "repositories") { + update_sink_->drainRepositories(); + } else if (resp.name == "corecomponentstate") { + for (const auto& corecomponent : resp.operation_arguments) { + std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(corecomponent.second.to_string()); + auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_); + if (state_manager_provider != nullptr) { + for (auto &component : components) { + logger_->log_debug("Clearing state for component %s", component->getComponentName()); + auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID()); + if (state_manager != nullptr) { + component->stop(); + state_manager->clear(); + state_manager->persist(); + component->start(); + } else { + logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID().to_string()); + } + } + } else { + logger_->log_error("Failed to get StateManagerProvider"); + } + } + } else { + logger_->log_error("Unknown clear operand %s", resp.name); + } + + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true); + enqueue_c2_response(std::move(response)); +} + void C2Agent::handle_update(const C2ContentResponse &resp) { // we've been told to update something if (resp.name == "configuration") {
