This is an automated email from the ASF dual-hosted git repository. phrocker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 217be558805c10b95c0e40dd886515dec23111e6 Author: Arpad Boda <[email protected]> AuthorDate: Tue Dec 18 16:27:56 2018 +0100 MINIFICPP-695 - NanoFi Examples appear to be broken This closes #462. Signed-off-by: Marc Parisi <[email protected]> --- nanofi/examples/transmit_flow.c | 11 ++ nanofi/include/api/nanofi.h | 34 +++++- nanofi/include/core/cstructs.h | 5 + nanofi/include/cxx/Instance.h | 6 +- nanofi/src/api/nanofi.cpp | 254 ++++++++++++++++++++++------------------ nanofi/tests/CAPITests.cpp | 90 +++++++++++++- 6 files changed, 276 insertions(+), 124 deletions(-) diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c index e132c8b..66bedde 100644 --- a/nanofi/examples/transmit_flow.c +++ b/nanofi/examples/transmit_flow.c @@ -89,5 +89,16 @@ int main(int argc, char **argv) { //initialize_instance(instance); transfer_file_or_directory(instance,file); + //Create flowfile without content (just a set of attributes) + flow_file_record * record = create_ff_object_nc(); + + const char * custom_value = "transmitted value"; + + add_attribute(record, "transmitted attribute", (void*)custom_value, strlen(custom_value)); + + transmit_flowfile(record, instance); + + free_flowfile(record); + free_instance(instance); } diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h index 04f43e5..7b2cb18 100644 --- a/nanofi/include/api/nanofi.h +++ b/nanofi/include/api/nanofi.h @@ -265,16 +265,40 @@ flow_file_record *invoke_chunk(standalone_processor *proc, uint8_t *buf, uint64_ int transfer(processor_session* session, flow *flow, const char *rel); /** - * Creates a flow file object. - * Will obtain the size of file + * Creates a flow file record based on a file + * @param file source file + * @param len length of the file name + * @return a flow file record or nullptr in case no flowfile was generated */ flow_file_record* create_flowfile(const char *file, const size_t len); +/** + * Creates a flow file record based on a file + * @param file source file + * @param len length of the file name + * @param size size of the file + * @return a flow file record or nullptr in case no flowfile was generated + */ flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size); +/** + * Creates a flow file record based on a file, without attributes + * @attention attributes cannot be added later! + * @param file source file + * @param len length of the file name + * @param size size of the file + * @return a flow file record or nullptr in case no flowfile was generated + */ flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size); /** + * Creates a flow file record without content. Only attributes can be added. + * @attention content cannot be added later! + * @return a flow file record or nullptr in case no flowfile was generated + */ +flow_file_record* create_ff_object_nc(); + +/** * Get incoming flow file. To be used in processor logic callbacks. * @param session current processor session * @param context current processor context @@ -297,7 +321,7 @@ void free_flowfile(flow_file_record* ff); * @size size size of the data pointed by "value" * @return 0 in case of success, -1 otherwise (already existed) **/ -uint8_t add_attribute(flow_file_record* ff, const char *key, void *value, size_t size); +int8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size); /** * Updates an attribute (adds if it hasn't existed before) @@ -314,7 +338,7 @@ void update_attribute(flow_file_record* ff, const char *key, void *value, size_t * @param caller_attribute attribute structure to provide name and get value, size * @return 0 in case of success, -1 otherwise (no such attribute) **/ -uint8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute); +int8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute); /** * Get the quantity of attributes @@ -345,7 +369,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size); * @param name name of the attribute * @return 0 on success, -1 otherwise (doesn't exist) **/ -uint8_t remove_attribute(flow_file_record*, char *key); +int8_t remove_attribute(flow_file_record*, const char * key); /**** * ################################################################## diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h index d85c856..9a78544 100644 --- a/nanofi/include/core/cstructs.h +++ b/nanofi/include/core/cstructs.h @@ -128,4 +128,9 @@ typedef enum FS { typedef void (processor_logic)(processor_session*, processor_context *); +typedef struct file_buffer { + uint8_t * buffer; + uint64_t file_len; +} file_buffer; + #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */ diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h index 982f6d4..1370710 100644 --- a/nanofi/include/cxx/Instance.h +++ b/nanofi/include/cxx/Instance.h @@ -122,7 +122,7 @@ class Instance { return content_repo_; } - void transfer(const std::shared_ptr<FlowFileRecord> &ff) { + void transfer(const std::shared_ptr<FlowFileRecord> &ff, const std::shared_ptr<minifi::io::DataStream> &stream = nullptr) { std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr; auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_); auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext); @@ -132,7 +132,9 @@ class Instance { auto session = std::make_shared<core::ReflexiveSession>(processContext); session->add(ff); - + if(stream) { + session->importFrom(*stream.get(), ff); + } rpg_->onTrigger(processContext, session); } diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index a58f325..756544e 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -35,6 +35,17 @@ #include "io/DataStream.h" #include "core/cxxstructs.h" +#define NULL_CHECK(ret_val, ...) \ + do { \ + const void *_p[] = { __VA_ARGS__ }; \ + int _i; \ + for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) { \ + if (_p[_i] == NULL) { \ + return ret_val; \ + } \ + } \ + } while(0) + using string_map = std::map<std::string, std::string>; class API_INITIALIZER { @@ -42,6 +53,33 @@ class API_INITIALIZER { static int initialized; }; +//Just an internal utility func., not to be published via API! +file_buffer file_to_buffer(const char *path) { + file_buffer fb; + fb.buffer = nullptr; + fb.file_len = 0; + + NULL_CHECK(fb, path); + FILE *fileptr; + uint8_t *buffer; + uint64_t filelen; + + fileptr = fopen(path, "rb"); + NULL_CHECK(fb, fileptr); + + fseek(fileptr, 0, SEEK_END); + filelen = ftell(fileptr); + rewind(fileptr); + + buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory for file + \0 + fread(buffer, filelen, 1, fileptr); + fclose(fileptr); + + fb.buffer = buffer; + fb.file_len = filelen; + return fb; +} + int API_INITIALIZER::initialized = initialize_api(); static nifi_instance* standalone_instance = nullptr; @@ -88,6 +126,9 @@ nifi_instance *create_instance(const char *url, nifi_port *port) { * Since minifi::Instance is currently being used, then we need to use new in that case. */ instance->instance_ptr = new minifi::Instance(url, port->port_id); + + NULL_CHECK(nullptr, instance->instance_ptr); + // may have to translate port ID here in the future // need reinterpret cast until we move to C for this module. instance->port.port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1)); @@ -96,6 +137,7 @@ nifi_instance *create_instance(const char *url, nifi_port *port) { } standalone_processor *create_processor(const char *name) { + NULL_CHECK(nullptr, name); auto ptr = ExecutionPlan::createProcessor(name, name); if (!ptr) { return nullptr; @@ -114,9 +156,7 @@ standalone_processor *create_processor(const char *name) { } void free_standalone_processor(standalone_processor* proc) { - if (proc == nullptr) { - return; - } + NULL_CHECK(, proc); ExecutionPlan::removeProcWithPlan(proc->getUUIDStr()); if (ExecutionPlan::getProcWithPlanQty() == 0) { @@ -154,9 +194,7 @@ void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callbac * @return -1 when instance or key are null */ int set_instance_property(nifi_instance *instance, const char *key, const char *value) { - if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == key) { - return -1; - } + NULL_CHECK(-1, instance, key); auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); minifi_instance_ref->getConfiguration()->set(key, value); return 0; @@ -167,11 +205,10 @@ int set_instance_property(nifi_instance *instance, const char *key, const char * * @param instance nifi instance. */ void free_instance(nifi_instance* instance) { - if (instance != nullptr) { - delete ((minifi::Instance*) instance->instance_ptr); - free(instance->port.port_id); - free(instance); - } + NULL_CHECK(, instance); + delete ((minifi::Instance*) instance->instance_ptr); + free(instance->port.port_id); + free(instance); } /** @@ -179,15 +216,10 @@ void free_instance(nifi_instance* instance) { * @param file file to place into the flow file. */ flow_file_record* create_flowfile(const char *file, const size_t len) { - flow_file_record *new_ff = (flow_file_record*) malloc(sizeof(flow_file_record)); - new_ff->attributes = new string_map(); - new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1)); - snprintf(new_ff->contentLocation, len + 1, "%s", file); + NULL_CHECK(nullptr, file); std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); - // set the size of the flow file. - new_ff->size = in.tellg(); - new_ff->keepContent = 0; - return new_ff; + uint64_t file_size = in.tellg(); + return create_ff_object(file, len, file_size); } /** @@ -195,38 +227,48 @@ flow_file_record* create_flowfile(const char *file, const size_t len) { * @param file file to place into the flow file. */ flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) { - if (nullptr == file) { - return nullptr; - } + NULL_CHECK(nullptr, file); flow_file_record *new_ff = create_ff_object_na(file, len, size); new_ff->attributes = new string_map(); - new_ff->ffp = 0; return new_ff; } flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) { - flow_file_record *new_ff = (flow_file_record*) malloc(sizeof(flow_file_record)); + flow_file_record *new_ff = (flow_file_record *) malloc(sizeof(flow_file_record)); new_ff->attributes = nullptr; - new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1)); - snprintf(new_ff->contentLocation, len + 1, "%s", file); - // set the size of the flow file. - new_ff->size = size; + if (file != nullptr) { + new_ff->contentLocation = (char *) malloc(sizeof(char) * (len + 1)); + snprintf(new_ff->contentLocation, len + 1, "%s", file); + // set the size of the flow file. + new_ff->size = size; + } else { + new_ff->contentLocation = nullptr; + new_ff->size = 0; + } new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>); + new_ff->ffp = nullptr; new_ff->keepContent = 0; return new_ff; } + +flow_file_record* create_ff_object_nc() { + flow_file_record* new_ff = create_ff_object_na(nullptr, 0, 0); + new_ff->attributes = new string_map(); + return new_ff; +} /** * Reclaims memory associated with a flow file object * @param ff flow file record. */ void free_flowfile(flow_file_record *ff) { - if (ff == nullptr) { - return; - } - auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp); - if (content_repo_ptr->get() && (ff->keepContent == 0)) { - std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr); - (*content_repo_ptr)->remove(claim); + NULL_CHECK(, ff); + if (ff->crp){ + auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp); + if((*content_repo_ptr) && (ff->keepContent == 0)) { + auto claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr); + (*content_repo_ptr)->remove(claim); + } + delete content_repo_ptr; } if (ff->ffp == nullptr) { auto map = static_cast<string_map*>(ff->attributes); @@ -237,7 +279,6 @@ void free_flowfile(flow_file_record *ff) { } free(ff->contentLocation); free(ff); - delete content_repo_ptr; } /** @@ -248,7 +289,9 @@ void free_flowfile(flow_file_record *ff) { * @param size size of value * @return 0 or -1 based on whether the attributed existed previously (-1) or not (0) */ -uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { +int8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { + NULL_CHECK(-1, ff, key, value); + NULL_CHECK(-1, ff->attributes); auto attribute_map = static_cast<string_map*>(ff->attributes); const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size))); return ret.second ? 0 : -1; @@ -262,6 +305,8 @@ uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t * @param size size of value */ void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) { + NULL_CHECK(, ff, key); + NULL_CHECK(, ff->attributes); auto attribute_map = static_cast<string_map*>(ff->attributes); (*attribute_map)[key] = std::string(static_cast<char*>(value), size); } @@ -273,14 +318,10 @@ void update_attribute(flow_file_record *ff, const char *key, void *value, size_t * @param caller_attribute caller supplied object in which we will copy the data ptr * @return 0 if successful, -1 if the key does not exist */ -uint8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) { - if (ff == nullptr) { - return -1; - } +int8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) { + NULL_CHECK(-1, ff, caller_attribute); + NULL_CHECK(-1, ff->attributes, caller_attribute->key); auto attribute_map = static_cast<string_map*>(ff->attributes); - if (!attribute_map) { - return -1; - } auto find = attribute_map->find(caller_attribute->key); if (find != attribute_map->end()) { caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data())); @@ -291,21 +332,16 @@ uint8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) } int get_attribute_quantity(const flow_file_record *ff) { - if (ff == nullptr) { - return 0; - } + NULL_CHECK(0, ff); + NULL_CHECK(0, ff->attributes); auto attribute_map = static_cast<string_map*>(ff->attributes); return attribute_map ? attribute_map->size() : 0; } int get_all_attributes(const flow_file_record* ff, attribute_set *target) { - if (ff == nullptr) { - return 0; - } + NULL_CHECK(0, ff, target); + NULL_CHECK(0, ff->attributes, target->attributes); auto attribute_map = static_cast<string_map*>(ff->attributes); - if (!attribute_map || attribute_map->empty()) { - return 0; - } int i = 0; for (const auto& kv : *attribute_map) { if (i >= target->size) { @@ -325,15 +361,15 @@ int get_all_attributes(const flow_file_record* ff, attribute_set *target) { * @param key key to remove * @return 0 if removed, -1 otherwise */ -uint8_t remove_attribute(flow_file_record *ff, const char *key) { +int8_t remove_attribute(flow_file_record *ff, const char *key) { + NULL_CHECK(-1, ff, key); + NULL_CHECK(-1, ff->attributes); auto attribute_map = static_cast<string_map*>(ff->attributes); return attribute_map->erase(key) - 1; // erase by key returns the number of elements removed (0 or 1) } int get_content(const flow_file_record* ff, uint8_t* target, int size) { - if (ff == nullptr || target == nullptr || size == 0) { - return 0; - } + NULL_CHECK(0, ff, target); auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp); std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo); auto stream = (*content_repo)->read(claim); @@ -346,48 +382,67 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) { * @param instance nifi instance structure */ int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { + static string_map empty_attribute_map; + + NULL_CHECK(-1, ff, instance); auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); // in the unlikely event the user forgot to initialize the instance, we shall do it for them. if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) { minifi_instance_ref->setRemotePort(instance->port.port_id); } - auto attribute_map = static_cast<string_map*>(ff->attributes); + string_map& attribute_map = empty_attribute_map; + if(ff->attributes) { + attribute_map = *(static_cast<string_map *>(ff->attributes)); + } auto no_op = minifi_instance_ref->getNoOpRepository(); auto content_repo = minifi_instance_ref->getContentRepository(); - std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); - claim->increaseFlowFileRecordOwnedCount(); - claim->increaseFlowFileRecordOwnedCount(); + std::shared_ptr<minifi::ResourceClaim> claim = nullptr; + std::shared_ptr<minifi::io::DataStream> stream = nullptr; //Used when content is not in content repo + + if(ff->contentLocation) { + auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp)); + if (ff->crp && (*ff_content_repo_ptr)) { + content_repo = *ff_content_repo_ptr; + claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); + claim->increaseFlowFileRecordOwnedCount(); + claim->increaseFlowFileRecordOwnedCount(); + } else { + file_buffer fb = file_to_buffer(ff->contentLocation); + stream = std::make_shared<minifi::io::DataStream>(); + stream->writeData(fb.buffer, fb.file_len); + } + } else { + //The flowfile has no content - create an empty stream to create empty content + stream = std::make_shared<minifi::io::DataStream>(); + } - auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim); + auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, attribute_map, claim); ffr->addAttribute("nanofi.version", API_VERSION); ffr->setSize(ff->size); std::string port_uuid = instance->port.port_id; - minifi_instance_ref->transfer(ffr); + minifi_instance_ref->transfer(ffr, stream); return 0; } flow * create_new_flow(nifi_instance * instance) { - if (nullptr == instance || nullptr == instance->instance_ptr) { - return nullptr; - } + NULL_CHECK(nullptr, instance); auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); flow * area = static_cast<flow*>(malloc(1*sizeof(flow))); - if(area == nullptr) { - return nullptr; - } + NULL_CHECK(nullptr, area); flow *new_flow = new(area) flow(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); return new_flow; } flow *create_flow(nifi_instance *instance, const char *first_processor) { + NULL_CHECK(nullptr, instance); auto new_flow = create_new_flow(instance); if(new_flow != nullptr && first_processor != nullptr && strlen(first_processor) > 0) { @@ -423,9 +478,7 @@ flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfi } processor *add_processor(flow *flow, const char *processor_name) { - if (nullptr == flow || nullptr == processor_name) { - return nullptr; - } + NULL_CHECK(nullptr, flow, processor_name); auto proc = flow->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), flow->hasProcessor()); return static_cast<processor*>(proc.get()); @@ -440,25 +493,19 @@ int set_failure_strategy(flow *flow, FailureStrategy strategy) { } int set_propery_internal(core::Processor* proc, const char *name, const char *value) { - if (name != nullptr && value != nullptr) { - bool success = proc->setProperty(name, value) || (proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value)); - return success ? 0 : -2; - } - return -1; + NULL_CHECK(-1, proc, name, value); + bool success = proc->setProperty(name, value) || (proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value)); + return success ? 0 : -2; } int set_property(processor *proc, const char *name, const char *value) { - if (proc != nullptr) { - return set_propery_internal(proc, name, value); - } - return -1; + NULL_CHECK(-1, proc); + return set_propery_internal(proc, name, value); } int set_standalone_property(standalone_processor *proc, const char *name, const char *value) { - if (proc != nullptr) { - return set_propery_internal(proc, name, value); - } - return -1; + NULL_CHECK(-1, proc); + return set_propery_internal(proc, name, value); } uint8_t get_property(const processor_context * context, const char * name, char * buffer, size_t size) { @@ -473,21 +520,8 @@ uint8_t get_property(const processor_context * context, const char * name, char return 0; } -char * get_property(const processor_context * context, const char * name) { - std::string value; - if(!context->getDynamicProperty(name, value)) { - return nullptr; - } - size_t len = value.length(); - char * ret_val = (char*)malloc((len +1) * sizeof(char)); - strncpy(ret_val, value.data(), len); - ret_val[len] = '\0'; - return ret_val; -} - int free_flow(flow *flow) { - if (flow == nullptr) - return -1; + NULL_CHECK(-1, flow); flow->~flow(); free(flow); return 0; @@ -611,24 +645,12 @@ flow_file_record *invoke_chunk(standalone_processor* proc, uint8_t* buf, uint64_ } flow_file_record *invoke_file(standalone_processor* proc, const char* path) { - FILE *fileptr; - uint8_t *buffer; - uint64_t filelen; - - fileptr = fopen(path, "rb"); - if (fileptr == nullptr) { - return nullptr; - } - fseek(fileptr, 0, SEEK_END); - filelen = ftell(fileptr); - rewind(fileptr); - - buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory for file + \0 - fread(buffer, filelen, 1, fileptr); - fclose(fileptr); + NULL_CHECK(nullptr, path, proc); + auto fb = file_to_buffer(path); + NULL_CHECK(nullptr, fb.buffer); - flow_file_record* ffr = invoke_chunk(proc, buffer, filelen); - free(buffer); + flow_file_record* ffr = invoke_chunk(proc, fb.buffer, fb.file_len); + free(fb.buffer); return ffr; } diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index 35cbfc2..52a793c 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -450,4 +450,92 @@ TEST_CASE("Test custom processor", "[TestCutomProcessor]") { flow_file_record *record = get_next_flow_file(instance, test_flow); REQUIRE(record != nullptr); -} \ No newline at end of file +} + +TEST_CASE("C API robustness test", "[TestRobustness]") { + free_flow(nullptr); + free_standalone_processor(nullptr); + free_instance(nullptr); + + REQUIRE(create_processor(nullptr) == nullptr); + + standalone_processor *standalone_proc = create_processor("GetFile"); + REQUIRE(standalone_proc != nullptr); + + REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1); + + REQUIRE(set_standalone_property(standalone_proc, nullptr, "prop_value") == -1); + + REQUIRE(set_standalone_property(standalone_proc, "prop_name", nullptr) == -1); + + free_standalone_processor(standalone_proc); + + const char *file_path = "path/to/file"; + + flow_file_record *ffr = create_ff_object_na(file_path, strlen(file_path), 0); + + const char *custom_value = "custom value"; + + REQUIRE(add_attribute(nullptr, "custom attribute", (void *) custom_value, strlen(custom_value)) == -1); + + REQUIRE(add_attribute(ffr, "custom attribute", (void *) custom_value, strlen(custom_value)) == -1); + + REQUIRE(add_attribute(ffr, nullptr, (void *) custom_value, strlen(custom_value)) == -1); + + REQUIRE(add_attribute(ffr, "custom attribute", nullptr, 0) == -1); + + update_attribute(nullptr, "custom attribute", (void *) custom_value, strlen(custom_value)); + + update_attribute(ffr, nullptr, (void *) custom_value, strlen(custom_value)); + + attribute attr; + attr.key = "filename"; + attr.value_size = 0; + REQUIRE(get_attribute(ffr, &attr) == -1); + + REQUIRE(get_attribute(nullptr, &attr) == -1); + + REQUIRE(get_attribute(ffr, nullptr) == -1); + + REQUIRE(get_attribute_quantity(nullptr) == 0); + + REQUIRE(get_attribute_quantity(ffr) == 0); + + attribute_set attr_set; + attr_set.size = 3; + attr_set.attributes = (attribute *) malloc(attr_set.size * sizeof(attribute)); // NOLINT + + REQUIRE(get_all_attributes(nullptr, &attr_set) == 0); + + REQUIRE(get_all_attributes(ffr, &attr_set) == 0); + + REQUIRE(get_all_attributes(ffr, nullptr) == 0); + + free(attr_set.attributes); + + REQUIRE(remove_attribute(nullptr, "key") == -1); + + REQUIRE(remove_attribute(ffr, "key") == -1); + + REQUIRE(remove_attribute(ffr, nullptr) == -1); + + auto instance = create_instance_obj(); + + REQUIRE(transmit_flowfile(nullptr, instance) == -1); + + REQUIRE(transmit_flowfile(ffr, nullptr) == -1); + + REQUIRE(create_new_flow(nullptr) == nullptr); + + flow *test_flow = create_new_flow(instance); + + REQUIRE(test_flow != nullptr); + + REQUIRE(add_processor(nullptr, "GetFile") == nullptr); + + REQUIRE(add_processor(test_flow, nullptr) == nullptr); + + free_flow(test_flow); + + free_instance(instance); +}
