MINIFICPP-418: Add build time information, ability to deploy, and run time information to c2 response and build output MINIFICPP-395: Add transfer capability that runs a rollback command on failure MINIFICPP-417: Resolve issue with RapidJson changes.
MINIFICPP-417: Resolve linter issues MINIFICPP-418: Update package name MINIFICPP-418: Add flow URI introspection and update flow version MINIFICPP-468: Add configurable agent information and update readme This closes #314. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a330c57a Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a330c57a Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a330c57a Branch: refs/heads/master Commit: a330c57a59fc3dd1c12948f8aabf1854463bee8f Parents: eb9128c Author: Marc Parisi <[email protected]> Authored: Sat Jan 27 21:20:10 2018 -0500 Committer: Aldrin Piri <[email protected]> Committed: Fri May 4 09:19:55 2018 -0400 ---------------------------------------------------------------------- .gitignore | 1 + C2.md | 129 ++++ CMakeLists.txt | 26 +- README.md | 29 +- bin/minifi.sh | 32 + bootstrap.sh | 105 ++-- centos.sh | 2 + controller/Controller.h | 56 ++ controller/MiNiFiController.cpp | 11 +- darwin.sh | 4 +- deploy.sh | 27 + extensions/http-curl/HTTPCurlLoader.h | 1 - extensions/http-curl/client/HTTPClient.cpp | 7 +- extensions/http-curl/protocols/AgentPrinter.cpp | 141 +++++ extensions/http-curl/protocols/AgentPrinter.h | 80 +++ extensions/http-curl/protocols/RESTSender.cpp | 34 +- .../http-curl/tests/C2FailedUpdateTest.cpp | 186 ++++++ .../http-curl/tests/C2UpdateAgentTest.cpp | 184 ++++++ extensions/http-curl/tests/C2UpdateTest.cpp | 6 +- extensions/http-curl/tests/CMakeLists.txt | 2 + generateVersion.sh | 83 +++ libminifi/include/FlowController.h | 47 +- libminifi/include/agent/build_description.h | 101 ++++ libminifi/include/c2/C2Agent.h | 37 +- libminifi/include/c2/C2Payload.h | 67 ++- libminifi/include/c2/protocols/RESTProtocol.h | 12 +- libminifi/include/core/ClassLoader.h | 22 + libminifi/include/core/ConfigurableComponent.h | 17 +- libminifi/include/core/Core.h | 2 +- libminifi/include/core/ProcessGroup.h | 4 +- libminifi/include/core/Property.h | 2 +- .../core/repository/VolatileRepository.h | 14 +- libminifi/include/core/state/StateManager.h | 29 +- libminifi/include/core/state/UpdateController.h | 4 +- libminifi/include/core/state/Value.h | 214 +++++++ .../core/state/metrics/DeviceInformation.h | 313 ---------- .../include/core/state/metrics/MetricsBase.h | 161 ----- .../core/state/metrics/MetricsListener.h | 128 ---- .../include/core/state/metrics/ProcessMetrics.h | 102 ---- .../include/core/state/metrics/QueueMetrics.h | 106 ---- .../core/state/metrics/RepositoryMetrics.h | 101 ---- .../include/core/state/metrics/SystemMetrics.h | 108 ---- .../include/core/state/nodes/AgentInformation.h | 586 +++++++++++++++++++ .../include/core/state/nodes/BuildInformation.h | 135 +++++ .../core/state/nodes/DeviceInformation.h | 355 +++++++++++ .../include/core/state/nodes/FlowInformation.h | 273 +++++++++ .../include/core/state/nodes/MetricsBase.h | 264 +++++++++ .../include/core/state/nodes/ProcessMetrics.h | 103 ++++ .../include/core/state/nodes/QueueMetrics.h | 106 ++++ .../core/state/nodes/RepositoryMetrics.h | 101 ++++ .../include/core/state/nodes/StateMonitor.h | 87 +++ .../include/core/state/nodes/SystemMetrics.h | 117 ++++ .../core/state/nodes/TreeUpdateListener.h | 128 ++++ libminifi/include/processors/GetFile.h | 29 +- libminifi/include/processors/GetTCP.h | 30 +- libminifi/include/properties/Configure.h | 1 + libminifi/include/utils/ByteArrayCallback.h | 8 +- libminifi/include/utils/FileOutputCallback.h | 77 +++ libminifi/include/utils/HTTPClient.h | 1 + libminifi/include/utils/file/FileUtils.h | 9 + libminifi/src/Configure.cpp | 1 + libminifi/src/FlowController.cpp | 333 +++++++++-- libminifi/src/c2/C2Agent.cpp | 276 +++++++-- libminifi/src/c2/C2Payload.cpp | 23 +- libminifi/src/c2/ControllerSocketProtocol.cpp | 8 +- libminifi/src/c2/protocols/RESTProtocol.cpp | 234 +++++--- libminifi/src/core/ConfigurableComponent.cpp | 27 +- libminifi/src/core/ProcessGroup.cpp | 2 +- libminifi/src/core/Property.cpp | 3 +- libminifi/src/core/state/StateManager.cpp | 17 +- libminifi/src/core/yaml/YamlConfiguration.cpp | 7 +- libminifi/src/processors/GetFile.cpp | 2 +- libminifi/src/processors/GetTCP.cpp | 2 +- libminifi/src/utils/ByteArrayCallback.cpp | 2 - libminifi/src/utils/FileOutputCallback.cpp | 61 ++ libminifi/src/utils/HTTPClient.cpp | 46 ++ libminifi/test/resources/TestBad.yml | 74 +++ libminifi/test/unit/C2MetricsTests.cpp | 96 +-- libminifi/test/unit/ControllerTests.cpp | 4 +- 79 files changed, 4937 insertions(+), 1428 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index c687dee..d89d4b6 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ target thirdparty/**/*.o thirdparty/**/*.a libminifi/test/**/*.a +libminifi/include/agent/agent_version.h docs/generated thirdparty/apache-rat/apache-rat* http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/C2.md ---------------------------------------------------------------------- diff --git a/C2.md b/C2.md new file mode 100644 index 0000000..194467c --- /dev/null +++ b/C2.md @@ -0,0 +1,129 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Apache NiFi - MiNiFi - C++ C2 Readme. + + +This readme defines the Command and control configuration options that work with Apache NiFi. All +options defined are located in minifi.properties. + +## Table of Contents + +- [Description](#description) +- [Configuration](#configuration) + - [Base Options](#base-options) + - [Metrics](#metrics) + +## Description + +Apache NiFi MiNiFI C++ can communicates with a C2 Server via a number of protocols. These protocols +deliver a C2 response the server, expecting requests in a hearbeat response. The protocol transforms +the C2 messages into a protocol specific representation. The internal representation is an AST therefore +you must define the root classes, which configure the classes that branch from the root. You can define +arbitrary nodes and sub-trees, but this isn't necessary and only advantageous for custom C2 servers. That +will be explained in greater detail in the metrics section. + +## Configuration + +### Base Options +For more more insight into the API used within the C2 agent, please visit: +https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal + + + in minifi.properties + + # Disable/Enable C2 + nifi.c2.enable=true + + # specify classes for the AST response + nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation + + # specify C2 protocol -- default is RESTSender if this is not specified + c2.agent.protocol.class=RESTSender + + # control c2 heartbeat interval in millisecocnds + c2.agent.heartbeat.period=3000 + + # enable reporter classes + c2.agent.heartbeat.reporter.class=RESTReciver + + # specify the rest URIs if using RESTSender + c2.rest.url=http://localhost:10080/minifi-c2-api/c2-protocol/heartbeat + c2.rest.url.ack=http://localhost:10080/minifi-c2-api/c2-protocol/acknowledge + + # c2 agent identifier + nifi.c2.agent.identifier=<your identifier> + + # c2 agent class + nifi.c2.agent.class=<your agent class> + + # configure SSL Context service for REST Protocol + c2.rest.ssl.context.service + + +### Metrics + +Command and Control metrics can be used to send metrics through the heartbeat or via the DESCRIBE +operation. Since responses are formed in an AST, metrics can be formed as a sub tree. Metrics classes +are defined apriori and may reference a metrics class specific to a processor. The following example describes +a configuration of an agent + + # in minifi.properties + + + nifi.c2.root.class.definitions=metrics + nifi.c2.root.class.definitions.metrics.name=metrics + nifi.c2.root.class.definitions.metrics.metrics=typedmetrics,processorMetrics + nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.name=RuntimeMetrics + nifi.c2.root.class.definitions.metrics.metrics.typedmetrics.classes=ProcessMetrics,SystemInformation + nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetrics + nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics + +This example shows a metrics sub tree defined by the option 'nifi.c2.root.class.definitions'. + +This is a comma separated list of all sub trees. In the example, above, only one sub tree exists: metrics. + +The options below metrics define the sub-trees within metrics: typedmetrics and processorMetrics. Each of these has a name. +The classes sub option will define the metrics classes that are placed within this sub-tree. For the RESTProtocol, the above +configuration produces the following JSON: + + "metrics": { + "ProcessorMetrics": { + "GetFileMetrics": { + "AcceptedFiles": 22, + "InputBytes": 61755, + "OnTriggerInvocations": 1 + } + }, + "RuntimeMetrics": { + "ProcessMetrics": { + "CpuMetrics": { + "involcs": 1 + }, + "MemoryMetrics": { + "maxrss": 145804 + } + }, + "systeminfo": { + "systemInfo": { + "machinearch": "x86_64", + "physicalMem": 67361411072, + "vCores": 12 + }, + "identifier": "identifier" + } + } + } + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 163e637..8594eaf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,7 +21,7 @@ cmake_minimum_required(VERSION 3.0) project(nifi-minifi-cpp) set(PROJECT_NAME "nifi-minifi-cpp") set(PROJECT_VERSION_MAJOR 0) -set(PROJECT_VERSION_MINOR 4) +set(PROJECT_VERSION_MINOR 5) set(PROJECT_VERSION_PATCH 0) option(SKIP_TESTS "Skips building all tests." OFF) option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON) @@ -292,6 +292,30 @@ if (NOT DISABLE_CURL) endif() +get_property(selected_extensions GLOBAL PROPERTY EXTENSION-OPTIONS) + +if (NOT BUILD_IDENTIFIER) + message("empty identifier ${BUILD_IDENTIFIER}") + if (NOT BUILD_IDENTIFIER) + message("empty identifier ${BUILD_IDENTIFIER}") + string(RANDOM LENGTH 24 BUILD_IDENTIFIER) + set(BUILD_IDENTIFIER "${BUILD_IDENTIFIER}" CACHE STRING "Build identifier") + endif() +endif() + +message("BUILD_IDENTIFIER is ${BUILD_IDENTIFIER}") + +execute_process(COMMAND + "${CMAKE_CURRENT_SOURCE_DIR}/generateVersion.sh" + "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}" + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/libminifi/include/agent/ + "${CMAKE_CXX_COMPILER}" + "${CMAKE_CXX_COMPILER_VERSION}" + "${CMAKE_CXX_FLAGS}" + "${selected_extensions}" + "${BUILD_IDENTIFIER}") + # Generate source assembly set(ASSEMBLY_BASE_NAME "${CMAKE_PROJECT_NAME}-${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}") set(CPACK_SOURCE_GENERATOR "TGZ") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index f102faf..91c41c6 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ MiNiFi is a child project effort of Apache NiFi. This repository is for a nativ - [Cleaning](#cleaning) - [Configuring](#configuring) - [Running](#running) + - [Deploying](#deploying) - [Documentation](#documentation) - [License](#license) @@ -572,25 +573,7 @@ To enable HTTP Proxy for a remote process group. proxy password: ### Command and Control Configuration -For more more insight into the API used within the C2 agent, please visit: -https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal - - in minifi.properties - - #Disable/Enable C2 - nifi.c2.enable=true - - #specify metrics classes - nifi.flow.metrics.classes=DeviceInformation,SystemInformation,ProcessMetrics - - #specify C2 protocol - c2.agent.protocol.class=RESTSender - - #control c2 heartbeat interval in millisecocnds - c2.agent.heartbeat.period=3000 - - # enable reporter classes - c2.agent.heartbeat.reporter.class=RESTReciver +Please see the [C2 readme](C2.md) for more informatoin ### Configuring Repository storage locations @@ -738,6 +721,14 @@ MiNiFi can also be installed as a system service using minifi.sh with an optiona $ ./bin/minifi.sh install [service name] +### Deploying +MiNiFi C++ comes with a deployment script. This will build and package minifi. Additionally, a file named build_output will be +created within the build directory that contains a manifest of build artifacts. + + $ deploy.sh <build identifier> + +The build identifier will be carried with the deployed binary for the configuration you specify. By default all extensions will be built. + ### Managing MiNFI C++ through the MiNiFi Controller The MiNiFi controller is an executable in the bin directory that can be used to control the MiNFi C++ agent while it runs. Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/bin/minifi.sh ---------------------------------------------------------------------- diff --git a/bin/minifi.sh b/bin/minifi.sh index dca94f3..4fc7123 100755 --- a/bin/minifi.sh +++ b/bin/minifi.sh @@ -225,6 +225,22 @@ case "\$1" in exit 3; fi ;; + update) + if [ -f \${bin_dir}/minifi.update ]; then + \${bin_dir}/minifi.sh stop + cp \${bin_dir}/minifi \${bin_dir}/minifi.bak + cp \${bin_dir}/minifi.update \${bin_dir}/minifi + # ensure that the command is now running + \${bin_dir}/minifi.sh start + saved_pid=\$(get_pid) + if [ "\${saved_pid}" -gt 0 ]; then + if [ \$(active_pid \${saved_pid}) -ne 0 ]; then + cp \${bin_dir}/minifi.bak \${bin_dir}/minifi + \${bin_dir}/minifi.sh start + fi + fi + fi + ;; restart) echo Restarting MiNiFi service \${bin_dir}/minifi.sh stop @@ -324,6 +340,22 @@ case "$1" in exit 3; fi ;; + update) + if [ -f ${bin_dir}/minifi.update ]; then + ${bin_dir}/minifi.sh stop + cp ${bin_dir}/minifi ${bin_dir}/minifi.bak + cp ${bin_dir}/minifi.update ${bin_dir}/minifi + # ensure that the command is now running + ${bin_dir}/minifi.sh start + saved_pid=$(get_pid) + if [ "${saved_pid}" -gt 0 ]; then + if [ $(active_pid ${saved_pid}) -ne 0 ]; then + cp ${bin_dir}/minifi.bak ${bin_dir}/minifi + ${bin_dir}/minifi.sh start + fi + fi + fi + ;; restart) echo Restarting MiNiFi service ${bin_dir}/minifi.sh stop http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/bootstrap.sh ---------------------------------------------------------------------- diff --git a/bootstrap.sh b/bootstrap.sh index a23b0d5..7706a58 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -24,17 +24,19 @@ NO_COLOR='\033[0;0;39m' CORES=1 BUILD="false" PACKAGE="false" - +BUILD_IDENTIFIER="" TRUE="Enabled" FALSE="Disabled" FEATURES_SELECTED="false" AUTO_REMOVE_EXTENSIONS="true" export NO_PROMPT="false" +DEPLOY="false" OPTIONS=() CMAKE_OPTIONS_ENABLED=() CMAKE_OPTIONS_DISABLED=() CMAKE_MIN_VERSION=() +DEPLOY_LIMITS=() DEPENDENCIES=() @@ -61,6 +63,12 @@ add_disabled_option(){ if [ ! -z "$4" ]; then CMAKE_MIN_VERSION+=("$1:$4") fi + + if [ ! -z "$5" ]; then + if [ "$5" = "true" ]; then + DEPLOY_LIMITS+=("$1") + fi + fi } add_dependency(){ @@ -69,11 +77,37 @@ add_dependency(){ ### parse the command line arguments + +EnableAllFeatures(){ + for option in "${OPTIONS[@]}" ; do + feature_status=${!option} + if [ "$feature_status" = "${FALSE}" ]; then + ToggleFeature $option + fi + # eval "$option=${TRUE}" + done +} + while :; do case $1 in -n|--noprompt) NO_PROMPT="true" ;; + -e|--enableall) + NO_PROMPT="true" + FEATURES_SELECTED="true" + EnableAllFeatures + ;; + -d|--deploy) + NO_PROMPT="true" + DEPLOY="true" + FEATURES_SELECTED="true" + EnableAllFeatures + ;; + -t|--travis) + NO_PROMPT="true" + FEATURES_SELECTED="true" + ;; -p|--package) CORES=$(grep -c ^processor /proc/cpuinfo 2>/dev/null || sysctl -n hw.ncpu) BUILD="true" @@ -83,6 +117,9 @@ while :; do CORES=$(grep -c ^processor /proc/cpuinfo 2>/dev/null || sysctl -n hw.ncpu) BUILD="true" ;; + "--build_identifier="* ) + BUILD_IDENTIFIER="${1#*=}" + ;; *) break esac shift @@ -107,14 +144,7 @@ if [ "$NO_PROMPT" = "true" ]; then agree="N" echo "****************************************" echo "Welcome, this boostrap script will update your system to install MiNIFi C++" - echo "You have opted to skip prompts. Do you agree to this script installing" - echo "System packages without prompting you?" - read -p "Enter Y to continue, N to exit [ Y/N ] " agree - if [ "$agree" = "Y" ] || [ "$agree" = "y" ]; then - echo "Continuing..." - else - exit - fi + echo "You have opted to skip prompts. " fi @@ -239,18 +269,29 @@ add_disabled_option KAFKA_ENABLED ${FALSE} "ENABLE_LIBRDKAFKA" "3.4.0" add_disabled_option MQTT_ENABLED ${FALSE} "ENABLE_MQTT" -#add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" -#add_dependency BUSTACHE_ENABLED "boost" +# Since the following extensions have limitations on + +add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE} +add_dependency BUSTACHE_ENABLED "boost" ## currently need to limit on certain platforms -#add_disabled_option TENSORFLOW_ENABLED ${FALSE} "ENABLE_TENSORFLOW" +add_disabled_option TENSORFLOW_ENABLED ${FALSE} "ENABLE_TENSORFLOW" "2.6" ${TRUE} +add_dependency TENSORFLOW_ENABLED "tensorflow" pause(){ read -p "Press [Enter] key to continue..." fackEnterKey } - +can_deploy(){ + for option in "${DEPLOY_LIMITS[@]}" ; do + OPT=${option%%:*} + if [ "${OPT}" = "$1" ]; then + echo "false" + fi + done + echo "true" +} ToggleFeature(){ VARIABLE_VALUE=${!1} @@ -278,21 +319,15 @@ ToggleFeature(){ fi done CAN_ENABLE=$(verify_enable $1) + CAN_DEPLOY=$(can_deploy $1) if [ "$CAN_ENABLE" = "true" ]; then - eval "$1=${TRUE}" + if [[ "$DEPLOY" = "true" && "$CAN_DEPLOY" = "true" ]] || [[ "$DEPLOY" = "false" ]]; then + eval "$1=${TRUE}" + fi fi fi } -EnableAllFeatures(){ - for option in "${OPTIONS[@]}" ; do - feature_status=${!option} - if [ "$feature_status" = "${FALSE}" ]; then - ToggleFeature $option - fi - # eval "$option=${TRUE}" - done -} print_feature_status(){ feature="$1" @@ -334,24 +369,6 @@ print_feature_status(){ } -### parse the command line arguments - -while :; do - case $1 in - -e|--enableall) - NO_PROMPT="true" - FEATURES_SELECTED="true" - EnableAllFeatures - ;; - -t|--travis) - NO_PROMPT="true" - FEATURES_SELECTED="true" - ;; - *) break - esac - shift -done - if [ ! -d "build" ]; then mkdir build/ else @@ -371,7 +388,7 @@ fi ## change to the directory -cd build +pushd build show_supported_features() { @@ -495,6 +512,8 @@ build_cmake_command(){ CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -DPORTABLE=OFF " fi + CMAKE_BUILD_COMMAND="${CMAKE_BUILD_COMMAND} -DBUILD_IDENTIFIER=${BUILD_IDENTIFIER}" + add_os_flags curl -V | grep OpenSSL &> /dev/null @@ -530,4 +549,4 @@ if [ "$PACKAGE" = "true" ]; then make package fi - +popd http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/centos.sh ---------------------------------------------------------------------- diff --git a/centos.sh b/centos.sh index 03d3b56..2794576 100644 --- a/centos.sh +++ b/centos.sh @@ -24,6 +24,8 @@ verify_enable() { echo "false" elif [ "$feature" = "USB_ENABLED" ]; then echo "false" + elif [ "$feature" = "TENSORFLOW_ENABLED" ]; then + echo "false" else echo "true" fi http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/controller/Controller.h ---------------------------------------------------------------------- diff --git a/controller/Controller.h b/controller/Controller.h index 95e05b1..0a2b292 100644 --- a/controller/Controller.h +++ b/controller/Controller.h @@ -244,4 +244,60 @@ std::shared_ptr<core::controller::ControllerService> getControllerService(const return service; } + void printManifest(const std::shared_ptr<minifi::Configure> &configuration) { + + std::string prov_repo_class = "volatileprovenancerepository"; + std::string flow_repo_class = "volatileflowfilerepository"; + std::string nifi_configuration_class_name = "yamlconfiguration"; + std::string content_repo_class = "volatilecontentrepository"; + + std::shared_ptr<logging::LoggerProperties> log_properties = std::make_shared<logging::LoggerProperties>(); + log_properties->setHome("./"); + log_properties->set("appender.stdout","stdout"); + log_properties->set("logger.org::apache::nifi::minifi","OFF,stdout"); + logging::LoggerConfiguration::getConfiguration().initialize(log_properties); + + configuration->set(minifi::Configure::nifi_flow_configuration_file,"../conf/config.yml"); + configuration->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class); + // Create repos for flow record and provenance + std::shared_ptr<core::Repository> prov_repo = core::createRepository(prov_repo_class, true, "provenance"); + prov_repo->initialize(configuration); + + configuration->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class); + + std::shared_ptr<core::Repository> flow_repo = core::createRepository(flow_repo_class, true, "flowfile"); + + flow_repo->initialize(configuration); + + configuration->get(minifi::Configure::nifi_content_repository_class_name, content_repo_class); + + std::shared_ptr<core::ContentRepository> content_repo = core::createContentRepository(content_repo_class, true, "content"); + + content_repo->initialize(configuration); + + std::string content_repo_path; + if (configuration->get(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_path)) { + minifi::setDefaultDirectory(content_repo_path); + } + + configuration->set("c2.agent.heartbeat.period","25"); + configuration->set("nifi.c2.root.classes","AgentInformation"); + configuration->set("nifi.c2.enable","true"); + configuration->set("c2.agent.listen","true"); + configuration->set("c2.agent.heartbeat.reporter.classes","AgentPrinter"); + + configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + + std::unique_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name); + + std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>( + new minifi::FlowController(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo,"manifest",false)); + controller->load(); + controller->start(); + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + controller->stop(true); +} + #endif /* CONTROLLER_CONTROLLER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/controller/MiNiFiController.cpp ---------------------------------------------------------------------- diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp index 98acd15..dc12306 100644 --- a/controller/MiNiFiController.cpp +++ b/controller/MiNiFiController.cpp @@ -75,7 +75,7 @@ int main(int argc, char **argv) { if (!validHome(minifiHome)) { logger->log_error("No valid MINIFI_HOME could be inferred. " "Please set MINIFI_HOME or run minifi from a valid location."); - return -1; + //return -1; } std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); @@ -138,6 +138,7 @@ int main(int argc, char **argv) { ("getsize", "Reports the size of the associated connection queue", cxxopts::value<std::vector<std::string>>()) //NOLINT ("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value<std::string>()) //NOLINT ("getfull", "Reports a list of full connections") //NOLINT + ("manifest", "Generates a manifest for the current binary") //NOLINT ("noheaders", "Removes headers from output streams"); bool show_headers = true; @@ -245,6 +246,14 @@ int main(int argc, char **argv) { if (updateFlow(std::move(socket), std::cout, flow_file) < 0) std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; } + + if (result.count("manifest") > 0) { + printManifest(configuration); + } + }catch (const std::exception &exc) + { + // catch anything thrown within try block that derives from std::exception + std::cerr << exc.what() << std::endl; } catch (...) { std::cout << options.help( { "", "Group" }) << std::endl; exit(0); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/darwin.sh ---------------------------------------------------------------------- diff --git a/darwin.sh b/darwin.sh index e68c237..2283d3b 100644 --- a/darwin.sh +++ b/darwin.sh @@ -20,7 +20,7 @@ verify_enable() { feature="$1" feature_status=${!1} if [ "$feature" = "BUSTACHE_ENABLED" ]; then - BUSTACHE_MAX="9" + BUSTACHE_MAX="9" ## we should check the xcode version CLANG_VERSION=`clang --version | head -n 1 | awk '{print $4}'` CLANG_MAJOR=`echo $CLANG_VERSION | cut -d. -f1` @@ -73,6 +73,8 @@ build_deps(){ INSTALLED+=("flex") elif [ "$FOUND_VALUE" = "python" ]; then INSTALLED+=("python") + elif [ "$FOUND_VALUE" = "boost" ]; then + INSTALLED+=("boost") elif [ "$FOUND_VALUE" = "lua" ]; then INSTALLED+=("lua") elif [ "$FOUND_VALUE" = "gpsd" ]; then http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/deploy.sh ---------------------------------------------------------------------- diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..63d58db --- /dev/null +++ b/deploy.sh @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#!/bin/bash + +build_identifier=$1 + +echo "${build_identifier}" > build_identifier + +./bootstrap.sh -d -p --build_identifier=${build_identifier} + +pushd build + ./controller/minificontroller --manifest >> build_output +popd http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/HTTPCurlLoader.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/HTTPCurlLoader.h b/extensions/http-curl/HTTPCurlLoader.h index ec90e99..23dcb44 100644 --- a/extensions/http-curl/HTTPCurlLoader.h +++ b/extensions/http-curl/HTTPCurlLoader.h @@ -49,7 +49,6 @@ class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core */ virtual std::vector<std::string> getClassNames() override{ std::vector<std::string> class_names; - class_names.push_back("RESTProtocol"); class_names.push_back("HttpProtocol"); class_names.push_back("RESTSender"); class_names.push_back("InvokeHTTP"); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/client/HTTPClient.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp index eae8e1d..8f7ca42 100644 --- a/extensions/http-curl/client/HTTPClient.cpp +++ b/extensions/http-curl/client/HTTPClient.cpp @@ -297,8 +297,13 @@ const char *HTTPClient::getContentType() { } const std::vector<char> &HTTPClient::getResponseBody() { - if (response_body_.size() == 0) + if (response_body_.size() == 0){ + if (callback && callback->ptr){ + response_body_ = callback->ptr->to_string(); + }else{ response_body_ = read_callback_.to_string(); + } + } return response_body_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/protocols/AgentPrinter.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/AgentPrinter.cpp b/extensions/http-curl/protocols/AgentPrinter.cpp new file mode 100644 index 0000000..efcf195 --- /dev/null +++ b/extensions/http-curl/protocols/AgentPrinter.cpp @@ -0,0 +1,141 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AgentPrinter.h" +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <list> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +AgentPrinter::AgentPrinter(std::string name, uuid_t uuid) + : HeartBeatReporter(name, uuid), + logger_(logging::LoggerFactory<AgentPrinter>::getLogger()) { +} + +void AgentPrinter::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, + const std::shared_ptr<Configure> &configure) { + HeartBeatReporter::initialize(controller, updateSink, configure); +} +int16_t AgentPrinter::heartbeat(const C2Payload &payload) { + std::string outputConfig = serializeJsonRootPayload(payload); + return 0; +} + +std::string AgentPrinter::serializeJsonRootPayload(const C2Payload& payload) { + rapidjson::Document json_payload(payload.isContainer() ? rapidjson::kArrayType : rapidjson::kObjectType); + rapidjson::Document::AllocatorType &alloc = json_payload.GetAllocator(); + + rapidjson::Value opReqStrVal; + std::string operation_request_str = getOperation(payload); + opReqStrVal.SetString(operation_request_str.c_str(), operation_request_str.length(), alloc); + json_payload.AddMember("operation", opReqStrVal, alloc); + + std::string operationid = payload.getIdentifier(); + if (operationid.length() > 0) { + rapidjson::Value operationIdVal = getStringValue(operationid, alloc); + json_payload.AddMember("operationid", operationIdVal, alloc); + } + + mergePayloadContent(json_payload, payload, alloc); + + for (const auto &nested_payload : payload.getNestedPayloads()) { + rapidjson::Value np_key = getStringValue(nested_payload.getLabel(), alloc); + rapidjson::Value np_value = serializeJsonPayload(nested_payload, alloc); + json_payload.AddMember(np_key, np_value, alloc); + } + + rapidjson::StringBuffer buffer; + rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer); + json_payload.Accept(writer); + return buffer.GetString(); +} + +rapidjson::Value AgentPrinter::serializeJsonPayload(const C2Payload &payload, rapidjson::Document::AllocatorType &alloc) { + // get the name from the content + rapidjson::Value json_payload(payload.isContainer() ? rapidjson::kArrayType : rapidjson::kObjectType); + + std::map<std::string, std::list<rapidjson::Value*>> children; + bool print = payload.getLabel() == "agentManifest"; + for (const auto &nested_payload : payload.getNestedPayloads()) { + + rapidjson::Value* child_payload = new rapidjson::Value(serializeJsonPayload(nested_payload, alloc)); + children[nested_payload.getLabel()].push_back(child_payload); + + } + + // child_vector is Pair<string, vector<Value*>> + for (auto child_vector : children) { + rapidjson::Value children_json; + rapidjson::Value newMemberKey = getStringValue(child_vector.first, alloc); + if (child_vector.second.size() > 1) { + children_json.SetArray(); + for (auto child : child_vector.second) + children_json.PushBack(child->Move(), alloc); + if (json_payload.IsArray()) + json_payload.PushBack(children_json, alloc); + else + json_payload.AddMember(newMemberKey, children_json, alloc); + } else if (child_vector.second.size() == 1) { + rapidjson::Value* first = child_vector.second.front(); + + if (first->IsObject() && first->HasMember(newMemberKey)) { + if (json_payload.IsArray()) + json_payload.PushBack((*first)[newMemberKey].Move(), alloc); + else + json_payload.AddMember(newMemberKey, (*first)[newMemberKey].Move(), alloc); + } else { + if (json_payload.IsArray()) { + json_payload.PushBack(first->Move(), alloc); + } else { + json_payload.AddMember(newMemberKey, first->Move(), alloc); + } + } + } + + for (rapidjson::Value* child : child_vector.second) + delete child; + } + + mergePayloadContent(json_payload, payload, alloc); + + if (print) { + + rapidjson::StringBuffer buffer; + rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer); + json_payload.Accept(writer); + std::cout << buffer.GetString() << std::endl; + std::exit(1); + } + + return json_payload; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/protocols/AgentPrinter.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/AgentPrinter.h b/extensions/http-curl/protocols/AgentPrinter.h new file mode 100644 index 0000000..e1f7480 --- /dev/null +++ b/extensions/http-curl/protocols/AgentPrinter.h @@ -0,0 +1,80 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_AGENTPRINTER_H_ +#define LIBMINIFI_INCLUDE_C2_AGENTPRINTER_H_ + +#include <string> +#include <mutex> +#include "core/Resource.h" +#include "c2/protocols/RESTProtocol.h" +#include "CivetServer.h" +#include "c2/C2Protocol.h" +#include "controllers/SSLContextService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: Encapsulates printing agent information. + * + * Will be used to print agent information from the C2 response to stdout, selecting the agent's manifest + * + */ +class AgentPrinter : public RESTProtocol, public HeartBeatReporter { + public: + AgentPrinter(std::string name, uuid_t uuid = nullptr); + + /** + * Initialize agent printer. + */ + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, + const std::shared_ptr<Configure> &configure) override; + + /** + * Accepts the heartbeat, only extracting AgentInformation. + */ + virtual int16_t heartbeat(const C2Payload &heartbeat) override; + + /** + * Overrides extracting the agent information from the root. + */ + virtual std::string serializeJsonRootPayload(const C2Payload& payload) override; + + /** + * Overrides extracting the agent information from the payload. + */ + virtual rapidjson::Value serializeJsonPayload(const C2Payload &payload, rapidjson::Document::AllocatorType &alloc) override; + + protected: + + private: + std::shared_ptr<logging::Logger> logger_; +}; + +REGISTER_RESOURCE(AgentPrinter); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_AGENTPRINTER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/protocols/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index a98ba03..c2fd56e 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -19,11 +19,16 @@ #include "RESTSender.h" #include <algorithm> +#include <iostream> #include <memory> #include <utility> #include <map> #include <string> #include <vector> +#include "utils/file/FileUtils.h" +#include "utils/StringUtils.h" +#include "utils/file/FileManager.h" +#include "utils/FileOutputCallback.h" namespace org { namespace apache { @@ -40,8 +45,17 @@ void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerSe C2Protocol::initialize(controller, configure); // base URL when one is not specified. if (nullptr != configure) { + std::string update_str, ssl_context_service_str; configure->get("c2.rest.url", rest_uri_); configure->get("c2.rest.url.ack", ack_uri_); + if (configure->get("c2.rest.ssl.context.service", ssl_context_service_str)) { + auto service = controller->getControllerService(ssl_context_service_str); + if (nullptr != service) { + ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); + } + } + configure->get("c2.rest.heartbeat.minimize.updates", update_str); + utils::StringUtils::StringToBool(update_str, minimize_updates_); } logger_->log_debug("Submitting to %s", rest_uri_); } @@ -52,7 +66,6 @@ C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &pa if (direction == Direction::TRANSMIT) { outputConfig = serializeJsonRootPayload(payload); } - return sendPayload(url, direction, payload, outputConfig); } @@ -72,7 +85,6 @@ void RESTSender::update(const std::shared_ptr<Configure> &configure) { const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { utils::HTTPClient client(url, ssl_context_service_); client.setConnectionTimeout(2); - std::unique_ptr<utils::ByteInputCallBack> input = nullptr; std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr; if (direction == Direction::TRANSMIT) { @@ -89,15 +101,25 @@ const C2Payload RESTSender::sendPayload(const std::string url, const Direction d // since we are not uploading anything on a get client.set_request_method("GET"); } - client.appendHeader("Accept: application/json"); - client.setContentType("application/json"); + + std::unique_ptr<utils::FileOutputCallback> file_callback = nullptr; + utils::HTTPReadCallback read; + if (payload.getOperation() == TRANSFER) { + utils::file::FileManager file_man; + auto file = file_man.unique_file(true); + file_callback = std::unique_ptr<utils::FileOutputCallback>(new utils::FileOutputCallback(file)); + read.pos = 0; + read.ptr = file_callback.get(); + client.setReadCallback(&read); + } else { + client.appendHeader("Accept: application/json"); + client.setContentType("application/json"); + } bool isOkay = client.submit(); int64_t respCode = client.getResponseCode(); - if (isOkay && respCode) { if (payload.isRaw()) { C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); - response_payload.setRawData(client.getResponseBody()); return response_payload; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/tests/C2FailedUpdateTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp b/extensions/http-curl/tests/C2FailedUpdateTest.cpp new file mode 100644 index 0000000..2aed1f4 --- /dev/null +++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp @@ -0,0 +1,186 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + ConfigHandler() { + calls_ = 0; + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + calls_++; + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; + std::string base_location_; + std::atomic<size_t> calls_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "7071", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.base_location_ = test_file_location = argv[1]; + h_ex.test_file_location_ = argv[2]; + key_dir = argv[3]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:7071/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + + configuration->set("c2.rest.url", "http://localhost:7071/update"); + configuration->set("c2.agent.heartbeat.period", "1000"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + ptr.release(); + auto start = std::chrono::system_clock::now(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + auto then = std::chrono::system_clock::now(); + + auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count(); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("Invalid configuration payload") != std::string::npos); + assert(logs.find("update failed.") != std::string::npos); + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + assert(h_ex.calls_ <= (milliseconds / 1000) + 1); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/tests/C2UpdateAgentTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp new file mode 100644 index 0000000..26a3e86 --- /dev/null +++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp @@ -0,0 +1,184 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + ConfigHandler() { + calls_ = 0; + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + calls_++; + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; + std::atomic<size_t> calls_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "7072", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"agent\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"agent\", \"content\": { \"location\": \"http://localhost:7072/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + + configuration->set("c2.rest.url", "http://localhost:7072/update"); + configuration->set("c2.agent.heartbeat.period", "1000"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + configuration->set("c2.agent.update.command", "echo \"verification command\""); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + ptr.release(); + auto start = std::chrono::system_clock::now(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + auto then = std::chrono::system_clock::now(); + + auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count(); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("removing command") != std::string::npos); + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + assert(h_ex.calls_ <= (milliseconds / 1000) + 1); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/tests/C2UpdateTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp index f21084b..0799ae5 100644 --- a/extensions/http-curl/tests/C2UpdateTest.cpp +++ b/extensions/http-curl/tests/C2UpdateTest.cpp @@ -102,7 +102,7 @@ int main(int argc, char **argv) { LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); - const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + const char *options[] = { "document_root", ".", "listening_ports", "7070", 0 }; std::vector<std::string> cpp_options; for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { cpp_options.push_back(options[i]); @@ -134,13 +134,13 @@ int main(int argc, char **argv) { std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" "\"operation\" : \"update\", " "\"operationid\" : \"8675309\", " - "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:7070/update\"}}]}"; responses.push_back(response); } std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - configuration->set("c2.rest.url", "http://localhost:9090/update"); + configuration->set("c2.rest.url", "http://localhost:7070/update"); configuration->set("c2.agent.heartbeat.period", "1000"); mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/extensions/http-curl/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index 994177b..721c221 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -71,6 +71,8 @@ message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test fi add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml" "${TEST_RESOURCES}/") add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") #add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/generateVersion.sh ---------------------------------------------------------------------- diff --git a/generateVersion.sh b/generateVersion.sh new file mode 100755 index 0000000..a0796cb --- /dev/null +++ b/generateVersion.sh @@ -0,0 +1,83 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version=$1 +src_dir=$2 +out_dir=$3 +compiler=$4 +compiler_version=$5 +flags=$6 +extensions=$7 +buildident=$8 + +date=`date +%s` + +if [ -d ${src_dir}/.git ]; then + buildrev=`git log -1 --pretty=format:"%H"` + hostname=`hostname` +else + buildrev="Unknown" +fi + +IFS=';' read -r -a extensions_array <<< "$extensions" + +extension_list="${extension_list} } " + +cat >"$out_dir/agent_version.h" <<EOF +#ifndef AGENT_BUILD_H +#define AGENT_BUILD_H + +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +class AgentBuild { + public: + static constexpr const char* VERSION = "$version"; + static constexpr const char* BUILD_IDENTIFIER = "$buildident"; + static constexpr const char* BUILD_REV = "$buildrev"; + static constexpr const char* BUILD_DATE = "$date"; + static constexpr const char* COMPILER = "$compiler"; + static constexpr const char* COMPILER_VERSION = "$compiler_version"; + static constexpr const char* COMPILER_FLAGS = "$flags"; + static std::vector<std::string> getExtensions() { + static std::vector<std::string> extensions; + if (extensions.empty()){ +EOF + +for EXTENSION in "${extensions_array[@]}" +do +cat <<EOF >> "$out_dir/agent_version.h" + extensions.push_back("${EXTENSION}"); +EOF +done + +cat <<EOF >> "$out_dir/agent_version.h" + } + return extensions; + } +}; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* AGENT_BUILD_H */ +EOF http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 2087f81..71d22cb 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -45,10 +45,10 @@ #include "EventDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" #include "core/Property.h" +#include "core/state/nodes/MetricsBase.h" #include "utils/Id.h" -#include "core/state/metrics/MetricsBase.h" #include "core/state/StateManager.h" - +#include "core/state/nodes/FlowInformation.h" namespace org { namespace apache { namespace nifi { @@ -117,7 +117,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // Whether the Flow Controller is start running virtual bool isRunning() { - return running_.load(); + return running_.load() || updating_.load(); } // Whether the Flow Controller has already been initialized (loaded flow XML) @@ -131,7 +131,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi } // Unload the current flow YAML, clean the root process group and all its children virtual int16_t stop(bool force, uint64_t timeToWait = 0); - virtual int16_t applyUpdate(const std::string &configuration); + virtual int16_t applyUpdate(const std::string &source, const std::string &configuration); virtual int16_t drainRepositories() { return -1; @@ -143,7 +143,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi virtual int16_t clearConnection(const std::string &connection); - virtual int16_t applyUpdate(const std::shared_ptr<state::Update> &updateController) { + virtual int16_t applyUpdate(const std::string &source, const std::shared_ptr<state::Update> &updateController) { return -1; } // Asynchronous function trigger unloading and wait for a period of time @@ -187,11 +187,11 @@ class FlowController : public core::controller::ControllerServiceProvider, publi } // get version - int getVersion() { + virtual std::string getVersion() { if (root_ != nullptr) - return root_->getVersion(); + return std::to_string( root_->getVersion() ); else - return 0; + return "0"; } /** @@ -299,6 +299,15 @@ class FlowController : public core::controller::ControllerServiceProvider, publi virtual void enableAllControllerServices(); /** + * Retrieves all root response nodes from this source. + * @param metric_vector -- metrics will be placed in this vector. + * @return result of the get operation. + * 0 Success + * 1 No error condition, but cannot obtain lock in timely manner. + * -1 failure + */ + virtual int16_t getResponseNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass); + /** * Retrieves all metrics from this source. * @param metric_vector -- metrics will be placed in this vector. * @return result of the get operation. @@ -306,17 +315,23 @@ class FlowController : public core::controller::ControllerServiceProvider, publi * 1 No error condition, but cannot obtain lock in timely manner. * -1 failure */ - virtual int16_t getMetrics(std::vector<std::shared_ptr<state::metrics::Metrics>> &metric_vector, uint16_t metricsClass); + virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass); virtual uint64_t getUptime(); + void initializeC2(); + protected: + void loadC2ResponseConfiguration(); + + void loadC2ResponseConfiguration(const std::string &prefix); + + std::shared_ptr<state::response::ResponseNode> loadC2ResponseConfiguration(const std::string &prefix, std::shared_ptr<state::response::ResponseNode>); + // function to load the flow file repo. void loadFlowRepo(); - void initializeC2(); - /** * Initializes flow controller paths. */ @@ -338,6 +353,7 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // FlowFile Repo // Whether it is running std::atomic<bool> running_; + std::atomic<bool> updating_; // conifiguration filename std::string configuration_filename_; @@ -379,17 +395,20 @@ class FlowController : public core::controller::ControllerServiceProvider, publi std::chrono::steady_clock::time_point start_time_; std::mutex metrics_mutex_; + // root_nodes cache + std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_; // metrics cache - std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_; + std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_; // metrics cache - std::map<std::string, std::shared_ptr<state::metrics::Metrics>> component_metrics_; + std::map<std::string, std::shared_ptr<state::response::ResponseNode>> component_metrics_; - std::map<uint8_t, std::vector<std::shared_ptr<state::metrics::Metrics>>>component_metrics_by_id_; + std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>>component_metrics_by_id_; // metrics last run std::chrono::steady_clock::time_point last_metrics_capture_; private: + std::shared_ptr<state::response::FlowVersion> flow_version_; std::shared_ptr<logging::Logger> logger_; std::string serial_number_; static std::shared_ptr<utils::IdGenerator> id_generator_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/agent/build_description.h ---------------------------------------------------------------------- diff --git a/libminifi/include/agent/build_description.h b/libminifi/include/agent/build_description.h new file mode 100644 index 0000000..314282c --- /dev/null +++ b/libminifi/include/agent/build_description.h @@ -0,0 +1,101 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BUILD_DESCRPTION_H +#define BUILD_DESCRPTION_H + +#include <vector> +#include "capi/expect.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + + + +struct ClassDescription { + explicit ClassDescription(std::string name) + : class_name_(name), + support_dynamic_(false) { + } + explicit ClassDescription(std::string name, std::map<std::string, std::string> props, bool dyn) + : class_name_(name), + class_properties_(props), + support_dynamic_(dyn) { + + } + std::string class_name_; + std::map<std::string, std::string> class_properties_; + bool support_dynamic_; +}; + +struct Components { + std::vector<ClassDescription> processors_; + std::vector<ClassDescription> controller_services_; + std::vector<ClassDescription> other_components_; +}; + +class BuildDescription { + public: + + static struct Components getClassDescriptions() { + static struct Components classes; + if (UNLIKELY(IsNullOrEmpty(classes.processors_) && IsNullOrEmpty(classes.controller_services_))) { + for (auto clazz : core::ClassLoader::getDefaultClassLoader().getClasses()) { + + auto lastOfIdx = clazz.find_last_of("::"); + if (lastOfIdx != std::string::npos) { + lastOfIdx++; // if a value is found, increment to move beyond the . + int nameLength = clazz.length() - lastOfIdx; + std::string class_name = clazz.substr(lastOfIdx, nameLength); + + auto obj = core::ClassLoader::getDefaultClassLoader().instantiate(class_name, class_name); + + std::shared_ptr<core::ConfigurableComponent> component = std::dynamic_pointer_cast<core::ConfigurableComponent>(obj); + + ClassDescription description(clazz); + if (nullptr != component) { + + bool is_processor = std::dynamic_pointer_cast<core::Processor>(obj) != nullptr; + bool is_controller_service = LIKELY(is_processor == true) ? false : std::dynamic_pointer_cast<core::controller::ControllerService>(obj) != nullptr; + + component->initialize(); + description.class_properties_ = component->getProperties(); + description.support_dynamic_ = component->supportsDynamicProperties(); + if (is_processor) { + classes.processors_.emplace_back(description); + } else if (is_controller_service) { + classes.controller_services_.emplace_back(description); + } else { + classes.other_components_.emplace_back(description); + } + } + } + } + } + return classes; + } + +}; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* BUILD_DESCRPTION_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/c2/C2Agent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index 810eede..b902c04 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -24,8 +24,10 @@ #include <memory> #include <mutex> #include <thread> + +#include "../core/state/nodes/MetricsBase.h" #include "core/state/UpdateController.h" -#include "core/state/metrics/MetricsBase.h" +#include "core/state/Value.h" #include "C2Payload.h" #include "C2Protocol.h" #include "io/validation.h" @@ -47,13 +49,12 @@ namespace c2 { * 0 HeartBeat -- RESERVED * 1-255 Defined by the configuration file. */ -class C2Agent : public state::UpdateController, public state::metrics::MetricsSink, public std::enable_shared_from_this<C2Agent> { +class C2Agent : public state::UpdateController, public state::response::ResponseNodeSink, public std::enable_shared_from_this<C2Agent> { public: C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure); virtual ~C2Agent() { - } /** @@ -71,7 +72,14 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi * @param metric metric to set * @param return 0 on success, -1 on failure. */ - virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric); + virtual int16_t setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric); + + /** + * Sets the metric within this sink + * @param metric metric to set + * @param return 0 on success, -1 on failure. + */ + virtual int16_t setMetricsNodes(const std::shared_ptr<state::response::ResponseNode> &metric); int64_t getHeartBeatDelay(){ std::lock_guard<std::mutex> lock(heartbeat_mutex); @@ -80,6 +88,10 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi protected: + void restart_agent(); + + void update_agent(); + /** * Configure the C2 agent */ @@ -91,7 +103,7 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi * @param name name of this metric * @param metrics metrics to include. */ - void serializeMetrics(C2Payload &parent_payload, const std::string &name, const std::vector<state::metrics::MetricResponse> &metrics); + void serializeMetrics(C2Payload &parent_payload, const std::string &name, const std::vector<state::response::SerializedResponseNode> &metrics, bool is_container = false); /** * Extract the payload @@ -139,12 +151,17 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi void handle_describe(const C2ContentResponse &resp); std::timed_mutex metrics_mutex_; - std::map<std::string, std::shared_ptr<state::metrics::Metrics>> metrics_map_; + std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_; + + /** + * Device information stored in the metrics format + */ + std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_; /** * Device information stored in the metrics format */ - std::map<std::string, std::shared_ptr<state::metrics::Metrics>> device_information_; + std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_; // queue mutex std::timed_mutex queue_mutex; @@ -191,6 +208,12 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi std::atomic<C2Protocol*> protocol_; + bool allow_updates_; + + std::string update_command_; + + std::string update_location_; + std::shared_ptr<logging::Logger> logger_; } ; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/c2/C2Payload.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h index ca14584..4d4b0ad 100644 --- a/libminifi/include/c2/C2Payload.h +++ b/libminifi/include/c2/C2Payload.h @@ -22,6 +22,7 @@ #include <string> #include <map> #include "core/state/UpdateController.h" +#include "core/state/Value.h" namespace org { namespace apache { @@ -38,7 +39,8 @@ enum Operation { HEARTBEAT, UPDATE, VALIDATE, - CLEAR + CLEAR, + TRANSFER }; enum Direction { @@ -58,6 +60,24 @@ class C2ContentResponse { C2ContentResponse & operator=(const C2ContentResponse &other); + inline bool operator==(const C2ContentResponse &rhs) const { + if (op != rhs.op) + return false; + if (required != rhs.required) + return false; + if (ident != rhs.ident) + return false; + if (name != rhs.name) + return false; + if (operation_arguments != rhs.operation_arguments) + return false; + return true; + } + + inline bool operator!=(const C2ContentResponse &rhs) const { + return !(*this == rhs); + } + Operation op; // determines if the operation is required bool required; @@ -70,7 +90,7 @@ class C2ContentResponse { // name applied to commands std::string name; // commands that correspond with the operation. - std::map<std::string, std::string> operation_arguments; + std::map<std::string, state::response::ValueNode> operation_arguments; // std::vector<std::string> content; }; @@ -147,13 +167,21 @@ class C2Payload : public state::Update { /** * Returns raw data. */ - std::string getRawData() const; + std::vector<char> getRawData() const; /** * Add a nested payload. * @param payload payload to move into this object. */ void addPayload(const C2Payload &&payload); + + bool isContainer() const { + return is_container_; + } + + void setContainer(bool is_container) { + is_container_ = is_container; + } /** * Get nested payloads. */ @@ -162,6 +190,35 @@ class C2Payload : public state::Update { C2Payload &operator=(const C2Payload &&other); C2Payload &operator=(const C2Payload &other); + inline bool operator==(const C2Payload &rhs) const { + if (op_ != rhs.op_) { + return false; + } + if (ident_ != rhs.ident_) { + return false; + } + if (label_ != rhs.label_) { + return false; + } + if (payloads_ != rhs.payloads_) { + return false; + } + if (content_ != rhs.content_) { + return false; + } + if (raw_ != rhs.raw_) { + return false; + } + if (raw_data_ != rhs.raw_data_) { + return false; + } + return true; + } + + inline bool operator!=(const C2Payload &rhs) const { + return !(*this == rhs); + } + protected: // identifier for this payload. @@ -177,10 +234,12 @@ class C2Payload : public state::Update { bool raw_; - std::string raw_data_; + std::vector<char> raw_data_; bool isResponse; + bool is_container_; + }; } /* namesapce c2 */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/c2/protocols/RESTProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h index ed6d3eb..53e3f4f 100644 --- a/libminifi/include/c2/protocols/RESTProtocol.h +++ b/libminifi/include/c2/protocols/RESTProtocol.h @@ -49,7 +49,8 @@ namespace c2 { */ class RESTProtocol { public: - RESTProtocol() { + RESTProtocol() + : minimize_updates_(false) { } @@ -59,10 +60,12 @@ class RESTProtocol { protected: + virtual rapidjson::Value getStringValue(const std::string& value, rapidjson::Document::AllocatorType& alloc); + virtual rapidjson::Value serializeJsonPayload(const C2Payload &payload, rapidjson::Document::AllocatorType &alloc); virtual std::string serializeJsonRootPayload(const C2Payload& payload); - + virtual void mergePayloadContent(rapidjson::Value &target, const C2Payload &payload, rapidjson::Document::AllocatorType &alloc); virtual const C2Payload parseJsonResponse(const C2Payload &payload, const std::vector<char> &response); @@ -71,6 +74,11 @@ class RESTProtocol { virtual Operation stringToOperation(const std::string str); + bool containsPayload(const C2Payload &o); + + std::mutex update_mutex_; + bool minimize_updates_; + std::map<std::string, C2Payload> nested_payloads_; }; } /* namesapce c2 */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/ClassLoader.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h index 9297eef..6c17493 100644 --- a/libminifi/include/core/ClassLoader.h +++ b/libminifi/include/core/ClassLoader.h @@ -226,6 +226,28 @@ class ClassLoader { loaded_factories_.insert(std::make_pair(name, std::move(factory))); } + std::vector<std::string> getGroups() { + std::vector<std::string> groups; + std::lock_guard<std::mutex> lock(internal_mutex_); + for (auto & resource : loaded_factories_) { + groups.push_back(resource.first); + } + return groups; + } + + std::vector<std::string> getClasses() { + std::vector<std::string> groups; + std::lock_guard<std::mutex> lock(internal_mutex_); + for (auto & resource : loaded_factories_) { + if (nullptr != resource.second) { + auto classes = resource.second->getClassNames(); + groups.insert(groups.end(), classes.begin(), classes.end()); + }else{ + } + } + return groups; + } + /** * Instantiate object based on class_name * @param class_name class to create http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/ConfigurableComponent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index 630ba59..e83e30e 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -129,12 +129,14 @@ class __attribute__((visibility("default"))) ConfigurableComponent { /** * Invoked anytime a static property is modified */ - virtual void onPropertyModified(const Property &old_property, const Property &new_property) {} + virtual void onPropertyModified(const Property &old_property, const Property &new_property) { + } /** * Invoked anytime a dynamic property is modified. */ - virtual void onDynamicPropertyModified(const Property &old_property, const Property &new_property) {} + virtual void onDynamicPropertyModified(const Property &old_property, const Property &new_property) { + } /** * Provides all dynamic property keys that have been set. @@ -143,8 +145,19 @@ class __attribute__((visibility("default"))) ConfigurableComponent { */ std::vector<std::string> getDynamicPropertyKeys(); + /** + * Returns a vector all properties + * + * @return map of property keys to their descriptions. + */ + std::map<std::string, std::string> getProperties(); + virtual ~ConfigurableComponent(); + virtual void initialize() { + + } + protected: /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index 3ba051d..8109cd1 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -153,7 +153,7 @@ class CoreComponent { * Return the UUID string * @param constant reference to the UUID str */ - const std::string & getUUIDStr() { + const std::string & getUUIDStr() const { return uuidStr_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index 737d6de..b5d437a 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -153,7 +153,7 @@ class ProcessGroup { } // getVersion int getVersion() { - return version_; + return config_version_; } // Start Processing void startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler); @@ -216,7 +216,7 @@ class ProcessGroup { // Processor Group Name std::string name_; // version - int version_; + int config_version_; // Process Group Type ProcessGroupType type_; // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a330c57a/libminifi/include/core/Property.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h index 788e452..ec228ec 100644 --- a/libminifi/include/core/Property.h +++ b/libminifi/include/core/Property.h @@ -82,7 +82,7 @@ class Property { // Get Name for the property std::string getName() const; // Get Description for the property - std::string getDescription(); + std::string getDescription() const; // Get value for the property std::string getValue() const; std::vector<std::string> &getValues();
