Repository: nifi-minifi-cpp Updated Branches: refs/heads/master fc1074a04 -> 556794b15
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/api/nanofi.cpp ---------------------------------------------------------------------- diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp new file mode 100644 index 0000000..ee33c6b --- /dev/null +++ b/nanofi/src/api/nanofi.cpp @@ -0,0 +1,518 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <string> +#include <map> +#include <memory> +#include <utility> +#include <exception> + +#include "api/nanofi.h" +#include "core/Core.h" +#include "core/expect.h" +#include "cxx/Instance.h" +#include "cxx/Plan.h" +#include "ResourceClaim.h" +#include "processors/GetFile.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/StringUtils.h" + +using string_map = std::map<std::string, std::string>; + +class API_INITIALIZER { + public: + static int initialized; +}; + +int API_INITIALIZER::initialized = initialize_api(); + +int initialize_api() { + logging::LoggerConfiguration::getConfiguration().disableLogging(); + return 1; +} + +void enable_logging() { + logging::LoggerConfiguration::getConfiguration().enableLogging(); +} + +void set_terminate_callback(void (*terminate_callback)()) { + std::set_terminate(terminate_callback); +} + +class DirectoryConfiguration { + protected: + DirectoryConfiguration() { + minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY); + } + public: + static void initialize() { + static DirectoryConfiguration configure; + } +}; + +/** + * Creates a NiFi Instance from the url and output port. + * @param url http URL for NiFi instance + * @param port Remote output port. + * @Deprecated for API version 0.2 in favor of the following prototype + * nifi_instance *create_instance(nifi_port const *port) { + */ +nifi_instance *create_instance(const char *url, nifi_port *port) { + // make sure that we have a thread safe way of initializing the content directory + DirectoryConfiguration::initialize(); + + // need reinterpret cast until we move to C for this module. + nifi_instance *instance = reinterpret_cast<nifi_instance*>(malloc(sizeof(nifi_instance))); + /** + * This API will gradually move away from C++, hence malloc is used for nifi_instance + * Since minifi::Instance is currently being used, then we need to use new in that case. + */ + instance->instance_ptr = new minifi::Instance(url, port->port_id); + // may have to translate port ID here in the future + // need reinterpret cast until we move to C for this module. + instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1)); + snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", port->port_id); + return instance; +} + +/** + * Initializes the instance + */ +void initialize_instance(nifi_instance *instance) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + minifi_instance_ref->setRemotePort(instance->port.port_id); +} +/* + typedef int c2_update_callback(char *); + + typedef int c2_stop_callback(char *); + + typedef int c2_start_callback(char *); + + */ +void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + minifi_instance_ref->enableAsyncC2(server, c1, c2, c3); +} + +/** + * Sets a property within the nifi instance + * @param instance nifi instance + * @param key key in which we will set the valiue + * @param value + * @return -1 when instance or key are null + */ +int set_instance_property(nifi_instance *instance, const char *key, const char *value) { + if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) { + return -1; + } + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + minifi_instance_ref->getConfiguration()->set(key, value); + return 0; +} + +/** + * Reclaims memory associated with a nifi instance structure. + * @param instance nifi instance. + */ +void free_instance(nifi_instance* instance) { + if (instance != nullptr) { + delete ((minifi::Instance*) instance->instance_ptr); + free(instance->port.port_id); + free(instance); + } +} + +/** + * Creates a flow file record + * @param file file to place into the flow file. + */ +flow_file_record* create_flowfile(const char *file, const size_t len) { + flow_file_record *new_ff = (flow_file_record*) malloc(sizeof(flow_file_record)); + new_ff->attributes = new string_map(); + new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1)); + snprintf(new_ff->contentLocation, len + 1, "%s", file); + std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); + // set the size of the flow file. + new_ff->size = in.tellg(); + return new_ff; +} + +/** + * Creates a flow file record + * @param file file to place into the flow file. + */ +flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) { + if (nullptr == file) { + return nullptr; + } + flow_file_record *new_ff = create_ff_object_na(file, len, size); + new_ff->attributes = new string_map(); + new_ff->ffp = 0; + return new_ff; +} + +flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) { + flow_file_record *new_ff = new flow_file_record; + new_ff->attributes = nullptr; + new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1)); + snprintf(new_ff->contentLocation, len + 1, "%s", file); + // set the size of the flow file. + new_ff->size = size; + new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>); + return new_ff; +} +/** + * Reclaims memory associated with a flow file object + * @param ff flow file record. + */ +void free_flowfile(flow_file_record *ff) { + if (ff == nullptr) { + return; + } + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp); + if (content_repo_ptr->get()) { + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr); + (*content_repo_ptr)->remove(claim); + } + if (ff->ffp == nullptr) { + auto map = static_cast<string_map*>(ff->attributes); + delete map; + } + free(ff->contentLocation); + free(ff); + delete content_repo_ptr; +} + +/** + * Adds an attribute + * @param ff flow file record + * @param key key + * @param value value to add + * @param size size of value + * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0) + */ +uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { + auto attribute_map = static_cast<string_map*>(ff->attributes); + const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size))); + return ret.second ? 0 : -1; +} + +/** + * Updates (or adds) an attribute + * @param ff flow file record + * @param key key + * @param value value to add + * @param size size of value + */ +void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { + auto attribute_map = static_cast<string_map*>(ff->attributes); + (*attribute_map)[key] = std::string(static_cast<char*>(value), size); +} + +/* + * Obtains the attribute. + * @param ff flow file record + * @param key key + * @param caller_attribute caller supplied object in which we will copy the data ptr + * @return 0 if successful, -1 if the key does not exist + */ +uint8_t get_attribute(flow_file_record * ff, attribute * caller_attribute) { + if (ff == nullptr) { + return -1; + } + auto attribute_map = static_cast<string_map*>(ff->attributes); + if (!attribute_map) { + return -1; + } + auto find = attribute_map->find(caller_attribute->key); + if (find != attribute_map->end()) { + caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data())); + caller_attribute->value_size = find->second.size(); + return 0; + } + return -1; +} + +int get_attribute_qty(const flow_file_record* ff) { + if (ff == nullptr) { + return 0; + } + auto attribute_map = static_cast<string_map*>(ff->attributes); + return attribute_map ? attribute_map->size() : 0; +} + +int get_all_attributes(const flow_file_record* ff, attribute_set *target) { + if (ff == nullptr) { + return 0; + } + auto attribute_map = static_cast<string_map*>(ff->attributes); + if (!attribute_map || attribute_map->empty()) { + return 0; + } + int i = 0; + for (const auto& kv : *attribute_map) { + if (i >= target->size) { + break; + } + target->attributes[i].key = kv.first.data(); + target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data())); + target->attributes[i].value_size = kv.second.size(); + ++i; + } + return i; +} + +/** + * Removes a key from the attribute chain + * @param ff flow file record + * @param key key to remove + * @return 0 if removed, -1 otherwise + */ +uint8_t remove_attribute(flow_file_record *ff, const char *key) { + auto attribute_map = static_cast<string_map*>(ff->attributes); + return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1) +} + +/** + * Transmits the flowfile + * @param ff flow file record + * @param instance nifi instance structure + */ +int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + // in the unlikely event the user forgot to initialize the instance, we shall do it for them. + if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) { + minifi_instance_ref->setRemotePort(instance->port.port_id); + } + + auto attribute_map = static_cast<string_map*>(ff->attributes); + + auto no_op = minifi_instance_ref->getNoOpRepository(); + + auto content_repo = minifi_instance_ref->getContentRepository(); + + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); + claim->increaseFlowFileRecordOwnedCount(); + claim->increaseFlowFileRecordOwnedCount(); + + auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim); + ffr->addAttribute("nanofi.version", API_VERSION); + ffr->setSize(ff->size); + + std::string port_uuid = instance->port.port_id; + + minifi_instance_ref->transfer(ffr); + + return 0; +} + +flow * create_new_flow(nifi_instance * instance) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + flow *new_flow = (flow*) malloc(sizeof(flow)); + + auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); + + new_flow->plan = execution_plan; + + return new_flow; +} + +flow *create_flow(nifi_instance *instance, const char *first_processor) { + if (nullptr == instance || nullptr == instance->instance_ptr) { + return nullptr; + } + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + flow *new_flow = (flow*) malloc(sizeof(flow)); + + auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); + + new_flow->plan = execution_plan; + + if (first_processor != nullptr && strlen(first_processor) > 0) { + // automatically adds it with success + execution_plan->addProcessor(first_processor, first_processor); + } + return new_flow; +} + +processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) { + if (nullptr == flow || nullptr == flow->plan || nullptr == ontrigger_callback) { + return nullptr; + } + ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); + auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, std::placeholders::_1)); + processor *new_processor = (processor*) malloc(sizeof(processor)); + new_processor->processor_ptr = proc.get(); + return new_processor; +} + +flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfig * c) { + static const std::string first_processor = "GetFile"; + flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : parent_flow; + + ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan); + // automatically adds it with success + auto getFile = plan->addProcessor(first_processor, first_processor); + + plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory); + plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false"); + plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false"); + + return new_flow; +} + +processor *add_processor(flow *flow, const char *processor_name) { + if (nullptr == flow || nullptr == processor_name) { + return nullptr; + } + ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); + auto proc = plan->addProcessor(processor_name, processor_name); + if (proc) { + processor *new_processor = (processor*) malloc(sizeof(processor)); + new_processor->processor_ptr = proc.get(); + return new_processor; + } + return nullptr; +} + +processor *add_processor_with_linkage(flow *flow, const char *processor_name) { + ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); + auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true); + if (proc) { + processor *new_processor = (processor*) malloc(sizeof(processor)); + new_processor->processor_ptr = proc.get(); + return new_processor; + } + return nullptr; +} + +int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) { + ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); + return plan->setFailureCallback(onerror_callback) ? 0 : 1; +} + +int set_failure_strategy(flow *flow, FailureStrategy strategy) { + return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) ? 0 : -1; +} + +int set_property(processor *proc, const char *name, const char *value) { + if (name != nullptr && value != nullptr && proc != nullptr) { + core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr); + bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value)); + return success ? 0 : -2; + } + return -1; +} + +int free_flow(flow *flow) { + if (flow == nullptr || nullptr == flow->plan) + return -1; + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + delete execution_plan; + free(flow); + return 0; +} + +flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) { + if (instance == nullptr || nullptr == flow || nullptr == flow->plan) + return nullptr; + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + execution_plan->reset(); + while (execution_plan->runNextProcessor()) { + } + auto ff = execution_plan->getCurrentFlowFile(); + if (ff == nullptr) { + return nullptr; + } + auto claim = ff->getResourceClaim(); + + if (claim != nullptr) { + // create a flow file. + claim->increaseFlowFileRecordOwnedCount(); + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->ffp = ff.get(); + ffr->attributes = ff->getAttributesPtr(); + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + *content_repo_ptr = execution_plan->getContentRepo(); + return ffr; + } else { + return nullptr; + } +} + +size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) { + if (nullptr == instance || nullptr == flow || nullptr == ff_r) + return 0; + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + size_t i = 0; + for (; i < size; i++) { + execution_plan->reset(); + auto ffr = get_next_flow_file(instance, flow); + if (ffr == nullptr) { + break; + } + ff_r[i] = ffr; + } + return i; +} + +flow_file_record * get(nifi_instance * instance, flow * flow, processor_session * session) { + if (nullptr == instance || nullptr == flow || nullptr == session) + return nullptr; + auto sesh = static_cast<core::ProcessSession*>(session->session); + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + auto ff = sesh->get(); + execution_plan->setNextFlowFile(ff); + if (ff == nullptr) { + return nullptr; + } + auto claim = ff->getResourceClaim(); + + if (claim != nullptr) { + // create a flow file. + claim->increaseFlowFileRecordOwnedCount(); + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->attributes = ff->getAttributesPtr(); + ffr->ffp = ff.get(); + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + *content_repo_ptr = execution_plan->getContentRepo(); + return ffr; + } else { + return nullptr; + } +} + +int transfer(processor_session* session, flow *flow, const char *rel) { + if (nullptr == session || nullptr == flow || rel == nullptr) { + return -1; + } + auto sesh = static_cast<core::ProcessSession*>(session->session); + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + if (nullptr == sesh || nullptr == execution_plan) { + return -1; + } + core::Relationship relationship(rel, rel); + auto ff = execution_plan->getNextFlowFile(); + if (nullptr == ff) { + return -2; + } + sesh->transfer(ff, relationship); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/C2CallbackAgent.cpp ---------------------------------------------------------------------- diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp b/nanofi/src/cxx/C2CallbackAgent.cpp new file mode 100644 index 0000000..3a2b0d1 --- /dev/null +++ b/nanofi/src/cxx/C2CallbackAgent.cpp @@ -0,0 +1,79 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cxx/C2CallbackAgent.h" +#include <csignal> +#include <utility> +#include <vector> +#include <map> +#include <string> +#include <memory> +#include "c2/ControllerSocketProtocol.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/file/FileUtils.h" +#include "utils/file/FileManager.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +C2CallbackAgent::C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, + const std::shared_ptr<Configure> &configuration) + : C2Agent(controller, updateSink, configuration), + stop(nullptr), + logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) { +} + +void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) { + switch (resp.op) { + case Operation::CLEAR: + break; + case Operation::UPDATE: + break; + case Operation::DESCRIBE: + break; + case Operation::RESTART: + break; + case Operation::START: + case Operation::STOP: { + if (resp.name == "C2" || resp.name == "c2") { + raise(SIGTERM); + } + + auto str = resp.name.c_str(); + + if (nullptr != stop) + stop(const_cast<char*>(str)); + + break; + } + // + break; + default: + break; + // do nothing + } +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/CallbackProcessor.cpp ---------------------------------------------------------------------- diff --git a/nanofi/src/cxx/CallbackProcessor.cpp b/nanofi/src/cxx/CallbackProcessor.cpp new file mode 100644 index 0000000..5294a1b --- /dev/null +++ b/nanofi/src/cxx/CallbackProcessor.cpp @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "cxx/CallbackProcessor.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +void CallbackProcessor::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + if (callback_ != nullptr) { + processor_session sesh; + sesh.session = session; + callback_(&sesh); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/Plan.cpp ---------------------------------------------------------------------- diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp new file mode 100644 index 0000000..f892aa9 --- /dev/null +++ b/nanofi/src/cxx/Plan.cpp @@ -0,0 +1,294 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cxx/Plan.h" +#include "cxx/CallbackProcessor.h" +#include <memory> +#include <vector> +#include <set> +#include <string> + +bool intToFailureStragey(int in, FailureStrategy *out) { + auto tmp = static_cast<FailureStrategy>(in); + switch (tmp) { + case AS_IS: + case ROLLBACK: + *out = tmp; + return true; + default: + return false; + } +} + +std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator(); + +ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) + : content_repo_(content_repo), + flow_repo_(flow_repo), + prov_repo_(prov_repo), + finalized(false), + location(-1), + current_flowfile_(nullptr), + logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) { + stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); +} + +/** + * Add a callback to obtain and pass processor session to a generated processor + * + */ +std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, std::function<void(processor_session*)> fp) { + if (finalized) { + return nullptr; + } + + auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor"); + if (!ptr) + return nullptr; + + std::shared_ptr<processors::CallbackProcessor> processor = std::static_pointer_cast<processors::CallbackProcessor>(ptr); + processor->setCallback(obj, fp); + + return addProcessor(processor, "CallbackProcessor", core::Relationship("success", "description"), true); +} + +bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { + uint32_t i = 0; + logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName()); + for (i = 0; i < processor_queue_.size(); i++) { + if (processor_queue_.at(i) == proc) { + break; + } + } + + if (i >= processor_queue_.size() || i >= processor_contexts_.size()) { + return false; + } + + return processor_contexts_.at(i)->setProperty(prop, value); +} + +std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, bool linkToPrevious) { + if (finalized) { + return nullptr; + } + + utils::Identifier uuid; + id_generator_->generate(uuid); + + processor->setStreamFactory(stream_factory); + // initialize the processor + processor->initialize(); + + processor_mapping_[processor->getUUIDStr()] = processor; + + if (!linkToPrevious) { + termination_ = relationship; + } else { + std::shared_ptr<core::Processor> last = processor_queue_.back(); + + if (last == nullptr) { + last = processor; + termination_ = relationship; + } + + relationships_.push_back(connectProcessors(last, processor, relationship, true)); + } + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + + processor_nodes_.push_back(node); + + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_); + processor_contexts_.push_back(context); + + processor_queue_.push_back(processor); + + return processor; +} + +std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, bool linkToPrevious) { + if (finalized) { + return nullptr; + } + auto processor = ExecutionPlan::createProcessor(processor_name, name); + if (!processor) { + return nullptr; + } + return addProcessor(processor, name, relationship, linkToPrevious); +} + +void ExecutionPlan::reset() { + process_sessions_.clear(); + factories_.clear(); + location = -1; + for (auto proc : processor_queue_) { + while (proc->getActiveTasks() > 0) { + proc->decrementActiveTask(); + } + } +} + +bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) { + if (!finalized) { + finalize(); + } + location++; + if (location >= processor_queue_.size()) { + return false; + } + std::shared_ptr<core::Processor> processor = processor_queue_[location]; + std::shared_ptr<core::ProcessContext> context = processor_contexts_[location]; + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + factories_.push_back(factory); + if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) { + processor->onSchedule(context, factory); + configured_processors_.push_back(processor); + } + std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context); + process_sessions_.push_back(current_session); + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + if (verify != nullptr) { + verify(context, current_session); + } else { + logger_->log_debug("Running %s", processor->getName()); + processor->onTrigger(context, current_session); + } + current_session->commit(); + current_flowfile_ = current_session->get(); + auto hasMore = location + 1 < processor_queue_.size(); + if (!hasMore && !current_flowfile_) { + std::set<std::shared_ptr<core::FlowFile>> expired; + current_flowfile_ = relationships_.back()->poll(expired); + } + return hasMore; +} + +std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() { + return process_sessions_.at(location)->getProvenanceReporter()->getEvents(); +} + +std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() { + return current_flowfile_; +} + +std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() { + return current_session_; +} + +std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst) { + return connectProcessors(processor, processor, termination_, set_dst); +} + +void ExecutionPlan::finalize() { + if (failure_handler_) { + auto failure_proc = createProcessor("CallbackProcessor", "CallbackProcessor"); + + std::shared_ptr<processors::CallbackProcessor> callback_proc = std::static_pointer_cast<processors::CallbackProcessor>(failure_proc); + callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), failure_handler_, std::placeholders::_1)); + + for (const auto& proc : processor_queue_) { + for (const auto& rel : proc->getSupportedRelationships()) { + if (rel.getName() == "failure") { + relationships_.push_back(connectProcessors(proc, failure_proc, core::Relationship("failure", "failure collector"), true)); + break; + } + } + } + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(failure_proc); + + processor_nodes_.push_back(node); + + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_); + processor_contexts_.push_back(context); + + processor_queue_.push_back(failure_proc); + } + + if (relationships_.size() > 0) { + relationships_.push_back(buildFinalConnection(processor_queue_.back())); + } else { + for (auto processor : processor_queue_) { + relationships_.push_back(buildFinalConnection(processor, true)); + } + } + + finalized = true; +} + +std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::string &processor_name, const std::string &name) { + utils::Identifier uuid; + id_generator_->generate(uuid); + + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid); + if (nullptr == ptr) { + return nullptr; + } + std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr); + + processor->setName(name); + return processor; +} + +std::shared_ptr<minifi::Connection> ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc, core::Relationship relationship, + bool set_dst) { + std::stringstream connection_name; + connection_name << src_proc->getUUIDStr() << "-to-" << dst_proc->getUUIDStr(); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); + connection->setRelationship(relationship); + + // link the connections so that we can test results at the end for this + connection->setSource(src_proc); + + utils::Identifier uuid_copy, uuid_copy_next; + src_proc->getUUID(uuid_copy); + connection->setSourceUUID(uuid_copy); + if (set_dst) { + connection->setDestination(dst_proc); + dst_proc->getUUID(uuid_copy_next); + connection->setDestinationUUID(uuid_copy_next); + if (src_proc != dst_proc) { + dst_proc->addConnection(connection); + } + } + src_proc->addConnection(connection); + + return connection; +} + +bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> onerror_callback) { + if (finalized && !failure_handler_) { + return false; // Already finalized the flow without failure handler processor + } + if (!failure_handler_) { + failure_handler_ = std::make_shared<FailureHandler>(getContentRepo()); + } + failure_handler_->setCallback(onerror_callback); + return true; +} + +bool ExecutionPlan::setFailureStrategy(FailureStrategy start) { + if (!failure_handler_) { + return false; + } + failure_handler_->setStrategy(start); + return true; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/tests/CAPITests.cpp ---------------------------------------------------------------------- diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp new file mode 100644 index 0000000..65c52e1 --- /dev/null +++ b/nanofi/tests/CAPITests.cpp @@ -0,0 +1,278 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <uuid/uuid.h> +#include <sys/stat.h> +#include <utility> +#include <memory> +#include <string> +#include <vector> +#include <set> +#include <fstream> + +#include "utils/file/FileUtils.h" +#include "TestBase.h" + +#include <chrono> +#include <thread> +#include "api/nanofi.h" + +static nifi_instance *create_instance_obj(const char *name = "random_instance") { + nifi_port port; + char port_str[] = "12345"; + port.port_id = port_str; + return create_instance("random_instance", &port); +} + +static int failure_count = 0; + +void failure_counter(flow_file_record * fr) { + failure_count++; + REQUIRE(get_attribute_qty(fr) > 0); + free_flowfile(fr); +} + +void big_failure_counter(flow_file_record * fr) { + failure_count += 100; + free_flowfile(fr); +} + +TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); + REQUIRE(test_proc != nullptr); + free_flow(test_flow); + free_instance(instance); +} + +TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") { + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + processor *test_proc = add_processor(test_flow, "NeverExisted"); + REQUIRE(test_proc == nullptr); + processor *no_proc = add_processor(test_flow, ""); + REQUIRE(no_proc == nullptr); + free_flow(test_flow); + free_instance(instance); +} + +TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") { + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); + REQUIRE(test_proc != nullptr); + REQUIRE(set_property(test_proc, "Data Format", "Text") == 0); // Valid value + // TODO(aboda): add this two below when property handling is made strictly typed + // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value + // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute + REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0); // Empty value + REQUIRE(set_property(test_proc, nullptr, "Blah") != 0); // Empty attribute + REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0); // Invalid processor + free_flow(test_flow); + free_instance(instance); +} + +TEST_CASE("get file and put file", "[getAndPutFile]") { + TestController testController; + + char src_format[] = "/tmp/gt.XXXXXX"; + char put_format[] = "/tmp/pt.XXXXXX"; + const char *sourcedir = testController.createTempDirectory(src_format); + const char *putfiledir = testController.createTempDirectory(put_format); + std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); + REQUIRE(put_proc != nullptr); + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0); + + std::fstream file; + std::stringstream ss; + ss << sourcedir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + + flow_file_record *record = get_next_flow_file(instance, test_flow); + REQUIRE(record != nullptr); + + ss.str(""); + + ss << putfiledir << "/" << "tstFile.ext"; + std::ifstream t(ss.str()); + std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>()); + + REQUIRE(test_file_content == put_data); + + // No failure handler can be added after the flow is finalized + REQUIRE(add_failure_callback(test_flow, failure_counter) == 1); + + free_flowfile(record); + + free_flow(test_flow); + + free_instance(instance); +} + +TEST_CASE("Test manipulation of attributes", "[testAttributes]") { + TestController testController; + + char src_format[] = "/tmp/gt.XXXXXX"; + const char *sourcedir = testController.createTempDirectory(src_format); + + std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; + + std::fstream file; + std::stringstream ss; + ss << sourcedir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText"); + REQUIRE(extract_test != nullptr); + REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0); + processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute"); + REQUIRE(update_attr != nullptr); + + REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0); + + flow_file_record *record = get_next_flow_file(instance, test_flow); + + REQUIRE(record != nullptr); + + attribute test_attr; + test_attr.key = "TestAttr"; + REQUIRE(get_attribute(record, &test_attr) == 0); + + REQUIRE(test_attr.value_size != 0); + REQUIRE(test_attr.value != nullptr); + + std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size); + + REQUIRE(attr_value == test_file_content); + + const char * new_testattr_value = "S0me t3st t3xt"; + + // Attribute already exist, should fail + REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0); // NOLINT + + // Update overwrites values + update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)); // NOLINT + + int attr_size = get_attribute_qty(record); + REQUIRE(attr_size > 0); + + attribute_set attr_set; + attr_set.size = attr_size; + attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute)); // NOLINT + + REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size); + + bool test_attr_found = false; + bool updated_attr_found = false; + for (int i = 0; i < attr_set.size; ++i) { + if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) { + test_attr_found = true; + REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value); + } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) { + updated_attr_found = true; + REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue"); + } + } + REQUIRE(updated_attr_found == true); + REQUIRE(test_attr_found == true); + + free_flowfile(record); + + free_flow(test_flow); + free_instance(instance); +} + +TEST_CASE("Test error handling callback", "[errorHandling]") { + TestController testController; + + char src_format[] = "/tmp/gt.XXXXXX"; + const char *sourcedir = testController.createTempDirectory(src_format); + std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; + + auto instance = create_instance_obj(); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + + // Failure strategy cannot be set before a valid callback is added + REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0); + REQUIRE(add_failure_callback(test_flow, failure_counter) == 0); + + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); + REQUIRE(put_proc != nullptr); + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0); + REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0); + + std::fstream file; + std::stringstream ss; + + ss << sourcedir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + + + REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); + + REQUIRE(failure_count == 1); + + // Failure handler function can be replaced runtime + REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0); + REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0); + + // Create new testfile to trigger failure again + ss << "2"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + + REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); + REQUIRE(failure_count > 100); + + failure_count = 0; + + free_flow(test_flow); + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/library/CMakeLists.txt b/python/library/CMakeLists.txt index 5d309a1..684cf22 100644 --- a/python/library/CMakeLists.txt +++ b/python/library/CMakeLists.txt @@ -23,7 +23,7 @@ IF(POLICY CMP0048) CMAKE_POLICY(SET CMP0048 OLD) ENDIF(POLICY CMP0048) -include_directories(../../blocks/ ../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/) +include_directories(../../nanofi/include/ ../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/) if(WIN32) include_directories(../../libminifi/opsys/win) else() @@ -33,9 +33,9 @@ include_directories(../../extensions/http-curl/ ../../extensions/http-curl/clien add_library(python-lib SHARED python_lib.cpp) if (APPLE) - target_link_libraries(python-lib capi core-minifi minifi) + target_link_libraries(python-lib nanofi core-minifi minifi) else() - target_link_libraries(python-lib capi-shared core-minifi-shared minifi-shared) + target_link_libraries(python-lib nanofi-shared core-minifi-shared minifi-shared) endif(APPLE) if (WIN32) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/python_lib.cpp ---------------------------------------------------------------------- diff --git a/python/library/python_lib.cpp b/python/library/python_lib.cpp index eb81f79..79241df 100644 --- a/python/library/python_lib.cpp +++ b/python/library/python_lib.cpp @@ -22,11 +22,10 @@ #include <sys/stat.h> #include <unistd.h> #include <dirent.h> -#include "capi/api.h" -#include "file_blocks.h" -#include "comms.h" -#include "capi/api.h" -#include "capi/processors.h" +#include "api/nanofi.h" +#include "blocks/file_blocks.h" +#include "blocks/comms.h" +#include "core/processors.h" #include "HTTPCurlLoader.h" #include "python_lib.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/rocksdb/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/CMakeLists.txt b/thirdparty/rocksdb/CMakeLists.txt index 837f0bc..f20d09a 100644 --- a/thirdparty/rocksdb/CMakeLists.txt +++ b/thirdparty/rocksdb/CMakeLists.txt @@ -584,6 +584,7 @@ else() endif() set(ROCKSDB_STATIC_LIB rocksdb${ARTIFACT_SUFFIX}) +# commented out to avoid building the shared lib #set(ROCKSDB_SHARED_LIB rocksdb-shared${ARTIFACT_SUFFIX}) set(ROCKSDB_IMPORT_LIB ${ROCKSDB_SHARED_LIB}) if(WIN32) @@ -592,16 +593,19 @@ if(WIN32) set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) else() set(SYSTEM_LIBS ${CMAKE_THREAD_LIBS_INIT}) - set(LIBS ${ROCKSDB_SHARED_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) - add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES}) - # target_link_libraries(${ROCKSDB_SHARED_LIB} + set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) +# commented out to avoid building the shared lib +# as there is no reason +#add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES}) + +# target_link_libraries(${ROCKSDB_SHARED_LIB} # ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) - set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES - LINKER_LANGUAGE CXX - VERSION ${ROCKSDB_VERSION} - SOVERSION ${ROCKSDB_VERSION_MAJOR} - CXX_STANDARD 11 - OUTPUT_NAME "rocksdb") +# set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES +# LINKER_LANGUAGE CXX +# VERSION ${ROCKSDB_VERSION} +# SOVERSION ${ROCKSDB_VERSION_MAJOR} +# CXX_STANDARD 11 +# OUTPUT_NAME "rocksdb") endif() option(WITH_LIBRADOS "Build with librados" OFF) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt index f4239a4..6716638 100644 --- a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt +++ b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt @@ -44,7 +44,7 @@ option(YAML_CPP_BUILD_CONTRIB "Enable contrib stuff in library" ON) # --> General # see http://www.cmake.org/cmake/help/cmake2.6docs.html#variable:BUILD_SHARED_LIBS # http://www.cmake.org/cmake/help/cmake2.6docs.html#command:add_library -option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF) +#option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF)
