This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 217be558805c10b95c0e40dd886515dec23111e6
Author: Arpad Boda <[email protected]>
AuthorDate: Tue Dec 18 16:27:56 2018 +0100

    MINIFICPP-695 - NanoFi Examples appear to be broken
    
    This closes #462.
    
    Signed-off-by: Marc Parisi <[email protected]>
---
 nanofi/examples/transmit_flow.c |  11 ++
 nanofi/include/api/nanofi.h     |  34 +++++-
 nanofi/include/core/cstructs.h  |   5 +
 nanofi/include/cxx/Instance.h   |   6 +-
 nanofi/src/api/nanofi.cpp       | 254 ++++++++++++++++++++++------------------
 nanofi/tests/CAPITests.cpp      |  90 +++++++++++++-
 6 files changed, 276 insertions(+), 124 deletions(-)

diff --git a/nanofi/examples/transmit_flow.c b/nanofi/examples/transmit_flow.c
index e132c8b..66bedde 100644
--- a/nanofi/examples/transmit_flow.c
+++ b/nanofi/examples/transmit_flow.c
@@ -89,5 +89,16 @@ int main(int argc, char **argv) {
   //initialize_instance(instance);
   transfer_file_or_directory(instance,file);
 
+  //Create flowfile without content (just a set of attributes)
+  flow_file_record * record = create_ff_object_nc();
+
+  const char * custom_value = "transmitted value";
+
+  add_attribute(record, "transmitted attribute", (void*)custom_value, 
strlen(custom_value));
+
+  transmit_flowfile(record, instance);
+
+  free_flowfile(record);
+
   free_instance(instance);
 }
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 04f43e5..7b2cb18 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -265,16 +265,40 @@ flow_file_record *invoke_chunk(standalone_processor 
*proc, uint8_t *buf, uint64_
 int transfer(processor_session* session, flow *flow, const char *rel);
 
 /**
- * Creates a flow file object.
- * Will obtain the size of file
+ * Creates a flow file record based on a file
+ * @param file source file
+ * @param len length of the file name
+ * @return a flow file record or nullptr in case no flowfile was generated
  */
 flow_file_record* create_flowfile(const char *file, const size_t len);
 
+/**
+ * Creates a flow file record based on a file
+ * @param file source file
+ * @param len length of the file name
+ * @param size size of the file
+ * @return a flow file record or nullptr in case no flowfile was generated
+ */
 flow_file_record* create_ff_object(const char *file, const size_t len, const 
uint64_t size);
 
+/**
+ * Creates a flow file record based on a file, without attributes
+ * @attention attributes cannot be added later!
+ * @param file source file
+ * @param len length of the file name
+ * @param size size of the file
+ * @return a flow file record or nullptr in case no flowfile was generated
+ */
 flow_file_record* create_ff_object_na(const char *file, const size_t len, 
const uint64_t size);
 
 /**
+ * Creates a flow file record without content. Only attributes can be added.
+ * @attention content cannot be added later!
+ * @return a flow file record or nullptr in case no flowfile was generated
+ */
+flow_file_record* create_ff_object_nc();
+
+/**
  * Get incoming flow file. To be used in processor logic callbacks.
  * @param session current processor session
  * @param context current processor context
@@ -297,7 +321,7 @@ void free_flowfile(flow_file_record* ff);
  * @size size size of the data pointed by "value"
  * @return 0 in case of success, -1 otherwise (already existed)
  **/
-uint8_t add_attribute(flow_file_record* ff, const char *key, void *value, 
size_t size);
+int8_t add_attribute(flow_file_record*, const char *key, void *value, size_t 
size);
 
 /**
  * Updates an attribute (adds if it hasn't existed before)
@@ -314,7 +338,7 @@ void update_attribute(flow_file_record* ff, const char 
*key, void *value, size_t
  * @param caller_attribute attribute structure to provide name and get value, 
size
  * @return 0 in case of success, -1 otherwise (no such attribute)
  **/
-uint8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute);
+int8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute);
 
 /**
  * Get the quantity of attributes
@@ -345,7 +369,7 @@ int get_content(const flow_file_record* ff, uint8_t* 
target, int size);
  * @param name name of the attribute
  * @return 0 on success, -1 otherwise (doesn't exist)
  **/
