Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 8e258f545 -> 6a7f98904
MINIFICPP-623: Add trace capabilities to controller and agent MINIFICPP-623: Change test port and controller sizes for OSX MINIFICPP-623: avoid ifdef checks that may or may not exist on platforms -- use cmake check This closes #424. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6a7f9890 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6a7f9890 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6a7f9890 Branch: refs/heads/master Commit: 6a7f98904bc09b8b34eadfacaf4a0088f0d5bb22 Parents: 8e258f5 Author: Marc Parisi <[email protected]> Authored: Thu Oct 18 20:13:15 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Mon Oct 22 10:33:58 2018 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 8 + OPS.md | 97 +++++++++++ README.md | 62 +------ controller/Controller.h | 60 +++++-- controller/MiNiFiController.cpp | 21 ++- extensions/http-curl/tests/C2JstackTest.cpp | 164 ++++++++++++++++++ extensions/http-curl/tests/CMakeLists.txt | 1 + libminifi/include/FlowController.h | 2 + libminifi/include/SchedulingAgent.h | 13 +- libminifi/include/core/state/UpdateController.h | 14 +- libminifi/include/utils/BackTrace.h | 169 +++++++++++++++++++ libminifi/include/utils/ThreadPool.h | 47 +++++- libminifi/src/FlowController.cpp | 9 + libminifi/src/Properties.cpp | 6 +- libminifi/src/c2/C2Agent.cpp | 58 +++++-- libminifi/src/c2/ControllerSocketProtocol.cpp | 16 ++ libminifi/src/utils/BackTrace.cpp | 132 +++++++++++++++ libminifi/test/unit/BackTraceTests.cpp | 116 +++++++++++++ libminifi/test/unit/ControllerTests.cpp | 4 + 19 files changed, 893 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b0f7a3..1c92446 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,6 +27,7 @@ option(SKIP_TESTS "Skips building all tests." OFF) option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON) option(USE_SYSTEM_OPENSSL "Instructs the build system to search for and use an SSL library available in the host system" ON) option(OPENSSL_OFF "Disables OpenSSL" OFF) +option(ENABLE_OPS "Enable Operations Tools" ON) option(USE_SYSTEM_UUID "Instructs the build system to search for and use an UUID library available in the host system" OFF) option(USE_SYSTEM_CURL "Instructs the build system to search for and use a cURL library available in the host system" ON) if (WIN32) @@ -39,6 +40,7 @@ option(USE_SYSTEM_BZIP2 "Instructs the build system to search for and use a bzip option(BUILD_ROCKSDB "Instructs the build system to use RocksDB from the third party directory" ON) option(FORCE_WINDOWS "Instructs the build system to force Windows builds when WIN32 is specified" OFF) +include(CheckIncludeFile) include(FeatureSummary) include(ExternalProject) @@ -73,6 +75,12 @@ if(CCACHE_FOUND) message("-- Found ccache: ${CCACHE_FOUND}") endif(CCACHE_FOUND) +# check for exec info before we enable the backtrace features. +CHECK_INCLUDE_FILE("execinfo.h" HAS_EXECINFO) +if (ENABLE_OPS AND HAS_EXECINFO AND NOT WIN32) + add_definitions("-DHAS_EXECINFO=1") +endif() + #### Establish Project Configuration #### # Enable usage of the VERSION specifier include(CheckCXXCompilerFlag) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/OPS.md ---------------------------------------------------------------------- diff --git a/OPS.md b/OPS.md new file mode 100644 index 0000000..c993785 --- /dev/null +++ b/OPS.md @@ -0,0 +1,97 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Apache NiFi - MiNiFi - Operations Readme. + + +This readme defines operational commands for managing instances. + +## Table of Contents + +- [Description](#description) +- [Managing](#managing-minifi) + - [Commands](#commands) + +## Description + +Apache NiFi MiNiFi C++ can be managed through our [C2 protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal) +or through a local interface called the MiNiFi Controller + +## Managing MiNiFi + +The MiNiFi controller is an executable in the bin directory that can be used to control the MiNiFi C++ agent while it runs -- utilizing the [Command and Control Protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal). Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy. + +The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required. +The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows +you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only +minificontroller on the local host can control the agent: + + $ controller.socket.host=localhost + $ controller.socket.port=9998 + $ controller.socket.local.any.interface=true/false (default: false) + +These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller +will be disabled in your deployment. + + The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below. + Note that with all commands an immediate response by the agent isn't guaranteed. In all cases the agent assumes the role of validating that a response was received, but execution of said command may take some time depending on a number of factors to include persistent storage type, size of queues, and speed of hardware. + +### Debug + + Agents have the ability to return a list of stacks of currently running threads. The Jstack command provides a list of call stacks + for threads within the agent. This may allow users and maintainers to view stacks of running threads to diagnose issues. The name + is an homage to the jstack command used by Java developers. The design is fundamentally the same as that of Java -- signal handlers + notify signals to interrupt and provide traces. This feature is currently not built into Windows builds. + +### Commands + #### Specifying connecting information + + ./minificontroller --host "host name" --port "port" + + * By default these options use those defined in minifi.properties and are not required + + #### Start Command + + ./minificontroller --start "component name" + + #### Stack command + ./minificontroller --jstack + + #### Stop command + ./minificontroller --stop "component name" + + #### List connections command + ./minificontroller --list connections + + #### List components command + ./minificontroller --list components + + #### Clear connection command + ./minificontroller --clear "connection name" + + #### GetSize command + ./minificontroller --getsize "connection name" + + * Returns the size of the connection. The current size along with the max will be reported + + #### Update flow + ./minificontroller --updateflow "config yml" + + * Updates the flow file reference and performs a warm re-deploy. + + #### Get full connection command + ./minificontroller --getfull + + * Provides a list of full connections, if any. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index ead8dba..542dd6c 100644 --- a/README.md +++ b/README.md @@ -23,11 +23,12 @@ MiNiFi is a child project effort of Apache NiFi. This repository is for a nativ - [Getting Started](#getting-started) - [System Requirements](#system-requirements) - [Bootstrapping](#bootstrapping) - - [Building](#building) - [Cleaning](#cleaning) - [Configuring](#configuring) - [Running](#running) - [Deploying](#deploying) + - [Extensions](#extensions) +- [Operations](#operations) - [Issue Tracking](#issue-tracking) - [Documentation](#documentation) - [License](#license) @@ -801,66 +802,13 @@ created within the build directory that contains a manifest of build artifacts. The build identifier will be carried with the deployed binary for the configuration you specify. By default all extensions will be built. -### Managing MiNiFi C++ through the MiNiFi Controller - -The MiNiFi controller is an executable in the bin directory that can be used to control the MiNiFi C++ agent while it runs -- utilizing the [Command and Control Protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal). Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy. - -The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required. -The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows -you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only -minificontroller on the local host can control the agent: - - $ controller.socket.host=localhost - $ controller.socket.port=9998 - $ controller.socket.local.any.interface=true/false ( default false) - -These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller -will be disabled in your deployment. - - The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below. - Note that with all commands an immediate response by the agent isn't guaranteed. In all cases the agent assumes the role of validating that a response was received, but execution of said command may take some time depending on a number of factors to include persistent storage type, size of queues, and speed of hardware. - - #### Specifying connecting information - - ./minificontroller --host "host name" --port "port" - - * By default these options use those defined in minifi.properties and are not required - - #### Start Command - - ./minificontroller --start "component name" - - #### Stop command - ./minificontroller --stop "component name" - - #### List connections command - ./minificontroller --list connections - - #### List components command - ./minificontroller --list components - - #### Clear connection command - ./minificontroller --clear "connection name" - - #### GetSize command - ./minificontroller --getsize "connection name" - - * Returns the size of the connection. The current size along with the max will be reported - - #### Update flow - ./minificontroller --updateflow "config yml" - - *Updates the flow file reference and performs a warm re-deploy. - - #### Get full connection command - ./minificontroller --getfull - - *Provides a list of full connections, if any. - ### Extensions Please see [Extensions.md](Extensions.md) on how to build and run conditionally built dependencies and extensions. +## Operations +See our [operations documentation for additional inforomation on how to manage instances](OPS.md) + ## Issue Tracking See https://issues.apache.org/jira/projects/MINIFICPP/issues for the issue tracker. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/controller/Controller.h ---------------------------------------------------------------------- diff --git a/controller/Controller.h b/controller/Controller.h index 312b922..bbd099b 100644 --- a/controller/Controller.h +++ b/controller/Controller.h @@ -125,6 +125,40 @@ int getFullConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream return 0; } +int getJstacks(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out) { + socket->initialize(); + std::vector<uint8_t> data; + uint8_t op = minifi::c2::Operation::DESCRIBE; + minifi::io::BaseStream stream; + stream.writeData(&op, 1); + stream.writeUTF("jstack"); + if (socket->writeData(const_cast<uint8_t*>(stream.getBuffer()), stream.getSize()) < 0) { + return -1; + } + // read the response + uint8_t resp = 0; + socket->readData(&resp, 1); + if (resp == minifi::c2::Operation::DESCRIBE) { + + uint64_t size = 0; + socket->read(size); + + for (int i = 0; i < size; i++) { + std::string name; + uint64_t lines; + socket->readUTF(name); + socket->read(lines); + for (int j = 0; j < lines; j++) { + std::string line; + socket->readUTF(line); + out << name << " -- " << line << std::endl; + } + + } + } + return 0; +} + /** * Prints the connection size for the provided connection. * @param socket socket ptr @@ -168,7 +202,7 @@ int listComponents(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out out << "Components:" << std::endl; for (int i = 0; i < responses; i++) { - std::string name,status; + std::string name, status; socket->readUTF(name, false); socket->readUTF(status, false); out << name << ", running: " << status << std::endl; @@ -244,7 +278,7 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const return service; } - void printManifest(const std::shared_ptr<minifi::Configure> &configuration) { +void printManifest(const std::shared_ptr<minifi::Configure> &configuration) { std::string prov_repo_class = "volatileprovenancerepository"; std::string flow_repo_class = "volatileflowfilerepository"; @@ -252,12 +286,12 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string content_repo_class = "volatilecontentrepository"; std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>(); - log_properties->setHome("./"); - log_properties->set("appender.stdout","stdout"); - log_properties->set("logger.org::apache::nifi::minifi","OFF,stdout"); - logging::LoggerConfiguration::getConfiguration().initialize(log_properties); + log_properties->setHome("./"); + log_properties->set("appender.stdout", "stdout"); + log_properties->set("logger.org::apache::nifi::minifi", "OFF,stdout"); + logging::LoggerConfiguration::getConfiguration().initialize(log_properties); - configuration->set(minifi::Configure::nifi_flow_configuration_file,"../conf/config.yml"); + configuration->set(minifi::Configure::nifi_flow_configuration_file, "../conf/config.yml"); configuration->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class); // Create repos for flow record and provenance std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance"); @@ -280,11 +314,11 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const minifi::setDefaultDirectory(content_repo_path); } - configuration->set("c2.agent.heartbeat.period","25"); - configuration->set("nifi.c2.root.classes","AgentInformation"); - configuration->set("nifi.c2.enable","true"); - configuration->set("c2.agent.listen","true"); - configuration->set("c2.agent.heartbeat.reporter.classes","AgentPrinter"); + configuration->set("c2.agent.heartbeat.period", "25"); + configuration->set("nifi.c2.root.classes", "AgentInformation"); + configuration->set("nifi.c2.enable", "true"); + configuration->set("c2.agent.listen", "true"); + configuration->set("c2.agent.heartbeat.reporter.classes", "AgentPrinter"); configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); @@ -293,7 +327,7 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name); std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>( - new minifi::FlowController(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo,"manifest",false)); + new minifi::FlowController(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo, "manifest", false)); controller->load(); controller->start(); std::this_thread::sleep_for(std::chrono::milliseconds(10000)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/controller/MiNiFiController.cpp ---------------------------------------------------------------------- diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp index b385685..06bba2c 100644 --- a/controller/MiNiFiController.cpp +++ b/controller/MiNiFiController.cpp @@ -129,6 +129,7 @@ int main(int argc, char **argv) { ("getsize", "Reports the size of the associated connection queue", cxxopts::value<std::vector<std::string>>()) //NOLINT ("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value<std::string>()) //NOLINT ("getfull", "Reports a list of full connections") //NOLINT + ("jstack", "Returns backtraces from the agent") //NOLINT ("manifest", "Generates a manifest for the current binary") //NOLINT ("noheaders", "Removes headers from output streams"); @@ -191,13 +192,12 @@ int main(int argc, char **argv) { auto& components = result["c"].as<std::vector<std::string>>(); for (const auto& connection : components) { auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); - if (clearConnection(std::move(socket), connection)){ + if (clearConnection(std::move(socket), connection)) { std::cout << "Sent clear command to " << connection << ". Size before clear operation sent: " << std::endl; socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); if (getConnectionSize(std::move(socket), std::cout, connection) < 0) - std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; - } - else + std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; + } else std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; } } @@ -231,6 +231,12 @@ int main(int argc, char **argv) { std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; } + if (result.count("jstack") > 0) { + auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); + if (getJstacks(std::move(socket), std::cout) < 0) + std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; + } + if (result.count("updateflow") > 0) { auto& flow_file = result["updateflow"].as<std::string>(); auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); @@ -241,10 +247,9 @@ int main(int argc, char **argv) { if (result.count("manifest") > 0) { printManifest(configuration); } - }catch (const std::exception &exc) - { - // catch anything thrown within try block that derives from std::exception - std::cerr << exc.what() << std::endl; + } catch (const std::exception &exc) { + // catch anything thrown within try block that derives from std::exception + std::cerr << exc.what() << std::endl; } catch (...) { std::cout << options.help( { "", "Group" }) << std::endl; exit(0); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/extensions/http-curl/tests/C2JstackTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp new file mode 100644 index 0000000..4141312 --- /dev/null +++ b/extensions/http-curl/tests/C2JstackTest.cpp @@ -0,0 +1,164 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + + +class ConfigHandler : public CivetHandler { + public: + ConfigHandler() { + calls_ = 0; + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + calls_++; + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"describe\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"jstack\"" + "}]}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + heartbeat_response.length()); + mg_printf(conn, "%s", heartbeat_response.c_str()); + + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; + std::atomic<size_t> calls_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "8727", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + + configuration->set("c2.rest.url", "http://localhost:8727/update"); + configuration->set("c2.agent.heartbeat.period", "1000"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + ptr.release(); + auto start = std::chrono::system_clock::now(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + auto then = std::chrono::system_clock::now(); + + auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count(); + std::string logs = LogTestController::getInstance().log_output.str(); + #ifndef WIN32 + assert(logs.find("SchedulingAgent") != std::string::npos); + #endif + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + assert(h_ex.calls_ <= (milliseconds / 1000) + 1); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/extensions/http-curl/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index 721c221..b8d6c69 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -71,6 +71,7 @@ message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test fi add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml" "${TEST_RESOURCES}/") add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 0466546..9309b4f 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -319,6 +319,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi virtual uint64_t getUptime(); + virtual std::vector<BackTrace> getTraces(); + void initializeC2(); protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 682f6ec..925efdb 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -29,6 +29,7 @@ #include <thread> #include "utils/TimeUtil.h" #include "utils/ThreadPool.h" +#include "utils/BackTrace.h" #include "core/Core.h" #include "core/logging/LoggerConfiguration.h" #include "properties/Configure.h" @@ -90,7 +91,7 @@ class SingleRunMonitor : public TimerAwareMonitor { : TimerAwareMonitor(run_monitor) { } explicit SingleRunMonitor(TimerAwareMonitor &&other) - : TimerAwareMonitor(std::move(other)){ + : TimerAwareMonitor(std::move(other)) { } virtual bool isFinished(const uint64_t &result) { if (result == 0) { @@ -123,7 +124,11 @@ class SchedulingAgent { running_ = false; repo_ = repo; flow_repo_ = flow_repo; - auto pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true, controller_service_provider); + /** + * To facilitate traces we cannot use daemon threads -- this could potentially cause blocking on I/O; however, it's a better path + * to be able to debug why an agent doesn't work and still allow a restart via updates in these cases. + */ + auto pool = utils::ThreadPool<uint64_t>(configure_->getInt(Configure::nifi_flow_engine_threads, 2), false, controller_service_provider, "SchedulingAgent"); thread_pool_ = std::move(pool); thread_pool_.start(); } @@ -148,6 +153,10 @@ class SchedulingAgent { thread_pool_.shutdown(); } + std::vector<BackTrace> getTraces() { + return thread_pool_.getTraces(); + } + public: virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/core/state/UpdateController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h index fec3cc1..7cd61c4 100644 --- a/libminifi/include/core/state/UpdateController.h +++ b/libminifi/include/core/state/UpdateController.h @@ -20,6 +20,7 @@ #include <string> #include "utils/ThreadPool.h" +#include "utils/BackTrace.h" namespace org { namespace apache { @@ -69,9 +70,9 @@ class UpdateStatus { class Update { public: - Update() - : status_(UpdateStatus(UpdateState::INITIATE, 0)) { - } + Update() + : status_(UpdateStatus(UpdateState::INITIATE, 0)) { + } Update(UpdateStatus status) : status_(status) { @@ -235,6 +236,13 @@ class StateMonitor : public StateController { */ virtual uint64_t getUptime() = 0; + /** + * Returns a vector of backtraces + * @return backtraces from the state monitor. + */ + virtual std::vector<BackTrace> getTraces() = 0; + + protected: std::atomic<bool> controller_running_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/utils/BackTrace.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/BackTrace.h b/libminifi/include/utils/BackTrace.h new file mode 100644 index 0000000..5c7bb80 --- /dev/null +++ b/libminifi/include/utils/BackTrace.h @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ +#define LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ + +#ifdef HAS_EXECINFO +#include <execinfo.h> +#include <signal.h> +#endif +#include <thread> +#include <vector> +#include <mutex> +#include <iostream> +#include <sstream> + +#define TRACE_BUFFER_SIZE 128 + +/** + * Forward declaration allows us to tightly couple TraceResolver + * with BackTrace. + */ +class TraceResolver; + +/** + * Purpose: Backtrace is a movable vector of trace lines. + * + */ +class BackTrace { + public: + BackTrace() { + } + BackTrace(const std::string &name) + : name_(name) { + } + BackTrace(BackTrace &&) = default; + BackTrace(BackTrace &) = delete; + + std::vector<std::string> getTraces() const { + return trace_; + } + + BackTrace &operator=(BackTrace &&other) = default; + + /** + * Return thread name of f this caller + * @returns name ; + */ + std::string getName() const { + return name_; + } + + protected: + void addLine(const std::string &symbol_line) { + trace_.emplace_back(symbol_line); + } + + private: + std::string name_; + std::vector<std::string> trace_; + friend class TraceResolver; +}; + +/** + * Pulls the trace and places it onto the TraceResolver instance. + */ +void pull_trace(const uint8_t frames_to_skip = 1); + +#ifdef HAS_EXECINFO +/** + * Signal handler that will run via TraceResolver + */ +void handler(int signr, siginfo_t *info, void *secret); +#endif +/** + * Emplaces a signal handler for SIGUSR2 + */ +void emplace_handler(); + +/** + * Purpose: Provides a singular instance to grab the call stack for thread(s). + * Design: is a singleton to avoid multiple signal handlers. + */ +class TraceResolver { + public: + + /** + * Retrieves the backtrace for the provided thread reference + * @return BackTrace instance + */ + BackTrace &&getBackTrace(const std::string &thread_name, std::thread::native_handle_type thread); + + /** + * Retrieves the backtrace for the calling thread + * @returns BackTrace instance + */ + BackTrace &&getBackTrace(const std::string &thread_name) { +#ifdef WIN32 + // currrently not supported in windows + return BackTrace(thread_name); +#else + return std::move(getBackTrace(thread_name, pthread_self())); +#endif + } + + /** + * Returns a static instance of the thread resolver. + */ + static TraceResolver &getResolver() { + static TraceResolver resolver; + return resolver; + } + + /** + * Adds a trace line with an optional function + * @param symbol_line symbol line that was produced + * @param func function name + */ + void addTraceLine(const char *symbol_line, const char *func = nullptr) { + std::stringstream line; + line << symbol_line; + if (nullptr != func) { + line << " @" << func; + } + trace_.addLine(line.str()); + } + + /** + * Returns the thread handle reference in the native format. + */ + const std::thread::native_handle_type getThreadHandle() { + return thread_handle_; + } + + /** + * Returns the caller handle refernce in the native format. + */ + const std::thread::native_handle_type getCallerHandle() { + return caller_handle_; + } + + private: + TraceResolver() // can't use = default due to handle_types not defaulting. + : thread_handle_(0), + caller_handle_(0) { + ; + } + + BackTrace trace_; + std::thread::native_handle_type thread_handle_; + std::thread::native_handle_type caller_handle_; + std::mutex mutex_; +}; + +#endif /* LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 9fc47f5..ffb28fe 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -18,6 +18,7 @@ #define LIBMINIFI_INCLUDE_THREAD_POOL_H #include <chrono> +#include <sstream> #include <iostream> #include <atomic> #include <mutex> @@ -28,6 +29,7 @@ #include <thread> #include <functional> +#include "BackTrace.h" #include "capi/expect.h" #include "controllers/ThreadManagementService.h" #include "concurrentqueue.h" @@ -189,17 +191,20 @@ std::shared_ptr<std::promise<T>> Worker<T>::getPromise() { class WorkerThread { public: - explicit WorkerThread(std::thread thread) + explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker") : is_running_(false), - thread_(std::move(thread)) { + thread_(std::move(thread)), + name_(name) { } - WorkerThread() - : is_running_(false) { + WorkerThread(const std::string &name = "NamelessWorker") + : is_running_(false), + name_(name) { } std::atomic<bool> is_running_; std::thread thread_; + std::string name_; }; /** @@ -212,13 +217,15 @@ template<typename T> class ThreadPool { public: - ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr) + ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr, + const std::string &name = "NamelessPool") : daemon_threads_(daemon_threads), thread_reduction_count_(0), max_worker_threads_(max_worker_threads), adjust_threads_(false), running_(false), - controller_service_provider_(controller_service_provider) { + controller_service_provider_(controller_service_provider), + name_(name) { current_workers_ = 0; task_count_ = 0; thread_manager_ = nullptr; @@ -231,7 +238,8 @@ class ThreadPool { adjust_threads_(false), running_(false), controller_service_provider_(std::move(other.controller_service_provider_)), - thread_manager_(std::move(other.thread_manager_)) { + thread_manager_(std::move(other.thread_manager_)), + name_(std::move(other.name_)) { current_workers_ = 0; task_count_ = 0; } @@ -264,6 +272,22 @@ class ThreadPool { return task_status_[identifier] == true; } + std::vector<BackTrace> getTraces() { + std::vector<BackTrace> traces; + std::lock_guard<std::recursive_mutex> lock(manager_mutex_); + std::unique_lock<std::mutex> wlock(worker_queue_mutex_); + // while we may be checking if running, we don't want to + // use the threads outside of the manager mutex's lock -- therefore we will + // obtain a lock so we can keep the threads in memory + if (running_) { + for (const auto &worker : thread_queue_) { + if (worker->is_running_) + traces.emplace_back(TraceResolver::getResolver().getBackTrace(worker->name_, worker->thread_.native_handle())); + } + } + return traces; + } + /** * Starts the Thread Pool */ @@ -315,6 +339,8 @@ class ThreadPool { if (!running_) { start(); } + + name_ = other.name_; return *this; } @@ -367,6 +393,8 @@ class ThreadPool { std::recursive_mutex manager_mutex_; // work queue mutex std::mutex worker_queue_mutex_; + // thread pool name + std::string name_; /** * Call for the manager to start worker threads @@ -404,7 +432,9 @@ bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) { template<typename T> void ThreadPool<T>::manageWorkers() { for (int i = 0; i < max_worker_threads_; i++) { - auto worker_thread = std::make_shared<WorkerThread>(); + std::stringstream thread_name; + thread_name << name_ << " #" << i; + auto worker_thread = std::make_shared<WorkerThread>(thread_name.str()); worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread)); thread_queue_.push_back(worker_thread); current_workers_++; @@ -461,6 +491,7 @@ void ThreadPool<T>::manageWorkers() { template<typename T> void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) { auto waitperiod = std::chrono::milliseconds(1) * 100; + thread->is_running_ = true; uint64_t wait_decay_ = 0; uint64_t yield_backoff = 10; // start at 10 ms while (running_.load()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 9206f41..25b4fc3 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -910,6 +910,15 @@ uint64_t FlowController::getUptime() { return time_since; } +std::vector<BackTrace> FlowController::getTraces() { + std::vector<BackTrace> traces; + auto timer_driven = timer_scheduler_->getTraces(); + traces.insert(traces.end(), std::make_move_iterator(timer_driven.begin()), std::make_move_iterator(timer_driven.end())); + auto event_driven = event_scheduler_->getTraces(); + traces.insert(traces.end(), std::make_move_iterator(event_driven.begin()), std::make_move_iterator(event_driven.end())); + return traces; +} + } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/Properties.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index e64a92f..c8cb341 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -26,7 +26,7 @@ namespace apache { namespace nifi { namespace minifi { -#define BUFFER_SIZE 512 +#define TRACE_BUFFER_SIZE 512 Properties::Properties() : logger_(logging::LoggerFactory<Properties>::getLogger()) { @@ -138,8 +138,8 @@ void Properties::loadConfigureFile(const char *fileName) { } this->clear(); - char buf[BUFFER_SIZE]; - for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) { + char buf[TRACE_BUFFER_SIZE]; + for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, TRACE_BUFFER_SIZE)) { parseConfigureFileLine(buf); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index ebb4549..8168c77 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -173,21 +173,25 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf if (allow_updates_) { if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) { char cwd[1024]; - getcwd(cwd, sizeof(cwd)); + if (getcwd(cwd, sizeof(cwd)) == nullptr) { + logger_->log_error("Could not set update command, reason %s", std::strerror(errno)); - std::stringstream command; - command << cwd << "/minifi.sh update"; - update_command_ = command.str(); + } else { + std::stringstream command; + command << cwd << "/minifi.sh update"; + update_command_ = command.str(); + } } if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) { char cwd[1024]; - getcwd(cwd, sizeof(cwd)); - - std::stringstream copy_path; - std::stringstream command; - - copy_path << cwd << "/minifi.update"; + if (getcwd(cwd, sizeof(cwd)) == nullptr) { + logger_->log_error("Could not set copy path, reason %s", std::strerror(errno)); + } else { + std::stringstream copy_path; + std::stringstream command; + copy_path << cwd << "/minifi.update"; + } } // if not defined we won't beable to update @@ -536,6 +540,31 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { enqueue_c2_response(std::move(response)); return; + } else if (resp.name == "jstack") { + if (update_sink_->isRunning()) { + const std::vector<BackTrace> traces = update_sink_->getTraces(); + for (const auto &trace : traces) { + for (const auto & line : trace.getTraces()) { + logger_->log_trace("%s -- %s", trace.getName(), line); + } + } + auto keys = configuration_->getConfiguredKeys(); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + response.setLabel("configuration_options"); + for (const auto &trace : traces) { + C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true); + options.setLabel(trace.getName()); + std::string value; + for (const auto &line : trace.getTraces()) { + C2ContentResponse option(Operation::ACKNOWLEDGE); + option.name = line; + option.operation_arguments[line] = line; + options.addContent(std::move(option)); + } + response.addPayload(std::move(options)); + } + enqueue_c2_response(std::move(response)); + } } C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); @@ -720,14 +749,19 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { void C2Agent::restart_agent() { char cwd[1024]; - getcwd(cwd, sizeof(cwd)); + if (getcwd(cwd, sizeof(cwd)) == nullptr) { + logger_->log_error("Could not restart agent, reason %s", std::strerror(errno)); + return; + } std::stringstream command; command << cwd << "/minifi.sh restart"; } void C2Agent::update_agent() { - system(update_command_.c_str()); + if (!system(update_command_.c_str())) { + logger_->log_warn("May not have command processor"); + } } int16_t C2Agent::setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/c2/ControllerSocketProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index 2bd6d4d..d4f3970 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -187,6 +187,22 @@ void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller resp.writeUTF(component->isRunning() ? "true" : "false"); } stream->writeData(const_cast<uint8_t*>(resp.getBuffer()), resp.getSize()); + } else if (what == "jstack") { + io::BaseStream resp; + resp.writeData(&head, 1); + auto traces = update_sink_->getTraces(); + uint64_t trace_size = traces.size(); + resp.write(trace_size); + for (const auto &trace : traces) { + const auto &lines = trace.getTraces(); + resp.writeUTF(trace.getName()); + uint64_t lsize = lines.size(); + resp.write(lsize); + for (const auto &line : lines) { + resp.writeUTF(line); + } + } + stream->writeData(const_cast<uint8_t*>(resp.getBuffer()), resp.getSize()); } else if (what == "connections") { io::BaseStream resp; resp.writeData(&head, 1); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/src/utils/BackTrace.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/BackTrace.cpp b/libminifi/src/utils/BackTrace.cpp new file mode 100644 index 0000000..160a070 --- /dev/null +++ b/libminifi/src/utils/BackTrace.cpp @@ -0,0 +1,132 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/BackTrace.h" +#ifdef HAS_EXECINFO +#include <execinfo.h> +#include <iostream> +#include <cxxabi.h> +#endif +#define NAME_SIZE 256 + +void pull_trace(const uint8_t frames_to_skip) { +#ifdef HAS_EXECINFO + void *stackBuffer[TRACE_BUFFER_SIZE + 1]; + + // retrieve current stack addresses + int trace_size = backtrace(stackBuffer, TRACE_BUFFER_SIZE); + + char **symboltable = backtrace_symbols(stackBuffer, trace_size); + /** + * we can skip the signal handler, call to pull_trace, and the first entry for backtrace_symbols + */ + for (int i = frames_to_skip; i < trace_size; i++) { + char *start_parenthetical = 0; + char *functor = 0; + char *stop_parenthetical = 0; + + for (char *p = symboltable[i]; *p; ++p) { + if (*p == '(') { + start_parenthetical = p; + } else if (*p == '+') { + functor = p; + } else if (*p == ')' && functor) { + stop_parenthetical = p; + break; + } + } + bool hasFunc = start_parenthetical && functor && stop_parenthetical; + if (hasFunc && start_parenthetical < functor) { + *start_parenthetical++ = '\0'; + *functor++ = '\0'; + *stop_parenthetical = '\0'; + + /** + * Demangle the names -- this requires calling cxx api to demangle the function name. + * not sending an allocated buffer, so we'll deallocate if status is zero. + */ + + int status; + + auto demangled = abi::__cxa_demangle(start_parenthetical, nullptr, nullptr, &status); + if (status == 0) { + TraceResolver::getResolver().addTraceLine(symboltable[i], demangled); + free(demangled); + } else { + TraceResolver::getResolver().addTraceLine(symboltable[i], start_parenthetical); + } + } else { + TraceResolver::getResolver().addTraceLine(symboltable[i], ""); + } + } + + free(symboltable); +#endif +} + +BackTrace &&TraceResolver::getBackTrace(const std::string &thread_name, std::thread::native_handle_type thread_handle) { + // lock so that we only perform one backtrace at a time. +#ifdef HAS_EXECINFO + std::lock_guard<std::mutex> lock(mutex_); + + caller_handle_ = pthread_self(); + thread_handle_ = thread_handle; + trace_ = BackTrace(thread_name); + + if (0 == thread_handle_ || pthread_equal(caller_handle_, thread_handle)) { + pull_trace(); + } else { + if (thread_handle_ == 0) { + return std::move(trace_); + } + emplace_handler(); + if (pthread_kill(thread_handle_, SIGUSR2) != 0) { + return std::move(trace_); + } + sigset_t mask; + sigfillset(&mask); + sigdelset(&mask, SIGUSR2); + sigsuspend(&mask); + } +#else + // even if tracing is disabled, include thread name into the trace object + trace_ = BackTrace(thread_name); +#endif + return std::move(trace_); +} +#ifdef HAS_EXECINFO +void handler(int signr, siginfo_t *info, void *secret) { + auto curThread = pthread_self(); + + // not the intended thread + if (!pthread_equal(curThread, TraceResolver::getResolver().getThreadHandle())) { + return; + } + + pull_trace(); + + pthread_kill(TraceResolver::getResolver().getCallerHandle(), SIGUSR2); +} +#endif + +void emplace_handler() { +#ifdef HAS_EXECINFO + struct sigaction sa; + sigfillset(&sa.sa_mask); + sa.sa_flags = SA_SIGINFO; + sa.sa_sigaction = handler; + sigaction(SIGUSR2, &sa, NULL); +#endif +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/test/unit/BackTraceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp new file mode 100644 index 0000000..816ff63 --- /dev/null +++ b/libminifi/test/unit/BackTraceTests.cpp @@ -0,0 +1,116 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <utility> +#include <future> +#include <memory> +#include "../TestBase.h" +#include "utils/BackTrace.h" + +bool function() { + return true; +} + +class WorkerNumberExecutions : public utils::AfterExecute<int> { + public: + explicit WorkerNumberExecutions(int tasks) + : runs(0), + tasks(tasks) { + } + + explicit WorkerNumberExecutions(WorkerNumberExecutions && other) + : runs(std::move(other.runs)), + tasks(std::move(other.tasks)) { + } + + ~WorkerNumberExecutions() { + } + + virtual bool isFinished(const int &result) { + if (result > 0 && ++runs < tasks) { + return false; + } else { + return true; + } + } + virtual bool isCancelled(const int &result) { + return false; + } + + int getRuns() { + return runs; + } + + virtual int64_t wait_time() { + // wait 50ms + return 50; + } + + protected: + int runs; + int tasks; +}; + +TEST_CASE("BT1", "[TPT1]") { + const BackTrace trace = TraceResolver::getResolver().getBackTrace("BT1"); +#ifdef HAS_EXECINFO + REQUIRE(!trace.getTraces().empty()); +#endif +} + +std::atomic<int> counter; + +int counterFunction() { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + return ++counter; +} + +TEST_CASE("BT2", "[TPT2]") { + counter = 0; + utils::ThreadPool<int> pool(4); + pool.start(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for (int i = 0; i < 3; i++) { + std::function<int()> f_ex = counterFunction; + std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5)); + utils::Worker<int> functor(f_ex, "id", std::move(after_execute)); + + std::future<int> fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); + } + + std::function<int()> f_ex = counterFunction; + std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5)); + utils::Worker<int> functor(f_ex, "id", std::move(after_execute)); + + std::future<int> fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); + + std::vector<BackTrace> traces = pool.getTraces(); + for (const auto &trace : traces) { + const auto &trace_strings = trace.getTraces(); +#ifdef HAS_EXECINFO + REQUIRE(trace_strings.size() > 2); + if (trace_strings.at(0).find("sleep_for") != std::string::npos) { + REQUIRE(trace_strings.at(1).find("counterFunction") != std::string::npos); + } +#endif + } + fut.wait(); +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6a7f9890/libminifi/test/unit/ControllerTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp index c5268ab..0e75566 100644 --- a/libminifi/test/unit/ControllerTests.cpp +++ b/libminifi/test/unit/ControllerTests.cpp @@ -110,6 +110,10 @@ class TestUpdateSink : public minifi::state::StateMonitor { virtual int16_t pause() { return 0; } + virtual std::vector<BackTrace> getTraces() { + std::vector<BackTrace> traces; + return traces; + } /** * Operational controllers
