bakaid commented on a change in pull request #743: Minificpp 1169 - Simplify C2 
metrics collection and reporting
URL: https://github.com/apache/nifi-minifi-cpp/pull/743#discussion_r395845086
 
 

 ##########
 File path: extensions/http-curl/tests/C2JstackTest.cpp
 ##########
 @@ -16,152 +16,63 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
 #include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
 
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-
-class ConfigHandler : public CivetHandler {
+class VerifyC2DescribeJstack : public VerifyC2Describe {
  public:
-  ConfigHandler() {
-    calls_ = 0;
-  }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    calls_++;
-    std::string heartbeat_response = "{\"operation\" : 
\"heartbeat\",\"requested_operations\": [  {"
-          "\"operation\" : \"describe\", "
-          "\"operationid\" : \"8675309\", "
-          "\"name\": \"jstack\""
-          "}]}";
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
-                heartbeat_response.length());
-      mg_printf(conn, "%s", heartbeat_response.c_str());
-
-
-    return true;
+  explicit VerifyC2DescribeJstack(bool isSecure)
+      : VerifyC2Describe(isSecure) {
   }
 
-  bool handleGet(CivetServer *server, struct mg_connection *conn) {
-    std::ifstream myfile(test_file_location_.c_str());
-
-    if (myfile.is_open()) {
-      std::stringstream buffer;
-      buffer << myfile.rdbuf();
-      std::string str = buffer.str();
-      myfile.close();
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: 
close\r\n\r\n",
-                str.length());
-      mg_printf(conn, "%s", str.c_str());
-    } else {
-      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
-    }
-
-    return true;
+  virtual void runAssertions() {
+    assert(LogTestController::getInstance().contains("SchedulingAgent") == 
true);
   }
-  std::string test_file_location_;
-  std::atomic<size_t> calls_;
 };
 
-int main(int argc, char **argv) {
-  mg_init_library(0);
-  LogTestController::getInstance().setInfo<minifi::FlowController>();
-  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
-  LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
-  LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+class DescribeJstackHandler : public HeartbeatHandler {
+ public:
+  explicit DescribeJstackHandler(bool isSecure)
+     : HeartbeatHandler(isSecure) {
+  }
 
-  const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
-  std::vector<std::string> cpp_options;
-  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct 
mg_connection * conn) {
+    sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
   }
 
-  CivetServer server(cpp_options);
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+    assert(root.HasMember("Flowcontroller threadpool #0") == true);
+  }
 
-  std::string port_str = std::to_string(server.getListeningPorts()[0]);
+};
 
-  ConfigHandler h_ex;
-  server.addHandler("/update", h_ex);
-  std::string key_dir, test_file_location;
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:0/api/heartbeat";;
   if (argc > 1) {
-    h_ex.test_file_location_ = test_file_location = argv[1];
-    key_dir = argv[2];
+    test_file_location = argv[1];
+    if (argc > 2) {
+      url = "https://localhost:0/api/heartbeat";;
+      key_dir = argv[2];
+    }
   }
 
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
 
-  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
-
-  std::string c2_rest_url = "http://localhost:"; + port_str + "/update";
-
-  configuration->set("c2.rest.url", c2_rest_url);
-  configuration->set("c2.agent.heartbeat.period", "1000");
-
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
-
-  configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
-
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));
-  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
-  true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
+  VerifyC2DescribeJstack harness(isSecure);
 
-  std::unique_ptr<core::ProcessGroup> ptr = 
yaml_config.getRoot(test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = 
std::shared_ptr<core::ProcessGroup>(ptr.get());
-  ptr.release();
-  auto start = std::chrono::system_clock::now();
+  harness.setKeyDir(key_dir);
 
-  controller->load();
-  controller->start();
-  waitToVerifyProcessor();
+  DescribeJstackHandler responder(isSecure);
 
-  controller->waitUnload(60000);
-  auto then = std::chrono::system_clock::now();
+  harness.setUrl(url, &responder);
 
-  auto milliseconds = 
std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
-  std::string logs = LogTestController::getInstance().log_output.str();
-  #ifndef WIN32
-  assert(logs.find("SchedulingAgent") != std::string::npos);
-  #endif
-  LogTestController::getInstance().reset();
-  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
+  harness.run(test_file_location);
 
-  return 0;
 }
 
 Review comment:
   @msharee9 you are right, the proper calls to the IdGenerator was added in 
this PR. Even with the previous version though, `CoreComponent::id_generator_` 
was statically initialized (see the original `Core.cpp`, so an issue with it 
not yet being initialized could only have occured if some other static 
initialization tried to use. Even then, it should have failed much sooner, not 
at the underlying uuid_impl_ point: the `id_generator_` shared_ptr itself would 
have been nullptr.
   
   Anyway, unfortunately I was able to reproduce the issue on Linux with your 
latest version. The IdGenerator (or the underlying uuid_impl_ ) is not nullptr, 
rather, a substructure of the uuid object, the md5 calculator seems to be 
changed/overwritten at some time:
   ```
   Thread 58 "C2JstackTest" received signal SIGSEGV, Segmentation fault.
   [Switching to Thread 0x7ffff4f07700 (LWP 39351)]
   MD5Update (context=0x21, input=0x7ffff4f06040 "\354\004u^", inputLen=32) at 
uuid_md5.c:176
   176      idx = (unsigned int)((context->count[0] >> 3) & 0x3F);
   (gdb) bt
   #0  MD5Update (context=0x21, input=0x7ffff4f06040 "\354\004u^", inputLen=32) 
at uuid_md5.c:176
   #1  0x0000555555952abe in uuid_md5_update (md5=<optimized out>, 
data_ptr=data_ptr@entry=0x7ffff4f06040, data_len=data_len@entry=32) at 
uuid_md5.c:398
   #2  0x00005555559537a3 in uuid_prng_data (prng=0x555555df8c80, 
data_ptr=data_ptr@entry=0x7ffff4f060c8, data_len=data_len@entry=2) at 
uuid_prng.c:164
   #3  0x0000555555951e12 in uuid_make_v1 (ap=<optimized out>, mode=<optimized 
out>, uuid=<optimized out>) at uuid.c:952
   #4  uuid_make (uuid=0x555555df8030, mode=1) at uuid.c:1181
   #5  0x000055555595037a in uuid::make(unsigned int, ...) ()
   #6  0x00005555557dd0f1 in 
org::apache::nifi::minifi::utils::IdGenerator::generateWithUuidImpl 
(this=0x555555df5e70, mode=<optimized out>, output=0x7ffff4f06300 "\006") at 
/home/bakaid/nifi-minifi-cpp/libminifi/src/utils/Id.cpp:286
   #7  0x00005555557dd255 in 
org::apache::nifi::minifi::utils::IdGenerator::generate (this=<optimized out>, 
ident=...) at /home/bakaid/nifi-minifi-cpp/libminifi/src/utils/Id.cpp:329
   #8  0x0000555555770084 in 
org::apache::nifi::minifi::core::CoreComponent::CoreComponent (name=..., 
this=0x7ffff4f06430) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/core/Core.h:166
   #9  org::apache::nifi::minifi::core::Connectable::Connectable 
(this=0x7ffff4f06430, name=...) at 
/home/bakaid/nifi-minifi-cpp/libminifi/src/core/Connectable.cpp:43
   #10 0x0000555555816de8 in 
org::apache::nifi::minifi::state::response::ResponseNode::ResponseNode 
(name="status", this=0x7ffff4f06430) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/core/state/nodes/../nodes/MetricsBase.h:48
   #11 
org::apache::nifi::minifi::state::response::DeviceInformation::DeviceInformation
 (name="status", this=0x7ffff4f06430) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/core/state/nodes/../nodes/MetricsBase.h:97
   #12 
org::apache::nifi::minifi::state::response::StateMonitorNode::StateMonitorNode 
(name="status", this=0x7ffff4f06430) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/core/state/nodes/../nodes/StateMonitor.h:53
   #13 org::apache::nifi::minifi::state::response::AgentStatus::AgentStatus 
(name="status", this=0x7ffff4f06430) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/core/state/nodes/AgentInformation.h:418
   #14 org::apache::nifi::minifi::state::response::AgentNode::getAgentStatus 
(this=this@entry=0x555556019640) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/core/state/nodes/AgentInformation.h:687
   #15 0x000055555581d5bf in 
org::apache::nifi::minifi::state::response::AgentInformation::serialize 
(this=0x555556019640) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/core/state/nodes/AgentInformation.h:738
   #16 0x000055555573ed5e in 
org::apache::nifi::minifi::c2::C2Agent::performHeartBeat (this=0x5555560199c0) 
at /home/bakaid/nifi-minifi-cpp/libminifi/src/c2/C2Agent.cpp:314
   #17 0x000055555573f14f in 
org::apache::nifi::minifi::c2::C2Agent::<lambda()>::operator()(void) const 
(__closure=0x7ffff4f06d10) at 
/home/bakaid/nifi-minifi-cpp/libminifi/src/c2/C2Agent.cpp:100
   #18 0x000055555573f951 in 
std::function<org::apache::nifi::minifi::utils::TaskRescheduleInfo 
()>::operator()() const (this=0x7ffff4f06d10) at 
/usr/include/c++/7/bits/std_function.h:706
   #19 
org::apache::nifi::minifi::utils::Worker<org::apache::nifi::minifi::utils::TaskRescheduleInfo>::run
 (this=0x7ffff4f06ce0) at 
/home/bakaid/nifi-minifi-cpp/libminifi/include/utils/ThreadPool.h:97
   #20 0x00005555557f2ec9 in 
org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>::run_tasks
 (this=0x555556012aa8, thread=...)
       at /home/bakaid/nifi-minifi-cpp/libminifi/src/utils/ThreadPool.cpp:49
   #21 0x00005555557e3d49 in std::__invoke_impl<void, void 
(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>::*&)(std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>),
 
org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>*&,
 std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>&> 
(__t=<optimized out>, __f=<optimized out>)
       at /usr/include/c++/7/bits/invoke.h:73
   #22 std::__invoke<void 
(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>::*&)(std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>),
 
org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>*&,
 std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>&> 
(__fn=<optimized out>) at /usr/include/c++/7/bits/invoke.h:95
   #23 std::_Bind<void 
(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>::*(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>*,
 
std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>))(std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>)>::__call<void,
 , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (__args=..., 
this=<optimized out>)
       at /usr/include/c++/7/functional:467
   #24 std::_Bind<void 
(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>::*(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>*,
 
std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>))(std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>)>::operator()<,
 void>() (this=<optimized out>) at /usr/include/c++/7/functional:551
   #25 std::_Function_handler<void (), std::_Bind<void 
(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>::*(org::apache::nifi::minifi::utils::ThreadPool<org::apache::nifi::minifi::utils::TaskRescheduleInfo>*,
 
std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>))(std::shared_ptr<org::apache::nifi::minifi::utils::WorkerThread>)>
 >::_M_invoke(std::_Any_data const&) (__functor=...)
       at /usr/include/c++/7/bits/std_function.h:316
   #26 0x00007ffff74c96ef in execute_native_thread_routine () at 
../../../../../src/libstdc++-v3/src/c++11/thread.cc:80
   #27 0x00007ffff7bbd6db in start_thread (arg=0x7ffff4f07700) at 
pthread_create.c:463
   #28 0x00007ffff6f2488f in clone () at 
../sysdeps/unix/sysv/linux/x86_64/clone.S:95
   (gdb) f 6
   #6  0x00005555557dd0f1 in 
org::apache::nifi::minifi::utils::IdGenerator::generateWithUuidImpl 
(this=0x555555df5e70, mode=<optimized out>, output=0x7ffff4f06300 "\006") at 
/home/bakaid/nifi-minifi-cpp/libminifi/src/utils/Id.cpp:286
   286      uuid_impl_->make(mode);
   (gdb) p *uuid_impl_.get()->ctx
   $13 = {obj = {time_low = 3412410872, time_mid = 27348, time_hi_and_version = 
490, clock_seq_hi_and_reserved = 33 '!', clock_seq_low = 0 '\000', node = 
"\000\000\000\000\000"}, prng = 0x555555df8c80, md5 = 0x555555df7e70, sha1 = 
0x0,
     mac = "\241\000\000\000\000", time_last = {tv_sec = 93825001311728, 
tv_usec = 93825001663376}, time_seq = 0}
   (gdb) p *uuid_impl_.get()->ctx->prng
   $14 = {dev = 1330905088, md5 = 0x21, cnt = 93825001295265}
   (gdb) p *uuid_impl_.get()->ctx->prng->md5
   Cannot access memory at address 0x21
   ```
   It is nondeterministic, and there are also random crashes and aborts at 
other points.
   valgrind also sometimes finds errors:
   ```
   ==38778== HEAP SUMMARY:
   ==38778==     in use at exit: 1,887,773 bytes in 17,209 blocks
   ==38778==   total heap usage: 98,643 allocs, 81,434 frees, 16,173,074 bytes 
allocated
   ==38778==
   ==38778== Searching for pointers to 17,209 not-freed blocks
   ==38778== Checked 60,375,896 bytes
   ==38778==
   ==38778== LEAK SUMMARY:
   ==38778==    definitely lost: 1,088 bytes in 1 blocks
   ==38778==    indirectly lost: 19,129 bytes in 126 blocks
   ==38778==      possibly lost: 2,128 bytes in 7 blocks
   ==38778==    still reachable: 1,865,428 bytes in 17,075 blocks
   ==38778==                       of which reachable via heuristic:
   ==38778==                         multipleinheritance: 744 bytes in 1 blocks
   ==38778==         suppressed: 0 bytes in 0 blocks
   ==38778== Rerun with --leak-check=full to see details of leaked memory
   ==38778==
   ==38778== ERROR SUMMARY: 425 errors from 1 contexts (suppressed: 0 from 0)
   ==38778==
   ==38778== 425 errors in context 1 of 1:
   ==38778== Conditional jump or move depends on uninitialised value(s)
   ==38778==    at 0x4C32D08: strlen (in 
/usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
   ==38778==    by 0x2ACC57: length (char_traits.h:320)
   ==38778==    by 0x2ACC57: std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> >::basic_string(char const*, 
std::allocator<char> const&) [clone .isra.565] (basic_string.h:511)
   ==38778==    by 0x2D5D2B: readPost (HTTPHandlers.h:360)
   ==38778==    by 0x2D5D2B: HeartbeatHandler::verify(mg_connection*) 
(HTTPHandlers.h:421)
   ==38778==    by 0x2D6640: HeartbeatHandler::handlePost(CivetServer*, 
mg_connection*) (HTTPHandlers.h:438)
   ==38778==    by 0x4AB8C8: CivetServer::requestHandler(mg_connection*, void*) 
(CivetServer.cpp:148)
   ==38778==    by 0x4B9E4C: handle_request (civetweb.c:12704)
   ==38778==    by 0x4BB4DC: process_new_connection (civetweb.c:15918)
   ==38778==    by 0x4BB82F: worker_thread_run (civetweb.c:16225)
   ==38778==    by 0x4BB82F: worker_thread (civetweb.c:16290)
   ==38778==    by 0x4E436DA: start_thread (pthread_create.c:463)
   ==38778==    by 0x5B3E88E: clone (clone.S:95)
   ==38778==  Uninitialised value was created by a stack allocation
   ==38778==    at 0x2D5CB0: HeartbeatHandler::verify(mg_connection*) 
(HTTPHandlers.h:420)
   ==38778==
   ==38778== ERROR SUMMARY: 425 errors from 1 contexts (suppressed: 0 from 0)
   ```
   
   The whole thing seems to me like a memory corruption/stack corruption issue. 
I am investigating it further, but I do not at all find it unlikely that it 
could be caused by the things we do in the SIGUSR2 signal handlers to get 
BackTraces.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to