msharee9 commented on a change in pull request #743: Minificpp 1169 - Simplify 
C2 metrics collection and reporting
URL: https://github.com/apache/nifi-minifi-cpp/pull/743#discussion_r397588367
 
 

 ##########
 File path: extensions/http-curl/tests/C2JstackTest.cpp
 ##########
 @@ -16,152 +16,63 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
 #include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
 
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-
-class ConfigHandler : public CivetHandler {
+class VerifyC2DescribeJstack : public VerifyC2Describe {
  public:
-  ConfigHandler() {
-    calls_ = 0;
-  }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    calls_++;
-    std::string heartbeat_response = "{\"operation\" : 
\"heartbeat\",\"requested_operations\": [  {"
-          "\"operation\" : \"describe\", "
-          "\"operationid\" : \"8675309\", "
-          "\"name\": \"jstack\""
-          "}]}";
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
-                heartbeat_response.length());
-      mg_printf(conn, "%s", heartbeat_response.c_str());
-
-
-    return true;
+  explicit VerifyC2DescribeJstack(bool isSecure)
+      : VerifyC2Describe(isSecure) {
   }
 
-  bool handleGet(CivetServer *server, struct mg_connection *conn) {
-    std::ifstream myfile(test_file_location_.c_str());
-
-    if (myfile.is_open()) {
-      std::stringstream buffer;
-      buffer << myfile.rdbuf();
-      std::string str = buffer.str();
-      myfile.close();
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
-                str.length());
-      mg_printf(conn, "%s", str.c_str());
-    } else {
-      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
-    }
-
-    return true;
+  virtual void runAssertions() {
+    assert(LogTestController::getInstance().contains("SchedulingAgent") == 
true);
   }
-  std::string test_file_location_;
-  std::atomic<size_t> calls_;
 };
 
-int main(int argc, char **argv) {
-  mg_init_library(0);
-  LogTestController::getInstance().setInfo<minifi::FlowController>();
-  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
-  LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
-  LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+class DescribeJstackHandler : public HeartbeatHandler {
+ public:
+  explicit DescribeJstackHandler(bool isSecure)
+     : HeartbeatHandler(isSecure) {
+  }
 
-  const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
-  std::vector<std::string> cpp_options;
-  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct 
mg_connection * conn) {
+    sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
   }
 
-  CivetServer server(cpp_options);
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+    assert(root.HasMember("Flowcontroller threadpool #0") == true);
+  }
 
-  std::string port_str = std::to_string(server.getListeningPorts()[0]);
+};
 
-  ConfigHandler h_ex;
-  server.addHandler("/update", h_ex);
-  std::string key_dir, test_file_location;
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:0/api/heartbeat";;
   if (argc > 1) {
-    h_ex.test_file_location_ = test_file_location = argv[1];
-    key_dir = argv[2];
+    test_file_location = argv[1];
+    if (argc > 2) {
+      url = "https://localhost:0/api/heartbeat";;
+      key_dir = argv[2];
+    }
   }
 
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
 
-  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-
-  std::string c2_rest_url = "http://localhost:"; + port_str + "/update";
-
-  configuration->set("c2.rest.url", c2_rest_url);
-  configuration->set("c2.agent.heartbeat.period", "1000");
-
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
-
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));
-  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
-  true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
+  VerifyC2DescribeJstack harness(isSecure);
 
-  std::unique_ptr<core::ProcessGroup> ptr = 
yaml_config.getRoot(test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = 
std::shared_ptr<core::ProcessGroup>(ptr.get());
-  ptr.release();
-  auto start = std::chrono::system_clock::now();
+  harness.setKeyDir(key_dir);
 
-  controller->load();
-  controller->start();
-  waitToVerifyProcessor();
+  DescribeJstackHandler responder(isSecure);
 
-  controller->waitUnload(60000);
-  auto then = std::chrono::system_clock::now();
+  harness.setUrl(url, &responder);
 
-  auto milliseconds = 
std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
-  std::string logs = LogTestController::getInstance().log_output.str();
-  #ifndef WIN32
-  assert(logs.find("SchedulingAgent") != std::string::npos);
-  #endif
-  LogTestController::getInstance().reset();
-  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
+  harness.run(test_file_location);
 
-  return 0;
 }
 
 Review comment:
   @bakaid Your analysis of the crash is spot on. I could also reproduce the 
C2JstackTest crash on Linux and MacOSX.
   The reason behind the crash stems from the memory leak due to shared_ptr 
cycles. When the test completes the FlowController is never destroyed and that 
means the thread pool is not shutdown. Therefore, the C2Agent thread or any 
other thread that accesses the static memory that was released by the main 
thread causes a segfault.
   
   Solutions: There are different approaches to solve this problem.
   1. Do not share the threadpool with C2Agent. Run the C2Agent operations in a 
separate thread and when we stop FlowController we can optionally stop the 
C2Agent as well.
   
   2. Optionally stop the threadpool in FlowController::stop method. We accept 
boolean parameter to determine shutting down the threadpool.
    
   3. Expose a public member function FlowController::stopThreadPool that will 
just shutdown the threadpool.
   
   I personally like option 1, but I would like to create a separate Jira to do 
that as it will be more extensive work that well surpasses the scope of this PR.
   
   Option 2 is also acceptable but there is a caveat while trying to stop 
components other than FlowController. It requires more investigation finding 
its side effects.
   
   Option 1 seems to be a cleaner approach and so effort and time spent for 
option 2 makes little sense.
   In the scope of this PR, I will go with option 3 and here is a follow up 
Jira that will implement Option 1 and get rid of the extra method we have to 
add here.
   
   Follow up: https://issues.apache.org/jira/browse/MINIFICPP-1186
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to