Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 1f2e60061 -> 4d20f840b
MINIFICPP-648 - add processor and add processor with linkage nomenclature is confusing Fasten C API by eliminitating some allocations and indirections C API: add a method to extract flow file content Provide API to invoke with user data or file This closes #432. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/4d20f840 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/4d20f840 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/4d20f840 Branch: refs/heads/master Commit: 4d20f840bb4ffb595c303a5c85af88cbf96d797d Parents: 1f2e600 Author: Arpad Boda <[email protected]> Authored: Fri Oct 26 14:50:55 2018 +0200 Committer: Marc Parisi <[email protected]> Committed: Tue Nov 13 11:22:21 2018 -0500 ---------------------------------------------------------------------- libminifi/CMakeLists.txt | 9 +- nanofi/examples/terminate_handler.c | 2 +- nanofi/include/api/nanofi.h | 24 ++- nanofi/include/core/cstructs.h | 17 +- nanofi/include/core/cxxstructs.h | 41 ++++ nanofi/include/cxx/CallbackProcessor.h | 4 +- nanofi/include/cxx/Plan.h | 37 +++- nanofi/src/api/nanofi.cpp | 290 ++++++++++++++++++---------- nanofi/src/cxx/CallbackProcessor.cpp | 5 +- nanofi/src/cxx/Plan.cpp | 28 +-- nanofi/tests/CAPITests.cpp | 184 ++++++++++++++---- 11 files changed, 467 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index f6cc302..41c8063 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -141,11 +141,14 @@ endif() SET (LIBMINIFI core-minifi PARENT_SCOPE) if (ENABLE_PYTHON) -if (NOT APPLE) #### shared add_library(core-minifi-shared SHARED ${SOURCES}) -target_link_libraries(core-minifi-shared ${CMAKE_DL_LIBS} uuid-shared yaml-cpp) +if (APPLE) + target_link_libraries(core-minifi-shared ${CMAKE_DL_LIBS} yaml-cpp) +else() + target_link_libraries(core-minifi-shared ${CMAKE_DL_LIBS} uuid-shared yaml-cpp) +endif() find_package(ZLIB REQUIRED) include_directories(${ZLIB_INCLUDE_DIRS}) @@ -175,5 +178,5 @@ endif() set_property(TARGET core-minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) set_property(TARGET minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) -endif() +#endif() endif(ENABLE_PYTHON) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/examples/terminate_handler.c ---------------------------------------------------------------------- diff --git a/nanofi/examples/terminate_handler.c b/nanofi/examples/terminate_handler.c index d5443f0..1d5150d 100644 --- a/nanofi/examples/terminate_handler.c +++ b/nanofi/examples/terminate_handler.c @@ -42,7 +42,7 @@ int main(int argc, char **argv) { flow *new_flow = create_flow(instance, "GenerateFlowFile"); - processor *put_proc = add_processor_with_linkage(new_flow, "PutFile"); + processor *put_proc = add_processor(new_flow, "PutFile"); // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship // As it doesn't exist, an exception is thrown http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/api/nanofi.h ---------------------------------------------------------------------- diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h index 31a4829..e25a3a0 100644 --- a/nanofi/include/api/nanofi.h +++ b/nanofi/include/api/nanofi.h @@ -79,10 +79,12 @@ flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); processor *add_processor(flow *, const char *); -processor *add_processor_with_linkage(flow *flow, const char *processor_name); - processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); +standalone_processor *create_processor(const char *); + +void free_standalone_processor(standalone_processor*); + /** * Register your callback to received flow files that the flow failed to process * The flow file ownership is transferred to the caller! @@ -101,6 +103,8 @@ int set_failure_strategy(flow *flow, FailureStrategy strategy); int set_property(processor *, const char *, const char *); +int set_standalone_property(standalone_processor*, const char*, const char *); + int set_instance_property(nifi_instance *instance, const char*, const char *); int free_flow(flow *); @@ -111,6 +115,14 @@ size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t); flow_file_record *get(nifi_instance *,flow *, processor_session *); +flow_file_record *invoke(standalone_processor* proc); + +flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record *input_ff); + +flow_file_record *invoke_file(standalone_processor* proc, const char* path); + +flow_file_record *invoke_chunck(standalone_processor* proc, uint8_t* buf, uint64_t); + int transfer(processor_session* session, flow *flow, const char *rel); /** @@ -135,6 +147,14 @@ int get_attribute_qty(const flow_file_record* ff); int get_all_attributes(const flow_file_record* ff, attribute_set *target); +/** + * reads the content of a flow file + * @param target reference in which will set the result + * @param size max number of bytes to read (use flow_file_record->size to get the whole content) + * @return resulting read size (<=size) + **/ +int get_content(const flow_file_record* ff, uint8_t* target, int size); + uint8_t remove_attribute(flow_file_record*, char *key); /**** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/core/cstructs.h ---------------------------------------------------------------------- diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h index bc9700e..e493166 100644 --- a/nanofi/include/core/cstructs.h +++ b/nanofi/include/core/cstructs.h @@ -19,6 +19,9 @@ #ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ #define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ +#include <stddef.h> +#include <stdint.h> + /** * NiFi Port struct */ @@ -62,13 +65,11 @@ typedef struct { * ################################################################## */ -typedef struct { - void *processor_ptr; -} processor; +typedef struct processor processor; -typedef struct { - void *session; -} processor_session; +typedef struct standalone_processor standalone_processor; + +typedef struct processor_session processor_session; /**** * ################################################################## @@ -106,9 +107,7 @@ typedef struct { } flow_file_record; -typedef struct { - void *plan; -} flow; +typedef struct flow flow; typedef enum FS { AS_IS, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/core/cxxstructs.h ---------------------------------------------------------------------- diff --git a/nanofi/include/core/cxxstructs.h b/nanofi/include/core/cxxstructs.h new file mode 100644 index 0000000..dfa327c --- /dev/null +++ b/nanofi/include/core/cxxstructs.h @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NIFI_MINIFI_CPP_CXXSTRUCTS_H +#define NIFI_MINIFI_CPP_CXXSTRUCTS_H + +#include "cstructs.h" +#include "cxx/Plan.h" + +struct flow : public ExecutionPlan { + using ExecutionPlan::ExecutionPlan; +}; + +struct standalone_processor : public core::Processor { + using core::Processor::Processor; +}; + +struct processor : public core::Processor { + using core::Processor::Processor; +}; + +struct processor_session : public core::ProcessSession { + using core::ProcessSession::ProcessSession; +}; + +#endif //NIFI_MINIFI_CPP_CXXSTRUCTS_H http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/cxx/CallbackProcessor.h ---------------------------------------------------------------------- diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h index 61e824f..7cfcaf2 100644 --- a/nanofi/include/cxx/CallbackProcessor.h +++ b/nanofi/include/cxx/CallbackProcessor.h @@ -65,7 +65,7 @@ class CallbackProcessor : public core::Processor { public: - void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) { + void setCallback(void *obj,std::function<void(core::ProcessSession*)> ontrigger_callback) { objref_ = obj; callback_ = ontrigger_callback; } @@ -82,7 +82,7 @@ class CallbackProcessor : public core::Processor { protected: void *objref_; - std::function<void(processor_session*)> callback_; + std::function<void(core::ProcessSession*)> callback_; private: // Logger std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/cxx/Plan.h ---------------------------------------------------------------------- diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h index 6171242..e2cb827 100644 --- a/nanofi/include/cxx/Plan.h +++ b/nanofi/include/cxx/Plan.h @@ -44,12 +44,16 @@ #include "core/ProcessSession.h" #include "core/ProcessorNode.h" #include "core/reporting/SiteToSiteProvenanceReportingTask.h" -#include "core/cstructs.h" #include "api/nanofi.h" using failure_callback_type = std::function<void(flow_file_record*)>; using content_repo_sptr = std::shared_ptr<core::ContentRepository>; +struct flowfile_input_params { + std::shared_ptr<minifi::io::DataStream> content_stream; + std::map<std::string, std::string> attributes; +}; + namespace { void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) { @@ -88,7 +92,7 @@ class ExecutionPlan { explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo); - std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>); + std::shared_ptr<core::Processor> addCallback(void *, std::function<void(core::ProcessSession*)>); std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), @@ -101,7 +105,7 @@ class ExecutionPlan { void reset(); - bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr); + bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr, std::shared_ptr<flowfile_input_params> = nullptr); bool setFailureCallback(failure_callback_type onerror_callback); @@ -133,8 +137,29 @@ class ExecutionPlan { next_ff_ = ptr; } + bool hasProcessor() { + return !processor_queue_.empty(); + } + static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name); + static std::shared_ptr<ExecutionPlan> getPlan(const std::string& uuid) { + auto it = proc_plan_map_.find(uuid); + return it != proc_plan_map_.end() ? it->second : nullptr; + } + + static void addProcessorWithPlan(const std::string &uuid, std::shared_ptr<ExecutionPlan> plan) { + proc_plan_map_[uuid] = plan; + } + + static bool removeProcWithPlan(const std::string& uuid) { + return proc_plan_map_.erase(uuid) > 0; + } + + static size_t getProcWithPlanQty() { + return proc_plan_map_.size(); + } + protected: class FailureHandler { public: @@ -149,9 +174,8 @@ class ExecutionPlan { void setStrategy(FailureStrategy strat) { strategy_ = strat; } - void operator()(const processor_session* ps) { - auto ses = static_cast<core::ProcessSession*>(ps->session); - FailureStrategies.at(strategy_)(ses, callback_, content_repo_); + void operator()(core::ProcessSession* ps) { + FailureStrategies.at(strategy_)(ps, callback_, content_repo_); } private: failure_callback_type callback_; @@ -199,6 +223,7 @@ class ExecutionPlan { static std::shared_ptr<utils::IdGenerator> id_generator_; std::shared_ptr<logging::Logger> logger_; std::shared_ptr<FailureHandler> failure_handler_; + static std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> proc_plan_map_; }; #endif /* LIBMINIFI_CAPI_PLAN_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/src/api/nanofi.cpp ---------------------------------------------------------------------- diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index ee33c6b..7a06bfc 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -20,6 +20,7 @@ #include <memory> #include <utility> #include <exception> +#include <stdio.h> #include "api/nanofi.h" #include "core/Core.h" @@ -30,6 +31,8 @@ #include "processors/GetFile.h" #include "core/logging/LoggerConfiguration.h" #include "utils/StringUtils.h" +#include "io/DataStream.h" +#include "core/cxxstructs.h" using string_map = std::map<std::string, std::string>; @@ -40,6 +43,8 @@ class API_INITIALIZER { int API_INITIALIZER::initialized = initialize_api(); +static nifi_instance* standalone_instance = nullptr; + int initialize_api() { logging::LoggerConfiguration::getConfiguration().disableLogging(); return 1; @@ -89,6 +94,39 @@ nifi_instance *create_instance(const char *url, nifi_port *port) { return instance; } +standalone_processor *create_processor(const char *name) { + static int proc_counter = 0; + auto ptr = ExecutionPlan::createProcessor(name, name); + if (!ptr) { + return nullptr; + } + if (standalone_instance == nullptr) { + nifi_port port; + char portnum[] = "98765"; + port.port_id = portnum; + standalone_instance = create_instance("internal_standalone", &port); + } + std::string flow_name = std::to_string(proc_counter++); + auto flow = create_flow(standalone_instance, flow_name.c_str()); + std::shared_ptr<ExecutionPlan> plan(flow); + plan->addProcessor(ptr, name); + ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan); + return static_cast<standalone_processor*>(ptr.get()); +} + +void free_standalone_processor(standalone_processor* proc) { + if (proc == nullptr) { + return; + } + ExecutionPlan::removeProcWithPlan(proc->getUUIDStr()); + + if (ExecutionPlan::getProcWithPlanQty() == 0) { + // The instance is not needed any more as there are no standalone processors in the system + free_instance(standalone_instance); + standalone_instance = nullptr; + } +} + /** * Initializes the instance */ @@ -288,6 +326,16 @@ uint8_t remove_attribute(flow_file_record *ff, const char *key) { return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1) } +int get_content(const flow_file_record* ff, uint8_t* target, int size) { + if (ff == nullptr || target == nullptr || size == 0) { + return 0; + } + auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp); + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo); + auto stream = (*content_repo)->read(claim); + return stream->read(target, size); +} + /** * Transmits the flowfile * @param ff flow file record @@ -323,13 +371,11 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { flow * create_new_flow(nifi_instance * instance) { auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - flow *new_flow = (flow*) malloc(sizeof(flow)); - - auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); - - new_flow->plan = execution_plan; - - return new_flow; + flow * area = static_cast<flow*>(malloc(1*sizeof(flow))); + if(area == nullptr) { + return nullptr; + } + return new(area) flow(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); } flow *create_flow(nifi_instance *instance, const char *first_processor) { @@ -337,41 +383,40 @@ flow *create_flow(nifi_instance *instance, const char *first_processor) { return nullptr; } auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - flow *new_flow = (flow*) malloc(sizeof(flow)); - - auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); - - new_flow->plan = execution_plan; + flow * area = static_cast<flow*>(malloc(1*sizeof(flow))); + if(area == nullptr) { + return nullptr; + } + flow *new_flow = new(area) flow(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); if (first_processor != nullptr && strlen(first_processor) > 0) { // automatically adds it with success - execution_plan->addProcessor(first_processor, first_processor); + new_flow->addProcessor(first_processor, first_processor); } return new_flow; } processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) { - if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) { + if (nullptr == flow || nullptr == ontrigger_callback) { return nullptr; } - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1)); - processor *new_processor = (processor*) malloc(sizeof(processor)); - new_processor->processor_ptr = proc.get(); - return new_processor; + auto lambda = [ontrigger_callback](core::ProcessSession *ps) { + ontrigger_callback(static_cast<processor_session*>(ps)); //Meh, sorry for this + }; + auto proc = flow->addCallback(nullptr, lambda); + return static_cast<processor*>(proc.get()); } flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfig * c) { static const std::string first_processor = "GetFile"; flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow; - ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan); // automatically adds it with success - auto getFile = plan->addProcessor(first_processor, first_processor); + auto getFile = new_flow->addProcessor(first_processor, first_processor); - plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory); - plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false"); - plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false"); + new_flow->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory); + new_flow->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false"); + new_flow->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false"); return new_flow; } @@ -380,89 +425,83 @@ processor *add_processor(flow *flow, const char *processor_name) { if (nullptr == flow || nullptr == processor_name) { return nullptr; } - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - auto proc = plan->addProcessor(processor_name, processor_name); - if (proc) { - processor *new_processor = (processor*) malloc(sizeof(processor)); - new_processor->processor_ptr = proc.get(); - return new_processor; - } - return nullptr; -} -processor *add_processor_with_linkage(flow *flow, const char *processor_name) { - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true); - if (proc) { - processor *new_processor = (processor*) malloc(sizeof(processor)); - new_processor->processor_ptr = proc.get(); - return new_processor; - } - return nullptr; + auto proc = flow->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), flow->hasProcessor()); + return static_cast<processor*>(proc.get()); } int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) { - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - return plan->setFailureCallback(onerror_callback) ? 0 : 1; + return flow->setFailureCallback(onerror_callback) ? 0 : 1; } int set_failure_strategy(flow *flow, FailureStrategy strategy) { - return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1; + return flow->setFailureStrategy(strategy) ? 0 : -1; } -int set_property(processor *proc, const char *name, const char *value) { - if (name != nullptr && value != nullptr && proc != nullptr) { - core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr); - bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value)); +int set_propery_internal(core::Processor* proc, const char *name, const char *value) { + if (name != nullptr && value != nullptr) { + bool success = proc->setProperty(name, value) || (proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value)); return success ? 0 : -2; } return -1; } +int set_property(processor *proc, const char *name, const char *value) { + if (proc != nullptr) { + return set_propery_internal(proc, name, value); + } + return -1; +} + +int set_standalone_property(standalone_processor *proc, const char *name, const char *value) { + if (proc != nullptr) { + return set_propery_internal(proc, name, value); + } + return -1; +} + int free_flow(flow *flow) { - if (flow == nullptr || nullptr == flow->plan) + if (flow == nullptr) return -1; - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - delete execution_plan; - free(flow); + delete flow; return 0; } -flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) { - if (instance == nullptr || nullptr == flow || nullptr == flow->plan) - return nullptr; - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - execution_plan->reset(); - while (execution_plan->runNextProcessor()) { - } - auto ff = execution_plan->getCurrentFlowFile(); +flow_file_record* flowfile_to_record(std::shared_ptr<core::FlowFile> ff, ExecutionPlan* plan) { if (ff == nullptr) { return nullptr; } auto claim = ff->getResourceClaim(); + if(claim == nullptr) { + return nullptr; + } + + // create a flow file. + claim->increaseFlowFileRecordOwnedCount(); + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->ffp = ff.get(); + ffr->attributes = ff->getAttributesPtr(); + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + *content_repo_ptr = plan->getContentRepo(); + return ffr; +} - if (claim != nullptr) { - // create a flow file. - claim->increaseFlowFileRecordOwnedCount(); - auto path = claim->getContentFullPath(); - auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); - ffr->ffp = ff.get(); - ffr->attributes = ff->getAttributesPtr(); - auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); - *content_repo_ptr = execution_plan->getContentRepo(); - return ffr; - } else { +flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) { + if (instance == nullptr || nullptr == flow) return nullptr; + flow->reset(); + while (flow->runNextProcessor()) { } + return flowfile_to_record(flow->getCurrentFlowFile(), flow); } size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) { if (nullptr == instance || nullptr == flow || nullptr == ff_r) return 0; - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); size_t i = 0; for (; i < size; i++) { - execution_plan->reset(); + flow->reset(); auto ffr = get_next_flow_file(instance, flow); if (ffr == nullptr) { break; @@ -475,44 +514,99 @@ size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff flow_file_record * get(nifi_instance * instance, flow * flow, processor_session * session) { if (nullptr == instance || nullptr == flow || nullptr == session) return nullptr; - auto sesh = static_cast<core::ProcessSession*>(session->session); - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - auto ff = sesh->get(); - execution_plan->setNextFlowFile(ff); - if (ff == nullptr) { + auto ff = session->get(); + flow->setNextFlowFile(ff); + return flowfile_to_record(ff, flow); +} + +flow_file_record *invoke(standalone_processor* proc) { + return invoke_ff(proc, nullptr); +} + + +flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record *input_ff) { + if (proc == nullptr) { return nullptr; } - auto claim = ff->getResourceClaim(); + auto plan = ExecutionPlan::getPlan(proc->getUUIDStr()); + if (!plan) { + // This is not a standalone processor, shouldn't be used with invoke! + return nullptr; + } + + plan->reset(); + + if (input_ff) { + auto ff_data = std::make_shared<flowfile_input_params>(); + auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository> *>(input_ff->crp); + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation, + *content_repo); + ff_data->content_stream = (*content_repo)->read(claim); + ff_data->attributes = *static_cast<std::map<std::string, std::string> *>(input_ff->attributes); + + plan->runNextProcessor(nullptr, ff_data); + } + while (plan->runNextProcessor()) { + } + return flowfile_to_record(plan->getCurrentFlowFile(), plan.get()); +} + +flow_file_record *invoke_chunk(standalone_processor* proc, uint8_t* buf, uint64_t size) { + if (proc == nullptr || buf == nullptr || size == 0) { + return nullptr; + } + + auto plan = ExecutionPlan::getPlan(proc->getUUIDStr()); + if (!plan) { + // This is not a standalone processor, shouldn't be used with invoke! + return nullptr; + } + + plan->reset(); + + auto ff_data = std::make_shared<flowfile_input_params>(); + ff_data->content_stream = std::make_shared<minifi::io::DataStream>(); + ff_data->content_stream->writeData(buf, size); - if (claim != nullptr) { - // create a flow file. - claim->increaseFlowFileRecordOwnedCount(); - auto path = claim->getContentFullPath(); - auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); - ffr->attributes = ff->getAttributesPtr(); - ffr->ffp = ff.get(); - auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); - *content_repo_ptr = execution_plan->getContentRepo(); - return ffr; - } else { + plan->runNextProcessor(nullptr, ff_data); + while (plan->runNextProcessor()) { + } + + return flowfile_to_record(plan->getCurrentFlowFile(), plan.get()); +} + +flow_file_record *invoke_file(standalone_processor* proc, const char* path) { + FILE *fileptr; + uint8_t *buffer; + uint64_t filelen; + + fileptr = fopen(path, "rb"); + if (fileptr == nullptr) { return nullptr; } + fseek(fileptr, 0, SEEK_END); + filelen = ftell(fileptr); + rewind(fileptr); + + buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory for file + \0 + fread(buffer, filelen, 1, fileptr); + fclose(fileptr); + + flow_file_record* ffr = invoke_chunk(proc, buffer, filelen); + free(buffer); + return ffr; } int transfer(processor_session* session, flow *flow, const char *rel) { if (nullptr == session || nullptr == flow || rel == nullptr) { return -1; } - auto sesh = static_cast<core::ProcessSession*>(session->session); - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - if (nullptr == sesh || nullptr == execution_plan) { - return -1; - } + core::Relationship relationship(rel, rel); - auto ff = execution_plan->getNextFlowFile(); + auto ff = flow->getNextFlowFile(); if (nullptr == ff) { return -2; } - sesh->transfer(ff, relationship); + session->transfer(ff, relationship); return 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/src/cxx/CallbackProcessor.cpp ---------------------------------------------------------------------- diff --git a/nanofi/src/cxx/CallbackProcessor.cpp b/nanofi/src/cxx/CallbackProcessor.cpp index 5294a1b..013ec47 100644 --- a/nanofi/src/cxx/CallbackProcessor.cpp +++ b/nanofi/src/cxx/CallbackProcessor.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ #include "cxx/CallbackProcessor.h" +#include "core/cxxstructs.h" namespace org { namespace apache { namespace nifi { @@ -24,9 +25,7 @@ namespace processors { void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { if (callback_ != nullptr) { - processor_session sesh; - sesh.session = session; - callback_(&sesh); + callback_(session); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/src/cxx/Plan.cpp ---------------------------------------------------------------------- diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp index f892aa9..b2b4690 100644 --- a/nanofi/src/cxx/Plan.cpp +++ b/nanofi/src/cxx/Plan.cpp @@ -23,19 +23,8 @@ #include <set> #include <string> -bool intToFailureStragey(int in, FailureStrategy *out) { - auto tmp = static_cast<FailureStrategy>(in); - switch (tmp) { - case AS_IS: - case ROLLBACK: - *out = tmp; - return true; - default: - return false; - } -} - std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator(); +std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> ExecutionPlan::proc_plan_map_ = {}; ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) : content_repo_(content_repo), @@ -52,7 +41,7 @@ ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_re * Add a callback to obtain and pass processor session to a generated processor * */ -std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) { +std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(core::ProcessSession*)> fp) { if (finalized) { return nullptr; } @@ -144,7 +133,8 @@ void ExecutionPlan::reset() { } } -bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) { +bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify, + std::shared_ptr<flowfile_input_params> input_ff_params) { if (!finalized) { finalize(); } @@ -152,6 +142,7 @@ bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<co if (location >= processor_queue_.size()) { return false; } + std::shared_ptr<core::Processor> processor = processor_queue_[location]; std::shared_ptr<core::ProcessContext> context = processor_contexts_[location]; std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); @@ -162,6 +153,15 @@ bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<co } std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context); process_sessions_.push_back(current_session); + if (input_ff_params) { + std::shared_ptr<minifi::FlowFileRecord> flowFile = std::static_pointer_cast<minifi::FlowFileRecord>(current_session->create()); + for(const auto& kv : input_ff_params->attributes) { + flowFile->setAttribute(kv.first, kv.second); + } + current_session->importFrom(*(input_ff_params->content_stream.get()), flowFile); + current_session->transfer(flowFile, core::Relationship("success", "success")); + relationships_[relationships_.size()-1]->put(std::static_pointer_cast<core::FlowFile>(flowFile)); + } processor->incrementActiveTasks(); processor->setScheduledState(core::ScheduledState::RUNNING); if (verify != nullptr) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/tests/CAPITests.cpp ---------------------------------------------------------------------- diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index 65c52e1..54eae0e 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -31,6 +31,11 @@ #include <thread> #include "api/nanofi.h" +char src_format[] = "/tmp/gt.XXXXXX"; +char put_format[] = "/tmp/pt.XXXXXX"; +std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; +std::string test_file_name = "tstFile.ext"; + static nifi_instance *create_instance_obj(const char *name = "random_instance") { nifi_port port; char port_str[] = "12345"; @@ -51,6 +56,16 @@ void big_failure_counter(flow_file_record * fr) { free_flowfile(fr); } +std::string create_testfile_for_getfile(const char* sourcedir, const std::string& filename = test_file_name) { + std::fstream file; + std::stringstream ss; + ss << sourcedir << "/" << filename; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + return ss.str(); +} + TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); @@ -95,35 +110,27 @@ TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") { TEST_CASE("get file and put file", "[getAndPutFile]") { TestController testController; - char src_format[] = "/tmp/gt.XXXXXX"; - char put_format[] = "/tmp/pt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); const char *putfiledir = testController.createTempDirectory(put_format); - std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; auto instance = create_instance_obj(); REQUIRE(instance != nullptr); flow *test_flow = create_flow(instance, nullptr); REQUIRE(test_flow != nullptr); processor *get_proc = add_processor(test_flow, "GetFile"); REQUIRE(get_proc != nullptr); - processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); + processor *put_proc = add_processor(test_flow, "PutFile"); REQUIRE(put_proc != nullptr); REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0); - std::fstream file; - std::stringstream ss; - ss << sourcedir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); + create_testfile_for_getfile(sourcedir); flow_file_record *record = get_next_flow_file(instance, test_flow); REQUIRE(record != nullptr); - ss.str(""); + std::stringstream ss; - ss << putfiledir << "/" << "tstFile.ext"; + ss << putfiledir << "/" << test_file_name; std::ifstream t(ss.str()); std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>()); @@ -132,6 +139,14 @@ TEST_CASE("get file and put file", "[getAndPutFile]") { // No failure handler can be added after the flow is finalized REQUIRE(add_failure_callback(test_flow, failure_counter) == 1); + uint8_t* content = (uint8_t*)malloc(record->size* sizeof(uint8_t)); + + REQUIRE(get_content(record, content, record->size) == record->size); + + REQUIRE(test_file_content == std::string(reinterpret_cast<char*>(content), record->size)); + + free(content); + free_flowfile(record); free_flow(test_flow); @@ -142,17 +157,10 @@ TEST_CASE("get file and put file", "[getAndPutFile]") { TEST_CASE("Test manipulation of attributes", "[testAttributes]") { TestController testController; - char src_format[] = "/tmp/gt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); - std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; + create_testfile_for_getfile(sourcedir); - std::fstream file; - std::stringstream ss; - ss << sourcedir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); auto instance = create_instance_obj(); REQUIRE(instance != nullptr); flow *test_flow = create_flow(instance, nullptr); @@ -161,10 +169,10 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { processor *get_proc = add_processor(test_flow, "GetFile"); REQUIRE(get_proc != nullptr); REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); - processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText"); + processor *extract_test = add_processor(test_flow, "ExtractText"); REQUIRE(extract_test != nullptr); REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0); - processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute"); + processor *update_attr = add_processor(test_flow, "UpdateAttribute"); REQUIRE(update_attr != nullptr); REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0); @@ -224,9 +232,7 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { TEST_CASE("Test error handling callback", "[errorHandling]") { TestController testController; - char src_format[] = "/tmp/gt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); - std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; auto instance = create_instance_obj(); REQUIRE(instance != nullptr); @@ -239,19 +245,13 @@ TEST_CASE("Test error handling callback", "[errorHandling]") { processor *get_proc = add_processor(test_flow, "GetFile"); REQUIRE(get_proc != nullptr); - processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); + processor *put_proc = add_processor(test_flow, "PutFile"); REQUIRE(put_proc != nullptr); REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0); REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0); - std::fstream file; - std::stringstream ss; - - ss << sourcedir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); + create_testfile_for_getfile(sourcedir); REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); @@ -263,10 +263,7 @@ TEST_CASE("Test error handling callback", "[errorHandling]") { REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0); // Create new testfile to trigger failure again - ss << "2"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); + create_testfile_for_getfile(sourcedir, test_file_name + "2"); REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); REQUIRE(failure_count > 100); @@ -276,3 +273,118 @@ TEST_CASE("Test error handling callback", "[errorHandling]") { free_flow(test_flow); free_instance(instance); } + +TEST_CASE("Test standalone processors", "[testStandalone]") { + TestController testController; + + const char *sourcedir = testController.createTempDirectory(src_format); + + create_testfile_for_getfile(sourcedir); + + standalone_processor* getfile_proc = create_processor("GetFile"); + REQUIRE(set_standalone_property(getfile_proc, "Input Directory", sourcedir) == 0); + + flow_file_record* ffr = invoke(getfile_proc); + + REQUIRE(ffr != nullptr); + REQUIRE(get_attribute_qty(ffr) > 0); + + standalone_processor* extract_test = create_processor("ExtractText"); + REQUIRE(extract_test != nullptr); + REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0); + + flow_file_record* ffr2 = invoke_ff(extract_test, ffr); + + free_flowfile(ffr); + + // Verify the transfer of attributes + REQUIRE(ffr2 != nullptr); + REQUIRE(get_attribute_qty(ffr2) > 0); + + char filename_key[] = "filename"; + attribute attr; + attr.key = filename_key; + attr.value_size = 0; + + REQUIRE(get_attribute(ffr2, &attr) == 0); + REQUIRE(attr.value_size > 0); + + // Verify extracttext behavior + char test_attr[] = "TestAttr"; + attr.key = test_attr; + attr.value_size = 0; + REQUIRE(get_attribute(ffr2, &attr) == 0); + REQUIRE(std::string(static_cast<char*>(attr.value), attr.value_size) == test_file_content); + + free_flowfile(ffr2); + free_standalone_processor(getfile_proc); +} + +TEST_CASE("Test interaction of flow and standlone processors", "[testStandaloneWithFlow]") { + TestController testController; + + const char *sourcedir = testController.createTempDirectory(src_format); + const char *putfiledir = testController.createTempDirectory(put_format); + + create_testfile_for_getfile(sourcedir); + + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + + flow_file_record *record = get_next_flow_file(instance, test_flow); + REQUIRE(record != nullptr); + + standalone_processor* putfile_proc = create_processor("PutFile"); + REQUIRE(set_standalone_property(putfile_proc, "Directory", putfiledir) == 0); + + flow_file_record* put_record = invoke_ff(putfile_proc, record); + REQUIRE(put_record != nullptr); + + free_flowfile(record); + free_flowfile(put_record); + + std::stringstream ss; + + ss << putfiledir << "/" << test_file_name; + std::ifstream t(ss.str()); + std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>()); + + REQUIRE(test_file_content == put_data); + + free_flow(test_flow); + free_instance(instance); + free_standalone_processor(putfile_proc); +} + +TEST_CASE("Test standalone processors with file input", "[testStandaloneWithFile]") { + TestController testController; + + enable_logging(); + + const char *sourcedir = testController.createTempDirectory(src_format); + std::string path = create_testfile_for_getfile(sourcedir); + + standalone_processor* extract_test = create_processor("ExtractText"); + REQUIRE(extract_test != nullptr); + REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0); + + flow_file_record* ffr = invoke_file(extract_test, path.c_str()); + + REQUIRE(ffr != nullptr); + + attribute attr; + char test_attr[] = "TestAttr"; + attr.key = test_attr; + attr.value_size = 0; + REQUIRE(get_attribute(ffr, &attr) == 0); + REQUIRE(std::string(static_cast<char*>(attr.value), attr.value_size) == test_file_content); + + free_flowfile(ffr); + free_standalone_processor(extract_test); +}
