http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/capi/api.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp deleted file mode 100644 index e135fe1..0000000 --- a/libminifi/src/capi/api.cpp +++ /dev/null @@ -1,517 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <string> -#include <map> -#include <memory> -#include <utility> -#include <exception> -#include "core/Core.h" -#include "capi/api.h" -#include "capi/expect.h" -#include "capi/Instance.h" -#include "capi/Plan.h" -#include "ResourceClaim.h" -#include "processors/GetFile.h" -#include "core/logging/LoggerConfiguration.h" -#include "utils/StringUtils.h" - -using string_map = std::map<std::string, std::string>; - -class API_INITIALIZER { - public: - static int initialized; -}; - -int API_INITIALIZER::initialized = initialize_api(); - -int initialize_api() { - logging::LoggerConfiguration::getConfiguration().disableLogging(); - return 1; -} - -void enable_logging() { - logging::LoggerConfiguration::getConfiguration().enableLogging(); -} - -void set_terminate_callback(void (*terminate_callback)()) { - std::set_terminate(terminate_callback); -} - -class DirectoryConfiguration { - protected: - DirectoryConfiguration() { - minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY); - } - public: - static void initialize() { - static DirectoryConfiguration configure; - } -}; - -/** - * Creates a NiFi Instance from the url and output port. - * @param url http URL for NiFi instance - * @param port Remote output port. - * @Deprecated for API version 0.2 in favor of the following prototype - * nifi_instance *create_instance(nifi_port const *port) { - */ -nifi_instance *create_instance(const char *url, nifi_port *port) { - // make sure that we have a thread safe way of initializing the content directory - DirectoryConfiguration::initialize(); - - // need reinterpret cast until we move to C for this module. - nifi_instance *instance = reinterpret_cast<nifi_instance*>( malloc(sizeof(nifi_instance)) ); - /** - * This API will gradually move away from C++, hence malloc is used for nifi_instance - * Since minifi::Instance is currently being used, then we need to use new in that case. - */ - instance->instance_ptr = new minifi::Instance(url, port->port_id); - // may have to translate port ID here in the future - // need reinterpret cast until we move to C for this module. - instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1)); - snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id); - return instance; -} - -/** - * Initializes the instance - */ -void initialize_instance(nifi_instance *instance) { - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - minifi_instance_ref->setRemotePort(instance->port.port_id); -} -/* - typedef int c2_update_callback(char *); - - typedef int c2_stop_callback(char *); - - typedef int c2_start_callback(char *); - - */ -void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) { - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - minifi_instance_ref->enableAsyncC2(server, c1, c2, c3); -} - -/** - * Sets a property within the nifi instance - * @param instance nifi instance - * @param key key in which we will set the valiue - * @param value - * @return -1 when instance or key are null - */ -int set_instance_property(nifi_instance *instance, const char *key, const char *value) { - if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) { - return -1; - } - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - minifi_instance_ref->getConfiguration()->set(key, value); - return 0; -} - -/** - * Reclaims memory associated with a nifi instance structure. - * @param instance nifi instance. - */ -void free_instance(nifi_instance* instance) { - if (instance != nullptr) { - delete ((minifi::Instance*) instance->instance_ptr); - free(instance->port.port_id); - free(instance); - } -} - -/** - * Creates a flow file record - * @param file file to place into the flow file. - */ -flow_file_record* create_flowfile(const char *file, const size_t len) { - flow_file_record *new_ff = new flow_file_record; - new_ff->attributes = new string_map(); - new_ff->contentLocation = new char[len + 1]; - snprintf(new_ff->contentLocation, len + 1, "%s", file); - std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); - // set the size of the flow file. - new_ff->size = in.tellg(); - return new_ff; -} - -/** - * Creates a flow file record - * @param file file to place into the flow file. - */ -flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) { - if (nullptr == file) { - return nullptr; - } - flow_file_record *new_ff = create_ff_object_na(file, len, size); - new_ff->attributes = new string_map(); - new_ff->ffp = 0; - return new_ff; -} - -flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) { - flow_file_record *new_ff = new flow_file_record; - new_ff->attributes = nullptr; - new_ff->contentLocation = new char[len + 1]; - snprintf(new_ff->contentLocation, len + 1, "%s", file); - // set the size of the flow file. - new_ff->size = size; - new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>); - return new_ff; -} -/** - * Reclaims memory associated with a flow file object - * @param ff flow file record. - */ -void free_flowfile(flow_file_record *ff) { - if (ff == nullptr) { - return; - } - auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp); - if (content_repo_ptr->get()) { - std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr); - (*content_repo_ptr)->remove(claim); - } - if (ff->ffp == nullptr) { - auto map = static_cast<string_map*>(ff->attributes); - delete map; - } - delete[] ff->contentLocation; - delete ff; - delete content_repo_ptr; -} - -/** - * Adds an attribute - * @param ff flow file record - * @param key key - * @param value value to add - * @param size size of value - * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0) - */ -uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { - auto attribute_map = static_cast<string_map*>(ff->attributes); - const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size))); - return ret.second ? 0 : -1; -} - -/** - * Updates (or adds) an attribute - * @param ff flow file record - * @param key key - * @param value value to add - * @param size size of value - */ -void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { - auto attribute_map = static_cast<string_map*>(ff->attributes); - (*attribute_map)[key] = std::string(static_cast<char*>(value), size); -} - -/* - * Obtains the attribute. - * @param ff flow file record - * @param key key - * @param caller_attribute caller supplied object in which we will copy the data ptr - * @return 0 if successful, -1 if the key does not exist - */ -uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute) { - if (ff == nullptr) { - return -1; - } - auto attribute_map = static_cast<string_map*>(ff->attributes); - if (!attribute_map) { - return -1; - } - auto find = attribute_map->find(caller_attribute->key); - if (find != attribute_map->end()) { - caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data())); - caller_attribute->value_size = find->second.size(); - return 0; - } - return -1; -} - -int get_attribute_qty(const flow_file_record* ff) { - if (ff == nullptr) { - return 0; - } - auto attribute_map = static_cast<string_map*>(ff->attributes); - return attribute_map ? attribute_map->size() : 0; -} - -int get_all_attributes(const flow_file_record* ff, attribute_set *target) { - if (ff == nullptr) { - return 0; - } - auto attribute_map = static_cast<string_map*>(ff->attributes); - if (!attribute_map || attribute_map->empty()) { - return 0; - } - int i = 0; - for (const auto& kv : *attribute_map) { - if (i >= target->size) { - break; - } - target->attributes[i].key = kv.first.data(); - target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data())); - target->attributes[i].value_size = kv.second.size(); - ++i; - } - return i; -} - -/** - * Removes a key from the attribute chain - * @param ff flow file record - * @param key key to remove - * @return 0 if removed, -1 otherwise - */ -uint8_t remove_attribute(flow_file_record *ff, const char *key) { - auto attribute_map = static_cast<string_map*>(ff->attributes); - return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1) -} - -/** - * Transmits the flowfile - * @param ff flow file record - * @param instance nifi instance structure - */ -int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - // in the unlikely event the user forgot to initialize the instance, we shall do it for them. - if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) { - minifi_instance_ref->setRemotePort(instance->port.port_id); - } - - auto attribute_map = static_cast<string_map*>(ff->attributes); - - auto no_op = minifi_instance_ref->getNoOpRepository(); - - auto content_repo = minifi_instance_ref->getContentRepository(); - - std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); - claim->increaseFlowFileRecordOwnedCount(); - claim->increaseFlowFileRecordOwnedCount(); - - auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim); - ffr->addAttribute("nanofi.version", API_VERSION); - ffr->setSize(ff->size); - - std::string port_uuid = instance->port.port_id; - - minifi_instance_ref->transfer(ffr); - - return 0; -} - -flow *create_new_flow(nifi_instance *instance) { - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - flow *new_flow = new flow; - - auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); - - new_flow->plan = execution_plan; - - return new_flow; -} - -flow *create_flow(nifi_instance *instance, const char *first_processor) { - if (nullptr == instance || nullptr == instance->instance_ptr) { - return nullptr; - } - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - flow *new_flow = new flow; - - auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); - - new_flow->plan = execution_plan; - - if (first_processor != nullptr && strlen(first_processor) > 0) { - // automatically adds it with success - execution_plan->addProcessor(first_processor, first_processor); - } - return new_flow; -} - -processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) { - if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) { - return nullptr; - } - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1)); - processor *new_processor = new processor(); - new_processor->processor_ptr = proc.get(); - return new_processor; -} - -flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) { - static const std::string first_processor = "GetFile"; - flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow; - - ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan); - // automatically adds it with success - auto getFile = plan->addProcessor(first_processor, first_processor); - - plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory); - plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false"); - plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false"); - - return new_flow; -} - -processor *add_processor(flow *flow, const char *processor_name) { - if (nullptr == flow || nullptr == processor_name) { - return nullptr; - } - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - auto proc = plan->addProcessor(processor_name, processor_name); - if (proc) { - processor *new_processor = new processor(); - new_processor->processor_ptr = proc.get(); - return new_processor; - } - return nullptr; -} - -processor *add_processor_with_linkage(flow *flow, const char *processor_name) { - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true); - if (proc) { - processor *new_processor = new processor(); - new_processor->processor_ptr = proc.get(); - return new_processor; - } - return nullptr; -} - -int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) { - ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); - return plan->setFailureCallback(onerror_callback) ? 0 : 1; -} - -int set_failure_strategy(flow *flow, FailureStrategy strategy) { - return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1; -} - -int set_property(processor *proc, const char *name, const char *value) { - if (name != nullptr && value != nullptr && proc != nullptr) { - core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr); - bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value)); - return success ? 0 : -2; - } - return -1; -} - -int free_flow(flow *flow) { - if (flow == nullptr || nullptr == flow->plan) - return -1; - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - delete execution_plan; - delete flow; - return 0; -} - -flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { - if (instance == nullptr || nullptr == flow || nullptr == flow->plan) - return nullptr; - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - execution_plan->reset(); - while (execution_plan->runNextProcessor()) { - } - auto ff = execution_plan->getCurrentFlowFile(); - if (ff == nullptr) { - return nullptr; - } - auto claim = ff->getResourceClaim(); - - if (claim != nullptr) { - // create a flow file. - claim->increaseFlowFileRecordOwnedCount(); - auto path = claim->getContentFullPath(); - auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); - ffr->ffp = ff.get(); - ffr->attributes = ff->getAttributesPtr(); - auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); - *content_repo_ptr = execution_plan->getContentRepo(); - return ffr; - } else { - return nullptr; - } -} - -size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) { - if (nullptr == instance || nullptr == flow || nullptr == ff_r) - return 0; - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - size_t i = 0; - for (; i < size; i++) { - execution_plan->reset(); - auto ffr = get_next_flow_file(instance, flow); - if (ffr == nullptr) { - break; - } - ff_r[i] = ffr; - } - return i; -} - -flow_file_record *get(nifi_instance *instance, flow *flow, processor_session *session) { - if (nullptr == instance || nullptr == flow || nullptr == session) - return nullptr; - auto sesh = static_cast<core::ProcessSession*>(session->session); - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - auto ff = sesh->get(); - execution_plan->setNextFlowFile(ff); - if (ff == nullptr) { - return nullptr; - } - auto claim = ff->getResourceClaim(); - - if (claim != nullptr) { - // create a flow file. - claim->increaseFlowFileRecordOwnedCount(); - auto path = claim->getContentFullPath(); - auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); - ffr->attributes = ff->getAttributesPtr(); - ffr->ffp = ff.get(); - auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); - *content_repo_ptr = execution_plan->getContentRepo(); - return ffr; - } else { - return nullptr; - } -} - -int transfer(processor_session* session, flow *flow, const char *rel) { - if (nullptr == session || nullptr == flow || rel == nullptr) { - return -1; - } - auto sesh = static_cast<core::ProcessSession*>(session->session); - auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); - if (nullptr == sesh || nullptr == execution_plan) { - return -1; - } - core::Relationship relationship(rel, rel); - auto ff = execution_plan->getNextFlowFile(); - if (nullptr == ff) { - return -2; - } - sesh->transfer(ff, relationship); - return 0; -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/core/repository/VolatileContentRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 47c7ba6..674566b 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -17,7 +17,7 @@ */ #include "core/repository/VolatileContentRepository.h" -#include "capi/expect.h" +#include "core/expect.h" #include <cstdio> #include <string> #include <memory> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/src/processors/CallbackProcessor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/CallbackProcessor.cpp b/libminifi/src/processors/CallbackProcessor.cpp deleted file mode 100644 index 5524d92..0000000 --- a/libminifi/src/processors/CallbackProcessor.cpp +++ /dev/null @@ -1,37 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "processors/CallbackProcessor.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { - -void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - if (callback_ != nullptr) { - processor_session sesh; - sesh.session = session; - callback_(&sesh); - } -} - -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/libminifi/test/capi/CAPITests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp deleted file mode 100644 index b7bf784..0000000 --- a/libminifi/test/capi/CAPITests.cpp +++ /dev/null @@ -1,279 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <uuid/uuid.h> -#include <sys/stat.h> -#include <utility> -#include <memory> -#include <string> -#include <vector> -#include <set> -#include <fstream> - -#include "utils/file/FileUtils.h" -#include "../TestBase.h" - -#include "capi/api.h" - -#include <chrono> -#include <thread> - -static nifi_instance *create_instance_obj(const char *name = "random_instance") { - nifi_port port; - char port_str[] = "12345"; - port.port_id = port_str; - return create_instance("random_instance", &port); -} - -static int failure_count = 0; - -void failure_counter(flow_file_record * fr) { - failure_count++; - REQUIRE(get_attribute_qty(fr) > 0); - free_flowfile(fr); -} - -void big_failure_counter(flow_file_record * fr) { - failure_count += 100; - free_flowfile(fr); -} - -TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { - auto instance = create_instance_obj(); - REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); - REQUIRE(test_flow != nullptr); - processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); - REQUIRE(test_proc != nullptr); - free_flow(test_flow); - free_instance(instance); -} - -TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") { - auto instance = create_instance_obj(); - REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); - processor *test_proc = add_processor(test_flow, "NeverExisted"); - REQUIRE(test_proc == nullptr); - processor *no_proc = add_processor(test_flow, ""); - REQUIRE(no_proc == nullptr); - free_flow(test_flow); - free_instance(instance); -} - -TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") { - auto instance = create_instance_obj(); - REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); - REQUIRE(test_flow != nullptr); - processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); - REQUIRE(test_proc != nullptr); - REQUIRE(set_property(test_proc, "Data Format", "Text") == 0); // Valid value - // TODO(aboda): add this two below when property handling is made strictly typed - // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value - // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute - REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0); // Empty value - REQUIRE(set_property(test_proc, nullptr, "Blah") != 0); // Empty attribute - REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0); // Invalid processor - free_flow(test_flow); - free_instance(instance); -} - -TEST_CASE("get file and put file", "[getAndPutFile]") { - TestController testController; - - char src_format[] = "/tmp/gt.XXXXXX"; - char put_format[] = "/tmp/pt.XXXXXX"; - const char *sourcedir = testController.createTempDirectory(src_format); - const char *putfiledir = testController.createTempDirectory(put_format); - std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; - auto instance = create_instance_obj(); - REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); - REQUIRE(test_flow != nullptr); - processor *get_proc = add_processor(test_flow, "GetFile"); - REQUIRE(get_proc != nullptr); - processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); - REQUIRE(put_proc != nullptr); - REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); - REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0); - - std::fstream file; - std::stringstream ss; - ss << sourcedir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); - - flow_file_record *record = get_next_flow_file(instance, test_flow); - REQUIRE(record != nullptr); - - ss.str(""); - - ss << putfiledir << "/" << "tstFile.ext"; - std::ifstream t(ss.str()); - std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>()); - - REQUIRE(test_file_content == put_data); - - // No failure handler can be added after the flow is finalized - REQUIRE(add_failure_callback(test_flow, failure_counter) == 1); - - free_flowfile(record); - - free_flow(test_flow); - - free_instance(instance); -} - -TEST_CASE("Test manipulation of attributes", "[testAttributes]") { - TestController testController; - - char src_format[] = "/tmp/gt.XXXXXX"; - const char *sourcedir = testController.createTempDirectory(src_format); - - std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; - - std::fstream file; - std::stringstream ss; - ss << sourcedir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); - auto instance = create_instance_obj(); - REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); - REQUIRE(test_flow != nullptr); - - processor *get_proc = add_processor(test_flow, "GetFile"); - REQUIRE(get_proc != nullptr); - REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); - processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText"); - REQUIRE(extract_test != nullptr); - REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0); - processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute"); - REQUIRE(update_attr != nullptr); - - REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0); - - flow_file_record *record = get_next_flow_file(instance, test_flow); - - REQUIRE(record != nullptr); - - attribute test_attr; - test_attr.key = "TestAttr"; - REQUIRE(get_attribute(record, &test_attr) == 0); - - REQUIRE(test_attr.value_size != 0); - REQUIRE(test_attr.value != nullptr); - - std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size); - - REQUIRE(attr_value == test_file_content); - - const char * new_testattr_value = "S0me t3st t3xt"; - - // Attribute already exist, should fail - REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0); // NOLINT - - // Update overwrites values - update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)); // NOLINT - - int attr_size = get_attribute_qty(record); - REQUIRE(attr_size > 0); - - attribute_set attr_set; - attr_set.size = attr_size; - attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute)); // NOLINT - - REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size); - - bool test_attr_found = false; - bool updated_attr_found = false; - for (int i = 0; i < attr_set.size; ++i) { - if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) { - test_attr_found = true; - REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value); - } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) { - updated_attr_found = true; - REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue"); - } - } - REQUIRE(updated_attr_found == true); - REQUIRE(test_attr_found == true); - - free_flowfile(record); - - free_flow(test_flow); - free_instance(instance); -} - -TEST_CASE("Test error handling callback", "[errorHandling]") { - TestController testController; - - char src_format[] = "/tmp/gt.XXXXXX"; - const char *sourcedir = testController.createTempDirectory(src_format); - std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; - - auto instance = create_instance_obj(); - REQUIRE(instance != nullptr); - flow *test_flow = create_flow(instance, nullptr); - REQUIRE(test_flow != nullptr); - - // Failure strategy cannot be set before a valid callback is added - REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0); - REQUIRE(add_failure_callback(test_flow, failure_counter) == 0); - - processor *get_proc = add_processor(test_flow, "GetFile"); - REQUIRE(get_proc != nullptr); - processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); - REQUIRE(put_proc != nullptr); - REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); - REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0); - REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0); - - std::fstream file; - std::stringstream ss; - - ss << sourcedir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); - - - REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); - - REQUIRE(failure_count == 1); - - // Failure handler function can be replaced runtime - REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0); - REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0); - - // Create new testfile to trigger failure again - ss << "2"; - file.open(ss.str(), std::ios::out); - file << test_file_content; - file.close(); - - REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); - REQUIRE(failure_count > 100); - - failure_count = 0; - - free_flow(test_flow); - free_instance(instance); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt new file mode 100644 index 0000000..bd2d952 --- /dev/null +++ b/nanofi/CMakeLists.txt @@ -0,0 +1,100 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +IF(POLICY CMP0048) + CMAKE_POLICY(SET CMP0048 OLD) +ENDIF(POLICY CMP0048) + +include_directories(include) +include_directories(../libminifi/include ../thirdparty/spdlog-20170710/include) + +if(WIN32) +include_directories(../libminifi/opsys/win) +else() +include_directories(../libminifi/opsys/posix) +endif() + +file(GLOB NANOFI_SOURCES "src/api/*.cpp" "src/cxx/*.cpp" ) + +file(GLOB NANOFI_EXAMPLES_SOURCES "examples/*.c" ) + +include(CheckCXXCompilerFlag) +if (WIN32) + if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900")) + CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported) + if (_cpp_latest_flag_supported) + add_compile_options("/std:c++14") + endif() + endif() +else() + +CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) +CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X) +if(COMPILER_SUPPORTS_CXX11) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +elseif(COMPILER_SUPPORTS_CXX0X) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x") +else() + message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") +endif() + +endif() + +add_library(nanofi STATIC ${NANOFI_SOURCES}) + +if (APPLE) + target_link_libraries (nanofi -Wl,-all_load core-minifi minifi) +elseif(NOT WIN32) + target_link_libraries (nanofi -Wl,--whole-archive core-minifi minifi -Wl,--no-whole-archive) +else() + set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi") + set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi") +endif () + +if(WIN32) + set_target_properties(nanofi PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}") +endif() + +if (ENABLE_PYTHON) + +add_library(nanofi-shared SHARED ${NANOFI_SOURCES}) + +if (APPLE) + target_link_libraries (nanofi-shared -Wl,-all_load core-minifi-shared minifi-shared) +elseif(NOT WIN32) + target_link_libraries (nanofi-shared -Wl,--whole-archive core-minifi-shared minifi-shared -Wl,--no-whole-archive) +else() + set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:core-minifi-shared") + set(WIN32_ARCHIVES "${WIN32_ARCHIVES} /WHOLEARCHIVE:minifi-shared") +endif () + + +if(WIN32) + set_target_properties(nanofi-shared PROPERTIES LINK_FLAGS "${WIN32_ARCHIVES}") +endif() + +set_property(TARGET nanofi-shared PROPERTY POSITION_INDEPENDENT_CODE ON) + +endif(ENABLE_PYTHON) + +if (NOT DISABLE_CURL) +add_subdirectory(examples) +endif() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt new file mode 100644 index 0000000..f5f660f --- /dev/null +++ b/nanofi/examples/CMakeLists.txt @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +IF(POLICY CMP0048) + CMAKE_POLICY(SET CMP0048 OLD) +ENDIF(POLICY CMP0048) + +include_directories(/include) + +include(CheckCXXCompilerFlag) +if (WIN32) + if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900")) + CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported) + if (_cpp_latest_flag_supported) + add_compile_options("/std:c++14") + endif() + endif() +else() +CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) +CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X) +if(COMPILER_SUPPORTS_CXX11) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +elseif(COMPILER_SUPPORTS_CXX0X) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x") +else() + message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") +endif() + +endif() + +if (WIN32) + set(LINK_FLAGS "/WHOLEARCHIVE") + set(LINK_END_FLAGS "") +elseif (APPLE) + set(LINK_FLAGS "-Wl,-all_load") + set(LINK_END_FLAGS "") +else () + set(LINK_FLAGS "-Wl,--whole-archive") + set(LINK_END_FLAGS "-Wl,--no-whole-archive") +endif () + +add_executable(generate_flow generate_flow.c) + +add_executable(terminate_handler terminate_handler.c) + +target_link_libraries(generate_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) + +target_link_libraries(terminate_handler nanofi ${CMAKE_THREAD_LIBS_INIT} ) + +if (NOT WIN32) + +add_executable(transmit_flow transmit_flow.c) + +target_link_libraries(transmit_flow nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) + +add_executable(monitor_directory monitor_directory.c) + +target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) + +endif() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/generate_flow.c ---------------------------------------------------------------------- diff --git a/nanofi/examples/generate_flow.c b/nanofi/examples/generate_flow.c new file mode 100644 index 0000000..707de11 --- /dev/null +++ b/nanofi/examples/generate_flow.c @@ -0,0 +1,65 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "api/nanofi.h" + +/** + * This is an example of the C API that transmits a flow file to a remote instance. + */ +int main(int argc, char **argv) { + + if (argc < 3) { + printf("Error: must run ./generate_flow <instance> <remote port> \n"); + exit(1); + } + + char *instance_str = argv[1]; + char *portStr = argv[2]; + + nifi_port port; + + port.port_id = portStr; + + nifi_instance *instance = create_instance(instance_str, &port); + + flow *new_flow = create_flow(instance, "GenerateFlowFile"); + + flow_file_record *record = get_next_flow_file(instance, new_flow); + + if (record == 0) { + printf("Could not create flow file"); + exit(1); + } + + transmit_flowfile(record, instance); + + free_flowfile(record); + + // initializing will make the transmission slightly more efficient. + //initialize_instance(instance); + //transfer_file_or_directory(instance,file); + + free_flow(new_flow); + + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/monitor_directory.c ---------------------------------------------------------------------- diff --git a/nanofi/examples/monitor_directory.c b/nanofi/examples/monitor_directory.c new file mode 100644 index 0000000..2283b35 --- /dev/null +++ b/nanofi/examples/monitor_directory.c @@ -0,0 +1,95 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <dirent.h> +#include <pthread.h> + +#include "api/nanofi.h" +#include "blocks/file_blocks.h" +#include "blocks/comms.h" +#include "core/processors.h" +int is_dir(const char *path) { + struct stat stat_struct; + if (stat(path, &stat_struct) != 0) + return 0; + return S_ISDIR(stat_struct.st_mode); +} + +pthread_mutex_t mutex; +pthread_cond_t condition; + +int stopped; + +int stop_callback(char *c) { + pthread_mutex_lock(&mutex); + stopped = 1; + pthread_cond_signal(&condition); + pthread_mutex_unlock(&mutex); + return 0; +} + +int is_stopped(void *ptr) { + int is_stop = 0; + pthread_mutex_lock(&mutex); + is_stop = stopped; + pthread_mutex_unlock(&mutex); + return is_stop; +} + +/** + * This is an example of the C API that transmits a flow file to a remote instance. + */ +int main(int argc, char **argv) { + if (argc < 5) { + printf("Error: must run ./monitor_directory <instance> <remote port> <directory to monitor>\n"); + exit(1); + } + + stopped = 0x00; + + char *instance_str = argv[1]; + char *portStr = argv[2]; + char *directory = argv[3]; + + nifi_port port; + + port.port_id = portStr; + + C2_Server server; + server.url = argv[4]; + server.ack_url = argv[5]; + server.identifier = "monitor_directory"; + server.type = REST; + + nifi_instance *instance = create_instance(instance_str, &port); + + // enable_async_c2(instance, &server, stop_callback, NULL, NULL); + + flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE); + + transmit_to_nifi(instance, new_flow, is_stopped); + + free_flow(new_flow); + + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/terminate_handler.c ---------------------------------------------------------------------- diff --git a/nanofi/examples/terminate_handler.c b/nanofi/examples/terminate_handler.c new file mode 100644 index 0000000..d5443f0 --- /dev/null +++ b/nanofi/examples/terminate_handler.c @@ -0,0 +1,59 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "api/nanofi.h" + +/** + * This is an example of the C API that registers terminate handler and generates an exception. + */ + +void example_terminate_handler() { + fprintf(stderr, "Unhandled exception! Let's pretend that this is normal!"); + exit(0); +} + +int main(int argc, char **argv) { + + nifi_port port; + + port.port_id = "12345"; + + set_terminate_callback(example_terminate_handler); + + nifi_instance *instance = create_instance("random instance", &port); + + flow *new_flow = create_flow(instance, "GenerateFlowFile"); + + processor *put_proc = add_processor_with_linkage(new_flow, "PutFile"); + + // Target directory for PutFile is missing, it's not allowed to create, so tries to transmit to failure relationship + // As it doesn't exist, an exception is thrown + set_property(put_proc, "Directory", "/tmp/never_existed"); + set_property(put_proc, "Create Missing Directories", "false"); + + flow_file_record *record = get_next_flow_file(instance, new_flow ); + + // Here be dragons - nothing below this line gets executed + fprintf(stderr, "Dragons!!!"); + free_flowfile(record); + free_flow(new_flow); + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/examples/transmit_flow.c ---------------------------------------------------------------------- diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c new file mode 100644 index 0000000..e132c8b --- /dev/null +++ b/nanofi/examples/transmit_flow.c @@ -0,0 +1,93 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <dirent.h> + +#include "api/nanofi.h" + +int is_dir(const char *path) { + struct stat stat_struct; + if (stat(path, &stat_struct) != 0) + return 0; + return S_ISDIR(stat_struct.st_mode); +} + +void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) { + int size = 1; + + if (is_dir(file_or_dir)) { + DIR *d; + + struct dirent *dir; + d = opendir(file_or_dir); + if (d) { + while ((dir = readdir(d)) != NULL) { + if (!memcmp(dir->d_name,".",1) ) + continue; + char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2); + sprintf(file_path,"%s/%s",file_or_dir,dir->d_name); + transfer_file_or_directory(instance,file_path); + free(file_path); + } + closedir(d); + } + printf("%s is a directory", file_or_dir); + } else { + printf("Transferring %s\n",file_or_dir); + + flow_file_record *record = create_flowfile(file_or_dir, strlen(file_or_dir)); + + add_attribute(record, "addedattribute", "1", 2); + + transmit_flowfile(record, instance); + + free_flowfile(record); + } +} + +/** + * This is an example of the C API that transmits a flow file to a remote instance. + */ +int main(int argc, char **argv) { + + if (argc < 4) { + printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n"); + exit(1); + } + + char *instance_str = argv[1]; + char *portStr = argv[2]; + char *file = argv[3]; + + nifi_port port; + + port.port_id = portStr; + + nifi_instance *instance = create_instance(instance_str, &port); + + // initializing will make the transmission slightly more efficient. + //initialize_instance(instance); + transfer_file_or_directory(instance,file); + + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/api/nanofi.h ---------------------------------------------------------------------- diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h new file mode 100644 index 0000000..31a4829 --- /dev/null +++ b/nanofi/include/api/nanofi.h @@ -0,0 +1,159 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ +#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ + +#include <stddef.h> +#include <stdint.h> + +#include "core/cstructs.h" +#include "core/processors.h" + +int initialize_api(); + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Updates with every release. Functions used here constitute the public API of NanoFi. + * + * Changes here will follow semver + */ +#define API_VERSION "0.02" + +void enable_logging(); + +void set_terminate_callback(void (*terminate_callback)()); + +/**** + * ################################################################## + * BASE NIFI OPERATIONS + * ################################################################## + */ + +nifi_instance *create_instance(const char *url, nifi_port *port); + +void initialize_instance(nifi_instance *); + +void free_instance(nifi_instance*); + +/**** + * ################################################################## + * C2 OPERATIONS + * ################################################################## + */ + + +typedef int c2_update_callback(char *); + +typedef int c2_stop_callback(char *); + +typedef int c2_start_callback(char *); + +void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *); + + +uint8_t run_processor(const processor *processor); + +flow *create_new_flow(nifi_instance *); + +flow *create_flow(nifi_instance *, const char *); + +flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); + +processor *add_processor(flow *, const char *); + +processor *add_processor_with_linkage(flow *flow, const char *processor_name); + +processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); + +/** +* Register your callback to received flow files that the flow failed to process +* The flow file ownership is transferred to the caller! +* The first callback should be registered before the flow is used. Can be changed later during runtime. +*/ +int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)); + + +/** +* Set failure strategy. Please use the enum defined in cstructs.h +* Return values: 0 (success), -1 (strategy cannot be set - no failure callback added?) +* Can be changed runtime. +* The defailt strategy is AS IS. +*/ +int set_failure_strategy(flow *flow, FailureStrategy strategy); + +int set_property(processor *, const char *, const char *); + +int set_instance_property(nifi_instance *instance, const char*, const char *); + +int free_flow(flow *); + +flow_file_record *get_next_flow_file(nifi_instance *, flow *); + +size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t); + +flow_file_record *get(nifi_instance *,flow *, processor_session *); + +int transfer(processor_session* session, flow *flow, const char *rel); + +/** + * Creates a flow file object. + * Will obtain the size of file + */ +flow_file_record* create_flowfile(const char *file, const size_t len); + +flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size); + +flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size); + +void free_flowfile(flow_file_record*); + +uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size); + +void update_attribute(flow_file_record*, const char *key, void *value, size_t size); + +uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute); + +int get_attribute_qty(const flow_file_record* ff); + +int get_all_attributes(const flow_file_record* ff, attribute_set *target); + +uint8_t remove_attribute(flow_file_record*, char *key); + +/**** + * ################################################################## + * Remote NIFI OPERATIONS + * ################################################################## + */ + +int transmit_flowfile(flow_file_record *, nifi_instance *); + +/**** + * ################################################################## + * Persistence Operations + * ################################################################## + */ + + +#ifdef __cplusplus +} +#endif + +#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/comms.h ---------------------------------------------------------------------- diff --git a/nanofi/include/blocks/comms.h b/nanofi/include/blocks/comms.h new file mode 100644 index 0000000..bfacc3d --- /dev/null +++ b/nanofi/include/blocks/comms.h @@ -0,0 +1,48 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BLOCKS_COMMS_H_ +#define BLOCKS_COMMS_H_ + +#include <stdio.h> + +#include "../api/nanofi.h" +#include "core/processors.h" + +#define SUCCESS 0x00 +#define FINISHED_EARLY 0x01 +#define FAIL 0x02 + +typedef int transmission_stop(void *); + +uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, transmission_stop *stop_callback) { + + flow_file_record *record = 0x00; + do { + record = get_next_flow_file(instance, flow); + + if (record == 0) { + return FINISHED_EARLY; + } + transmit_flowfile(record, instance); + + free_flowfile(record); + } while (record != 0x00 && !(stop_callback != 0x00 && stop_callback(0x00))); + return SUCCESS; +} + +#endif /* BLOCKS_COMMS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/blocks/file_blocks.h ---------------------------------------------------------------------- diff --git a/nanofi/include/blocks/file_blocks.h b/nanofi/include/blocks/file_blocks.h new file mode 100644 index 0000000..d84ccbe --- /dev/null +++ b/nanofi/include/blocks/file_blocks.h @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BLOCKS_FILE_BLOCKS_H_ +#define BLOCKS_FILE_BLOCKS_H_ + +#include "../api/nanofi.h" +#include "core/processors.h" + +#define KEEP_SOURCE 0x01 +#define RECURSE 0x02 + +/** + * Monitor directory can be combined into a current flow. to create an execution plan + */ +flow *monitor_directory(nifi_instance *instance, char *directory, flow *parent_flow, char flags) { + GetFileConfig config; + config.directory = directory; + config.keep_source = flags & KEEP_SOURCE; + config.recurse = flags & RECURSE; + return create_getfile(instance, parent_flow, &config); +} + +#endif /* BLOCKS_FILE_BLOCKS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/cstructs.h ---------------------------------------------------------------------- diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h new file mode 100644 index 0000000..bc9700e --- /dev/null +++ b/nanofi/include/core/cstructs.h @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ +#define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ + +/** + * NiFi Port struct + */ +typedef struct { + char *port_id; +} nifi_port; + +/** + * Nifi instance struct + */ +typedef struct { + + void *instance_ptr; + + nifi_port port; + +} nifi_instance; + +/**** + * ################################################################## + * C2 OPERATIONS + * ################################################################## + */ + +enum C2_Server_Type { + REST, + MQTT +}; + +typedef struct { + char *url; + char *ack_url; + char *identifier; + char *topic; + enum C2_Server_Type type; +} C2_Server; + +/**** + * ################################################################## + * Processor OPERATIONS + * ################################################################## + */ + +typedef struct { + void *processor_ptr; +} processor; + +typedef struct { + void *session; +} processor_session; + +/**** + * ################################################################## + * FLOWFILE OPERATIONS + * ################################################################## + */ + +typedef struct { + const char *key; + void *value; + size_t value_size; +} attribute; + +typedef struct { + attribute * attributes; + size_t size; +} attribute_set; + +/** + * State of a flow file + * + */ +typedef struct { + uint64_t size; /**< Size in bytes of the data corresponding to this flow file */ + + void * in; + + void * crp; + + char * contentLocation; /**< Filesystem location of this object */ + + void *attributes; /**< Hash map of attributes */ + + void *ffp; + +} flow_file_record; + +typedef struct { + void *plan; +} flow; + +typedef enum FS { + AS_IS, + ROLLBACK +} FailureStrategy; + +#endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/core/processors.h ---------------------------------------------------------------------- diff --git a/nanofi/include/core/processors.h b/nanofi/include/core/processors.h new file mode 100644 index 0000000..7fe357d --- /dev/null +++ b/nanofi/include/core/processors.h @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ +#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ + + + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + char *directory; + unsigned keep_source :1; + unsigned recurse :1; +} GetFileConfig; + +#ifdef __cplusplus +} +#endif + +#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/C2CallbackAgent.h ---------------------------------------------------------------------- diff --git a/nanofi/include/cxx/C2CallbackAgent.h b/nanofi/include/cxx/C2CallbackAgent.h new file mode 100644 index 0000000..f620baa --- /dev/null +++ b/nanofi/include/cxx/C2CallbackAgent.h @@ -0,0 +1,81 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ +#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ + +#include <utility> +#include <functional> +#include <future> +#include <memory> +#include <mutex> +#include <thread> + +#include "c2/C2Agent.h" +#include "core/state/Value.h" +#include "c2/C2Payload.h" +#include "c2/C2Protocol.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +typedef int c2_ag_update_callback(char *); + +typedef int c2_ag_stop_callback(char *); + +typedef int c2_ag_start_callback(char *); + +class C2CallbackAgent : public c2::C2Agent { + + public: + + explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure); + + virtual ~C2CallbackAgent() { + } + + void setStopCallback(c2_ag_stop_callback *st){ + stop = st; + } + + + protected: + /** + * Handles a C2 event requested by the server. + * @param resp c2 server response. + */ + virtual void handle_c2_server_response(const C2ContentResponse &resp); + + c2_ag_stop_callback *stop; + + private: + std::shared_ptr<logging::Logger> logger_; + +}; + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + + +#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/CallbackProcessor.h ---------------------------------------------------------------------- diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h new file mode 100644 index 0000000..61e824f --- /dev/null +++ b/nanofi/include/cxx/CallbackProcessor.h @@ -0,0 +1,100 @@ +/** + * @file CallbackProcessor.h + * CallbackProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CALLBACK_PROCESSOR_H__ +#define __CALLBACK_PROCESSOR_H__ + +#include <stdio.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include <functional> +#include <iostream> +#include <sys/types.h> +#include "core/cstructs.h" +#include "io/BaseStream.h" +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// CallbackProcessor Class +class CallbackProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + CallbackProcessor(std::string name, utils::Identifier uuid = utils::Identifier()) + : Processor(name, uuid), + callback_(nullptr), + objref_(nullptr), + logger_(logging::LoggerFactory<CallbackProcessor>::getLogger()) { + } + // Destructor + virtual ~CallbackProcessor() { + + } + // Processor Name + static constexpr char const* ProcessorName = "CallbackProcessor"; + + public: + + void setCallback(void *obj,std::function<void(processor_session*)> ontrigger_callback) { + objref_ = obj; + callback_ = ontrigger_callback; + } + + // OnTrigger method, implemented by NiFi CallbackProcessor + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + // Initialize, over write by NiFi CallbackProcessor + virtual void initialize() { + std::set<core::Relationship> relationships; + core::Relationship Success("success", "description"); + relationships.insert(Success); + setSupportedRelationships(relationships); + } + + protected: + void *objref_; + std::function<void(processor_session*)> callback_; + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + +}; + +REGISTER_RESOURCE(CallbackProcessor); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Instance.h ---------------------------------------------------------------------- diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h new file mode 100644 index 0000000..982f6d4 --- /dev/null +++ b/nanofi/include/cxx/Instance.h @@ -0,0 +1,180 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ +#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ + +#include <memory> +#include <type_traits> +#include <string> +#include "core/Property.h" +#include "properties/Configure.h" +#include "io/StreamFactory.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ContentRepository.h" +#include "core/repository/VolatileContentRepository.h" +#include "core/Repository.h" + +#include "C2CallbackAgent.h" +#include "core/Connectable.h" +#include "core/ProcessorNode.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" +#include "core/controller/ControllerServiceProvider.h" +#include "core/FlowConfiguration.h" +#include "ReflexiveSession.h" +#include "utils/ThreadPool.h" +#include "core/state/UpdateController.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +class ProcessorLink { + public: + explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor) + : processor_(processor) { + + } + + const std::shared_ptr<core::Processor> &getProcessor() { + return processor_; + } + + protected: + std::shared_ptr<core::Processor> processor_; +}; + +class Instance { + public: + + explicit Instance(const std::string &url, const std::string &port) + : configure_(std::make_shared<Configure>()), + url_(url), + agent_(nullptr), + rpgInitialized_(false), + listener_thread_pool_(1), + content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()), + no_op_repo_(std::make_shared<minifi::core::Repository>()) { + running_ = false; + stream_factory_ = minifi::io::StreamFactory::getInstance(configure_); + utils::Identifier uuid; + uuid = port; + rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid); + proc_node_ = std::make_shared<core::ProcessorNode>(rpg_); + core::FlowConfiguration::initialize_static_functions(); + content_repo_->initialize(configure_); + } + + ~Instance() { + running_ = false; + listener_thread_pool_.shutdown(); + } + + bool isRPGConfigured() { + return rpgInitialized_; + } + + void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) { + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr; + running_ = true; + if (server->type != C2_Server_Type::MQTT){ + configure_->set("c2.rest.url",server->url); + configure_->set("c2.rest.url.ack",server->ack_url); + } + agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_); + listener_thread_pool_.start(); + registerUpdateListener(agent_, 1000); + agent_->setStopCallback(c1); + } + + void setRemotePort(std::string remote_port) { + rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port); + rpg_->initialize(); + rpg_->setTransmitting(true); + rpgInitialized_ = true; + } + + std::shared_ptr<Configure> getConfiguration() { + return configure_; + } + + std::shared_ptr<minifi::core::Repository> getNoOpRepository() { + return no_op_repo_; + } + + std::shared_ptr<minifi::core::ContentRepository> getContentRepository() { + return content_repo_; + } + + void transfer(const std::shared_ptr<FlowFileRecord> &ff) { + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr; + auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_); + auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext); + + rpg_->onSchedule(processContext, sessionFactory); + + auto session = std::make_shared<core::ReflexiveSession>(processContext); + + session->add(ff); + + rpg_->onTrigger(processContext, session); + } + + protected: + + bool registerUpdateListener(const std::shared_ptr<state::UpdateController> &updateController, const int64_t &delay) { + auto functions = updateController->getFunctions(); + // run all functions independently + + for (auto function : functions) { + std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay)); + utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute)); + std::future<state::Update> future; + if (!listener_thread_pool_.execute(std::move(functor), future)) { + // denote failure + return false; + } + } + return true; + } + + std::shared_ptr<c2::C2CallbackAgent> agent_; + + std::atomic<bool> running_; + + bool rpgInitialized_; + + std::shared_ptr<minifi::core::Repository> no_op_repo_; + + std::shared_ptr<minifi::core::ContentRepository> content_repo_; + + std::shared_ptr<core::ProcessorNode> proc_node_; + std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_; + std::shared_ptr<io::StreamFactory> stream_factory_; + std::string url_; + std::shared_ptr<Configure> configure_; + + utils::ThreadPool<state::Update> listener_thread_pool_; +}; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/Plan.h ---------------------------------------------------------------------- diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h new file mode 100644 index 0000000..6171242 --- /dev/null +++ b/nanofi/include/cxx/Plan.h @@ -0,0 +1,204 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_CAPI_PLAN_H_ +#define LIBMINIFI_CAPI_PLAN_H_ + +#ifndef WIN32 + #include <dirent.h> +#endif +#include <cstdio> +#include <cstdlib> +#include <sstream> +#include "ResourceClaim.h" +#include <vector> +#include <set> +#include <map> +#include "core/logging/Logger.h" +#include "core/Core.h" +#include "properties/Configure.h" +#include "properties/Properties.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/sinks/dist_sink.h" +#include "core/Core.h" +#include "core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" +#include "core/cstructs.h" +#include "api/nanofi.h" + +using failure_callback_type = std::function<void(flow_file_record*)>; +using content_repo_sptr = std::shared_ptr<core::ContentRepository>; + +namespace { + + void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) { + auto ff = session->get(); + if (ff == nullptr) { + return; + } + + auto claim = ff->getResourceClaim(); + + if (claim != nullptr && user_callback != nullptr) { + claim->increaseFlowFileRecordOwnedCount(); + // create a flow file. + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->attributes = ff->getAttributesPtr(); + ffr->ffp = ff.get(); + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + *content_repo_ptr = cr_ptr; + user_callback(ffr); + } + session->remove(ff); + } + + void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) { + session->rollback(); + failureStrategyAsIs(session, user_callback, cr_ptr); + } +} + +static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies = + { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } }; + +class ExecutionPlan { + public: + + explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo); + + std::shared_ptr<core::Processor> addCallback(void *, std::function<void(processor_session*)>); + + std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, + core::Relationship relationship = core::Relationship("success", "description"), + bool linkToPrevious = false); + + std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), + bool linkToPrevious = false); + + bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value); + + void reset(); + + bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr); + + bool setFailureCallback(failure_callback_type onerror_callback); + + bool setFailureStrategy(FailureStrategy start); + + std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords(); + + std::shared_ptr<core::FlowFile> getCurrentFlowFile(); + + std::shared_ptr<core::ProcessSession> getCurrentSession(); + + std::shared_ptr<core::Repository> getFlowRepo() { + return flow_repo_; + } + + std::shared_ptr<core::Repository> getProvenanceRepo() { + return prov_repo_; + } + + std::shared_ptr<core::ContentRepository> getContentRepo() { + return content_repo_; + } + + std::shared_ptr<core::FlowFile> getNextFlowFile(){ + return next_ff_; + } + + void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){ + next_ff_ = ptr; + } + + static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name); + + protected: + class FailureHandler { + public: + FailureHandler(content_repo_sptr cr_ptr) { + callback_ = nullptr; + strategy_ = FailureStrategy::AS_IS; + content_repo_ = cr_ptr; + } + void setCallback(failure_callback_type onerror_callback) { + callback_=onerror_callback; + } + void setStrategy(FailureStrategy strat) { + strategy_ = strat; + } + void operator()(const processor_session* ps) { + auto ses = static_cast<core::ProcessSession*>(ps->session); + FailureStrategies.at(strategy_)(ses, callback_, content_repo_); + } + private: + failure_callback_type callback_; + FailureStrategy strategy_; + content_repo_sptr content_repo_; + }; + + void finalize(); + + std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false); + + std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc, + core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false); + + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory; + + content_repo_sptr content_repo_; + + std::shared_ptr<core::Repository> flow_repo_; + std::shared_ptr<core::Repository> prov_repo_; + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_; + + std::atomic<bool> finalized; + + uint32_t location; + + std::shared_ptr<core::ProcessSession> current_session_; + std::shared_ptr<core::FlowFile> current_flowfile_; + + std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_; + std::vector<std::shared_ptr<core::Processor>> processor_queue_; + std::vector<std::shared_ptr<core::Processor>> configured_processors_; + std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_; + std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_; + std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_; + std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_; + std::vector<std::shared_ptr<minifi::Connection>> relationships_; + core::Relationship termination_; + + std::shared_ptr<core::FlowFile> next_ff_; + + private: + + static std::shared_ptr<utils::IdGenerator> id_generator_; + std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<FailureHandler> failure_handler_; +}; + +#endif /* LIBMINIFI_CAPI_PLAN_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/include/cxx/ReflexiveSession.h ---------------------------------------------------------------------- diff --git a/nanofi/include/cxx/ReflexiveSession.h b/nanofi/include/cxx/ReflexiveSession.h new file mode 100644 index 0000000..ebf6cbe --- /dev/null +++ b/nanofi/include/cxx/ReflexiveSession.h @@ -0,0 +1,77 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REFLEXIVE_SESSION_H__ +#define __REFLEXIVE_SESSION_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <algorithm> +#include <set> + +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// ReflexiveSession Class +class ReflexiveSession : public ProcessSession{ + public: + // Constructor + /*! + * Create a new process session + */ + ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr) + : ProcessSession(processContext){ + } + +// Destructor + virtual ~ReflexiveSession() { + } + + virtual std::shared_ptr<core::FlowFile> get(){ + auto prevff = ff; + ff = nullptr; + return prevff; + } + + virtual void add(const std::shared_ptr<core::FlowFile> &flow){ + ff = flow; + } + virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){ + // no op + } + protected: + // + // Get the FlowFile from the highest priority queue + std::shared_ptr<core::FlowFile> ff; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif