Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master ed8221b14 -> a82ac26a1


MINIFICPP-638 - C API: add unit tests

This closes #417.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a82ac26a
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a82ac26a
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a82ac26a

Branch: refs/heads/master
Commit: a82ac26a13226468382789cf0f20b87afb451ac5
Parents: ed8221b
Author: Arpad Boda <[email protected]>
Authored: Tue Oct 9 17:56:38 2018 +0200
Committer: Marc Parisi <[email protected]>
Committed: Wed Oct 17 14:26:25 2018 -0400

----------------------------------------------------------------------
 cmake/BuildTests.cmake            |  12 ++
 libminifi/include/capi/api.h      |  24 ++--
 libminifi/include/capi/cstructs.h |   7 +-
 libminifi/src/capi/Plan.cpp       |   7 +-
 libminifi/src/capi/api.cpp        |  76 ++++++++++---
 libminifi/test/capi/CAPITests.cpp | 201 +++++++++++++++++++++++++++++++++
 6 files changed, 297 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 3896dd9..d14c19b 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -105,6 +105,7 @@ target_include_directories(${CATCH_MAIN_LIB} BEFORE PRIVATE 
"${CMAKE_SOURCE_DIR}
 SET(TEST_RESOURCES ${TEST_DIR}/resources)
 
 GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/")
+GETSOURCEFILES(CAPI_UNIT_TESTS "${TEST_DIR}/capi/")
 GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/")
 
 SET(UNIT_TEST_COUNT 0)
@@ -118,6 +119,17 @@ FOREACH(testfile ${UNIT_TESTS})
 ENDFOREACH()
 message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...")
 
+SET(UNIT_TEST_COUNT 0)
+FOREACH(testfile ${CAPI_UNIT_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${TEST_DIR}/capi/${testfile}")
+    createTests("${testfilename}")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB} capi)
+    MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1")
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}" 
WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${UNIT_TEST_COUNT} capi unit test file(s)...")
+
 SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
   get_filename_component(testfilename "${testfile}" NAME_WE)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 9a4404b..dff29ed 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -39,15 +39,13 @@ void enable_logging();
  *  BASE NIFI OPERATIONS
  * ##################################################################
  */
-
-
-nifi_port *create_port(char *port);
+nifi_port *create_port(const char *port);
 
 int free_port(nifi_port *port);
 
 
 
-nifi_instance *create_instance(char *url, nifi_port *port);
+nifi_instance *create_instance(const char *url, nifi_port *port);
 
 void initialize_instance(nifi_instance *);
 
@@ -79,15 +77,13 @@ flow *create_getfile(nifi_instance *instance, flow *parent, 
GetFileConfig *c);
 
 processor *add_processor(flow *, const char *);
 
+processor *add_processor_with_linkage(flow *flow, const char *processor_name);
+
 processor *add_python_processor(flow *, void 
(*ontrigger_callback)(processor_session *session));
 
 int set_property(processor *, const char *, const char *);
 
-int set_instance_property(nifi_instance *instance, char *key, char *value);
-
-processor *add_processor(flow *parent_flow, const char *processor_name);
-
-int set_property(processor *proc, const char *name, const char *value);
+int set_instance_property(nifi_instance *instance, const char*, const char *);
 
 int free_flow(flow *);
 
@@ -111,11 +107,15 @@ flow_file_record* create_ff_object_na(const char *file, 
const size_t len, const
 
 void free_flowfile(flow_file_record*);
 
-uint8_t add_attribute(flow_file_record*, char *key, void *value, size_t size);
+uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t 
size);
+
+void update_attribute(flow_file_record*, const char *key, void *value, size_t 
size);
+
+uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute);
 
-void update_attribute(flow_file_record*, char *key, void *value, size_t size);
+int get_attribute_qty(const flow_file_record* ff);
 
-void *get_attribute(flow_file_record*, char *key);
+int get_all_attributes(const flow_file_record* ff, attribute_set *target);
 
 uint8_t remove_attribute(flow_file_record*, char *key);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/include/capi/cstructs.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/cstructs.h 
