This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e0257d82291d0e0e92e0733ca22a81f416d64239
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Fri Feb 4 17:20:18 2022 +0100

    MINIFICPP-1750 Fix C2 clear corecomponent state command
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    
    This closes #1255
---
 .../tests/C2ClearCoreComponentStateTest.cpp        | 152 +++++++++++++++++++++
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/http-curl/tests/HTTPHandlers.h          |  20 ++-
 libminifi/include/c2/C2Agent.h                     |   2 +
 libminifi/include/utils/TestUtils.h                |   2 +-
 libminifi/src/c2/C2Agent.cpp                       |  76 +++++------
 6 files changed, 212 insertions(+), 41 deletions(-)

diff --git a/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp 
b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
new file mode 100644
index 0000000..c0f4006
--- /dev/null
+++ b/extensions/http-curl/tests/C2ClearCoreComponentStateTest.cpp
@@ -0,0 +1,152 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <string>
+#include "TestBase.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "processors/TailFile.h"
+#include "state/ProcessorController.h"
+#include "utils/file/FileUtils.h"
+#include "utils/TestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+class VerifyC2ClearCoreComponentState : public VerifyC2Base {
+ public:
+  explicit VerifyC2ClearCoreComponentState(const std::atomic_bool& 
component_cleared_successfully) : 
component_cleared_successfully_(component_cleared_successfully) {
+    auto temp_dir = testController.createTempDirectory();
+    test_file_1_ = minifi::utils::putFileToDir(temp_dir, "test1.txt", "foo\n");
+    test_file_2_ = minifi::utils::putFileToDir(temp_dir, "test2.txt", 
"foobar\n");
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessContext>();
+    VerifyC2Base::testSetup();
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(40s, [&] { return 
component_cleared_successfully_.load(); }));
+  }
+
+ protected:
+  void updateProperties(std::shared_ptr<minifi::FlowController> 
flow_controller) override {
+    
std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile1")[0])
+        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, 
test_file_1_);
+    
std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile2")[0])
+        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, 
test_file_2_);
+  }
+
+  TestController testController;
+  std::string test_file_1_;
+  std::string test_file_2_;
+  const std::atomic_bool& component_cleared_successfully_;
+};
+
+class ClearCoreComponentStateHandler: public HeartbeatHandler {
+ public:
+  explicit ClearCoreComponentStateHandler(std::atomic_bool& 
component_cleared_successfully) : 
component_cleared_successfully_(component_cleared_successfully) {
+  }
+
+  void handleHeartbeat(const rapidjson::Document&, struct mg_connection * 
conn) override {
+    switch (flow_state_) {
+      case FlowState::STARTED:
+        sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", 
conn);
+        flow_state_ = FlowState::FIRST_DESCRIBE_SENT;
+        break;
+      case FlowState::FIRST_DESCRIBE_SENT: {
+        sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { 
{"corecomponent1", "TailFile1"} });
+        flow_state_ = FlowState::CLEAR_SENT;
+        break;
+      }
+      default:
+        sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889347", 
conn);
+        flow_state_ = FlowState::SECOND_DESCRIBE_SENT;
+    }
+  }
+
+  void handleAcknowledge(const rapidjson::Document& root) override {
+    switch (flow_state_) {
+      case FlowState::FIRST_DESCRIBE_SENT: {
+        assert(root.HasMember("corecomponentstate"));
+
+        auto assertExpectedTailFileState = [&](const char* uuid, const char* 
name, const char* position) {
+          assert(root["corecomponentstate"].HasMember(uuid));
+          const auto& tf = root["corecomponentstate"][uuid];
+          assert(tf.HasMember("file.0.name"));
+          assert(std::string(tf["file.0.name"].GetString()) == name);
+          assert(tf.HasMember("file.0.position"));
+          assert(std::string(tf["file.0.position"].GetString()) == position);
+          assert(tf.HasMember("file.0.current"));
+          assert(strlen(tf["file.0.current"].GetString()) > 0U);
+        };
+
+        assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1993", 
"test1.txt", "4");
+        assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1994", 
"test2.txt", "7");
+
+        last_read_time_1_ = 
std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString());
+        last_read_time_2_ = 
std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString());
+        break;
+      }
+      case FlowState::CLEAR_SENT:
+        break;
+      case FlowState::SECOND_DESCRIBE_SENT: {
+        auto clearedStateFound = [this, &root]() {
+          return root.HasMember("corecomponentstate") &&
+            
root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1993") &&
+            
root["corecomponentstate"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1994") &&
+            
std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1994"]["file.0.last_read_time"].GetString())
 == last_read_time_2_ &&
+            
std::string(root["corecomponentstate"]["2438e3c8-015a-1000-79ca-83af40ec1993"]["file.0.last_read_time"].GetString())
 != last_read_time_1_;
+        };
+        component_cleared_successfully_ = clearedStateFound();
+        break;
+      }
+      default:
+        throw std::runtime_error("Invalid flow state state when handling 
acknowledge message!");
+    }
+  }
+
+ private:
+  enum class FlowState {
+    STARTED,
+    FIRST_DESCRIBE_SENT,
+    CLEAR_SENT,
+    SECOND_DESCRIBE_SENT
+  };
+
+  std::atomic<FlowState> flow_state_{FlowState::STARTED};
+  std::atomic_bool& component_cleared_successfully_;
+  std::string last_read_time_1_;
+  std::string last_read_time_2_;
+};
+
+int main(int argc, char **argv) {
+  std::atomic_bool component_cleared_successfully{false};
+  const cmd_args args = parse_cmdline_args(argc, argv, "api/heartbeat");
+  VerifyC2ClearCoreComponentState harness(component_cleared_successfully);
+  harness.setKeyDir(args.key_dir);
+  ClearCoreComponentStateHandler handler(component_cleared_successfully);
+  harness.setUrl(args.url, &handler);
+  harness.run(args.test_file);
+  return 0;
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt 
b/extensions/http-curl/tests/CMakeLists.txt
index e5410b5..3fed4bd 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -98,3 +98,4 @@ add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest 
"${TEST_RESOURCES}/C2P
 add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest)
 add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest)
 add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests)
