Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 54be9d403 -> 2821d71d6
MINIFICPP-491: Disable logging for C api MINIFICPP-492: Resolve issues with resource claims and volatile repos in C API MINIFICPP-486: Begin forming async control functions MINIFICPP-494: Resolve warning in valgrind to help debug failing test MINIFICPP-486: Rename block directory MINIFICPP-486: Update to async API This closes #327. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2821d71d Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2821d71d Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2821d71d Branch: refs/heads/master Commit: 2821d71d6ee965c7390def385bcd3b5ae0d4c376 Parents: 54be9d4 Author: Marc Parisi <[email protected]> Authored: Tue May 8 12:43:30 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Thu May 24 14:25:19 2018 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 1 + LibExample/CMakeLists.txt | 14 ++- LibExample/monitor_directory.c | 94 ++++++++++++++++++++ LibExample/transmit_flow.c | 2 +- blocks/comms.h | 47 ++++++++++ blocks/file_blocks.h | 38 ++++++++ extensions/rocksdb-repos/ProvenanceRepository.h | 1 - libminifi/include/c2/C2Agent.h | 2 +- libminifi/include/capi/C2CallbackAgent.h | 82 +++++++++++++++++ libminifi/include/capi/Instance.h | 48 +++++++++- libminifi/include/capi/api.h | 63 +++++++++---- libminifi/include/capi/processors.h | 35 ++++++++ libminifi/include/core/logging/Logger.h | 33 ++++++- .../include/core/logging/LoggerConfiguration.h | 13 ++- libminifi/src/capi/C2CallbackAgent.cpp | 81 +++++++++++++++++ libminifi/src/capi/Plan.cpp | 3 +- libminifi/src/capi/api.cpp | 85 ++++++++++++++++-- libminifi/src/core/FlowFile.cpp | 1 - .../src/core/logging/LoggerConfiguration.cpp | 6 +- .../repository/VolatileContentRepository.cpp | 20 +++-- libminifi/src/utils/Id.cpp | 2 +- 21 files changed, 623 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 8594eaf..7699d1f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -292,6 +292,7 @@ if (NOT DISABLE_CURL) endif() + get_property(selected_extensions GLOBAL PROPERTY EXTENSION-OPTIONS) if (NOT BUILD_IDENTIFIER) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/LibExample/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/LibExample/CMakeLists.txt b/LibExample/CMakeLists.txt index ddf1e20..0421da7 100644 --- a/LibExample/CMakeLists.txt +++ b/LibExample/CMakeLists.txt @@ -23,7 +23,7 @@ IF(POLICY CMP0048) CMAKE_POLICY(SET CMP0048 OLD) ENDIF(POLICY CMP0048) -include_directories(../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/) +include_directories(../blocks/ ../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/) include(CheckCXXCompilerFlag) CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) @@ -38,7 +38,6 @@ endif() add_executable(transmit_flow transmit_flow.c) -# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl and rocksdb target_link_libraries(transmit_flow capi core-minifi minifi) if (APPLE) @@ -50,7 +49,6 @@ endif () add_executable(generate_flow generate_flow.c) -# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl and rocksdb target_link_libraries(generate_flow capi core-minifi minifi) if (APPLE) @@ -59,3 +57,13 @@ else () target_link_libraries (generate_flow -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) endif () + +add_executable(monitor_directory monitor_directory.c) + +target_link_libraries(monitor_directory capi core-minifi minifi) + +if (APPLE) + target_link_libraries (monitor_directory -Wl,-all_load minifi-http-curl) +else () + target_link_libraries (monitor_directory -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) +endif () http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/LibExample/monitor_directory.c ---------------------------------------------------------------------- diff --git a/LibExample/monitor_directory.c b/LibExample/monitor_directory.c new file mode 100644 index 0000000..465106d --- /dev/null +++ b/LibExample/monitor_directory.c @@ -0,0 +1,94 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <dirent.h> + +#include "file_blocks.h" +#include "comms.h" +#include "capi/api.h" +#include "capi/processors.h" +int is_dir(const char *path) { + struct stat stat_struct; + if (stat(path, &stat_struct) != 0) + return 0; + return S_ISDIR(stat_struct.st_mode); +} + +pthread_mutex_t mutex; +pthread_cond_t condition; + +int stopped; + +int stop_callback(char *c) { + pthread_mutex_lock(&mutex); + stopped = 1; + pthread_cond_signal(&condition); + pthread_mutex_unlock(&mutex); + return 0; +} + +int is_stopped(void *ptr) { + int is_stop = 0; + pthread_mutex_lock(&mutex); + is_stop = stopped; + pthread_mutex_unlock(&mutex); + return is_stop; +} + +/** + * This is an example of the C API that transmits a flow file to a remote instance. + */ +int main(int argc, char **argv) { + if (argc < 5) { + printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n"); + exit(1); + } + + stopped = 0x00; + + char *instance_str = argv[1]; + char *portStr = argv[2]; + char *directory = argv[3]; + + nifi_port port; + + port.pord_id = portStr; + + C2_Server server; + server.url = argv[4]; + server.ack_url = argv[5]; + server.identifier = "monitor_directory"; + server.type = REST; + + nifi_instance *instance = create_instance(instance_str, &port); + + // enable_async_c2(instance, &server, stop_callback, NULL, NULL); + + flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE); + + transmit_to_nifi(instance, new_flow, is_stopped); + + free_flow(new_flow); + + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/LibExample/transmit_flow.c ---------------------------------------------------------------------- diff --git a/LibExample/transmit_flow.c b/LibExample/transmit_flow.c index 64e2beb..7fdb3d8 100644 --- a/LibExample/transmit_flow.c +++ b/LibExample/transmit_flow.c @@ -54,7 +54,7 @@ void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) { } else { printf("Transferring %s\n",file_or_dir); - flow_file_record *record = create_flowfile(file_or_dir); + flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir)); add_attribute(record, "addedattribute", "1", 2); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/blocks/comms.h ---------------------------------------------------------------------- diff --git a/blocks/comms.h b/blocks/comms.h new file mode 100644 index 0000000..3f5fda1 --- /dev/null +++ b/blocks/comms.h @@ -0,0 +1,47 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BLOCKS_COMMS_H_ +#define BLOCKS_COMMS_H_ + +#include <stdio.h> +#include "capi/api.h" +#include "capi/processors.h" + +#define SUCCESS 0x00 +#define FINISHED_EARLY 0x01 +#define FAIL 0x02 + +typedef int transmission_stop(void *); + +uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) { + + flow_file_record *record = 0x00; + do { + record = get_next_flow_file(instance, flow); + + if (record == 0) { + return FINISHED_EARLY; + } + transmit_flowfile(record, instance); + + free_flowfile(record); + } while (record != 0x00 && !( stop_callback != 0x00 && stop_callback(0x00))); + return SUCCESS; +} + +#endif /* BLOCKS_COMMS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/blocks/file_blocks.h ---------------------------------------------------------------------- diff --git a/blocks/file_blocks.h b/blocks/file_blocks.h new file mode 100644 index 0000000..3f1d6f7 --- /dev/null +++ b/blocks/file_blocks.h @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BLOCKS_FILE_BLOCKS_H_ +#define BLOCKS_FILE_BLOCKS_H_ + +#include "capi/api.h" +#include "capi/processors.h" + +#define KEEP_SOURCE 0x01 +#define RECURSE 0x02 + +/** + * Monitor directory can be combined into a current flow. to create an execution plan + */ +flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) { + GetFileConfig config; + config.directory = directory; + config.keep_source = flags & KEEP_SOURCE; + config.recurse = flags & RECURSE; + return create_getfile(instance, parent_flow, &config); +} + +#endif /* BLOCKS_FILE_BLOCKS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/extensions/rocksdb-repos/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h index d325c24..d4ff3a5 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.h +++ b/extensions/rocksdb-repos/ProvenanceRepository.h @@ -107,7 +107,6 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ } // Put virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { - if (repo_full_) { return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/c2/C2Agent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index b902c04..aa67035 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -137,7 +137,7 @@ class C2Agent : public state::UpdateController, public state::response::Response * Handles a C2 event requested by the server. * @param resp c2 server response. */ - void handle_c2_server_response(const C2ContentResponse &resp); + virtual void handle_c2_server_response(const C2ContentResponse &resp); /** * Handles an update request http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/C2CallbackAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/C2CallbackAgent.h b/libminifi/include/capi/C2CallbackAgent.h new file mode 100644 index 0000000..c7ee5a9 --- /dev/null +++ b/libminifi/include/capi/C2CallbackAgent.h @@ -0,0 +1,82 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ +#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ + +#include <utility> +#include <functional> +#include <future> +#include <memory> +#include <mutex> +#include <thread> + +#include "c2/C2Agent.h" +#include "core/state/UpdateController.h" +#include "core/state/Value.h" +#include "c2/C2Payload.h" +#include "c2/C2Protocol.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +typedef int c2_ag_update_callback(char *); + +typedef int c2_ag_stop_callback(char *); + +typedef int c2_ag_start_callback(char *); + +class C2CallbackAgent : public c2::C2Agent { + + public: + + explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure); + + virtual ~C2CallbackAgent() { + } + + void setStopCallback(c2_ag_stop_callback *st){ + stop = st; + } + + + protected: + /** + * Handles a C2 event requested by the server. + * @param resp c2 server response. + */ + virtual void handle_c2_server_response(const C2ContentResponse &resp); + + c2_ag_stop_callback *stop; + + private: + std::shared_ptr<logging::Logger> logger_; + +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + + +#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/Instance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/Instance.h b/libminifi/include/capi/Instance.h index 5d1c4c1..bbe0f4c 100644 --- a/libminifi/include/capi/Instance.h +++ b/libminifi/include/capi/Instance.h @@ -29,6 +29,7 @@ #include "core/repository/VolatileContentRepository.h" #include "core/Repository.h" +#include "C2CallbackAgent.h" #include "core/Connectable.h" #include "core/ProcessorNode.h" #include "core/ProcessContext.h" @@ -37,6 +38,8 @@ #include "core/controller/ControllerServiceProvider.h" #include "core/FlowConfiguration.h" #include "ReflexiveSession.h" +#include "utils/ThreadPool.h" +#include "core/state/UpdateController.h" namespace org { namespace apache { namespace nifi { @@ -63,10 +66,13 @@ class Instance { explicit Instance(const std::string &url, const std::string &port) : configure_(std::make_shared<Configure>()), url_(url), + agent_(nullptr), rpgInitialized_(false), - content_repo_(std::make_shared<minifi::core::repository::FileSystemRepository>()), + listener_thread_pool_(1), + content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()), no_op_repo_(std::make_shared<minifi::core::Repository>()) { stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configure_); + running_ = false; uuid_t uuid; uuid_parse(port.c_str(), uuid); rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid); @@ -75,10 +81,28 @@ class Instance { content_repo_->initialize(configure_); } + ~Instance() { + running_ = false; + listener_thread_pool_.shutdown(); + } + bool isRPGConfigured() { return rpgInitialized_; } + void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) { + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr; + running_ = true; + if (server->type != C2_Server_Type::MQTT){ + configure_->set("c2.rest.url",server->url); + configure_->set("c2.rest.url.ack",server->ack_url); + } + agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_); + listener_thread_pool_.start(); + registerUpdateListener(agent_, 1000); + agent_->setStopCallback(c1); + } + void setRemotePort(std::string remote_port) { rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port); rpg_->initialize(); @@ -114,6 +138,26 @@ class Instance { protected: + bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) { + auto functions = updateController->getFunctions(); + // run all functions independently + + for (auto function : functions) { + std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay)); + utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute)); + std::future<state::Update> future; + if (!listener_thread_pool_.execute(std::move(functor), future)) { + // denote failure + return false; + } + } + return true; + } + + std::shared_ptr<c2::C2CallbackAgent> agent_; + + std::atomic<bool> running_; + bool rpgInitialized_; std::shared_ptr<minifi::core::Repository> no_op_repo_; @@ -125,6 +169,8 @@ class Instance { std::shared_ptr<io::StreamFactory> stream_factory_; std::string url_; std::shared_ptr<Configure> configure_; + + utils::ThreadPool<state::Update> listener_thread_pool_; }; } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/api.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h index 440329e..d0b06b1 100644 --- a/libminifi/include/capi/api.h +++ b/libminifi/include/capi/api.h @@ -18,27 +18,32 @@ #ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ #define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ +#include <stddef.h> #include <stdint.h> +#include "processors.h" + +int initialize_api(); #ifdef __cplusplus extern "C" { #endif #define API_VERSION "0.01" + +void enable_logging(); + /**** * ################################################################## * BASE NIFI OPERATIONS * ################################################################## */ - /** * NiFi Port struct */ typedef struct { - char *pord_id; -}nifi_port; - + char *port_id; +} nifi_port; /** * Nifi instance struct @@ -51,7 +56,6 @@ typedef struct { } nifi_instance; - nifi_instance *create_instance(char *url, nifi_port *port); void set_property(nifi_instance *, char *, char *); @@ -60,6 +64,32 @@ void initialize_instance(nifi_instance *); void free_instance(nifi_instance*); +/**** + * ################################################################## + * C2 OPERATIONS + * ################################################################## + */ + +enum C2_Server_Type{ + REST, + MQTT +}; + +typedef struct { + char *url; + char *ack_url; + char *identifier; + char *topic; + enum C2_Server_Type type; +} C2_Server; + +typedef int c2_update_callback(char *); + +typedef int c2_stop_callback(char *); + +typedef int c2_start_callback(char *); + +void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *); /**** * ################################################################## @@ -73,14 +103,13 @@ typedef struct { uint8_t run_processor(const processor *processor); - /**** * ################################################################## * FLOWFILE OPERATIONS * ################################################################## */ -typedef struct{ +typedef struct { char *key; void *value; size_t value_size; @@ -93,34 +122,35 @@ typedef struct{ typedef struct { uint64_t size; /**< Size in bytes of the data corresponding to this flow file */ + void * in; + char * contentLocation; /**< Filesystem location of this object */ void *attributes; /**< Hash map of attributes */ - } flow_file_record; - - -typedef struct { - void *plan; +typedef struct { + void *plan; } flow; - flow *create_flow(nifi_instance *, const char *); +flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); + void free_flow(flow *); flow_file_record *get_next_flow_file(nifi_instance *, flow *); -size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t ); - +size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t); /** * Creates a flow file object. * Will obtain the size of file */ -flow_file_record* create_flowfile(const char *file); +flow_file_record* create_flowfile(const char *file, const size_t len); + +flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size); void free_flowfile(flow_file_record*); @@ -138,7 +168,6 @@ uint8_t remove_attribute(flow_file_record*, char *key); void transmit_flowfile(flow_file_record *, nifi_instance *); - /**** * ################################################################## * Persistence Operations http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/processors.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/processors.h b/libminifi/include/capi/processors.h new file mode 100644 index 0000000..0d395e5 --- /dev/null +++ b/libminifi/include/capi/processors.h @@ -0,0 +1,35 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ +#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + char *directory; + unsigned keep_source :1; + unsigned recurse :1; +} GetFileConfig; + +#ifdef __cplusplus +} +#endif + +#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/core/logging/Logger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h index a81e604..37ef390 100644 --- a/libminifi/include/core/logging/Logger.h +++ b/libminifi/include/core/logging/Logger.h @@ -21,6 +21,7 @@ #include <mutex> #include <memory> #include <sstream> +#include <iostream> #include "spdlog/spdlog.h" @@ -33,6 +34,24 @@ namespace logging { #define LOG_BUFFER_SIZE 1024 +class LoggerControl { + public: + LoggerControl() + : is_enabled_(true) { + + } + + bool is_enabled(){ + return is_enabled_; + } + + void setEnabled(bool status){ + is_enabled_ = status; + } + protected: + std::atomic<bool> is_enabled_; +}; + template<typename ... Args> inline std::string format_string(char const* format_str, Args&&... args) { char buf[LOG_BUFFER_SIZE]; @@ -171,6 +190,8 @@ class Logger : public BaseLogger { } bool should_log(const LOG_LEVEL &level) { + if (controller_ && !controller_->is_enabled()) + return false; spdlog::level::level_enum logger_level = spdlog::level::level_enum::info; switch (level) { case critical: @@ -228,16 +249,24 @@ class Logger : public BaseLogger { break; } } - Logger(std::shared_ptr<spdlog::logger> delegate) - : delegate_(delegate) { + Logger(std::shared_ptr<spdlog::logger> delegate, std::shared_ptr<LoggerControl> controller) + : delegate_(delegate), controller_(controller) { } + Logger(std::shared_ptr<spdlog::logger> delegate) + : delegate_(delegate), controller_(nullptr) { + } + + std::shared_ptr<spdlog::logger> delegate_; + std::shared_ptr<LoggerControl> controller_; std::mutex mutex_; private: template<typename ... Args> inline void log(spdlog::level::level_enum level, const char * const format, const Args& ... args) { + if (controller_ && !controller_->is_enabled()) + return; std::lock_guard<std::mutex> lock(mutex_); if (!delegate_->should_log(level)) { return; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/core/logging/LoggerConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h index fbcdaad..0236410 100644 --- a/libminifi/include/core/logging/LoggerConfiguration.h +++ b/libminifi/include/core/logging/LoggerConfiguration.h @@ -91,6 +91,13 @@ class LoggerConfiguration { return logger_configuration; } + void disableLogging(){ + controller_->setEnabled(false); + } + + void enableLogging(){ + controller_->setEnabled(true); + } /** * (Re)initializes the logging configuation with the given logger properties. */ @@ -110,8 +117,8 @@ class LoggerConfiguration { class LoggerImpl : public Logger { public: - LoggerImpl(std::string name, std::shared_ptr<spdlog::logger> delegate) - : Logger(delegate), + LoggerImpl(std::string name, std::shared_ptr<LoggerControl> controller, std::shared_ptr<spdlog::logger> delegate) + : Logger(delegate,controller), name(name) { } void set_delegate(std::shared_ptr<spdlog::logger> delegate) { @@ -119,6 +126,7 @@ class LoggerConfiguration { delegate_ = delegate; } const std::string name; + }; LoggerConfiguration(); @@ -127,6 +135,7 @@ class LoggerConfiguration { std::shared_ptr<spdlog::formatter> formatter_; std::mutex mutex; std::shared_ptr<LoggerImpl> logger_ = nullptr; + std::shared_ptr<LoggerControl> controller_; }; template<typename T> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/capi/C2CallbackAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/C2CallbackAgent.cpp b/libminifi/src/capi/C2CallbackAgent.cpp new file mode 100644 index 0000000..d8da5d6 --- /dev/null +++ b/libminifi/src/capi/C2CallbackAgent.cpp @@ -0,0 +1,81 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "capi/C2CallbackAgent.h" +#include <unistd.h> +#include <csignal> +#include <utility> +#include <vector> +#include <map> +#include <string> +#include <memory> +#include "c2/ControllerSocketProtocol.h" +#include "core/state/UpdateController.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/file/FileUtils.h" +#include "utils/file/FileManager.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +C2CallbackAgent::C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, + const std::shared_ptr<Configure> &configuration) + : C2Agent(controller, updateSink, configuration), + stop(nullptr), + logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) { +} + +void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) { + switch (resp.op) { + case Operation::CLEAR: + break; + case Operation::UPDATE: + break; + case Operation::DESCRIBE: + break; + case Operation::RESTART: + break; + case Operation::START: + case Operation::STOP: { + if (resp.name == "C2" || resp.name == "c2") { + raise(SIGTERM); + } + + auto str = resp.name.c_str(); + + if (nullptr != stop) + stop(const_cast<char*>(str)); + + break; + } + // + break; + default: + break; + // do nothing + } +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index 9775c80..6e5c7f6 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -122,10 +122,11 @@ bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, con } } - if (i >= processor_queue_.size() || i == 0 || i >= processor_contexts_.size()) { + if (i >= processor_queue_.size() || i >= processor_contexts_.size()) { return false; } + return processor_contexts_.at(i)->setProperty(prop, value); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/capi/api.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp index 5e8f3d8..cb0d726 100644 --- a/libminifi/src/capi/api.cpp +++ b/libminifi/src/capi/api.cpp @@ -25,6 +25,23 @@ #include "capi/Instance.h" #include "capi/Plan.h" #include "ResourceClaim.h" +#include "processors/GetFile.h" +#include "core/logging/LoggerConfiguration.h" + +class API_INITIALIZER { + static int initialized; +}; + +int API_INITIALIZER::initialized = initialize_api(); + +int initialize_api() { + logging::LoggerConfiguration::getConfiguration().disableLogging(); + return 1; +} + +void enable_logging() { + logging::LoggerConfiguration::getConfiguration().enableLogging(); +} class DirectoryConfiguration { protected: @@ -61,6 +78,18 @@ void initialize_instance(nifi_instance *instance) { auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); minifi_instance_ref->setRemotePort(instance->port.pord_id); } +/* + typedef int c2_update_callback(char *); + + typedef int c2_stop_callback(char *); + + typedef int c2_start_callback(char *); + + */ +void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + minifi_instance_ref->enableAsyncC2(server, c1, c2, c3); +} /** * Sets a property within the nifi instance @@ -88,24 +117,44 @@ void free_instance(nifi_instance* instance) { * Creates a flow file record * @param file file to place into the flow file. */ -flow_file_record* create_flowfile(const char *file) { +flow_file_record* create_flowfile(const char *file, const size_t len) { flow_file_record *new_ff = new flow_file_record; new_ff->attributes = new std::map<std::string, std::string>(); - new_ff->contentLocation = new char[strlen(file)]; - snprintf(new_ff->contentLocation, strlen(file), "%s", file); + new_ff->contentLocation = new char[len + 1]; + snprintf(new_ff->contentLocation, len + 1, "%s", file); std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); // set the size of the flow file. new_ff->size = in.tellg(); - return new_ff; } /** + * Creates a flow file record + * @param file file to place into the flow file. + */ +flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) { + flow_file_record *new_ff = new flow_file_record; + new_ff->attributes = new std::map<std::string, std::string>(); + new_ff->contentLocation = new char[len + 1]; + snprintf(new_ff->contentLocation, len + 1, "%s", file); + std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); + // set the size of the flow file. + new_ff->size = size; + return new_ff; +} +/** * Reclaims memory associated with a flow file object * @param ff flow file record. */ void free_flowfile(flow_file_record *ff) { if (ff != nullptr) { + if (ff->in != nullptr) { + auto instance = static_cast<nifi_instance*>(ff->in); + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + auto content_repo = minifi_instance_ref->getContentRepository(); + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); + content_repo->remove(claim); + } auto map = static_cast<std::map<std::string, std::string>*>(ff->attributes); delete[] ff->contentLocation; delete map; @@ -207,6 +256,28 @@ flow *create_flow(nifi_instance *instance, const char *first_processor) { return new_flow; } +flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) { + std::string first_processor = "GetFile"; + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + flow *new_flow = parent_flow == 0x00 ? new flow : parent_flow; + + if (parent_flow == 0x00) { + auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); + + new_flow->plan = execution_plan; + } + + ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan); + // automatically adds it with success + auto getFile = plan->addProcessor(first_processor, first_processor); + + plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory); + plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false"); + plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false"); + + return new_flow; +} + void free_flow(flow *flow) { if (flow == nullptr) return; @@ -230,7 +301,8 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { // create a flow file. claim->increaseFlowFileRecordOwnedCount(); auto path = claim->getContentFullPath(); - auto ffr = create_flowfile(path.c_str()); + auto ffr = create_ff_object(path.c_str(), path.length(), ff->getSize()); + ffr->in = instance; return ffr; } else { return nullptr; @@ -254,7 +326,8 @@ size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff auto path = claim->getContentFullPath(); // create a flow file. - ff_r[i] = create_flowfile(path.c_str()); + ff_r[i] = create_ff_object(path.c_str(), path.length(), ff->getSize()); + ff_r[i]->in = instance; } else { break; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/core/FlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp index c9d0d83..3427559 100644 --- a/libminifi/src/core/FlowFile.cpp +++ b/libminifi/src/core/FlowFile.cpp @@ -51,7 +51,6 @@ FlowFile::FlowFile() } FlowFile::~FlowFile() { - logger_->log_debug("Deleteting %s", getUUIDStr()); } FlowFile& FlowFile::operator=(const FlowFile& other) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/core/logging/LoggerConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index dac441b..a0a207a 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -59,7 +59,9 @@ LoggerConfiguration::LoggerConfiguration() : root_namespace_(create_default_root()), loggers(std::vector<std::shared_ptr<LoggerImpl>>()), formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) { - logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_))); + controller_ = std::make_shared<LoggerControl>(); + logger_ = std::shared_ptr<LoggerImpl>( + new LoggerImpl(core::getClassName<LoggerConfiguration>(), controller_, get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_))); loggers.push_back(logger_); } @@ -88,7 +90,7 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) { std::lock_guard<std::mutex> lock(mutex); - std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, get_logger(logger_, root_namespace_, name, formatter_)); + std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, controller_, get_logger(logger_, root_namespace_, name, formatter_)); loggers.push_back(result); return result; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/core/repository/VolatileContentRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 60f538d..0e99311 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -83,16 +83,16 @@ void VolatileContentRepository::start() { thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>()); thread_.detach(); running_ = true; - logger_->log_debug("%s Repository Monitor Thread Start", getName()); + logger_->log_info("%s Repository Monitor Thread Start", getName()); } std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) { - logger_->log_debug("enter write for %s", claim->getContentFullPath()); + logger_->log_info("enter write for %s", claim->getContentFullPath()); { std::lock_guard<std::mutex> lock(map_mutex_); auto claim_check = master_list_.find(claim->getContentFullPath()); if (claim_check != master_list_.end()) { - logger_->log_debug("Creating copy of atomic entry"); + logger_->log_info("Creating copy of atomic entry"); auto ent = claim_check->second->takeOwnership(); if (ent == nullptr) { return nullptr; @@ -107,7 +107,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) { std::lock_guard<std::mutex> lock(map_mutex_); master_list_[claim->getContentFullPath()] = ent; - logger_->log_debug("Minimize locking, return stream for %s", claim->getContentFullPath()); + logger_->log_info("Minimize locking, return stream for %s", claim->getContentFullPath()); return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); } size++; @@ -125,7 +125,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar } } } - logger_->log_debug("Cannot write %s %d, returning nullptr to roll back session. Repo is either full or locked", claim->getContentFullPath(), size); + logger_->log_info("Cannot write %s %d, returning nullptr to roll back session. Repo is either full or locked", claim->getContentFullPath(), size); return nullptr; } @@ -166,14 +166,16 @@ bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceCla // if we cannot remove the entry we will let the owner's destructor // decrement the reference count and free it master_list_.erase(claim->getContentFullPath()); + // because of the test and set we need to decrement ownership + ptr->decrementOwnership(); if (ptr->freeValue(claim)) { - logger_->log_debug("Removed %s", claim->getContentFullPath()); + logger_->log_info("Removed %s", claim->getContentFullPath()); return true; } else { - logger_->log_debug("free failed for %s", claim->getContentFullPath()); + logger_->log_info("free failed for %s", claim->getContentFullPath()); } } else { - logger_->log_debug("Could not remove %s", claim->getContentFullPath()); + logger_->log_info("Could not remove %s", claim->getContentFullPath()); } } else { std::lock_guard<std::mutex> lock(map_mutex_); @@ -187,7 +189,7 @@ bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceCla return true; } - logger_->log_debug("Could not remove %s, may not exist", claim->getContentFullPath()); + logger_->log_info("Could not remove %s, may not exist", claim->getContentFullPath()); return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/utils/Id.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp index 05e55a2..da0fea9 100644 --- a/libminifi/src/utils/Id.cpp +++ b/libminifi/src/utils/Id.cpp @@ -38,7 +38,7 @@ namespace utils { uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); NonRepeatingStringGenerator::NonRepeatingStringGenerator() - : prefix_((std::to_string(timestamp) + "-")) { + : prefix_((std::to_string(timestamp) + "-")), incrementor_(0) { } IdGenerator::IdGenerator()