b/libminifi/include/capi/cstructs.h
index 4807db3..731fd55 100644
--- a/libminifi/include/capi/cstructs.h
+++ b/libminifi/include/capi/cstructs.h
@@ -86,11 +86,16 @@ typedef struct {
  */
 
 typedef struct {
-  char *key;
+  const char *key;
   void *value;
   size_t value_size;
 } attribute;
 
+typedef struct {
+  attribute * attributes;
+  size_t  size;
+} attribute_set;
+
 /**
  * State of a flow file
  *

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 19a62e7..691be41 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -136,9 +136,10 @@ std::shared_ptr<core::Processor> 
ExecutionPlan::addProcessor(const std::string &
   if (finalized) {
     return nullptr;
   }
-
   auto processor = ExecutionPlan::createProcessor(processor_name, name);
-
+  if (!processor) {
+    return nullptr;
+  }
   return addProcessor(processor, name, relationship, linkToPrevious);
 }
 
@@ -241,7 +242,7 @@ std::shared_ptr<core::Processor> 
ExecutionPlan::createProcessor(const std::strin
 
   auto ptr = 
core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
   if (nullptr == ptr) {
-    throw std::exception();
+    return nullptr;
   }
   std::shared_ptr<core::Processor> processor = 
std::static_pointer_cast<core::Processor>(ptr);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index fa3a6c8..592fb6b 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -57,7 +57,7 @@ class DirectoryConfiguration {
   }
 };
 
-nifi_port *create_port(char *port) {
+nifi_port *create_port(const char *port) {
   if (nullptr == port)
     return nullptr;
   nifi_port *p = new nifi_port();
@@ -80,7 +80,7 @@ int free_port(nifi_port *port) {
  * @param url http URL for NiFi instance
  * @param port Remote output port.
  */
-nifi_instance *create_instance(char *url, nifi_port *port) {
+nifi_instance *create_instance(const char *url, nifi_port *port) {
   // make sure that we have a thread safe way of initializing the content 
directory
   DirectoryConfiguration::initialize();
 
@@ -119,7 +119,7 @@ void enable_async_c2(nifi_instance *instance, C2_Server 
*server, c2_stop_callbac
  * @param value
  * @return -1 when instance or key are null
  */
-int set_instance_property(nifi_instance *instance, char *key, char *value) {
+int set_instance_property(nifi_instance *instance, const char *key, const char 
*value) {
   if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == 
key) {
     return -1;
   }
@@ -196,10 +196,11 @@ void free_flowfile(flow_file_record *ff) {
       std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
       content_repo->remove(claim);
     }
-    auto map = static_cast<string_map*>(ff->attributes);
-    delete[] ff->contentLocation;
-    if (ff->ffp != nullptr)  // don't delete map since it's a ref ptr
+    if (ff->ffp == nullptr) {
+      auto map = static_cast<string_map*>(ff->attributes);
       delete map;
+    }
+    delete[] ff->contentLocation;
     delete ff;
   }
 }
@@ -212,7 +213,7 @@ void free_flowfile(flow_file_record *ff) {
  * @param size size of value
  * @return 0 or -1 based on whether the attributed existed previously (-1) or 
not (0)
  */
-uint8_t add_attribute(flow_file_record *ff, char *key, void *value, size_t 
size) {
+uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, 
size_t size) {
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   const auto& ret = attribute_map->insert(std::pair<std::string, 
std::string>(key, std::string(static_cast<char*>(value), size)));
   return ret.second ? 0 : -1;
@@ -225,7 +226,7 @@ uint8_t add_attribute(flow_file_record *ff, char *key, void 
*value, size_t size)
  * @param value value to add
  * @param size size of value
  */
-void update_attribute(flow_file_record *ff, char *key, void *value, size_t 
size) {
+void update_attribute(flow_file_record *ff, const char *key, void *value, 
size_t size) {
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
 }
@@ -237,11 +238,16 @@ void update_attribute(flow_file_record *ff, char *key, 
void *value, size_t size)
  * @param caller_attribute caller supplied object in which we will copy the 
data ptr
  * @return 0 if successful, -1 if the key does not exist
  */
-uint8_t get_attribute(flow_file_record *ff, char *key, attribute 
*caller_attribute) {
+uint8_t get_attribute(flow_file_record *ff, attribute *caller_attribute) {
+  if (ff == nullptr) {
+    return -1;
+  }
   auto attribute_map = static_cast<string_map*>(ff->attributes);
-  auto find = attribute_map->find(key);
+  if (!attribute_map) {
+    return -1;
+  }
+  auto find = attribute_map->find(caller_attribute->key);
   if (find != attribute_map->end()) {
-    caller_attribute->key = key;
     caller_attribute->value = 
static_cast<void*>(const_cast<char*>(find->second.data()));
     caller_attribute->value_size = find->second.size();
     return 0;
@@ -249,13 +255,42 @@ uint8_t get_attribute(flow_file_record *ff, char *key, 
attribute *caller_attribu
   return -1;
 }
 
+int get_attribute_qty(const flow_file_record* ff) {
+  if (ff == nullptr) {
+    return 0;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  return attribute_map ? attribute_map->size() : 0;
+}
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
+  if (ff == nullptr) {
+    return 0;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  if (!attribute_map || attribute_map->empty()) {
+    return 0;
+  }
+  int i = 0;
+  for (const auto& kv : *attribute_map) {
+    if (i >= target->size) {
+      break;
+    }
+    target->attributes[i].key = kv.first.data();
+    target->attributes[i].value = 
static_cast<void*>(const_cast<char*>(kv.second.data()));
+    target->attributes[i].value_size = kv.second.size();
+    ++i;
+  }
+  return i;
+}
+
 /**
  * Removes a key from the attribute chain
  * @param ff flow file record
  * @param key key to remove
  * @return 0 if removed, -1 otherwise
  */
-uint8_t remove_attribute(flow_file_record *ff, char *key) {
+uint8_t remove_attribute(flow_file_record *ff, const char *key) {
   auto attribute_map = static_cast<string_map*>(ff->attributes);
   return attribute_map->erase(key) - 1;  // erase by key returns the number of 
elements removed (0 or 1)
 }
@@ -361,6 +396,18 @@ processor *add_processor(flow *flow, const char 
*processor_name) {
   }
   return nullptr;
 }
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addProcessor(processor_name, processor_name, 
core::Relationship("success", "description"), true);
+  if (proc) {
+    processor *new_processor = new processor();
+    new_processor->processor_ptr = proc.get();
+    return new_processor;
+  }
+  return nullptr;
+}
+
 int set_property(processor *proc, const char *name, const char *value) {
   if (name != nullptr && value != nullptr && proc != nullptr) {
     core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
@@ -386,8 +433,9 @@ flow_file_record *get_next_flow_file(nifi_instance 
*instance, flow *flow) {
   while (execution_plan->runNextProcessor()) {
   }
   auto ff = execution_plan->getCurrentFlowFile();
-  if (ff == nullptr)
+  if (ff == nullptr) {
     return nullptr;
+  }
   auto claim = ff->getResourceClaim();
 
   if (claim != nullptr) {
@@ -395,8 +443,8 @@ flow_file_record *get_next_flow_file(nifi_instance 
*instance, flow *flow) {
     claim->increaseFlowFileRecordOwnedCount();
     auto path = claim->getContentFullPath();
     auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-    ffr->attributes = ff->getAttributesPtr();
     ffr->ffp = ff.get();
+    ffr->attributes = ff->getAttributesPtr();
     ffr->in = instance;
     return ffr;
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a82ac26a/libminifi/test/capi/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/capi/CAPITests.cpp 
b/libminifi/test/capi/CAPITests.cpp
new file mode 100644
index 0000000..d07b75b
--- /dev/null
+++ b/libminifi/test/capi/CAPITests.cpp
@@ -0,0 +1,201 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <uuid/uuid.h>
+#include <sys/stat.h>
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+
+#include "utils/file/FileUtils.h"
+#include "../TestBase.h"
+
+#include "capi/api.h"
+
+#include <chrono>
+#include <thread>
+
+TEST_CASE("Test Creation of instance, one processor", 
"[createInstanceAndFlow]") {
+  nifi_instance *instance = create_instance("random_instance", 
create_port("12345"));
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+  REQUIRE(test_proc != nullptr);
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
+  nifi_instance *instance = create_instance("random_instance", 
create_port("12345"));
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  processor *test_proc = add_processor(test_flow, "NeverExisted");
+  REQUIRE(test_proc == nullptr);
+  processor *no_proc = add_processor(test_flow, "");
+  REQUIRE(no_proc == nullptr);
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
+  nifi_instance *instance = create_instance("random_instance", 
create_port("12345"));
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+  REQUIRE(test_proc != nullptr);
+  REQUIRE(set_property(test_proc, "Data Format", "Text") == 0);  // Valid value
+  // TODO(aboda): add this two below when property handling is made strictly 
typed
+  // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // 
Invalid value
+  // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // 
Invalid attribute
+  REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0);  // Empty 
value
+  REQUIRE(set_property(test_proc, nullptr, "Blah") != 0);  // Empty attribute
+  REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0);  // 
Invalid processor
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("get file and put file", "[getAndPutFile]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  char put_format[] = "/tmp/pt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  const char *putfiledir = testController.createTempDirectory(put_format);
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+  nifi_instance *instance = create_instance("random_instance", 
create_port("12345"));
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  REQUIRE(put_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+  flow_file_record *record = get_next_flow_file(instance, test_flow);
+  REQUIRE(record != nullptr);
+
+  ss.str("");
+
+  ss << putfiledir << "/" << "tstFile.ext";
+  std::ifstream t(ss.str());
+  std::string put_data((std::istreambuf_iterator<char>(t)), 
std::istreambuf_iterator<char>());
+
+  REQUIRE(test_file_content == put_data);
+
+  free_flowfile(record);
+
+  free_flow(test_flow);
+
+  free_instance(instance);
+}
+
+TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
+  TestController testController;
+
+  enable_logging();
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+  nifi_instance *instance = create_instance("random_instance", 
create_port("12345"));
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  processor *extract_test  = add_processor_with_linkage(test_flow, 
"ExtractText");
+  REQUIRE(extract_test != nullptr);
+  REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
+  // TODO(aboda): enable this after decision made in MINIFICPP-640
+  /*processor *update_attribute = add_processor_with_linkage(test_flow, 
"UpdateAttribute");
+  REQUIRE(update_attribute != nullptr);
+
+  REQUIRE(set_property(update_attribute, "TestAttribute", "TestValue") == 0);*/
+
+  flow_file_record *record = get_next_flow_file(instance, test_flow);
+
+  REQUIRE(record != nullptr);
+
+  attribute test_attr;
+  test_attr.key = "TestAttr";
+  REQUIRE(get_attribute(record, &test_attr) == 0);
+
+  REQUIRE(test_attr.value_size != 0);
+  REQUIRE(test_attr.value != nullptr);
+
+  std::string attr_value(static_cast<char*>(test_attr.value), 
test_attr.value_size);
+
+  REQUIRE(attr_value == test_file_content);
+
+  const char * new_testattr_value = "S0me t3st t3xt";
+
+  // Attribute already exist, should fail
+  REQUIRE(add_attribute(record, test_attr.key, (void*)new_testattr_value, 
strlen(new_testattr_value)) != 0); // NOLINT
+
+  // Update overwrites values
+  update_attribute(record, test_attr.key, (void*)new_testattr_value, 
strlen(new_testattr_value)); // NOLINT
+
+  int attr_size = get_attribute_qty(record);
+  REQUIRE(attr_size > 0);
+
+  attribute_set attr_set;
+  attr_set.size = attr_size;
+  attr_set.attributes = (attribute*)malloc(attr_set.size * sizeof(attribute)); 
// NOLINT
+
+  REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
+
+  bool test_attr_found = false;
+  for (int i = 0; i < attr_set.size; ++i) {
+    if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
+      test_attr_found = true;
+      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), 
attr_set.attributes[i].value_size) == new_testattr_value);
+    }
+  }
+  REQUIRE(test_attr_found == true);
+
+  free_flowfile(record);
+
+  free_flow(test_flow);
+  free_instance(instance);
+}

Reply via email to