Repository: nifi-minifi-cpp Updated Branches: refs/heads/master ed8221b14 -> a82ac26a1
MINIFICPP-638 - C API: add unit tests This closes #417. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a82ac26a Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a82ac26a Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a82ac26a Branch: refs/heads/master Commit: a82ac26a13226468382789cf0f20b87afb451ac5 Parents: ed8221b Author: Arpad Boda <[email protected]> Authored: Tue Oct 9 17:56:38 2018 +0200 Committer: Marc Parisi <[email protected]> Committed: Wed Oct 17 14:26:25 2018 -0400 ---------------------------------------------------------------------- cmake/BuildTests.cmake | 12 ++ libminifi/include/capi/api.h | 24 ++-- libminifi/include/capi/cstructs.h | 7 +- libminifi/src/capi/Plan.cpp | 7 +- libminifi/src/capi/api.cpp | 76 ++++++++++--- libminifi/test/capi/CAPITests.cpp | 201 +++++++++++++++++++++++++++++++++ 6 files changed, 297 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index 3896dd9..d14c19b 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -105,6 +105,7 @@ target_include_directories(${CATCH_MAIN_LIB} BEFORE PRIVATE "${CMAKE_SOURCE_DIR} SET(TEST_RESOURCES ${TEST_DIR}/resources) GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/") +GETSOURCEFILES(CAPI_UNIT_TESTS "${TEST_DIR}/capi/") GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/") SET(UNIT_TEST_COUNT 0) @@ -118,6 +119,17 @@ FOREACH(testfile ${UNIT_TESTS}) ENDFOREACH() message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...") +SET(UNIT_TEST_COUNT 0) +FOREACH(testfile ${CAPI_UNIT_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${TEST_DIR}/capi/${testfile}") + createTests("${testfilename}") + target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} capi) + MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1") + add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) +ENDFOREACH() +message("-- Finished building ${UNIT_TEST_COUNT} capi unit test file(s)...") + SET(INT_TEST_COUNT 0) FOREACH(testfile ${INTEGRATION_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/include/capi/api.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h index 9a4404b..dff29ed 100644 --- a/libminifi/include/capi/api.h +++ b/libminifi/include/capi/api.h @@ -39,15 +39,13 @@ void enable_logging(); * BASE NIFI OPERATIONS * ################################################################## */ - - -nifi_port *create_port(char *port); +nifi_port *create_port(const char *port); int free_port(nifi_port *port); -nifi_instance *create_instance(char *url, nifi_port *port); +nifi_instance *create_instance(const char *url, nifi_port *port); void initialize_instance(nifi_instance *); @@ -79,15 +77,13 @@ flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); processor *add_processor(flow *, const char *); +processor *add_processor_with_linkage(flow *flow, const char *processor_name); + processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); int set_property(processor *, const char *, const char *); -int set_instance_property(nifi_instance *instance, char *key, char *value); - -processor *add_processor(flow *parent_flow, const char *processor_name); - -int set_property(processor *proc, const char *name, const char *value); +int set_instance_property(nifi_instance *instance, const char*, const char *); int free_flow(flow *); @@ -111,11 +107,15 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const void free_flowfile(flow_file_record*); -uint8_t add_attribute(flow_file_record*, char *key, void *value, size_t size); +uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size); + +void update_attribute(flow_file_record*, const char *key, void *value, size_t size); + +uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute); -void update_attribute(flow_file_record*, char *key, void *value, size_t size); +int get_attribute_qty(const flow_file_record* ff); -void *get_attribute(flow_file_record*, char *key); +int get_all_attributes(const flow_file_record* ff, attribute_set *target); uint8_t remove_attribute(flow_file_record*, char *key); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/include/capi/cstructs.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/cstructs.h b/libminifi/include/capi/cstructs.h index 4807db3..731fd55 100644 --- a/libminifi/include/capi/cstructs.h +++ b/libminifi/include/capi/cstructs.h @@ -86,11 +86,16 @@ typedef struct { */ typedef struct { - char *key; + const char *key; void *value; size_t value_size; } attribute; +typedef struct { + attribute * attributes; + size_t size; +} attribute_set; + /** * State of a flow file * http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index 19a62e7..691be41 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -136,9 +136,10 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string & if (finalized) { return nullptr; } - auto processor = ExecutionPlan::createProcessor(processor_name, name); - + if (!processor) { + return nullptr; + } return addProcessor(processor, name, relationship, linkToPrevious); } @@ -241,7 +242,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const std::strin auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid); if (nullptr == ptr) { - throw std::exception(); + return nullptr; } std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/src/capi/api.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp index fa3a6c8..592fb6b 100644 --- a/libminifi/src/capi/api.cpp +++ b/libminifi/src/capi/api.cpp @@ -57,7 +57,7 @@ class DirectoryConfiguration { } }; -nifi_port *create_port(char *port) { +nifi_port *create_port(const char *port) { if (nullptr == port) return nullptr; nifi_port *p = new nifi_port(); @@ -80,7 +80,7 @@ int free_port(nifi_port *port) { * @param url http URL for NiFi instance * @param port Remote output port. */ -nifi_instance *create_instance(char *url, nifi_port *port) { +nifi_instance *create_instance(const char *url, nifi_port *port) { // make sure that we have a thread safe way of initializing the content directory DirectoryConfiguration::initialize(); @@ -119,7 +119,7 @@ void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callbac * @param value * @return -1 when instance or key are null */ -int set_instance_property(nifi_instance *instance, char *key, char *value) { +int set_instance_property(nifi_instance *instance, const char *key, const char *value) { if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) { return -1; } @@ -196,10 +196,11 @@ void free_flowfile(flow_file_record *ff) { std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); content_repo->remove(claim); } - auto map = static_cast<string_map*>(ff->attributes); - delete[] ff->contentLocation; - if (ff->ffp != nullptr) // don't delete map since it's a ref ptr + if (ff->ffp == nullptr) { + auto map = static_cast<string_map*>(ff->attributes); delete map; + } + delete[] ff->contentLocation; delete ff; } } @@ -212,7 +213,7 @@ void free_flowfile(flow_file_record *ff) { * @param size size of value * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0) */ -uint8_t add_attribute(flow_file_record *ff, char *key, void *value, size_t size) { +uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { auto attribute_map = static_cast<string_map*>(ff->attributes); const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size))); return ret.second ? 0 : -1; @@ -225,7 +226,7 @@ uint8_t add_attribute(flow_file_record *ff, char *key, void *value, size_t size) * @param value value to add * @param size size of value */ -void update_attribute(flow_file_record *ff, char *key, void *value, size_t size) { +void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { auto attribute_map = static_cast<string_map*>(ff->attributes); (*attribute_map)[key] = std::string(static_cast<char*>(value), size); } @@ -237,11 +238,16 @@ void update_attribute(flow_file_record *ff, char *key, void *value, size_t size) * @param caller_attribute caller supplied object in which we will copy the data ptr * @return 0 if successful, -1 if the key does not exist */ -uint8_t get_attribute(flow_file_record *ff, char *key, attribute *caller_attribute) { +uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute) { + if (ff == nullptr) { + return -1; + } auto attribute_map = static_cast<string_map*>(ff->attributes); - auto find = attribute_map->find(key); + if (!attribute_map) { + return -1; + } + auto find = attribute_map->find(caller_attribute->key); if (find != attribute_map->end()) { - caller_attribute->key = key; caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data())); caller_attribute->value_size = find->second.size(); return 0; @@ -249,13 +255,42 @@ uint8_t get_attribute(flow_file_record *ff, char *key, attribute *caller_attribu return -1; } +int get_attribute_qty(const flow_file_record* ff) { + if (ff == nullptr) { + return 0; + } + auto attribute_map = static_cast<string_map*>(ff->attributes); + return attribute_map ? attribute_map->size() : 0; +} + +int get_all_attributes(const flow_file_record* ff, attribute_set *target) { + if (ff == nullptr) { + return 0; + } + auto attribute_map = static_cast<string_map*>(ff->attributes); + if (!attribute_map || attribute_map->empty()) { + return 0; + } + int i = 0; + for (const auto& kv : *attribute_map) { + if (i >= target->size) { + break; + } + target->attributes[i].key = kv.first.data(); + target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data())); + target->attributes[i].value_size = kv.second.size(); + ++i; + } + return i; +} + /** * Removes a key from the attribute chain * @param ff flow file record * @param key key to remove * @return 0 if removed, -1 otherwise */ -uint8_t remove_attribute(flow_file_record *ff, char *key) { +uint8_t remove_attribute(flow_file_record *ff, const char *key) { auto attribute_map = static_cast<string_map*>(ff->attributes); return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1) } @@ -361,6 +396,18 @@ processor *add_processor(flow *flow, const char *processor_name) { } return nullptr; } + +processor *add_processor_with_linkage(flow *flow, const char *processor_name) { + ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan); + auto proc = plan->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), true); + if (proc) { + processor *new_processor = new processor(); + new_processor->processor_ptr = proc.get(); + return new_processor; + } + return nullptr; +} + int set_property(processor *proc, const char *name, const char *value) { if (name != nullptr && value != nullptr && proc != nullptr) { core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr); @@ -386,8 +433,9 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { while (execution_plan->runNextProcessor()) { } auto ff = execution_plan->getCurrentFlowFile(); - if (ff == nullptr) + if (ff == nullptr) { return nullptr; + } auto claim = ff->getResourceClaim(); if (claim != nullptr) { @@ -395,8 +443,8 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { claim->increaseFlowFileRecordOwnedCount(); auto path = claim->getContentFullPath(); auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); - ffr->attributes = ff->getAttributesPtr(); ffr->ffp = ff.get(); + ffr->attributes = ff->getAttributesPtr(); ffr->in = instance; return ffr; } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/test/capi/CAPITests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp new file mode 100644 index 0000000..d07b75b --- /dev/null +++ b/libminifi/test/capi/CAPITests.cpp @@ -0,0 +1,201 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <uuid/uuid.h> +#include <sys/stat.h> +#include <utility> +#include <memory> +#include <string> +#include <vector> +#include <set> +#include <fstream> + + +#include "utils/file/FileUtils.h" +#include "../TestBase.h" + +#include "capi/api.h" + +#include <chrono> +#include <thread> + +TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { + nifi_instance *instance = create_instance("random_instance", create_port("12345")); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); + REQUIRE(test_proc != nullptr); + free_flow(test_flow); + free_instance(instance); +} + +TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") { + nifi_instance *instance = create_instance("random_instance", create_port("12345")); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + processor *test_proc = add_processor(test_flow, "NeverExisted"); + REQUIRE(test_proc == nullptr); + processor *no_proc = add_processor(test_flow, ""); + REQUIRE(no_proc == nullptr); + free_flow(test_flow); + free_instance(instance); +} + +TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") { + nifi_instance *instance = create_instance("random_instance", create_port("12345")); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); + REQUIRE(test_proc != nullptr); + REQUIRE(set_property(test_proc, "Data Format", "Text") == 0); // Valid value + // TODO(aboda): add this two below when property handling is made strictly typed + // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value + // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute + REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0); // Empty value + REQUIRE(set_property(test_proc, nullptr, "Blah") != 0); // Empty attribute + REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0); // Invalid processor + free_flow(test_flow); + free_instance(instance); +} + +TEST_CASE("get file and put file", "[getAndPutFile]") { + TestController testController; + + char src_format[] = "/tmp/gt.XXXXXX"; + char put_format[] = "/tmp/pt.XXXXXX"; + const char *sourcedir = testController.createTempDirectory(src_format); + const char *putfiledir = testController.createTempDirectory(put_format); + std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; + + nifi_instance *instance = create_instance("random_instance", create_port("12345")); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + processor *put_proc = add_processor_with_linkage(test_flow, "PutFile"); + REQUIRE(put_proc != nullptr); + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0); + + std::fstream file; + std::stringstream ss; + ss << sourcedir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + + flow_file_record *record = get_next_flow_file(instance, test_flow); + REQUIRE(record != nullptr); + + ss.str(""); + + ss << putfiledir << "/" << "tstFile.ext"; + std::ifstream t(ss.str()); + std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>()); + + REQUIRE(test_file_content == put_data); + + free_flowfile(record); + + free_flow(test_flow); + + free_instance(instance); +} + +TEST_CASE("Test manipulation of attributes", "[testAttributes]") { + TestController testController; + + enable_logging(); + + char src_format[] = "/tmp/gt.XXXXXX"; + const char *sourcedir = testController.createTempDirectory(src_format); + + std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; + + std::fstream file; + std::stringstream ss; + ss << sourcedir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << test_file_content; + file.close(); + + nifi_instance *instance = create_instance("random_instance", create_port("12345")); + REQUIRE(instance != nullptr); + flow *test_flow = create_flow(instance, nullptr); + REQUIRE(test_flow != nullptr); + + processor *get_proc = add_processor(test_flow, "GetFile"); + REQUIRE(get_proc != nullptr); + REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0); + processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText"); + REQUIRE(extract_test != nullptr); + REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0); + // TODO(aboda): enable this after decision made in MINIFICPP-640 + /*processor *update_attribute = add_processor_with_linkage(test_flow, "UpdateAttribute"); + REQUIRE(update_attribute != nullptr); + + REQUIRE(set_property(update_attribute, "TestAttribute", "TestValue") == 0);*/ + + flow_file_record *record = get_next_flow_file(instance, test_flow); + + REQUIRE(record != nullptr); + + attribute test_attr; + test_attr.key = "TestAttr"; + REQUIRE(get_attribute(record, &test_attr) == 0); + + REQUIRE(test_attr.value_size != 0); + REQUIRE(test_attr.value != nullptr); + + std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size); + + REQUIRE(attr_value == test_file_content); + + const char * new_testattr_value = "S0me t3st t3xt"; + + // Attribute already exist, should fail + REQUIRE(add_attribute(record, test_attr.key, (void*)new_testattr_value, strlen(new_testattr_value)) != 0); // NOLINT + + // Update overwrites values + update_attribute(record, test_attr.key, (void*)new_testattr_value, strlen(new_testattr_value)); // NOLINT + + int attr_size = get_attribute_qty(record); + REQUIRE(attr_size > 0); + + attribute_set attr_set; + attr_set.size = attr_size; + attr_set.attributes = (attribute*)malloc(attr_set.size * sizeof(attribute)); // NOLINT + + REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size); + + bool test_attr_found = false; + for (int i = 0; i < attr_set.size; ++i) { + if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) { + test_attr_found = true; + REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value); + } + } + REQUIRE(test_attr_found == true); + + free_flowfile(record); + + free_flow(test_flow); + free_instance(instance); +}
