This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit bc46482985b5202479f83f47dc98e274cf434b6d Author: Gabor Gyimesi <[email protected]> AuthorDate: Fri Apr 1 12:51:22 2022 +0200 MINIFICPP-1779 Verify multiple C2 commands in HB response Signed-off-by: Adam Debreceni <[email protected]> This closes #1285 --- .../http-curl/tests/C2MultipleCommandsTest.cpp | 130 +++++++++++++++++++++ extensions/http-curl/tests/CMakeLists.txt | 1 + extensions/http-curl/tests/HTTPHandlers.h | 63 ++++++---- 3 files changed, 174 insertions(+), 20 deletions(-) diff --git a/extensions/http-curl/tests/C2MultipleCommandsTest.cpp b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp new file mode 100644 index 0000000..06697c3 --- /dev/null +++ b/extensions/http-curl/tests/C2MultipleCommandsTest.cpp @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include <string> +#include <vector> +#include <functional> + +#include "TestBase.h" +#include "Catch.h" +#include "HTTPIntegrationBase.h" +#include "HTTPHandlers.h" + +class AckAuditor { + public: + void addAck(const std::string& ack) { + std::lock_guard<std::mutex> guard(acknowledged_operations_mutex_); + acknowledged_operations_.insert(ack); + } + + bool isAcknowledged(const std::string& operation_id) const { + std::lock_guard<std::mutex> guard(acknowledged_operations_mutex_); + return acknowledged_operations_.count(operation_id) > 0; + } + + void addVerifier(std::function<void(const rapidjson::Document&)> verifier) { + std::lock_guard<std::mutex> guard(verify_ack_mutex_); + ack_verifiers_.push_back(std::move(verifier)); + } + + void verifyAck(const rapidjson::Document& root) { + std::lock_guard<std::mutex> guard(verify_ack_mutex_); + if (ack_verifiers_.empty()) { + assert(false); + } + + ack_verifiers_[next_verifier_index_](root); + ++next_verifier_index_; + if (next_verifier_index_ >= ack_verifiers_.size()) { + next_verifier_index_ = 0; + } + } + + private: + mutable std::mutex acknowledged_operations_mutex_; + mutable std::mutex verify_ack_mutex_; + std::unordered_set<std::string> acknowledged_operations_; + std::vector<std::function<void(const rapidjson::Document&)>> ack_verifiers_; + uint32_t next_verifier_index_ = 0; +}; + +class MultipleC2CommandHandler: public HeartbeatHandler { + public: + explicit MultipleC2CommandHandler(AckAuditor& ack_auditor, std::shared_ptr<minifi::Configure> configuration) + : HeartbeatHandler(std::move(configuration)), + ack_auditor_(ack_auditor) { + } + + void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override { + std::vector<C2Operation> operations{{"DESCRIBE", "manifest", "889345", {}}, {"DESCRIBE", "corecomponentstate", "889346", {}}}; + ack_auditor_.addVerifier([this](const rapidjson::Document& root) { + verifyJsonHasAgentManifest(root); + }); + ack_auditor_.addVerifier([](const rapidjson::Document& root) { + assert(root.HasMember("corecomponentstate")); + }); + sendHeartbeatResponse(operations, conn); + } + + void handleAcknowledge(const rapidjson::Document& root) override { + ack_auditor_.verifyAck(root); + if (root.IsObject() && root.HasMember("operationId")) { + ack_auditor_.addAck(root["operationId"].GetString()); + } + } + + private: + AckAuditor& ack_auditor_; +}; + +class VerifyC2MultipleCommands : public VerifyC2Base { + public: + explicit VerifyC2MultipleCommands(AckAuditor& auditor) + : ack_auditor_(auditor) { + } + + void testSetup() override { + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + VerifyC2Base::testSetup(); + } + + void configureFullHeartbeat() override { + configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_full_heartbeat, "false"); + } + + void runAssertions() override { + assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889345");})); + assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_auditor_.isAcknowledged("889346");})); + } + + private: + AckAuditor& ack_auditor_; +}; + +int main(int argc, char **argv) { + const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat"); + AckAuditor ack_auditor; + VerifyC2MultipleCommands harness(ack_auditor); + harness.setKeyDir(args.key_dir); + MultipleC2CommandHandler responder(ack_auditor, harness.getConfiguration()); + harness.setUrl(args.url, &responder); + harness.run(args.test_file); +} diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index 0077008..975b6e9 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -101,3 +101,4 @@ add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest) add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest) add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests) add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/") +add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/") diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index c5b6c2e..a0acc6b 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -404,31 +404,54 @@ class HeartbeatHandler : public ServerAwareHandler { } protected: - void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn, + struct C2Operation { + std::string operation; + std::string operand; + std::string operation_id; + std::unordered_map<std::string, std::string> args; + }; + + void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operation_id, struct mg_connection* conn, const std::unordered_map<std::string, std::string>& args = {}) { - std::string resp_args; - if (!args.empty()) { - resp_args = ", \"args\": {"; - auto it = args.begin(); - while (it != args.end()) { - resp_args += "\"" + it->first + "\": \"" + it->second + "\""; - ++it; - if (it != args.end()) { - resp_args += ", "; + sendHeartbeatResponse({{operation, operand, operation_id, args}}, conn); + } + + void sendHeartbeatResponse(const std::vector<C2Operation>& operations, struct mg_connection * conn) { + std::string operation_jsons; + for (const auto& c2_operation : operations) { + std::string resp_args; + if (!c2_operation.args.empty()) { + resp_args = ", \"args\": {"; + auto it = c2_operation.args.begin(); + while (it != c2_operation.args.end()) { + resp_args += "\"" + it->first + "\": \"" + it->second + "\""; + ++it; + if (it != c2_operation.args.end()) { + resp_args += ", "; + } } + resp_args += "}"; + } + + std::string operation_json = "{" + "\"operation\" : \"" + c2_operation.operation + "\"," + "\"operationid\" : \"" + c2_operation.operation_id + "\"," + "\"operand\": \"" + c2_operation.operand + "\"" + + resp_args + "}"; + + if (operation_jsons.empty()) { + operation_jsons += operation_json; + } else { + operation_jsons += ", " + operation_json; } - resp_args += "}"; } - std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" - "\"operation\" : \"" + operation + "\"," - "\"operationid\" : \"" + operationId + "\"," - "\"operand\": \"" + operand + "\"" + - resp_args + "}]}"; - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - heartbeat_response.length()); - mg_printf(conn, "%s", heartbeat_response.c_str()); + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ " + operation_jsons + " ]}"; + + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + heartbeat_response.length()); + mg_printf(conn, "%s", heartbeat_response.c_str()); } void verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector<std::string>& verify_components = {}, const std::vector<std::string>& disallowed_properties = {}) {