+add_test(NAME C2ClearCoreComponentStateTest COMMAND 
C2ClearCoreComponentStateTest 
"${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h 
b/extensions/http-curl/tests/HTTPHandlers.h
index c769bd3..b198677 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -26,6 +26,7 @@
 #include <utility>
 #include <vector>
 #include <set>
+#include <unordered_map>
 
 #include "civetweb.h"
 #include "CivetServer.h"
@@ -393,11 +394,26 @@ class HeartbeatHandler : public ServerAwareHandler {
     mg_printf(conn, "%s", resp.c_str());
   }
 
-  void sendHeartbeatResponse(const std::string& operation, const std::string& 
operand, const std::string& operationId, struct mg_connection * conn) {
+  void sendHeartbeatResponse(const std::string& operation, const std::string& 
operand, const std::string& operationId, struct mg_connection * conn,
+      const std::unordered_map<std::string, std::string>& args = {}) {
+    std::string resp_args;
+    if (!args.empty()) {
+      resp_args = ", \"args\": {";
+      auto it = args.begin();
+      while (it != args.end()) {
+        resp_args += "\"" + it->first + "\": \"" + it->second + "\"";
+        ++it;
+        if (it != args.end()) {
+          resp_args += ", ";
+        }
+      }
+      resp_args += "}";
+    }
     std::string heartbeat_response = "{\"operation\" : 
\"heartbeat\",\"requested_operations\": [  {"
           "\"operation\" : \"" + operation + "\","
           "\"operationid\" : \"" + operationId + "\","
-          "\"operand\": \"" + operand + "\"}]}";
+          "\"operand\": \"" + operand + "\"" +
+          resp_args + "}]}";
 
       mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
                 "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index a62adc1..a25c280 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -137,6 +137,8 @@ class C2Agent : public state::UpdateController {
    */
   virtual void handle_c2_server_response(const C2ContentResponse &resp);
 
+  void handle_clear(const C2ContentResponse &resp);
+
   /**
    * Handles an update request
    * @param C2ContentResponse response
diff --git a/libminifi/include/utils/TestUtils.h 
b/libminifi/include/utils/TestUtils.h
index 5f6e3d4..4a82a35 100644
--- a/libminifi/include/utils/TestUtils.h
+++ b/libminifi/include/utils/TestUtils.h
@@ -46,7 +46,7 @@ std::string putFileToDir(const std::string& dir_path, const 
std::string& file_na
 
 std::string getFileContent(const std::string& file_name) {
   std::ifstream file_handle(file_name, std::ios::binary | std::ios::in);
-  REQUIRE(file_handle.is_open());
+  assert(file_handle.is_open());
   std::string file_content{ (std::istreambuf_iterator<char>(file_handle)), 
(std::istreambuf_iterator<char>()) };
   return file_content;
 }
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 50ed9b6..e7d7d3c 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -334,44 +334,7 @@ struct C2DebugBundleError : public C2TransferError {
 void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
   switch (resp.op.value()) {
     case Operation::CLEAR:
-      // we've been told to clear something
-      if (resp.name == "connection") {
-        for (const auto& connection : resp.operation_arguments) {
-          logger_->log_debug("Clearing connection %s", 
connection.second.to_string());
-          update_sink_->clearConnection(connection.second.to_string());
-        }
-        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
-        enqueue_c2_response(std::move(response));
-      } else if (resp.name == "repositories") {
-        update_sink_->drainRepositories();
-        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
-        enqueue_c2_response(std::move(response));
-      } else if (resp.name == "corecomponentstate") {
-        // TODO(bakaid): untested
-        std::vector<std::shared_ptr<state::StateController>> components = 
update_sink_->getComponents(resp.name);
-        auto state_manager_provider = 
core::ProcessContext::getStateManagerProvider(logger_, controller_, 
configuration_);
-        if (state_manager_provider != nullptr) {
-          for (auto &component : components) {
-            logger_->log_debug("Clearing state for component %s", 
component->getComponentName());
-            auto state_manager = 
state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
-            if (state_manager != nullptr) {
-              component->stop();
-              state_manager->clear();
-              state_manager->persist();
-              component->start();
-            } else {
-              logger_->log_warn("Failed to get StateManager for component %s", 
component->getComponentUUID().to_string());
-            }
-          }
-        } else {
-          logger_->log_error("Failed to get StateManagerProvider");
-        }
-        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
-        enqueue_c2_response(std::move(response));
-      } else {
-        logger_->log_debug("Clearing unknown %s", resp.name);
-      }
-
+      handle_clear(resp);
       break;
     case Operation::UPDATE:
       handle_update(resp);
@@ -556,6 +519,43 @@ void C2Agent::handle_describe(const C2ContentResponse 
&resp) {
   enqueue_c2_response(std::move(response));
 }
 
+void C2Agent::handle_clear(const C2ContentResponse &resp) {
+  if (resp.name == "connection") {
+    for (const auto& connection : resp.operation_arguments) {
+      logger_->log_debug("Clearing connection %s", 
connection.second.to_string());
+      update_sink_->clearConnection(connection.second.to_string());
+    }
+  } else if (resp.name == "repositories") {
+    update_sink_->drainRepositories();
+  } else if (resp.name == "corecomponentstate") {
+    for (const auto& corecomponent : resp.operation_arguments) {
+      std::vector<std::shared_ptr<state::StateController>> components = 
update_sink_->getComponents(corecomponent.second.to_string());
+      auto state_manager_provider = 
core::ProcessContext::getStateManagerProvider(logger_, controller_, 
configuration_);
+      if (state_manager_provider != nullptr) {
+        for (auto &component : components) {
+          logger_->log_debug("Clearing state for component %s", 
component->getComponentName());
+          auto state_manager = 
state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
+          if (state_manager != nullptr) {
+            component->stop();
+            state_manager->clear();
+            state_manager->persist();
+            component->start();
+          } else {
+            logger_->log_warn("Failed to get StateManager for component %s", 
component->getComponentUUID().to_string());
+          }
+        }
+      } else {
+        logger_->log_error("Failed to get StateManagerProvider");
+      }
+    }
+  } else {
+    logger_->log_error("Unknown clear operand %s", resp.name);
+  }
+
+  C2Payload response(Operation::ACKNOWLEDGE, resp.ident, true);
+  enqueue_c2_response(std::move(response));
+}
+
 void C2Agent::handle_update(const C2ContentResponse &resp) {
   // we've been told to update something
   if (resp.name == "configuration") {

Reply via email to