-uint8_t remove_attribute(flow_file_record*, char *key);
+int8_t remove_attribute(flow_file_record*, const char * key);
 
 /****
  * ##################################################################
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index d85c856..9a78544 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -128,4 +128,9 @@ typedef enum FS {
 
 typedef void (processor_logic)(processor_session*, processor_context *);
 
+typedef struct file_buffer {
+  uint8_t * buffer;
+  uint64_t file_len;
+} file_buffer;
+
 #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 982f6d4..1370710 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -122,7 +122,7 @@ class Instance {
     return content_repo_;
   }
 
-  void transfer(const std::shared_ptr<FlowFileRecord> &ff) {
+  void transfer(const std::shared_ptr<FlowFileRecord> &ff, const 
std::shared_ptr<minifi::io::DataStream> &stream = nullptr) {
     std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider = nullptr;
     auto processContext = std::make_shared<core::ProcessContext>(proc_node_, 
controller_service_provider, no_op_repo_, no_op_repo_, content_repo_);
     auto sessionFactory = 
std::make_shared<core::ProcessSessionFactory>(processContext);
@@ -132,7 +132,9 @@ class Instance {
     auto session = std::make_shared<core::ReflexiveSession>(processContext);
 
     session->add(ff);
-
+    if(stream) {
+      session->importFrom(*stream.get(), ff);
+    }
     rpg_->onTrigger(processContext, session);
   }
 
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index a58f325..756544e 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -35,6 +35,17 @@
 #include "io/DataStream.h"
 #include "core/cxxstructs.h"
 
+#define NULL_CHECK(ret_val, ...)                        \
+  do {                                                  \
+    const void *_p[] = { __VA_ARGS__ };                 \
+    int _i;                                             \
+    for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) {   \
+      if (_p[_i] == NULL) {                             \
+        return ret_val;                                 \
+      }                                                 \
+    }                                                   \
+  } while(0)
+
 using string_map = std::map<std::string, std::string>;
 
 class API_INITIALIZER {
@@ -42,6 +53,33 @@ class API_INITIALIZER {
   static int initialized;
 };
 
+//Just an internal utility func., not to be published via API!
+file_buffer file_to_buffer(const char *path) {
+  file_buffer fb;
+  fb.buffer = nullptr;
+  fb.file_len = 0;
+
+  NULL_CHECK(fb, path);
+  FILE *fileptr;
+  uint8_t *buffer;
+  uint64_t filelen;
+
+  fileptr = fopen(path, "rb");
+  NULL_CHECK(fb, fileptr);
+
+  fseek(fileptr, 0, SEEK_END);
+  filelen = ftell(fileptr);
+  rewind(fileptr);
+
+  buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory 
for file + \0
+  fread(buffer, filelen, 1, fileptr);
+  fclose(fileptr);
+
+  fb.buffer = buffer;
+  fb.file_len = filelen;
+  return fb;
+}
+
 int API_INITIALIZER::initialized = initialize_api();
 
 static nifi_instance* standalone_instance = nullptr;
@@ -88,6 +126,9 @@ nifi_instance *create_instance(const char *url, nifi_port 
*port) {
    * Since minifi::Instance is currently being used, then we need to use new 
in that case.
    */
   instance->instance_ptr = new minifi::Instance(url, port->port_id);
+
+  NULL_CHECK(nullptr, instance->instance_ptr);
+
   // may have to translate port ID here in the future
   // need reinterpret cast until we move to C for this module.
   instance->port.port_id = 
reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
@@ -96,6 +137,7 @@ nifi_instance *create_instance(const char *url, nifi_port 
*port) {
 }
 
 standalone_processor *create_processor(const char *name) {
+  NULL_CHECK(nullptr, name);
   auto ptr = ExecutionPlan::createProcessor(name, name);
   if (!ptr) {
     return nullptr;
@@ -114,9 +156,7 @@ standalone_processor *create_processor(const char *name) {
 }
 
 void free_standalone_processor(standalone_processor* proc) {
-  if (proc == nullptr) {
-    return;
-  }
+  NULL_CHECK(, proc);
   ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
 
   if (ExecutionPlan::getProcWithPlanQty() == 0) {
@@ -154,9 +194,7 @@ void enable_async_c2(nifi_instance *instance, C2_Server 
*server, c2_stop_callbac
  * @return -1 when instance or key are null
  */
 int set_instance_property(nifi_instance *instance, const char *key, const char 
*value) {
-  if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == 
key) {
-    return -1;
-  }
+  NULL_CHECK(-1, instance, key);
   auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
   minifi_instance_ref->getConfiguration()->set(key, value);
   return 0;
@@ -167,11 +205,10 @@ int set_instance_property(nifi_instance *instance, const 
char *key, const char *
  * @param instance nifi instance.
  */
 void free_instance(nifi_instance* instance) {
-  if (instance != nullptr) {
-    delete ((minifi::Instance*) instance->instance_ptr);
-    free(instance->port.port_id);
-    free(instance);
-  }
+  NULL_CHECK(, instance);
+  delete ((minifi::Instance*) instance->instance_ptr);
+  free(instance->port.port_id);
+  free(instance);
 }
 
 /**
@@ -179,15 +216,10 @@ void free_instance(nifi_instance* instance) {
  * @param file file to place into the flow file.
  */
 flow_file_record* create_flowfile(const char *file, const size_t len) {
-  flow_file_record *new_ff = (flow_file_record*) 
malloc(sizeof(flow_file_record));
-  new_ff->attributes = new string_map();
-  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
+  NULL_CHECK(nullptr, file);
   std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
-  // set the size of the flow file.
-  new_ff->size = in.tellg();
-  new_ff->keepContent = 0;
-  return new_ff;
+  uint64_t file_size = in.tellg();
+  return create_ff_object(file, len, file_size);
 }
 
 /**
@@ -195,38 +227,48 @@ flow_file_record* create_flowfile(const char *file, const 
size_t len) {
  * @param file file to place into the flow file.
  */
 flow_file_record* create_ff_object(const char *file, const size_t len, const 
uint64_t size) {
-  if (nullptr == file) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, file);
   flow_file_record *new_ff = create_ff_object_na(file, len, size);
   new_ff->attributes = new string_map();
-  new_ff->ffp = 0;
   return new_ff;
 }
 
 flow_file_record* create_ff_object_na(const char *file, const size_t len, 
const uint64_t size) {
-  flow_file_record *new_ff = (flow_file_record*) 
malloc(sizeof(flow_file_record));
+  flow_file_record *new_ff = (flow_file_record *) 
malloc(sizeof(flow_file_record));
   new_ff->attributes = nullptr;
-  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
-  snprintf(new_ff->contentLocation, len + 1, "%s", file);
-  // set the size of the flow file.
-  new_ff->size = size;
+  if (file != nullptr) {
+    new_ff->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
+    snprintf(new_ff->contentLocation, len + 1, "%s", file);
+    // set the size of the flow file.
+    new_ff->size = size;
+  } else {
+    new_ff->contentLocation = nullptr;
+    new_ff->size = 0;
+  }
   new_ff->crp = static_cast<void*>(new 
std::shared_ptr<minifi::core::ContentRepository>);
+  new_ff->ffp = nullptr;
   new_ff->keepContent = 0;
   return new_ff;
 }
+
+flow_file_record* create_ff_object_nc() {
+  flow_file_record* new_ff = create_ff_object_na(nullptr, 0, 0);
+  new_ff->attributes = new string_map();
+  return new_ff;
+}
 /**
  * Reclaims memory associated with a flow file object
  * @param ff flow file record.
  */
 void free_flowfile(flow_file_record *ff) {
-  if (ff == nullptr) {
-    return;
-  }
-  auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
-  if (content_repo_ptr->get() && (ff->keepContent == 0)) {
-    std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
-    (*content_repo_ptr)->remove(claim);
+  NULL_CHECK(, ff);
+  if (ff->crp){
+    auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+    if((*content_repo_ptr) && (ff->keepContent == 0)) {
+      auto claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
+      (*content_repo_ptr)->remove(claim);
+    }
+    delete content_repo_ptr;
   }
   if (ff->ffp == nullptr) {
     auto map = static_cast<string_map*>(ff->attributes);
@@ -237,7 +279,6 @@ void free_flowfile(flow_file_record *ff) {
   }
   free(ff->contentLocation);
   free(ff);
-  delete content_repo_ptr;
 }
 
 /**
@@ -248,7 +289,9 @@ void free_flowfile(flow_file_record *ff) {
  * @param size size of value
  * @return 0 or -1 based on whether the attributed existed previously (-1) or 
not (0)
  */
-uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, 
size_t size) {
+int8_t add_attribute(flow_file_record *ff, const char *key, void *value, 
size_t size) {
+  NULL_CHECK(-1, ff, key, value);
+  NULL_CHECK(-1, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   const auto& ret = attribute_map->insert(std::pair<std::string, 
std::string>(key, std::string(static_cast<char*>(value), size)));
   return ret.second ? 0 : -1;
@@ -262,6 +305,8 @@ uint8_t add_attribute(flow_file_record *ff, const char 
*key, void *value, size_t
  * @param size size of value
  */
 void update_attribute(flow_file_record *ff, const char *key, void *value, 
size_t size) {
+  NULL_CHECK(, ff, key);
+  NULL_CHECK(, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
 }
@@ -273,14 +318,10 @@ void update_attribute(flow_file_record *ff, const char 
*key, void *value, size_t
  * @param caller_attribute caller supplied object in which we will copy the 
data ptr
  * @return 0 if successful, -1 if the key does not exist
  */
-uint8_t get_attribute(const flow_file_record * ff, attribute * 
caller_attribute) {
-  if (ff == nullptr) {
-    return -1;
-  }
+int8_t get_attribute(const flow_file_record * ff, attribute * 
caller_attribute) {
+  NULL_CHECK(-1, ff, caller_attribute);
+  NULL_CHECK(-1, ff->attributes, caller_attribute->key);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map) {
-    return -1;
-  }
   auto find = attribute_map->find(caller_attribute->key);
   if (find != attribute_map->end()) {
     caller_attribute->value = 
static_cast<void*>(const_cast<char*>(find->second.data()));
@@ -291,21 +332,16 @@ uint8_t get_attribute(const flow_file_record * ff, 
attribute * caller_attribute)
 }
 
 int get_attribute_quantity(const flow_file_record *ff) {
-  if (ff == nullptr) {
-    return 0;
-  }
+  NULL_CHECK(0, ff);
+  NULL_CHECK(0, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   return attribute_map ? attribute_map->size() : 0;
 }
 
 int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
-  if (ff == nullptr) {
-    return 0;
-  }
+  NULL_CHECK(0, ff, target);
+  NULL_CHECK(0, ff->attributes, target->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
-  if (!attribute_map || attribute_map->empty()) {
-    return 0;
-  }
   int i = 0;
   for (const auto& kv : *attribute_map) {
     if (i >= target->size) {
@@ -325,15 +361,15 @@ int get_all_attributes(const flow_file_record* ff, 
attribute_set *target) {
  * @param key key to remove
  * @return 0 if removed, -1 otherwise
  */
-uint8_t remove_attribute(flow_file_record *ff, const char *key) {
+int8_t remove_attribute(flow_file_record *ff, const char *key) {
+  NULL_CHECK(-1, ff, key);
+  NULL_CHECK(-1, ff->attributes);
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   return attribute_map->erase(key) - 1;  // erase by key returns the number of 
elements removed (0 or 1)
 }
 
 int get_content(const flow_file_record* ff, uint8_t* target, int size) {
-  if (ff == nullptr || target == nullptr || size == 0) {
-    return 0;
-  }
+  NULL_CHECK(0, ff, target);
   auto content_repo = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
   std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo);
   auto stream = (*content_repo)->read(claim);
@@ -346,48 +382,67 @@ int get_content(const flow_file_record* ff, uint8_t* 
target, int size) {
  * @param instance nifi instance structure
  */
 int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
+  static string_map empty_attribute_map;
+
+  NULL_CHECK(-1, ff, instance);
   auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
   // in the unlikely event the user forgot to initialize the instance, we 
shall do it for them.
   if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
     minifi_instance_ref->setRemotePort(instance->port.port_id);
   }
 
-  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  string_map& attribute_map = empty_attribute_map;
+  if(ff->attributes) {
+    attribute_map = *(static_cast<string_map *>(ff->attributes));
+  }
 
   auto no_op = minifi_instance_ref->getNoOpRepository();
 
   auto content_repo = minifi_instance_ref->getContentRepository();
 
-  std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
-  claim->increaseFlowFileRecordOwnedCount();
-  claim->increaseFlowFileRecordOwnedCount();
+  std::shared_ptr<minifi::ResourceClaim> claim = nullptr;
+  std::shared_ptr<minifi::io::DataStream> stream = nullptr; //Used when 
content is not in content repo
+
+  if(ff->contentLocation) {
+    auto ff_content_repo_ptr = 
(static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp));
+    if (ff->crp && (*ff_content_repo_ptr)) {
+      content_repo = *ff_content_repo_ptr;
+      claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, 
content_repo);
+      claim->increaseFlowFileRecordOwnedCount();
+      claim->increaseFlowFileRecordOwnedCount();
+    } else {
+      file_buffer fb = file_to_buffer(ff->contentLocation);
+      stream = std::make_shared<minifi::io::DataStream>();
+      stream->writeData(fb.buffer, fb.file_len);
+    }
+  } else {
+    //The flowfile has no content - create an empty stream to create empty 
content
+    stream = std::make_shared<minifi::io::DataStream>();
+  }
 
-  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, 
*attribute_map, claim);
+  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, 
attribute_map, claim);
   ffr->addAttribute("nanofi.version", API_VERSION);
   ffr->setSize(ff->size);
 
   std::string port_uuid = instance->port.port_id;
 
-  minifi_instance_ref->transfer(ffr);
+  minifi_instance_ref->transfer(ffr, stream);
 
   return 0;
 }
 
 flow * create_new_flow(nifi_instance * instance) {
-  if (nullptr == instance || nullptr == instance->instance_ptr) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, instance);
   auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
   flow * area = static_cast<flow*>(malloc(1*sizeof(flow)));
-  if(area == nullptr) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, area);
 
   flow *new_flow = new(area) flow(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
   return new_flow;
 }
 
 flow *create_flow(nifi_instance *instance, const char *first_processor) {
+  NULL_CHECK(nullptr, instance);
   auto new_flow = create_new_flow(instance);
 
   if(new_flow != nullptr && first_processor != nullptr && 
strlen(first_processor) > 0) {
@@ -423,9 +478,7 @@ flow * create_getfile(nifi_instance * instance, flow * 
parent_flow, GetFileConfi
 }
 
 processor *add_processor(flow *flow, const char *processor_name) {
-  if (nullptr == flow || nullptr == processor_name) {
-    return nullptr;
-  }
+  NULL_CHECK(nullptr, flow, processor_name);
 
   auto proc = flow->addProcessor(processor_name, processor_name, 
core::Relationship("success", "description"), flow->hasProcessor());
   return static_cast<processor*>(proc.get());
@@ -440,25 +493,19 @@ int set_failure_strategy(flow *flow, FailureStrategy 
strategy) {
 }
 
 int set_propery_internal(core::Processor* proc, const char *name, const char 
*value) {
-  if (name != nullptr && value != nullptr) {
-    bool success = proc->setProperty(name, value) || 
(proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value));
-    return success ? 0 : -2;
-  }
-  return -1;
+  NULL_CHECK(-1, proc, name, value);
+  bool success = proc->setProperty(name, value) || 
(proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value));
+  return success ? 0 : -2;
 }
 
 int set_property(processor *proc, const char *name, const char *value) {
-  if (proc != nullptr) {
-    return set_propery_internal(proc, name, value);
-  }
-  return -1;
+  NULL_CHECK(-1, proc);
+  return set_propery_internal(proc, name, value);
 }
 
 int set_standalone_property(standalone_processor *proc, const char *name, 
const char *value) {
-  if (proc != nullptr) {
-    return set_propery_internal(proc, name, value);
-  }
-  return -1;
+  NULL_CHECK(-1, proc);
+  return set_propery_internal(proc, name, value);
 }
 
 uint8_t get_property(const processor_context * context, const char * name, 
char * buffer, size_t size) {
@@ -473,21 +520,8 @@ uint8_t get_property(const processor_context * context, 
const char * name, char
   return 0;
 }
 
-char * get_property(const processor_context *  context, const char * name) {
-  std::string value;
-  if(!context->getDynamicProperty(name, value)) {
-    return nullptr;
-  }
-  size_t len = value.length();
-  char * ret_val = (char*)malloc((len +1) * sizeof(char));
-  strncpy(ret_val, value.data(), len);
-  ret_val[len] = '\0';
-  return ret_val;
-}
-
 int free_flow(flow *flow) {
-  if (flow == nullptr)
-    return -1;
+  NULL_CHECK(-1, flow);
   flow->~flow();
   free(flow);
   return 0;
@@ -611,24 +645,12 @@ flow_file_record *invoke_chunk(standalone_processor* 
proc, uint8_t* buf, uint64_
 }
 
 flow_file_record *invoke_file(standalone_processor* proc, const char* path) {
-  FILE *fileptr;
-  uint8_t *buffer;
-  uint64_t filelen;
-
-  fileptr = fopen(path, "rb");
-  if (fileptr == nullptr) {
-    return nullptr;
-  }
-  fseek(fileptr, 0, SEEK_END);
-  filelen = ftell(fileptr);
-  rewind(fileptr);
-
-  buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory 
for file + \0
-  fread(buffer, filelen, 1, fileptr);
-  fclose(fileptr);
+  NULL_CHECK(nullptr, path, proc);
+  auto fb = file_to_buffer(path);
+  NULL_CHECK(nullptr, fb.buffer);
 
-  flow_file_record* ffr = invoke_chunk(proc, buffer, filelen);
-  free(buffer);
+  flow_file_record* ffr = invoke_chunk(proc, fb.buffer, fb.file_len);
+  free(fb.buffer);
   return ffr;
 }
 
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index 35cbfc2..52a793c 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -450,4 +450,92 @@ TEST_CASE("Test custom processor", "[TestCutomProcessor]") 
{
   flow_file_record *record = get_next_flow_file(instance, test_flow);
 
   REQUIRE(record != nullptr);
-}
\ No newline at end of file
+}
+
+TEST_CASE("C API robustness test", "[TestRobustness]") {
+  free_flow(nullptr);
+  free_standalone_processor(nullptr);
+  free_instance(nullptr);
+
+  REQUIRE(create_processor(nullptr) == nullptr);
+
+  standalone_processor *standalone_proc = create_processor("GetFile");
+  REQUIRE(standalone_proc != nullptr);
+
+  REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1);
+
+  REQUIRE(set_standalone_property(standalone_proc, nullptr, "prop_value") == 
-1);
+
+  REQUIRE(set_standalone_property(standalone_proc, "prop_name", nullptr) == 
-1);
+
+  free_standalone_processor(standalone_proc);
+
+  const char *file_path = "path/to/file";
+
+  flow_file_record *ffr = create_ff_object_na(file_path, strlen(file_path), 0);
+
+  const char *custom_value = "custom value";
+
+  REQUIRE(add_attribute(nullptr, "custom attribute", (void *) custom_value, 
strlen(custom_value)) == -1);
+
+  REQUIRE(add_attribute(ffr, "custom attribute", (void *) custom_value, 
strlen(custom_value)) == -1);
+
+  REQUIRE(add_attribute(ffr, nullptr, (void *) custom_value, 
strlen(custom_value)) == -1);
+
+  REQUIRE(add_attribute(ffr, "custom attribute", nullptr, 0) == -1);
+
+  update_attribute(nullptr, "custom attribute", (void *) custom_value, 
strlen(custom_value));
+
+  update_attribute(ffr, nullptr, (void *) custom_value, strlen(custom_value));
+
+  attribute attr;
+  attr.key = "filename";
+  attr.value_size = 0;
+  REQUIRE(get_attribute(ffr, &attr) == -1);
+
+  REQUIRE(get_attribute(nullptr, &attr) == -1);
+
+  REQUIRE(get_attribute(ffr, nullptr) == -1);
+
+  REQUIRE(get_attribute_quantity(nullptr) == 0);
+
+  REQUIRE(get_attribute_quantity(ffr) == 0);
+
+  attribute_set attr_set;
+  attr_set.size = 3;
+  attr_set.attributes = (attribute *) malloc(attr_set.size * 
sizeof(attribute));  // NOLINT
+
+  REQUIRE(get_all_attributes(nullptr, &attr_set) == 0);
+
+  REQUIRE(get_all_attributes(ffr, &attr_set) == 0);
+
+  REQUIRE(get_all_attributes(ffr, nullptr) == 0);
+
+  free(attr_set.attributes);
+
+  REQUIRE(remove_attribute(nullptr, "key") == -1);
+
+  REQUIRE(remove_attribute(ffr, "key") == -1);
+
+  REQUIRE(remove_attribute(ffr, nullptr) == -1);
+
+  auto instance = create_instance_obj();
+
+  REQUIRE(transmit_flowfile(nullptr, instance) == -1);
+
+  REQUIRE(transmit_flowfile(ffr, nullptr) == -1);
+
+  REQUIRE(create_new_flow(nullptr) == nullptr);
+
+  flow *test_flow = create_new_flow(instance);
+
+  REQUIRE(test_flow != nullptr);
+
+  REQUIRE(add_processor(nullptr, "GetFile") == nullptr);
+
+  REQUIRE(add_processor(test_flow, nullptr) == nullptr);
+
+  free_flow(test_flow);
+
+  free_instance(instance);
+}

Reply via email to