Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 4daa833fd -> 1a50a4ab0
MINIFICPP-641 - C API: add support to register failure callback This closes #421. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/1a50a4ab Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/1a50a4ab Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/1a50a4ab Branch: refs/heads/master Commit: 1a50a4ab03d522352a953f2e7ce9cb872ccf0738 Parents: 4daa833 Author: Arpad Boda <[email protected]> Authored: Tue Oct 9 17:56:38 2018 +0200 Committer: Marc Parisi <[email protected]> Committed: Wed Oct 24 08:01:09 2018 -0400 ---------------------------------------------------------------------- libminifi/include/capi/Plan.h | 50 ++++++++- libminifi/include/capi/api.h | 7 ++ .../include/processors/CallbackProcessor.h | 4 +- libminifi/src/capi/Plan.cpp | 109 +++++++++++-------- libminifi/src/capi/api.cpp | 8 +- libminifi/src/core/ProcessSession.cpp | 2 +- libminifi/test/CPPLINT.cfg | 2 +- libminifi/test/capi/CAPITests.cpp | 69 +++++++++++- 8 files changed, 194 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/capi/Plan.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h index 4afcd18..08ad68a 100644 --- a/libminifi/include/capi/Plan.h +++ b/libminifi/include/capi/Plan.h @@ -44,13 +44,15 @@ #include "core/ProcessorNode.h" #include "core/reporting/SiteToSiteProvenanceReportingTask.h" #include "capi/cstructs.h" +#include "capi/api.h" + class ExecutionPlan { public: explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo); - std::shared_ptr<core::Processor> addCallback(void *, void (*fp)(processor_session *)); + std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>); std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), @@ -65,6 +67,8 @@ class ExecutionPlan { bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr); + bool setFailureCallback(void (*onerror_callback)(const flow_file_record*)); + std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords(); std::shared_ptr<core::FlowFile> getCurrentFlowFile(); @@ -83,8 +87,6 @@ class ExecutionPlan { return content_repo_; } - static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name); - std::shared_ptr<core::FlowFile> getNextFlowFile(){ return next_ff_; } @@ -93,11 +95,50 @@ class ExecutionPlan { next_ff_ = ptr; } + static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name); + protected: + class FailureHandler { + public: + FailureHandler() { + callback_ = nullptr; + } + void setCallback(void (*onerror_callback)(const flow_file_record*)) { + callback_=onerror_callback; + } + void operator()(const processor_session* ps) + { + auto ses = static_cast<core::ProcessSession*>(ps->session); + + auto ff = ses->get(); + if (ff == nullptr) { + return; + } + auto claim = ff->getResourceClaim(); + + if (claim != nullptr && callback_ != nullptr) { + // create a flow file. + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->attributes = ff->getAttributesPtr(); + ffr->ffp = ff.get(); + callback_(ffr); + } + // This deletes the content of the flowfile as ff gets out of scope + // It's the users responsibility to copy all the data + ses->remove(ff); + + } + private: + void (*callback_)(const flow_file_record*); + }; void finalize(); - std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false); + std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false); + + std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc, + core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false); std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory; @@ -131,6 +172,7 @@ class ExecutionPlan { static std::shared_ptr<utils::IdGenerator> id_generator_; std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<FailureHandler> failure_handler_; }; #endif /* LIBMINIFI_CAPI_PLAN_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/capi/api.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h index 94b05fa..2b3622d 100644 --- a/libminifi/include/capi/api.h +++ b/libminifi/include/capi/api.h @@ -79,6 +79,13 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name); processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); +/** +* Register your callback to received flow files that the flow failed to process +* The flow file is deleted after the callback is executed, make sure to copy all the data you need! +* The first callback should be registered before the flow is used. Can be changed later during runtime. +*/ +int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)); + int set_property(processor *, const char *, const char *); int set_instance_property(nifi_instance *instance, const char*, const char *); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/include/processors/CallbackProcessor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/CallbackProcessor.h b/libminifi/include/processors/CallbackProcessor.h index 879f49c..801c99c 100644 --- a/libminifi/include/processors/CallbackProcessor.h +++ b/libminifi/include/processors/CallbackProcessor.h @@ -65,7 +65,7 @@ class CallbackProcessor : public core::Processor { public: - void setCallback(void *obj, void (*ontrigger_callback)(processor_session *)) { + void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) { objref_ = obj; callback_ = ontrigger_callback; } @@ -82,7 +82,7 @@ class CallbackProcessor : public core::Processor { protected: void *objref_; - void (*callback_)(processor_session*); + std::function<void(processor_session*)> callback_; private: // Logger std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index 691be41..0abd63b 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -40,21 +40,17 @@ ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_re * Add a callback to obtain and pass processor session to a generated processor * */ -std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, void (*fp)(processor_session *)) { +std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) { if (finalized) { return nullptr; } - utils::Identifier uuid; - id_generator_->generate(uuid); + auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor"); + if (!ptr) + return nullptr; - auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate("CallbackProcessor", uuid); - if (nullptr == ptr) { - throw std::exception(); - } std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr); processor->setCallback(obj, fp); - processor->setName("CallbackProcessor"); return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true); } @@ -99,25 +95,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p termination_ = relationship; } - std::stringstream connection_name; - connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); - connection->setRelationship(relationship); - - // link the connections so that we can test results at the end for this - connection->setSource(last); - connection->setDestination(processor); - - utils::Identifier uuid_copy, uuid_copy_next; - last->getUUID(uuid_copy); - connection->setSourceUUID(uuid_copy); - processor->getUUID(uuid_copy_next); - connection->setDestinationUUID(uuid_copy_next); - last->addConnection(connection); - if (last != processor) { - processor->addConnection(connection); - } - relationships_.push_back(connection); + relationships_.push_back(connectProcessors(last, processor, relationship, true)); } std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); @@ -202,29 +180,31 @@ std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() { return current_session_; } -std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) { - std::stringstream connection_name; - std::shared_ptr<core::Processor> last = processor; - connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); - connection->setRelationship(termination_); +std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) { + return connectProcessors(processor, processor, termination_, set_dst); +} - // link the connections so that we can test results at the end for this - connection->setSource(last); - if (setDest) - connection->setDestination(processor); +void ExecutionPlan::finalize() { + if (failure_handler_) { + auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor"); - utils::Identifier uuid_copy; - last->getUUID(uuid_copy); - connection->setSourceUUID(uuid_copy); - if (setDest) - connection->setDestinationUUID(uuid_copy); + std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc); + callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1)); - processor->addConnection(connection); - return connection; -} + for (const auto& proc : processor_queue_) { + relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true)); + } + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc); + + processor_nodes_.push_back(node); + + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_); + processor_contexts_.push_back(context); + + processor_queue_.push_back(failure_proc); + } -void ExecutionPlan::finalize() { if (relationships_.size() > 0) { relationships_.push_back(buildFinalConnection(processor_queue_.back())); } else { @@ -250,3 +230,40 @@ std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::strin return processor; } +std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc, + core::Relationship relationship, bool set_dst) { + std::stringstream connection_name; + connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr(); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); + connection->setRelationship(relationship); + + // link the connections so that we can test results at the end for this + connection->setSource(src_proc); + + utils::Identifier uuid_copy, uuid_copy_next; + src_proc->getUUID(uuid_copy); + connection->setSourceUUID(uuid_copy); + if (set_dst) { + connection->setDestination(dst_proc); + dst_proc->getUUID(uuid_copy_next); + connection->setDestinationUUID(uuid_copy_next); + if (src_proc != dst_proc) { + dst_proc->addConnection(connection); + } + } + src_proc->addConnection(connection); + + return connection; +} + +bool ExecutionPlan::setFailureCallback(void (*onerror_callback)(const flow_file_record*)) { + if (finalized && !failure_handler_) { + return false; // Already finalized the flow without failure handler processor + } + if (!failure_handler_) { + failure_handler_ = std::make_shared<FailureHandler>(); + } + failure_handler_->setCallback(onerror_callback); + return true; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/capi/api.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp index 6bec57c..66ed19b 100644 --- a/libminifi/src/capi/api.cpp +++ b/libminifi/src/capi/api.cpp @@ -32,6 +32,7 @@ using string_map = std::map<std::string, std::string>; class API_INITIALIZER { + public: static int initialized; }; @@ -353,7 +354,7 @@ processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor return nullptr; } ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - auto proc = plan->addCallback(nullptr, ontrigger_callback); + auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1)); processor *new_processor = new processor(); new_processor->processor_ptr = proc.get(); return new_processor; @@ -399,6 +400,11 @@ processor *add_processor_with_linkage(flow *flow, const char *processor_name) { return nullptr; } +int add_failure_callback(flow *flow, void (*onerror_callback)(const flow_file_record*)) { + ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); + return plan->setFailureCallback(onerror_callback) ? 0 : 1; +} + int set_property(processor *proc, const char *name, const char *value) { if (name != nullptr && value != nullptr && proc != nullptr) { core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index dc45446..6981bce 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -703,7 +703,7 @@ void ProcessSession::commit() { // No connection if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) { // Not autoterminate, we should have the connect - std::string message = "Connect empty for non auto terminated relationship" + relationship.getName(); + std::string message = "Connect empty for non auto terminated relationship " + relationship.getName(); throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); } else { // Autoterminated http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/test/CPPLINT.cfg ---------------------------------------------------------------------- diff --git a/libminifi/test/CPPLINT.cfg b/libminifi/test/CPPLINT.cfg index a1e22ad..7df4c76 100644 --- a/libminifi/test/CPPLINT.cfg +++ b/libminifi/test/CPPLINT.cfg @@ -1,4 +1,4 @@ set noparent filter=-build/include_order,-build/include_alpha exclude_files=Server.cpp -exclude_files=TestBase.cpp \ No newline at end of file +exclude_files=TestBase.cpp http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/1a50a4ab/libminifi/test/capi/CAPITests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp index 1739e1b..08214ac 100644 --- a/libminifi/test/capi/CAPITests.cpp +++ b/libminifi/test/capi/CAPITests.cpp @@ -38,6 +38,17 @@ static nifi_instance *create_instance_obj(const char *name = "random_instance") return create_instance("random_instance", &port); } +static int failure_count = 0; + +void failure_counter(const flow_file_record * fr) { + failure_count++; + REQUIRE(get_attribute_qty(fr) > 0); +} + +void big_failure_counter(const flow_file_record * fr) { + failure_count += 100; +} + TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { auto instance = create_instance_obj(); REQUIRE(instance != nullptr); @@ -116,6 +127,9 @@ TEST_CASE("get file and put file", "[getAndPutFile]") { REQUIRE(test_file_content == put_data); + // No failure handler can be added after the flow is finalized + REQUIRE(add_failure_callback(test_flow, failure_counter) == 1); + free_flowfile(record); free_flow(test_flow); @@ -126,8 +140,6 @@ TEST_CASE("get file and put file", "[getAndPutFile]") { TEST_CASE("Test manipulation of attributes", "[testAttributes]") { TestController testController; - enable_logging(); - char src_format[] = "/tmp/gt.XXXXXX"; const char *sourcedir = testController.createTempDirectory(src_format); @@ -201,3 +213,56 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { free_flow(test_flow); free_instance(instance); } + +TEST_CASE("Test error handling callback", "[errorHandling]") { + TestController testController; + + char src_format[] = "/tmp/gt.XXXXXX"; + const char *sourcedir = testController.createTempDirectory(src_format); + std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; + + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + + REQUIRE(add_failure_callback(test_flow, failure_counter) == 0); + + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); + REQUIRE(put_proc != nullptr); + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0); + REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0); + + std::fstream file; + std::stringstream ss; + + ss << sourcedir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + + + REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); + + REQUIRE(failure_count == 1); + + // Failure handler function can be replaced runtime + REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0); + + // Create new testfile to trigger failure again + ss << "2"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + + REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); + REQUIRE(failure_count > 100); + + failure_count = 0; + + free_flow(test_flow); + free_instance(instance); +}
