Repository: nifi-minifi-cpp Updated Branches: refs/heads/master cb17947d7 -> 5d5a56227
MINIFICPP-617: Create simple python example This closes #404. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/5d5a5622 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/5d5a5622 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/5d5a5622 Branch: refs/heads/master Commit: 5d5a562273fa576452126a2e32f9a718d95787db Parents: cb17947 Author: Marc Parisi <[email protected]> Authored: Wed Sep 12 11:05:25 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Thu Oct 11 15:43:36 2018 -0400 ---------------------------------------------------------------------- C2.md | 2 +- CMakeLists.txt | 14 +- PYTHON.md | 57 ++++++ extensions/ExtensionHeader.txt | 3 - extensions/bustache/CMakeLists.txt | 3 - extensions/expression-language/CMakeLists.txt | 4 - .../expression-language/noop/CMakeLists.txt | 1 + extensions/gps/CMakeLists.txt | 4 - extensions/tensorflow/CMakeLists.txt | 3 - libminifi/CMakeLists.txt | 44 +++- libminifi/include/agent/agent_docs.h | 3 +- libminifi/include/c2/C2Callback.h | 35 ++++ libminifi/include/capi/Plan.h | 16 +- libminifi/include/capi/api.h | 86 ++------ libminifi/include/capi/cstructs.h | 118 +++++++++++ libminifi/include/capi/processors.h | 2 + libminifi/include/core/Core.h | 1 + libminifi/include/core/FlowFile.h | 46 +++-- .../include/processors/CallbackProcessor.h | 100 +++++++++ libminifi/src/capi/C2CallbackAgent.cpp | 2 +- libminifi/src/capi/Plan.cpp | 70 +++++-- libminifi/src/capi/api.cpp | 135 +++++++++++- libminifi/src/processors/CallbackProcessor.cpp | 37 ++++ python/getFile.py | 70 +++++++ python/library/CMakeLists.txt | 50 +++++ python/library/python_lib.cpp | 47 +++++ python/library/python_lib.h | 33 +++ python/minifi/__init__.py | 205 +++++++++++++++++++ thirdparty/civetweb-1.10/src/CMakeLists.txt | 13 +- thirdparty/date/CMakeLists.txt | 2 +- thirdparty/uuid/CMakeLists.txt | 17 ++ .../yaml-cpp-yaml-cpp-20171024/CMakeLists.txt | 2 + 32 files changed, 1083 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/C2.md ---------------------------------------------------------------------- diff --git a/C2.md b/C2.md index 4a3daa4..b41c29c 100644 --- a/C2.md +++ b/C2.md @@ -138,7 +138,7 @@ requires a single ingress/egress through a gateway. In these classes of devices, or RESTSender can be used for C2 operations. As defined, above, MQTTC2Protocol can be used for the agent protocol class. If you wish to communicate with a RESTFul C2 server, -you may use the ConvertBase, ConvertHeartBeat, ConvertJSONAack, and ConvertUpdate classes on an agent to perform the transation. +you may use the ConvertBase, ConvertHeartBeat, ConvertJSONAack, and ConvertUpdate classes on an agent to perform the translation. State is not kept with an intermediate agent other than the broker. The broker is not embedded with the agent to simplify the agent. An example configuration, below, defines an agent that receives and forward MQTT C2 requests to a C2 server. Additionally, this agent http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 1325fbc..bb07922 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,7 @@ option(USE_SYSTEM_ZLIB "Instructs the build system to search for and use a zlib else() option(USE_SYSTEM_ZLIB "Instructs the build system to search for and use a zlib library available in the host system" ON) endif() - +option(ENABLE_PYTHON "Instructs the build system to enable building shared objects for the python lib" OFF) option(USE_SYSTEM_BZIP2 "Instructs the build system to search for and use a bzip2 library available in the host system" ON) option(BUILD_ROCKSDB "Instructs the build system to use RocksDB from the third party directory" ON) @@ -303,6 +303,9 @@ set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library") set(CIVETWEB_ENABLE_SSL OFF CACHE BOOL "DISABLE SSL") SET(WITH_TOOLS OFF CACHE BOOL "Do not build RocksDB tools") +if (NOT APPLE) +SET(BUILD_SHARED_LIBS ON CACHE BOOL "build yaml cpp shared lib") +endif() SET(WITH_TESTS OFF CACHE BOOL "Build RocksDB library (not repo) tests") set(CIVET_THIRDPARTY_ROOT "${CMAKE_SOURCE_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb root") set(CIVET_BINARY_ROOT "${CMAKE_BINARY_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb binary output") @@ -447,7 +450,14 @@ endif() ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN add_subdirectory(main) -add_subdirectory(LibExample) +if (NOT DISABLE_CURL) + add_subdirectory(LibExample) + if (ENABLE_PYTHON) + if (NOT WIN32) + add_subdirectory(python/library) + endif() + endif(ENABLE_PYTHON) +endif() get_property(selected_extensions GLOBAL PROPERTY EXTENSION-OPTIONS) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/PYTHON.md ---------------------------------------------------------------------- diff --git a/PYTHON.md b/PYTHON.md new file mode 100644 index 0000000..aad31ef --- /dev/null +++ b/PYTHON.md @@ -0,0 +1,57 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Apache NiFi - MiNiFi - C++ Python Access. + + +This readme provides a how-to guide on using the Python bindings for MiNiFi C++. + +## Table of Contents + +- [Description](#description) +- [Enabling](#enabling) +- [Example](#example) +- [Limitations](#limitations) + +## Description + +Apache NiFi MiNiFI C++ can communicates using python bindings. These bindings connect +to the C API that exists. In doing so they can utilize the building blocks within the CAPI. + +The design is predicated upon a MiNiFi instance. There is a getFile example that shows +the usage of this API. A processor can be created and then a flowfile will be output if one +is routed to success. Custom routes can be defined in later implementations of the Python API. + +An RPG is currently required to define a MiNiFi instance. As per the example, a flow file may +be transmitted via HTTP site to site. Presently, raw socket site to site is supported via +the CAPI but not the Python library. + +To run the getfile example you will need to provide the fullpath to the python-lib shared object +created at build time. + +## Enabling + At build time you must specify -DENABLE_PYTHON=ON to enable python bindings to be built. + + Further, you may have to install CFFI for python bindings. + + This can typically be performed through pip, with `pip install cffi` + +## Example + The python directory contains an example where we use a MiNiFI C++ processor along with a + a python processor. The implementation of the python processor requires that a call back + method be defined for ontrigger. + +## Limitations + Python bindings currently don't build on WIN32. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/extensions/ExtensionHeader.txt ---------------------------------------------------------------------- diff --git a/extensions/ExtensionHeader.txt b/extensions/ExtensionHeader.txt index aed1d0e..920200c 100644 --- a/extensions/ExtensionHeader.txt +++ b/extensions/ExtensionHeader.txt @@ -20,9 +20,6 @@ cmake_minimum_required(VERSION 2.6) -set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") -set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") - include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/) if(WIN32) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/extensions/bustache/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/bustache/CMakeLists.txt b/extensions/bustache/CMakeLists.txt index b8cb0ce..36814f4 100644 --- a/extensions/bustache/CMakeLists.txt +++ b/extensions/bustache/CMakeLists.txt @@ -19,9 +19,6 @@ cmake_minimum_required(VERSION 2.6) -set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") -set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") - include_directories(../../libminifi/include ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/) find_package(Boost COMPONENTS system filesystem iostreams REQUIRED) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/extensions/expression-language/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/expression-language/CMakeLists.txt b/extensions/expression-language/CMakeLists.txt index c5789a8..314f503 100644 --- a/extensions/expression-language/CMakeLists.txt +++ b/extensions/expression-language/CMakeLists.txt @@ -17,10 +17,6 @@ # under the License. # -set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") -set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") - - find_package(BISON REQUIRED) find_package(FLEX REQUIRED) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/extensions/expression-language/noop/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/expression-language/noop/CMakeLists.txt b/extensions/expression-language/noop/CMakeLists.txt index abf5e64..6424235 100644 --- a/extensions/expression-language/noop/CMakeLists.txt +++ b/extensions/expression-language/noop/CMakeLists.txt @@ -21,3 +21,4 @@ message(STATUS "Expression language is disabled; using NoOp implementation") file(GLOB SOURCES "*.cpp") include_directories(../../../libminifi/include ../../../libminifi/include/core ../../../thirdparty/spdlog-20170710/include ../../../thirdparty/concurrentqueue ../../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../../thirdparty/) add_library(minifi-expression-language-extensions STATIC ${SOURCES}) +set_property(TARGET minifi-expression-language-extensions PROPERTY POSITION_INDEPENDENT_CODE ON) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/extensions/gps/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/gps/CMakeLists.txt b/extensions/gps/CMakeLists.txt index 6b28d05..8ef013d 100644 --- a/extensions/gps/CMakeLists.txt +++ b/extensions/gps/CMakeLists.txt @@ -19,10 +19,6 @@ cmake_minimum_required(VERSION 2.6) - -set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") -set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") - include_directories(../../libminifi/include ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ${CIVET_THIRDPARTY_ROOT}/include ../../thirdparty/) find_package(LibGPS REQUIRED) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/extensions/tensorflow/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/tensorflow/CMakeLists.txt b/extensions/tensorflow/CMakeLists.txt index 0efc93d..970f6cc 100644 --- a/extensions/tensorflow/CMakeLists.txt +++ b/extensions/tensorflow/CMakeLists.txt @@ -20,9 +20,6 @@ set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") -set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") - find_package(TensorFlow REQUIRED) message("-- Found TensorFlow: ${TENSORFLOW_INCLUDE_DIRS}") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 20119ba..1f556d0 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -140,4 +140,46 @@ set_target_properties(minifi PROPERTIES LINK_FLAGS "${LINK_FLAGS} /WHOLEARCHIVE: endif() -SET (LIBMINIFI core-minifi PARENT_SCOPE) \ No newline at end of file +SET (LIBMINIFI core-minifi PARENT_SCOPE) + +if (ENABLE_PYTHON) +if (NOT APPLE) +#### shared + +add_library(core-minifi-shared SHARED ${SOURCES}) +target_link_libraries(core-minifi-shared ${CMAKE_DL_LIBS} uuid-shared yaml-cpp) + +find_package(ZLIB REQUIRED) +include_directories(${ZLIB_INCLUDE_DIRS}) + +target_link_libraries(core-minifi-shared minifi-expression-language-extensions) +target_link_libraries (core-minifi-shared ${ZLIB_LIBRARIES}) +if (WIN32) +set_target_properties(core-minifi-shared PROPERTIES LINK_FLAGS "/WHOLEARCHIVE") +endif() + + +# Include OpenSSL + +if (OPENSSL_FOUND) + include_directories(${OPENSSL_INCLUDE_DIR}) + target_link_libraries (core-minifi-shared ${OPENSSL_LIBRARIES}) +endif (OPENSSL_FOUND) + +add_library(minifi-shared SHARED ${PROCESSOR_SOURCES}) +add_library(capi-shared SHARED ${CAPI_SOURCES}) + + +target_link_libraries(minifi-shared core-minifi-shared) +if (WIN32) +set_target_properties(minifi-shared PROPERTIES WINDOWS_EXPORT_ALL_SYMBOLS TRUE) +set_target_properties(minifi-shared PROPERTIES LINK_FLAGS "${LINK_FLAGS} /WHOLEARCHIVE:core-minifi-shared") +endif() + + + +set_property(TARGET core-minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) +set_property(TARGET minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) +set_property(TARGET capi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) +endif() +endif(ENABLE_PYTHON) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/agent/agent_docs.h ---------------------------------------------------------------------- diff --git a/libminifi/include/agent/agent_docs.h b/libminifi/include/agent/agent_docs.h index 4b68d08..7d4455d 100644 --- a/libminifi/include/agent/agent_docs.h +++ b/libminifi/include/agent/agent_docs.h @@ -22,7 +22,6 @@ namespace org { namespace apache { namespace nifi { namespace minifi { - class AgentDocs{ public: static std::string getDescription(const std::string &feature){ @@ -57,4 +56,4 @@ class AgentDocs{ } } } -#endif +#endif \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/c2/C2Callback.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Callback.h b/libminifi/include/c2/C2Callback.h new file mode 100644 index 0000000..a8241cf --- /dev/null +++ b/libminifi/include/c2/C2Callback.h @@ -0,0 +1,35 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACK_H_ +#define LIBMINIFI_INCLUDE_C2_C2CALLBACK_H_ + + +#include "C2Agent.h" + +/** + * Purpose: Provides a callback with very specific accesses + */ +class C2Callback { + + public: + C2Callback(const std::shared_ptr<C2>) + +}; + + +#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACK_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/capi/Plan.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h index e4235a8..4afcd18 100644 --- a/libminifi/include/capi/Plan.h +++ b/libminifi/include/capi/Plan.h @@ -43,13 +43,15 @@ #include "core/ProcessSession.h" #include "core/ProcessorNode.h" #include "core/reporting/SiteToSiteProvenanceReportingTask.h" - +#include "capi/cstructs.h" class ExecutionPlan { public: explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo); + std::shared_ptr<core::Processor> addCallback(void *, void (*fp)(processor_session *)); + std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false); @@ -67,6 +69,8 @@ class ExecutionPlan { std::shared_ptr<core::FlowFile> getCurrentFlowFile(); + std::shared_ptr<core::ProcessSession> getCurrentSession(); + std::shared_ptr<core::Repository> getFlowRepo() { return flow_repo_; } @@ -81,6 +85,14 @@ class ExecutionPlan { static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name); + std::shared_ptr<core::FlowFile> getNextFlowFile(){ + return next_ff_; + } + + void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){ + next_ff_ = ptr; + } + protected: void finalize(); @@ -113,6 +125,8 @@ class ExecutionPlan { std::vector<std::shared_ptr<minifi::Connection>> relationships_; core::Relationship termination_; + std::shared_ptr<core::FlowFile> next_ff_; + private: static std::shared_ptr<utils::IdGenerator> id_generator_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/capi/api.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h index c7d281e..9a4404b 100644 --- a/libminifi/include/capi/api.h +++ b/libminifi/include/capi/api.h @@ -20,6 +20,8 @@ #include <stddef.h> #include <stdint.h> + +#include "cstructs.h" #include "processors.h" int initialize_api(); @@ -38,28 +40,15 @@ void enable_logging(); * ################################################################## */ -/** - * NiFi Port struct - */ -typedef struct { - char *port_id; -} nifi_port; -/** - * Nifi instance struct - */ -typedef struct { +nifi_port *create_port(char *port); - void *instance_ptr; +int free_port(nifi_port *port); - nifi_port port; -} nifi_instance; nifi_instance *create_instance(char *url, nifi_port *port); -void set_instance_property(nifi_instance *, char *, char *); - void initialize_instance(nifi_instance *); void free_instance(nifi_instance*); @@ -70,18 +59,6 @@ void free_instance(nifi_instance*); * ################################################################## */ -enum C2_Server_Type{ - REST, - MQTT -}; - -typedef struct { - char *url; - char *ack_url; - char *identifier; - char *topic; - enum C2_Server_Type type; -} C2_Server; typedef int c2_update_callback(char *); @@ -91,63 +68,37 @@ typedef int c2_start_callback(char *); void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *); -/**** - * ################################################################## - * Processor OPERATIONS - * ################################################################## - */ - -typedef struct { - void *processor_ptr; -} processor; uint8_t run_processor(const processor *processor); -/**** - * ################################################################## - * FLOWFILE OPERATIONS - * ################################################################## - */ - -typedef struct { - char *key; - void *value; - size_t value_size; -} attribute; - -/** - * State of a flow file - * - */ -typedef struct { - uint64_t size; /**< Size in bytes of the data corresponding to this flow file */ - - void * in; +flow *create_new_flow(nifi_instance *); - char * contentLocation; /**< Filesystem location of this object */ +flow *create_flow(nifi_instance *, const char *); - void *attributes; /**< Hash map of attributes */ +flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); -} flow_file_record; +processor *add_processor(flow *, const char *); -typedef struct { - void *plan; -} flow; +processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); -flow *create_flow(nifi_instance *, const char *); +int set_property(processor *, const char *, const char *); -flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); +int set_instance_property(nifi_instance *instance, char *key, char *value); processor *add_processor(flow *parent_flow, const char *processor_name); int set_property(processor *proc, const char *name, const char *value); -void free_flow(flow *); +int free_flow(flow *); flow_file_record *get_next_flow_file(nifi_instance *, flow *); size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t); +flow_file_record *get(nifi_instance *,flow *, processor_session *); + +int transfer(processor_session* session, flow *flow, const char *rel); + /** * Creates a flow file object. * Will obtain the size of file @@ -156,6 +107,8 @@ flow_file_record* create_flowfile(const char *file, const size_t len); flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size); +flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size); + void free_flowfile(flow_file_record*); uint8_t add_attribute(flow_file_record*, char *key, void *value, size_t size); @@ -172,7 +125,7 @@ uint8_t remove_attribute(flow_file_record*, char *key); * ################################################################## */ -void transmit_flowfile(flow_file_record *, nifi_instance *); +int transmit_flowfile(flow_file_record *, nifi_instance *); /**** * ################################################################## @@ -180,7 +133,6 @@ void transmit_flowfile(flow_file_record *, nifi_instance *); * ################################################################## */ -void transmit_flowfile(flow_file_record *, nifi_instance *); #ifdef __cplusplus } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/capi/cstructs.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/cstructs.h b/libminifi/include/capi/cstructs.h new file mode 100644 index 0000000..4807db3 --- /dev/null +++ b/libminifi/include/capi/cstructs.h @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ +#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ + + +/** + * NiFi Port struct + */ +typedef struct { + char *port_id; +} nifi_port; + + +/** + * Nifi instance struct + */ +typedef struct { + + void *instance_ptr; + + nifi_port port; + +} nifi_instance; + + +/**** + * ################################################################## + * C2 OPERATIONS + * ################################################################## + */ + +enum C2_Server_Type{ + REST, + MQTT +}; + +typedef struct { + char *url; + char *ack_url; + char *identifier; + char *topic; + enum C2_Server_Type type; +} C2_Server; + + +/**** + * ################################################################## + * Processor OPERATIONS + * ################################################################## + */ + +typedef struct { + void *processor_ptr; +} processor; + + +typedef struct { + + void *session; + +} processor_session; + + + +/**** + * ################################################################## + * FLOWFILE OPERATIONS + * ################################################################## + */ + +typedef struct { + char *key; + void *value; + size_t value_size; +} attribute; + +/** + * State of a flow file + * + */ +typedef struct { + uint64_t size; /**< Size in bytes of the data corresponding to this flow file */ + + void * in; + + char * contentLocation; /**< Filesystem location of this object */ + + void *attributes; /**< Hash map of attributes */ + + void *ffp; + +} flow_file_record; + + +typedef struct { + void *plan; +} flow; + + + +#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/capi/processors.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/processors.h b/libminifi/include/capi/processors.h index 0d395e5..7fe357d 100644 --- a/libminifi/include/capi/processors.h +++ b/libminifi/include/capi/processors.h @@ -18,6 +18,8 @@ #ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ #define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ + + #ifdef __cplusplus extern "C" { #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index a379056..0a3c97d 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -24,6 +24,7 @@ #include <string> #include <uuid/uuid.h> + #ifdef WIN32 #pragma comment(lib, "shlwapi.lib") #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/core/FlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h index 18d5034..b194649 100644 --- a/libminifi/include/core/FlowFile.h +++ b/libminifi/include/core/FlowFile.h @@ -162,6 +162,14 @@ class FlowFile : public core::Connectable { } /** + * Returns the map of attributes + * @return attributes. + */ + std::map<std::string, std::string> *getAttributesPtr() { + return &attributes_; + } + + /** * adds an attribute if it does not exist * */ @@ -217,25 +225,25 @@ class FlowFile : public core::Connectable { } /** - * Yield - */ - virtual void yield(){ - - } - /** - * Determines if we are connected and operating - */ - virtual bool isRunning(){ - return true; - } - - /** - * Determines if work is available by this connectable - * @return boolean if work is available. - */ - virtual bool isWorkAvailable(){ - return true; - } + * Yield + */ + virtual void yield() { + + } + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } /** * Sets the original connection with a shared pointer. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/include/processors/CallbackProcessor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/CallbackProcessor.h b/libminifi/include/processors/CallbackProcessor.h new file mode 100644 index 0000000..879f49c --- /dev/null +++ b/libminifi/include/processors/CallbackProcessor.h @@ -0,0 +1,100 @@ +/** + * @file CallbackProcessor.h + * CallbackProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CALLBACK_PROCESSOR_H__ +#define __CALLBACK_PROCESSOR_H__ + +#include <stdio.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include <functional> +#include <iostream> +#include <sys/types.h> +#include "capi/cstructs.h" +#include "io/BaseStream.h" +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// CallbackProcessor Class +class CallbackProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier()) + : Processor(name, uuid), + callback_(nullptr), + objref_(nullptr), + logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) { + } + // Destructor + virtual ~CallbackProcessor() { + + } + // Processor Name + static constexpr char const* ProcessorName = "CallbackProcessor"; + + public: + + void setCallback(void *obj, void (*ontrigger_callback)(processor_session *)) { + objref_ = obj; + callback_ = ontrigger_callback; + } + + // OnTrigger method, implemented by NiFi CallbackProcessor + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + // Initialize, over write by NiFi CallbackProcessor + virtual void initialize() { + std::set<core::Relationship> relationships; + core::Relationship Success("success", "description"); + relationships.insert(Success); + setSupportedRelationships(relationships); + } + + protected: + void *objref_; + void (*callback_)(processor_session*); + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + +}; + +REGISTER_RESOURCE(CallbackProcessor); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/src/capi/C2CallbackAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/C2CallbackAgent.cpp b/libminifi/src/capi/C2CallbackAgent.cpp index dd04466..4b7f5fb 100644 --- a/libminifi/src/capi/C2CallbackAgent.cpp +++ b/libminifi/src/capi/C2CallbackAgent.cpp @@ -61,7 +61,7 @@ void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) { auto str = resp.name.c_str(); if (nullptr != stop) - stop(const_cast<char*>(str)); + stop(const_cast<char*>(str)); break; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index 43ef6e6..19a62e7 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -17,6 +17,7 @@ */ #include "capi/Plan.h" +#include "processors/CallbackProcessor.h" #include <memory> #include <vector> #include <set> @@ -35,6 +36,45 @@ ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_re stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); } +/** + * Add a callback to obtain and pass processor session to a generated processor + * + */ +std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, void (*fp)(processor_session *)) { + if (finalized) { + return nullptr; + } + + utils::Identifier uuid; + id_generator_->generate(uuid); + + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate("CallbackProcessor", uuid); + if (nullptr == ptr) { + throw std::exception(); + } + std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr); + processor->setCallback(obj, fp); + processor->setName("CallbackProcessor"); + + return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true); +} + +bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { + uint32_t i = 0; + logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName()); + for (i = 0; i < processor_queue_.size(); i++) { + if (processor_queue_.at(i) == proc) { + break; + } + } + + if (i >= processor_queue_.size() || i >= processor_contexts_.size()) { + return false; + } + + return processor_contexts_.at(i)->setProperty(prop, value); +} + std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) { if (finalized) { return nullptr; @@ -102,23 +142,6 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string & return addProcessor(processor, name, relationship, linkToPrevious); } -bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { - uint32_t i = 0; - logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName()); - for (i = 0; i < processor_queue_.size(); i++) { - if (processor_queue_.at(i) == proc) { - break; - } - } - - if (i >= processor_queue_.size() || i >= processor_contexts_.size()) { - return false; - } - - - return processor_contexts_.at(i)->setProperty(prop, value); -} - void ExecutionPlan::reset() { process_sessions_.clear(); factories_.clear(); @@ -134,7 +157,6 @@ bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<co if (!finalized) { finalize(); } - location++; if (location >= processor_queue_.size()) { return false; @@ -159,7 +181,12 @@ bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<co } current_session->commit(); current_flowfile_ = current_session->get(); - return true; + auto hasMore = location + 1 < processor_queue_.size(); + if (!hasMore && !current_flowfile_) { + std::set<std::shared_ptr<core::FlowFile>> expired; + current_flowfile_ = relationships_.back()->poll(expired); + } + return hasMore; } std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() { @@ -170,6 +197,10 @@ std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() { return current_flowfile_; } +std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() { + return current_session_; +} + std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) { std::stringstream connection_name; std::shared_ptr<core::Processor> last = processor; @@ -218,4 +249,3 @@ std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::strin return processor; } - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/src/capi/api.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp index 5429873..fa3a6c8 100644 --- a/libminifi/src/capi/api.cpp +++ b/libminifi/src/capi/api.cpp @@ -57,6 +57,24 @@ class DirectoryConfiguration { } }; +nifi_port *create_port(char *port) { + if (nullptr == port) + return nullptr; + nifi_port *p = new nifi_port(); + p->port_id = new char[strlen(port) + 1]; + memset(p->port_id, 0x00, strlen(port) + 1); + strncpy(p->port_id, port, strlen(port)); + return p; +} + +int free_port(nifi_port *port) { + if (port == nullptr) + return -1; + delete[] port->port_id; + delete port; + return 0; +} + /** * Creates a NiFi Instance from the url and output port. * @param url http URL for NiFi instance @@ -99,10 +117,15 @@ void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callbac * @param instance nifi instance * @param key key in which we will set the valiue * @param value + * @return -1 when instance or key are null */ -void set_instance_property(nifi_instance *instance, char *key, char *value) { +int set_instance_property(nifi_instance *instance, char *key, char *value) { + if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) { + return -1; + } auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); minifi_instance_ref->getConfiguration()->set(key, value); + return 0; } /** @@ -136,8 +159,23 @@ flow_file_record* create_flowfile(const char *file, const size_t len) { * @param file file to place into the flow file. */ flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) { + if (nullptr == file) { + return nullptr; + } flow_file_record *new_ff = new flow_file_record; new_ff->attributes = new string_map(); + new_ff->ffp = 0; + new_ff->contentLocation = new char[len + 1]; + snprintf(new_ff->contentLocation, len + 1, "%s", file); + std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); + // set the size of the flow file. + new_ff->size = size; + return new_ff; +} + +flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) { + flow_file_record *new_ff = new flow_file_record; + new_ff->attributes = nullptr; new_ff->contentLocation = new char[len + 1]; snprintf(new_ff->contentLocation, len + 1, "%s", file); std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); @@ -160,7 +198,8 @@ void free_flowfile(flow_file_record *ff) { } auto map = static_cast<string_map*>(ff->attributes); delete[] ff->contentLocation; - delete map; + if (ff->ffp != nullptr) // don't delete map since it's a ref ptr + delete map; delete ff; } } @@ -226,7 +265,7 @@ uint8_t remove_attribute(flow_file_record *ff, char *key) { * @param ff flow file record * @param instance nifi instance structure */ -void transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { +int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); // in the unlikely event the user forgot to initialize the instance, we shall do it for them. if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) { @@ -250,9 +289,25 @@ void transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { std::string port_uuid = instance->port.port_id; minifi_instance_ref->transfer(ffr); + + return 0; +} + +flow *create_new_flow(nifi_instance *instance) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + flow *new_flow = new flow; + + auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); + + new_flow->plan = execution_plan; + + return new_flow; } flow *create_flow(nifi_instance *instance, const char *first_processor) { + if (nullptr == instance || nullptr == instance->instance_ptr) { + return nullptr; + } auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); flow *new_flow = new flow; @@ -267,6 +322,17 @@ flow *create_flow(nifi_instance *instance, const char *first_processor) { return new_flow; } +processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) { + if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) { + return nullptr; + } + ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); + auto proc = plan->addCallback(nullptr, ontrigger_callback); + processor *new_processor = new processor(); + new_processor->processor_ptr = proc.get(); + return new_processor; +} + flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) { static const std::string first_processor = "GetFile"; flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow; @@ -283,6 +349,9 @@ flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig * } processor *add_processor(flow *flow, const char *processor_name) { + if (nullptr == flow || nullptr == processor_name) { + return nullptr; + } ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); auto proc = plan->addProcessor(processor_name, processor_name); if (proc) { @@ -300,17 +369,19 @@ int set_property(processor *proc, const char *name, const char *value) { return -1; } -void free_flow(flow *flow) { - if (flow == nullptr) - return; +int free_flow(flow *flow) { + if (flow == nullptr || nullptr == flow->plan) + return -1; auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); delete execution_plan; delete flow; + return 0; } flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { + if (instance == nullptr || nullptr == flow || nullptr == flow->plan) + return nullptr; auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - execution_plan->reset(); while (execution_plan->runNextProcessor()) { } @@ -323,7 +394,9 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { // create a flow file. claim->increaseFlowFileRecordOwnedCount(); auto path = claim->getContentFullPath(); - auto ffr = create_ff_object(path.c_str(), path.length(), ff->getSize()); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->attributes = ff->getAttributesPtr(); + ffr->ffp = ff.get(); ffr->in = instance; return ffr; } else { @@ -332,6 +405,8 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { } size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) { + if (nullptr == instance || nullptr == flow || nullptr == ff_r) + return 0; auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); size_t i = 0; for (; i < size; i++) { @@ -344,3 +419,47 @@ size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff } return i; } + +flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *session) { + if (nullptr == instance || nullptr == flow || nullptr == session) + return nullptr; + auto sesh = static_cast<core::ProcessSession*>(session->session); + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + auto ff = sesh->get(); + execution_plan->setNextFlowFile(ff); + if (ff == nullptr) { + return nullptr; + } + auto claim = ff->getResourceClaim(); + + if (claim != nullptr) { + // create a flow file. + claim->increaseFlowFileRecordOwnedCount(); + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->attributes = ff->getAttributesPtr(); + ffr->ffp = ff.get(); + ffr->in = instance; + return ffr; + } else { + return nullptr; + } +} + +int transfer(processor_session* session, flow *flow, const char *rel) { + if (nullptr == session || nullptr == flow || rel == nullptr) { + return -1; + } + auto sesh = static_cast<core::ProcessSession*>(session->session); + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + if (nullptr == sesh || nullptr == execution_plan) { + return -1; + } + core::Relationship relationship(rel, rel); + auto ff = execution_plan->getNextFlowFile(); + if (nullptr == ff) { + return -2; + } + sesh->transfer(ff, relationship); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/libminifi/src/processors/CallbackProcessor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/CallbackProcessor.cpp b/libminifi/src/processors/CallbackProcessor.cpp new file mode 100644 index 0000000..5524d92 --- /dev/null +++ b/libminifi/src/processors/CallbackProcessor.cpp @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "processors/CallbackProcessor.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + if (callback_ != nullptr) { + processor_session sesh; + sesh.session = session; + callback_(&sesh); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/python/getFile.py ---------------------------------------------------------------------- diff --git a/python/getFile.py b/python/getFile.py new file mode 100644 index 0000000..3d2b9d0 --- /dev/null +++ b/python/getFile.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from minifi import * + +from argparse import ArgumentParser +from ctypes import cdll +import ctypes +import sys +from _cffi_backend import callback + + +class GetFilePrinterProcessor(PyProcessor): + def __init__(self,instance, minifi, flow): + PyProcessor.__init__(self,instance,minifi,flow) + self._callback = None + + def _onTriggerCallback(self): + def onTrigger(session): + flow_file = self.get(session) + if flow_file: + flow_file.add_attribute("python_test","value") + self.transfer(session,flow_file, "success") + return CALLBACK(onTrigger) + + +parser = ArgumentParser() +parser.add_argument("-s", "--dll", dest="dll_file", + help="DLL filename", metavar="FILE") + +parser.add_argument("-n", "--nifi", dest="nifi_instance", + help="NiFi Instance") + +parser.add_argument("-i", "--input", dest="input_port", + help="NiFi Input Port") + +parser.add_argument("-d", "--dir", dest="dir", + help="GetFile Dir to monitor", metavar="FILE") + +args = parser.parse_args() + +""" dll_file is the path to the shared object """ +minifi = MiNiFi(dll_file=args.dll_file,url = args.nifi_instance.encode('utf-8'), port=args.input_port.encode('utf-8')) + +minifi.set_property("nifi.remote.input.http.enabled","true") + +processor = minifi.add_processor( GetFile() ) + +processor.set_property("Input Directory", args.dir) +processor.set_property("Keep Source File", "true") + +current_module = sys.modules[__name__] + +processor = minifi.create_python_processor(current_module,"GetFilePrinterProcessor") + +ff = minifi.get_next_flowfile() +if ff: + minifi.transmit_flowfile(ff) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/python/library/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/library/CMakeLists.txt b/python/library/CMakeLists.txt new file mode 100644 index 0000000..5d309a1 --- /dev/null +++ b/python/library/CMakeLists.txt @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +IF(POLICY CMP0048) + CMAKE_POLICY(SET CMP0048 OLD) +ENDIF(POLICY CMP0048) + +include_directories(../../blocks/ ../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/) +if(WIN32) + include_directories(../../libminifi/opsys/win) +else() + include_directories(../../libminifi/opsys/posix) +endif() +include_directories(../../extensions/http-curl/ ../../extensions/http-curl/client ../../extensions/http-curl/sitetosite ../../extensions/http-curl/protocols ../../extensions/http-curl/processors) + +add_library(python-lib SHARED python_lib.cpp) +if (APPLE) + target_link_libraries(python-lib capi core-minifi minifi) +else() + target_link_libraries(python-lib capi-shared core-minifi-shared minifi-shared) +endif(APPLE) + +if (WIN32) +target_link_libraries(python-lib ${CURL_LIBRARY} minifi-http-curl) + set_target_properties(python-lib PROPERTIES + LINK_FLAGS "/WHOLEARCHIVE:minifi-http-curl" + ) +elseif (APPLE) + target_link_libraries(python-lib ${CURL_LIBRARY} -Wl,-all_load minifi-http-curl) +else () + target_link_libraries(python-lib -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive ${CURL_LIBRARY} ) +endif () \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/python/library/python_lib.cpp ---------------------------------------------------------------------- diff --git a/python/library/python_lib.cpp b/python/library/python_lib.cpp new file mode 100644 index 0000000..eb81f79 --- /dev/null +++ b/python/library/python_lib.cpp @@ -0,0 +1,47 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <dirent.h> +#include "capi/api.h" +#include "file_blocks.h" +#include "comms.h" +#include "capi/api.h" +#include "capi/processors.h" +#include "HTTPCurlLoader.h" +#include "python_lib.h" + + + +#ifdef __cplusplus +extern "C" { +#endif + + +int init_api(const char *resource){ + core::ClassLoader::getDefaultClassLoader().registerResource(resource, "createHttpCurlFactory"); + return 0; +} + +#ifdef __cplusplus +} +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/python/library/python_lib.h ---------------------------------------------------------------------- diff --git a/python/library/python_lib.h b/python/library/python_lib.h new file mode 100644 index 0000000..ac95190 --- /dev/null +++ b/python/library/python_lib.h @@ -0,0 +1,33 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef PYTHON_LIBRARY_PYTHON_LIB_H_ +#define PYTHON_LIBRARY_PYTHON_LIB_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +int init_api(const char *); + +#ifdef __cplusplus +} +#endif + + + +#endif /* PYTHON_LIBRARY_PYTHON_LIB_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/python/minifi/__init__.py ---------------------------------------------------------------------- diff --git a/python/minifi/__init__.py b/python/minifi/__init__.py new file mode 100644 index 0000000..01fb705 --- /dev/null +++ b/python/minifi/__init__.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ctypes import cdll +import ctypes +from abc import abstractmethod + + + +class RPG_PORT(ctypes.Structure): + _fields_ = [('port_id', ctypes.c_char_p)] + +class NIFI_STRUCT(ctypes.Structure): + _fields_ = [('instancePtr', ctypes.c_void_p), + ('port', RPG_PORT)] + +class CFlow(ctypes.Structure): + _fields_ = [('plan', ctypes.c_void_p)] + +class CFlowFile(ctypes.Structure): + _fields_ = [('size', ctypes.c_int), + ('in', ctypes.c_void_p), + ('contentLocation', ctypes.c_char_p), + ('attributes', ctypes.c_void_p), + ('ffp', ctypes.c_void_p)] + +class CProcessor(ctypes.Structure): + _fields_ = [('processor_ptr', ctypes.c_void_p)] + +class CProcessSession(ctypes.Structure): + _fields_ = [('process_session', ctypes.c_void_p)] + +CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession)) + +class Processor(object): + def __init__(self, cprocessor, minifi): + super(Processor, self).__init__() + self._proc = cprocessor + self._minifi = minifi + + def set_property(self, name, value): + self._minifi.set_property( self._proc, name.encode("UTF-8"), value.encode("UTF-8")) + +class PyProcessor(object): + def __init__(self, instance, minifi, flow): + super(PyProcessor, self).__init__() + self._instance = instance + self._minifi = minifi + self._flow = flow + + def setBase(self, proc): + self._proc = proc + + def get(self, session): + ff = self._minifi.get(self._instance.get_instance(),self._flow, session) + if ff: + return FlowFile(self._minifi, ff) + else: + return None + + def transfer(self, session, ff, rel): + self._minifi.transfer(session, self._flow, rel.encode("UTF-8")) + + @abstractmethod + def _onTriggerCallback(self): + pass + + def getTriggerCallback(self): + if self._callback is None: + print("creating ptr") + self._callback = self._onTriggerCallback() + return self._callback + + @abstractmethod + def onSchedule(self): + pass + + +class RPG(object): + def __init__(self, nifi_struct): + super(RPG, self).__init__() + self._nifi = nifi_struct + + def get_instance(self): + return self._nifi + +class FlowFile(object): + def __init__(self, minifi, ff): + super(FlowFile, self).__init__() + self._minifi = minifi + self._ff = ff + + def add_attribute(self, name, value): + vallen = len(value) + self._minifi.add_attribute(self._ff, name.encode("UTF-8"), value.encode("UTF-8"), vallen) + + def get_instance(self): + return self._ff + + + +class MiNiFi(object): + """ Proxy Connector """ + def __init__(self, dll_file, url, port): + super(MiNiFi, self).__init__() + self._minifi= cdll.LoadLibrary(dll_file) + """ create instance """ + self._minifi.create_instance.argtypes = [ctypes.c_char_p , ctypes.POINTER(RPG_PORT)] + self._minifi.create_instance.restype = ctypes.POINTER(NIFI_STRUCT) + """ create port """ + self._minifi.create_port.argtype = ctypes.c_char_p + self._minifi.create_port.restype = ctypes.POINTER(RPG_PORT) + """ free port """ + self._minifi.free_port.argtype = ctypes.POINTER(RPG_PORT) + self._minifi.free_port.restype = ctypes.c_int + """ create new flow """ + self._minifi.create_new_flow.argtype = ctypes.POINTER(NIFI_STRUCT) + self._minifi.create_new_flow.restype = ctypes.POINTER(CFlow) + """ add processor """ + self._minifi.add_processor.argtypes = [ctypes.POINTER(CFlow) , ctypes.c_char_p ] + self._minifi.add_processor.restype = ctypes.POINTER(CProcessor) + """ set processor property""" + self._minifi.set_property.argtypes = [ctypes.POINTER(CProcessor) , ctypes.c_char_p , ctypes.c_char_p ] + self._minifi.set_property.restype = ctypes.c_int + """ set instance property""" + self._minifi.set_instance_property.argtypes = [ctypes.POINTER(NIFI_STRUCT) , ctypes.c_char_p , ctypes.c_char_p ] + self._minifi.set_instance_property.restype = ctypes.c_int + """ get next flow file """ + self._minifi.get_next_flow_file.argtypes = [ctypes.POINTER(NIFI_STRUCT) , ctypes.POINTER(CFlow) ] + self._minifi.get_next_flow_file.restype = ctypes.POINTER(CFlowFile) + """ transmit flow file """ + self._minifi.transmit_flowfile.argtypes = [ctypes.POINTER(CFlowFile) , ctypes.POINTER(NIFI_STRUCT) ] + self._minifi.transmit_flowfile.restype = ctypes.c_int + """ get ff """ + self._minifi.get.argtypes = [ctypes.POINTER(NIFI_STRUCT) , ctypes.POINTER(CFlow), ctypes.POINTER(CProcessSession) ] + self._minifi.get.restype = ctypes.POINTER(CFlowFile) + """ add python processor """ + self._minifi.add_python_processor.argtypes = [ctypes.POINTER(CFlow) , ctypes.c_void_p ] + self._minifi.add_python_processor.restype = ctypes.POINTER(CProcessor) + """ transfer ff """ + self._minifi.transfer.argtypes = [ctypes.POINTER(CProcessSession), ctypes.POINTER(CFlow) , ctypes.c_char_p ] + self._minifi.transfer.restype = ctypes.c_int + """ add attribute to ff """ + self._minifi.add_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ] + self._minifi.add_attribute.restype = ctypes.c_int + + self._minifi.init_api.argtype = ctypes.c_char_p + self._minifi.init_api.restype = ctypes.c_int + self._minifi.init_api(dll_file.encode("UTF-8")) + + self._instance = self.__open_rpg(url,port) + self._flow = self._minifi.create_new_flow( self._instance.get_instance() ) + self._minifi.enable_logging() + + + + def __open_rpg(self, url, port): + rpgPort = self._minifi.create_port(port) + rpg = self._minifi.create_instance(url, rpgPort) + ret = RPG(rpg) + return ret + + def get_c_lib(self): + return self._minifi + + def set_property(self, name, value): + self._minifi.set_instance_property(self._instance.get_instance(), name.encode("UTF-8"), value.encode("UTF-8")) + + + def add_processor(self, processor): + proc = self._minifi.add_processor(self._flow, processor.get_name().encode("UTF-8")) + return Processor(proc,self._minifi) + + def create_python_processor(self, module, processor): + m = getattr(module,processor)(self._instance,self._minifi,self._flow) + proc = self._minifi.add_python_processor(self._flow, m.getTriggerCallback()) + m.setBase(proc) + return m + + def get_next_flowfile(self): + ff = self._minifi.get_next_flow_file(self._instance.get_instance(), self._flow) + return FlowFile(self._minifi, ff) + + def transmit_flowfile(self, ff): + if ff.get_instance(): + self._minifi.transmit_flowfile(ff.get_instance(),self._instance.get_instance()) + +class GetFile(object): + def __init__(self): + super(GetFile, self).__init__() + + def get_name(self): + return "GetFile" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/thirdparty/civetweb-1.10/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/civetweb-1.10/src/CMakeLists.txt b/thirdparty/civetweb-1.10/src/CMakeLists.txt index a0528b2..f01f66c 100644 --- a/thirdparty/civetweb-1.10/src/CMakeLists.txt +++ b/thirdparty/civetweb-1.10/src/CMakeLists.txt @@ -5,8 +5,13 @@ set_target_properties(c-library PROPERTIES VERSION ${CIVETWEB_VERSION} SOVERSION ${CIVETWEB_VERSION} ) + +set_property(TARGET c-library PROPERTY POSITION_INDEPENDENT_CODE ON) + if (BUILD_SHARED_LIBS) +if (APPLE) target_compile_definitions(c-library PRIVATE CIVETWEB_DLL_EXPORTS) + endif() endif() target_include_directories( c-library PUBLIC @@ -273,14 +278,18 @@ endif() # The C++ API library if (CIVETWEB_ENABLE_CXX) add_library(civetweb-cpp CivetServer.cpp) + + set_property(TARGET civetweb-cpp PROPERTY POSITION_INDEPENDENT_CODE ON) set_target_properties(civetweb-cpp PROPERTIES OUTPUT_NAME "civetweb-cpp" VERSION ${CIVETWEB_VERSION} SOVERSION ${CIVETWEB_VERSION} ) if (BUILD_SHARED_LIBS) - target_compile_definitions(civetweb-cpp PRIVATE CIVETWEB_DLL_EXPORTS) - endif() +if (APPLE) + target_compile_definitions(civetweb-cpp PRIVATE CIVETWEB_DLL_EXPORTS) + endif() + endif() target_include_directories( civetweb-cpp PUBLIC ${PROJECT_SOURCE_DIR}/include) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/thirdparty/date/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/date/CMakeLists.txt b/thirdparty/date/CMakeLists.txt index 683ddce..54382a0 100644 --- a/thirdparty/date/CMakeLists.txt +++ b/thirdparty/date/CMakeLists.txt @@ -11,7 +11,7 @@ set( CMAKE_CXX_STANDARD 14 CACHE STRING "The C++ standard whose features are req option( USE_SYSTEM_TZ_DB "Use the operating system's timezone database" OFF ) option( USE_TZ_DB_IN_DOT "Save the timezone database in the current folder" OFF ) -option( BUILD_SHARED_LIBS "Build a shared version of library" OFF ) +option( BUILD_SHARED_LIBS "Build a shared version of library" ON ) option( ENABLE_DATE_TESTING "Enable unit tests" ON ) function( print_option OPT ) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/thirdparty/uuid/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/uuid/CMakeLists.txt b/thirdparty/uuid/CMakeLists.txt index 323c1b9..2efaf42 100644 --- a/thirdparty/uuid/CMakeLists.txt +++ b/thirdparty/uuid/CMakeLists.txt @@ -36,3 +36,20 @@ add_library( unparse.c uuid_time.c ) + + +add_library( + uuid-shared + clear.c + compare.c + copy.c + gen_uuid.c + isnull.c + pack.c + parse.c + unpack.c + unparse.c + uuid_time.c +) + +set_property(TARGET uuid-shared PROPERTY POSITION_INDEPENDENT_CODE ON) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5d5a5622/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt index 11f9182..f4239a4 100644 --- a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt +++ b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt @@ -46,6 +46,8 @@ option(YAML_CPP_BUILD_CONTRIB "Enable contrib stuff in library" ON) # http://www.cmake.org/cmake/help/cmake2.6docs.html#command:add_library option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF) + + # --> Apple option(APPLE_UNIVERSAL_BIN "Apple: Build universal binary" OFF)
