Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master fc1074a04 -> 556794b15


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/api/nanofi.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
new file mode 100644
index 0000000..ee33c6b
--- /dev/null
+++ b/nanofi/src/api/nanofi.cpp
@@ -0,0 +1,518 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+#include <map>
+#include <memory>
+#include <utility>
+#include <exception>
+
+#include "api/nanofi.h"
+#include "core/Core.h"
+#include "core/expect.h"
+#include "cxx/Instance.h"
+#include "cxx/Plan.h"
+#include "ResourceClaim.h"
+#include "processors/GetFile.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/StringUtils.h"
+
+using string_map = std::map<std::string, std::string>;
+
+class API_INITIALIZER {
+ public:
+  static int initialized;
+};
+
+int API_INITIALIZER::initialized = initialize_api();
+
+int initialize_api() {
+  logging::LoggerConfiguration::getConfiguration().disableLogging();
+  return 1;
+}
+
+void enable_logging() {
+  logging::LoggerConfiguration::getConfiguration().enableLogging();
+}
+
+void set_terminate_callback(void (*terminate_callback)()) {
+  std::set_terminate(terminate_callback);
+}
+
+class DirectoryConfiguration {
+ protected:
+  DirectoryConfiguration() {
+    minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
+  }
+ public:
+  static void initialize() {
+    static DirectoryConfiguration configure;
+  }
+};
+
+/**
+ * Creates a NiFi Instance from the url and output port.
+ * @param url http URL for NiFi instance
+ * @param port Remote output port.
+ * @Deprecated for API version 0.2 in favor of the following prototype
+ * nifi_instance *create_instance(nifi_port const *port) {
+ */
+nifi_instance *create_instance(const char *url, nifi_port *port) {
+  // make sure that we have a thread safe way of initializing the content 
directory
+  DirectoryConfiguration::initialize();
+
+  // need reinterpret cast until we move to C for this module.
+  nifi_instance *instance = 
reinterpret_cast<nifi_instance*>(malloc(sizeof(nifi_instance)));
+  /**
+   * This API will gradually move away from C++, hence malloc is used for 
nifi_instance
+   * Since minifi::Instance is currently being used, then we need to use new 
in that case.
+   */
+  instance->instance_ptr = new minifi::Instance(url, port->port_id);
+  // may have to translate port ID here in the future
+  // need reinterpret cast until we move to C for this module.
+  instance->port.port_id = 
reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
+  snprintf(instance->port.port_id, strlen(port->port_id) + 1, "%s", 
port->port_id);
+  return instance;
+}
+
+/**
+ * Initializes the instance
+ */
+void initialize_instance(nifi_instance *instance) {
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->setRemotePort(instance->port.port_id);
+}
+/*
+ typedef int c2_update_callback(char *);
+
+ typedef int c2_stop_callback(char *);
+
+ typedef int c2_start_callback(char *);
+
+ */
+void enable_async_c2(nifi_instance *instance, C2_Server *server, 
c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
+}
+
+/**
+ * Sets a property within the nifi instance
+ * @param instance nifi instance
+ * @param key key in which we will set the valiue
+ * @param value
+ * @return -1 when instance or key are null
+ */
+int set_instance_property(nifi_instance *instance, const char *key, const char 
*value) {
+  if (nullptr == instance || nullptr == instance->instance_ptr || nullptr == 
key) {
+    return -1;
+  }
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->getConfiguration()->set(key, value);
+  return 0;
+}
+
+/**
+ * Reclaims memory associated with a nifi instance structure.
+ * @param instance nifi instance.
+ */
+void free_instance(nifi_instance* instance) {
+  if (instance != nullptr) {
+    delete ((minifi::Instance*) instance->instance_ptr);
+    free(instance->port.port_id);
+    free(instance);
+  }
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_flowfile(const char *file, const size_t len) {
+  flow_file_record *new_ff = (flow_file_record*) 
malloc(sizeof(flow_file_record));
+  new_ff->attributes = new string_map();
+  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
+  snprintf(new_ff->contentLocation, len + 1, "%s", file);
+  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
+  // set the size of the flow file.
+  new_ff->size = in.tellg();
+  return new_ff;
+}
+
+/**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_ff_object(const char *file, const size_t len, const 
uint64_t size) {
+  if (nullptr == file) {
+    return nullptr;
+  }
+  flow_file_record *new_ff = create_ff_object_na(file, len, size);
+  new_ff->attributes = new string_map();
+  new_ff->ffp = 0;
+  return new_ff;
+}
+
+flow_file_record* create_ff_object_na(const char *file, const size_t len, 
const uint64_t size) {
+  flow_file_record *new_ff = new flow_file_record;
+  new_ff->attributes = nullptr;
+  new_ff->contentLocation = (char*) malloc(sizeof(char) * (len + 1));
+  snprintf(new_ff->contentLocation, len + 1, "%s", file);
+  // set the size of the flow file.
+  new_ff->size = size;
+  new_ff->crp = static_cast<void*>(new 
std::shared_ptr<minifi::core::ContentRepository>);
+  return new_ff;
+}
+/**
+ * Reclaims memory associated with a flow file object
+ * @param ff flow file record.
+ */
+void free_flowfile(flow_file_record *ff) {
+  if (ff == nullptr) {
+    return;
+  }
+  auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+  if (content_repo_ptr->get()) {
+    std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
+    (*content_repo_ptr)->remove(claim);
+  }
+  if (ff->ffp == nullptr) {
+    auto map = static_cast<string_map*>(ff->attributes);
+    delete map;
+  }
+  free(ff->contentLocation);
+  free(ff);
+  delete content_repo_ptr;
+}
+
+/**
+ * Adds an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ * @return 0 or -1 based on whether the attributed existed previously (-1) or 
not (0)
+ */
+uint8_t add_attribute(flow_file_record *ff, const char *key, void *value, 
size_t size) {
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  const auto& ret = attribute_map->insert(std::pair<std::string, 
std::string>(key, std::string(static_cast<char*>(value), size)));
+  return ret.second ? 0 : -1;
+}
+
+/**
+ * Updates (or adds) an attribute
+ * @param ff flow file record
+ * @param key key
+ * @param value value to add
+ * @param size size of value
+ */
+void update_attribute(flow_file_record *ff, const char *key, void *value, 
size_t size) {
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  (*attribute_map)[key] = std::string(static_cast<char*>(value), size);
+}
+
+/*
+ * Obtains the attribute.
+ * @param ff flow file record
+ * @param key key
+ * @param caller_attribute caller supplied object in which we will copy the 
data ptr
+ * @return 0 if successful, -1 if the key does not exist
+ */
+uint8_t get_attribute(flow_file_record * ff, attribute * caller_attribute) {
+  if (ff == nullptr) {
+    return -1;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  if (!attribute_map) {
+    return -1;
+  }
+  auto find = attribute_map->find(caller_attribute->key);
+  if (find != attribute_map->end()) {
+    caller_attribute->value = 
static_cast<void*>(const_cast<char*>(find->second.data()));
+    caller_attribute->value_size = find->second.size();
+    return 0;
+  }
+  return -1;
+}
+
+int get_attribute_qty(const flow_file_record* ff) {
+  if (ff == nullptr) {
+    return 0;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  return attribute_map ? attribute_map->size() : 0;
+}
+
+int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
+  if (ff == nullptr) {
+    return 0;
+  }
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  if (!attribute_map || attribute_map->empty()) {
+    return 0;
+  }
+  int i = 0;
+  for (const auto& kv : *attribute_map) {
+    if (i >= target->size) {
+      break;
+    }
+    target->attributes[i].key = kv.first.data();
+    target->attributes[i].value = 
static_cast<void*>(const_cast<char*>(kv.second.data()));
+    target->attributes[i].value_size = kv.second.size();
+    ++i;
+  }
+  return i;
+}
+
+/**
+ * Removes a key from the attribute chain
+ * @param ff flow file record
+ * @param key key to remove
+ * @return 0 if removed, -1 otherwise
+ */
+uint8_t remove_attribute(flow_file_record *ff, const char *key) {
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+  return attribute_map->erase(key) - 1;  // erase by key returns the number of 
elements removed (0 or 1)
+}
+
+/**
+ * Transmits the flowfile
+ * @param ff flow file record
+ * @param instance nifi instance structure
+ */
+int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  // in the unlikely event the user forgot to initialize the instance, we 
shall do it for them.
+  if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
+    minifi_instance_ref->setRemotePort(instance->port.port_id);
+  }
+
+  auto attribute_map = static_cast<string_map*>(ff->attributes);
+
+  auto no_op = minifi_instance_ref->getNoOpRepository();
+
+  auto content_repo = minifi_instance_ref->getContentRepository();
+
+  std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
+  claim->increaseFlowFileRecordOwnedCount();
+  claim->increaseFlowFileRecordOwnedCount();
+
+  auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, 
*attribute_map, claim);
+  ffr->addAttribute("nanofi.version", API_VERSION);
+  ffr->setSize(ff->size);
+
+  std::string port_uuid = instance->port.port_id;
+
+  minifi_instance_ref->transfer(ffr);
+
+  return 0;
+}
+
+flow * create_new_flow(nifi_instance * instance) {
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  flow *new_flow = (flow*) malloc(sizeof(flow));
+
+  auto execution_plan = new 
ExecutionPlan(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
+
+  new_flow->plan = execution_plan;
+
+  return new_flow;
+}
+
+flow *create_flow(nifi_instance *instance, const char *first_processor) {
+  if (nullptr == instance || nullptr == instance->instance_ptr) {
+    return nullptr;
+  }
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  flow *new_flow = (flow*) malloc(sizeof(flow));
+
+  auto execution_plan = new 
ExecutionPlan(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
+
+  new_flow->plan = execution_plan;
+
+  if (first_processor != nullptr && strlen(first_processor) > 0) {
+    // automatically adds it with success
+    execution_plan->addProcessor(first_processor, first_processor);
+  }
+  return new_flow;
+}
+
+processor *add_python_processor(flow *flow, void 
(*ontrigger_callback)(processor_session *)) {
+  if (nullptr == flow || nullptr == flow->plan || nullptr == 
ontrigger_callback) {
+    return nullptr;
+  }
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, 
std::placeholders::_1));
+  processor *new_processor = (processor*) malloc(sizeof(processor));
+  new_processor->processor_ptr = proc.get();
+  return new_processor;
+}
+
+flow * create_getfile(nifi_instance * instance, flow * parent_flow, 
GetFileConfig * c) {
+  static const std::string first_processor = "GetFile";
+  flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : 
parent_flow;
+
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
+  // automatically adds it with success
+  auto getFile = plan->addProcessor(first_processor, first_processor);
+
+  plan->setProperty(getFile, processors::GetFile::Directory.getName(), 
c->directory);
+  plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), 
c->keep_source ? "true" : "false");
+  plan->setProperty(getFile, processors::GetFile::Recurse.getName(), 
c->recurse ? "true" : "false");
+
+  return new_flow;
+}
+
+processor *add_processor(flow *flow, const char *processor_name) {
+  if (nullptr == flow || nullptr == processor_name) {
+    return nullptr;
+  }
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addProcessor(processor_name, processor_name);
+  if (proc) {
+    processor *new_processor = (processor*) malloc(sizeof(processor));
+    new_processor->processor_ptr = proc.get();
+    return new_processor;
+  }
+  return nullptr;
+}
+
+processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto proc = plan->addProcessor(processor_name, processor_name, 
core::Relationship("success", "description"), true);
+  if (proc) {
+    processor *new_processor = (processor*) malloc(sizeof(processor));
+    new_processor->processor_ptr = proc.get();
+    return new_processor;
+  }
+  return nullptr;
+}
+
+int add_failure_callback(flow *flow, void 
(*onerror_callback)(flow_file_record*)) {
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
+  return plan->setFailureCallback(onerror_callback) ? 0 : 1;
+}
+
+int set_failure_strategy(flow *flow, FailureStrategy strategy) {
+  return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) 
? 0 : -1;
+}
+
+int set_property(processor *proc, const char *name, const char *value) {
+  if (name != nullptr && value != nullptr && proc != nullptr) {
+    core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
+    bool success = p->setProperty(name, value) || 
(p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
+    return success ? 0 : -2;
+  }
+  return -1;
+}
+
+int free_flow(flow *flow) {
+  if (flow == nullptr || nullptr == flow->plan)
+    return -1;
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  delete execution_plan;
+  free(flow);
+  return 0;
+}
+
+flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) {
+  if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
+    return nullptr;
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  execution_plan->reset();
+  while (execution_plan->runNextProcessor()) {
+  }
+  auto ff = execution_plan->getCurrentFlowFile();
+  if (ff == nullptr) {
+    return nullptr;
+  }
+  auto claim = ff->getResourceClaim();
+
+  if (claim != nullptr) {
+    // create a flow file.
+    claim->increaseFlowFileRecordOwnedCount();
+    auto path = claim->getContentFullPath();
+    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+    ffr->ffp = ff.get();
+    ffr->attributes = ff->getAttributesPtr();
+    auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    *content_repo_ptr = execution_plan->getContentRepo();
+    return ffr;
+  } else {
+    return nullptr;
+  }
+}
+
+size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record 
**ff_r, size_t size) {
+  if (nullptr == instance || nullptr == flow || nullptr == ff_r)
+    return 0;
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  size_t i = 0;
+  for (; i < size; i++) {
+    execution_plan->reset();
+    auto ffr = get_next_flow_file(instance, flow);
+    if (ffr == nullptr) {
+      break;
+    }
+    ff_r[i] = ffr;
+  }
+  return i;
+}
+
+flow_file_record * get(nifi_instance * instance, flow * flow, 
processor_session * session) {
+  if (nullptr == instance || nullptr == flow || nullptr == session)
+    return nullptr;
+  auto sesh = static_cast<core::ProcessSession*>(session->session);
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  auto ff = sesh->get();
+  execution_plan->setNextFlowFile(ff);
+  if (ff == nullptr) {
+    return nullptr;
+  }
+  auto claim = ff->getResourceClaim();
+
+  if (claim != nullptr) {
+    // create a flow file.
+    claim->increaseFlowFileRecordOwnedCount();
+    auto path = claim->getContentFullPath();
+    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+    ffr->attributes = ff->getAttributesPtr();
+    ffr->ffp = ff.get();
+    auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+    *content_repo_ptr = execution_plan->getContentRepo();
+    return ffr;
+  } else {
+    return nullptr;
+  }
+}
+
+int transfer(processor_session* session, flow *flow, const char *rel) {
+  if (nullptr == session || nullptr == flow || rel == nullptr) {
+    return -1;
+  }
+  auto sesh = static_cast<core::ProcessSession*>(session->session);
+  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
+  if (nullptr == sesh || nullptr == execution_plan) {
+    return -1;
+  }
+  core::Relationship relationship(rel, rel);
+  auto ff = execution_plan->getNextFlowFile();
+  if (nullptr == ff) {
+    return -2;
+  }
+  sesh->transfer(ff, relationship);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp 
b/nanofi/src/cxx/C2CallbackAgent.cpp
new file mode 100644
index 0000000..3a2b0d1
--- /dev/null
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cxx/C2CallbackAgent.h"
+#include <csignal>
+#include <utility>
+#include <vector>
+#include <map>
+#include <string>
+#include <memory>
+#include "c2/ControllerSocketProtocol.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/FileManager.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+C2CallbackAgent::C2CallbackAgent(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+                                 const std::shared_ptr<Configure> 
&configuration)
+    : C2Agent(controller, updateSink, configuration),
+      stop(nullptr),
+      logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) {
+}
+
+void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) 
{
+  switch (resp.op) {
+    case Operation::CLEAR:
+      break;
+    case Operation::UPDATE:
+      break;
+    case Operation::DESCRIBE:
+      break;
+    case Operation::RESTART:
+      break;
+    case Operation::START:
+    case Operation::STOP: {
+      if (resp.name == "C2" || resp.name == "c2") {
+        raise(SIGTERM);
+      }
+
+      auto str = resp.name.c_str();
+
+      if (nullptr != stop)
+        stop(const_cast<char*>(str));
+
+      break;
+    }
+      //
+      break;
+    default:
+      break;
+      // do nothing
+  }
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/CallbackProcessor.cpp 
b/nanofi/src/cxx/CallbackProcessor.cpp
new file mode 100644
index 0000000..5294a1b
--- /dev/null
+++ b/nanofi/src/cxx/CallbackProcessor.cpp
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "cxx/CallbackProcessor.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+void CallbackProcessor::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+  if (callback_ != nullptr) {
+    processor_session sesh;
+    sesh.session = session;
+    callback_(&sesh);
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/src/cxx/Plan.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
new file mode 100644
index 0000000..f892aa9
--- /dev/null
+++ b/nanofi/src/cxx/Plan.cpp
@@ -0,0 +1,294 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cxx/Plan.h"
+#include "cxx/CallbackProcessor.h"
+#include <memory>
+#include <vector>
+#include <set>
+#include <string>
+
+bool intToFailureStragey(int in, FailureStrategy *out) {
+  auto tmp = static_cast<FailureStrategy>(in);
+  switch (tmp) {
+    case AS_IS:
+    case ROLLBACK:
+      *out = tmp;
+      return true;
+    default:
+      return false;
+  }
+}
+
+std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = 
utils::IdGenerator::getIdGenerator();
+
+ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::Repository> prov_repo)
+    : content_repo_(content_repo),
+      flow_repo_(flow_repo),
+      prov_repo_(prov_repo),
+      finalized(false),
+      location(-1),
+      current_flowfile_(nullptr),
+      logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) {
+  stream_factory = 
org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
+}
+
+/**
+ * Add a callback to obtain and pass processor session to a generated processor
+ *
+ */
+std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, 
std::function<void(processor_session*)> fp) {
+  if (finalized) {
+    return nullptr;
+  }
+
+  auto ptr = createProcessor("CallbackProcessor", "CallbackProcessor");
+  if (!ptr)
+    return nullptr;
+
+  std::shared_ptr<processors::CallbackProcessor> processor = 
std::static_pointer_cast<processors::CallbackProcessor>(ptr);
+  processor->setCallback(obj, fp);
+
+  return addProcessor(processor, "CallbackProcessor", 
core::Relationship("success", "description"), true);
+}
+
+bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, 
const std::string &prop, const std::string &value) {
+  uint32_t i = 0;
+  logger_->log_debug("Attempting to set property %s %s for %s", prop, value, 
proc->getName());
+  for (i = 0; i < processor_queue_.size(); i++) {
+    if (processor_queue_.at(i) == proc) {
+      break;
+    }
+  }
+
+  if (i >= processor_queue_.size() || i >= processor_contexts_.size()) {
+    return false;
+  }
+
+  return processor_contexts_.at(i)->setProperty(prop, value);
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name, 
core::Relationship relationship, bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+
+  utils::Identifier uuid;
+  id_generator_->generate(uuid);
+
+  processor->setStreamFactory(stream_factory);
+  // initialize the processor
+  processor->initialize();
+
+  processor_mapping_[processor->getUUIDStr()] = processor;
+
+  if (!linkToPrevious) {
+    termination_ = relationship;
+  } else {
+    std::shared_ptr<core::Processor> last = processor_queue_.back();
+
+    if (last == nullptr) {
+      last = processor;
+      termination_ = relationship;
+    }
+
+    relationships_.push_back(connectProcessors(last, processor, relationship, 
true));
+  }
+
+  std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(processor);
+
+  processor_nodes_.push_back(node);
+
+  std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider_, 
prov_repo_, flow_repo_, content_repo_);
+  processor_contexts_.push_back(context);
+
+  processor_queue_.push_back(processor);
+
+  return processor;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string 
&processor_name, const std::string &name, core::Relationship relationship, bool 
linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+  auto processor = ExecutionPlan::createProcessor(processor_name, name);
+  if (!processor) {
+    return nullptr;
+  }
+  return addProcessor(processor, name, relationship, linkToPrevious);
+}
+
+void ExecutionPlan::reset() {
+  process_sessions_.clear();
+  factories_.clear();
+  location = -1;
+  for (auto proc : processor_queue_) {
+    while (proc->getActiveTasks() > 0) {
+      proc->decrementActiveTask();
+    }
+  }
+}
+
+bool ExecutionPlan::runNextProcessor(std::function<void(const 
std::shared_ptr<core::ProcessContext>, const 
std::shared_ptr<core::ProcessSession>)> verify) {
+  if (!finalized) {
+    finalize();
+  }
+  location++;
+  if (location >= processor_queue_.size()) {
+    return false;
+  }
+  std::shared_ptr<core::Processor> processor = processor_queue_[location];
+  std::shared_ptr<core::ProcessContext> context = 
processor_contexts_[location];
+  std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
+  factories_.push_back(factory);
+  if (std::find(configured_processors_.begin(), configured_processors_.end(), 
processor) == configured_processors_.end()) {
+    processor->onSchedule(context, factory);
+    configured_processors_.push_back(processor);
+  }
+  std::shared_ptr<core::ProcessSession> current_session = 
std::make_shared<core::ProcessSession>(context);
+  process_sessions_.push_back(current_session);
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  if (verify != nullptr) {
+    verify(context, current_session);
+  } else {
+    logger_->log_debug("Running %s", processor->getName());
+    processor->onTrigger(context, current_session);
+  }
+  current_session->commit();
+  current_flowfile_ = current_session->get();
+  auto hasMore = location + 1 < processor_queue_.size();
+  if (!hasMore && !current_flowfile_) {
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    current_flowfile_ = relationships_.back()->poll(expired);
+  }
+  return hasMore;
+}
+
+std::set<provenance::ProvenanceEventRecord*> 
ExecutionPlan::getProvenanceRecords() {
+  return process_sessions_.at(location)->getProvenanceReporter()->getEvents();
+}
+
+std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() {
+  return current_flowfile_;
+}
+
+std::shared_ptr<core::ProcessSession> ExecutionPlan::getCurrentSession() {
+  return current_session_;
+}
+
+std::shared_ptr<minifi::Connection> 
ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, 
bool set_dst) {
+  return connectProcessors(processor, processor, termination_, set_dst);
+}
+
+void ExecutionPlan::finalize() {
+  if (failure_handler_) {
+    auto failure_proc = createProcessor("CallbackProcessor", 
"CallbackProcessor");
+
+    std::shared_ptr<processors::CallbackProcessor> callback_proc = 
std::static_pointer_cast<processors::CallbackProcessor>(failure_proc);
+    callback_proc->setCallback(nullptr, std::bind(&FailureHandler::operator(), 
failure_handler_, std::placeholders::_1));
+
+    for (const auto& proc : processor_queue_) {
+      for (const auto& rel : proc->getSupportedRelationships()) {
+        if (rel.getName() == "failure") {
+          relationships_.push_back(connectProcessors(proc, failure_proc, 
core::Relationship("failure", "failure collector"), true));
+          break;
+        }
+      }
+    }
+
+    std::shared_ptr<core::ProcessorNode> node = 
std::make_shared<core::ProcessorNode>(failure_proc);
+
+    processor_nodes_.push_back(node);
+
+    std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, controller_services_provider_, 
prov_repo_, flow_repo_, content_repo_);
+    processor_contexts_.push_back(context);
+
+    processor_queue_.push_back(failure_proc);
+  }
+
+  if (relationships_.size() > 0) {
+    relationships_.push_back(buildFinalConnection(processor_queue_.back()));
+  } else {
+    for (auto processor : processor_queue_) {
+      relationships_.push_back(buildFinalConnection(processor, true));
+    }
+  }
+
+  finalized = true;
+}
+
+std::shared_ptr<core::Processor> ExecutionPlan::createProcessor(const 
std::string &processor_name, const std::string &name) {
+  utils::Identifier uuid;
+  id_generator_->generate(uuid);
+
+  auto ptr = 
core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+  if (nullptr == ptr) {
+    return nullptr;
+  }
+  std::shared_ptr<core::Processor> processor = 
std::static_pointer_cast<core::Processor>(ptr);
+
+  processor->setName(name);
+  return processor;
+}
+
+std::shared_ptr<minifi::Connection> 
ExecutionPlan::connectProcessors(std::shared_ptr<core::Processor> src_proc, 
std::shared_ptr<core::Processor> dst_proc, core::Relationship relationship,
+                                                                     bool 
set_dst) {
+  std::stringstream connection_name;
+  connection_name << src_proc->getUUIDStr() << "-to-" << 
dst_proc->getUUIDStr();
+  std::shared_ptr<minifi::Connection> connection = 
std::make_shared<minifi::Connection>(flow_repo_, content_repo_, 
connection_name.str());
+  connection->setRelationship(relationship);
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(src_proc);
+
+  utils::Identifier uuid_copy, uuid_copy_next;
+  src_proc->getUUID(uuid_copy);
+  connection->setSourceUUID(uuid_copy);
+  if (set_dst) {
+    connection->setDestination(dst_proc);
+    dst_proc->getUUID(uuid_copy_next);
+    connection->setDestinationUUID(uuid_copy_next);
+    if (src_proc != dst_proc) {
+      dst_proc->addConnection(connection);
+    }
+  }
+  src_proc->addConnection(connection);
+
+  return connection;
+}
+
+bool ExecutionPlan::setFailureCallback(std::function<void(flow_file_record*)> 
onerror_callback) {
+  if (finalized && !failure_handler_) {
+    return false;  // Already finalized the flow without failure handler 
processor
+  }
+  if (!failure_handler_) {
+    failure_handler_ = std::make_shared<FailureHandler>(getContentRepo());
+  }
+  failure_handler_->setCallback(onerror_callback);
+  return true;
+}
+
+bool ExecutionPlan::setFailureStrategy(FailureStrategy start) {
+  if (!failure_handler_) {
+    return false;
+  }
+  failure_handler_->setStrategy(start);
+  return true;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/nanofi/tests/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
new file mode 100644
index 0000000..65c52e1
--- /dev/null
+++ b/nanofi/tests/CAPITests.cpp
@@ -0,0 +1,278 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <uuid/uuid.h>
+#include <sys/stat.h>
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "utils/file/FileUtils.h"
+#include "TestBase.h"
+
+#include <chrono>
+#include <thread>
+#include "api/nanofi.h"
+
+static nifi_instance *create_instance_obj(const char *name = 
"random_instance") {
+  nifi_port port;
+  char port_str[] = "12345";
+  port.port_id = port_str;
+  return create_instance("random_instance", &port);
+}
+
+static int failure_count = 0;
+
+void failure_counter(flow_file_record * fr) {
+  failure_count++;
+  REQUIRE(get_attribute_qty(fr) > 0);
+  free_flowfile(fr);
+}
+
+void big_failure_counter(flow_file_record * fr) {
+  failure_count += 100;
+  free_flowfile(fr);
+}
+
+TEST_CASE("Test Creation of instance, one processor", 
"[createInstanceAndFlow]") {
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+  REQUIRE(test_proc != nullptr);
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") {
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  processor *test_proc = add_processor(test_flow, "NeverExisted");
+  REQUIRE(test_proc == nullptr);
+  processor *no_proc = add_processor(test_flow, "");
+  REQUIRE(no_proc == nullptr);
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") {
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *test_proc = add_processor(test_flow, "GenerateFlowFile");
+  REQUIRE(test_proc != nullptr);
+  REQUIRE(set_property(test_proc, "Data Format", "Text") == 0);  // Valid value
+  // TODO(aboda): add this two below when property handling is made strictly 
typed
+  // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // 
Invalid value
+  // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // 
Invalid attribute
+  REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0);  // Empty 
value
+  REQUIRE(set_property(test_proc, nullptr, "Blah") != 0);  // Empty attribute
+  REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0);  // 
Invalid processor
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("get file and put file", "[getAndPutFile]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  char put_format[] = "/tmp/pt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  const char *putfiledir = testController.createTempDirectory(put_format);
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  REQUIRE(put_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+  flow_file_record *record = get_next_flow_file(instance, test_flow);
+  REQUIRE(record != nullptr);
+
+  ss.str("");
+
+  ss << putfiledir << "/" << "tstFile.ext";
+  std::ifstream t(ss.str());
+  std::string put_data((std::istreambuf_iterator<char>(t)), 
std::istreambuf_iterator<char>());
+
+  REQUIRE(test_file_content == put_data);
+
+  // No failure handler can be added after the flow is finalized
+  REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
+
+  free_flowfile(record);
+
+  free_flow(test_flow);
+
+  free_instance(instance);
+}
+
+TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  processor *extract_test = add_processor_with_linkage(test_flow, 
"ExtractText");
+  REQUIRE(extract_test != nullptr);
+  REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
+  processor *update_attr = add_processor_with_linkage(test_flow, 
"UpdateAttribute");
+  REQUIRE(update_attr != nullptr);
+
+  REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
+
+  flow_file_record *record = get_next_flow_file(instance, test_flow);
+
+  REQUIRE(record != nullptr);
+
+  attribute test_attr;
+  test_attr.key = "TestAttr";
+  REQUIRE(get_attribute(record, &test_attr) == 0);
+
+  REQUIRE(test_attr.value_size != 0);
+  REQUIRE(test_attr.value != nullptr);
+
+  std::string attr_value(static_cast<char*>(test_attr.value), 
test_attr.value_size);
+
+  REQUIRE(attr_value == test_file_content);
+
+  const char * new_testattr_value = "S0me t3st t3xt";
+
+  // Attribute already exist, should fail
+  REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, 
strlen(new_testattr_value)) != 0);  // NOLINT
+
+  // Update overwrites values
+  update_attribute(record, test_attr.key, (void*) new_testattr_value, 
strlen(new_testattr_value));  // NOLINT
+
+  int attr_size = get_attribute_qty(record);
+  REQUIRE(attr_size > 0);
+
+  attribute_set attr_set;
+  attr_set.size = attr_size;
+  attr_set.attributes = (attribute*) malloc(attr_set.size * 
sizeof(attribute));  // NOLINT
+
+  REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size);
+
+  bool test_attr_found = false;
+  bool updated_attr_found = false;
+  for (int i = 0; i < attr_set.size; ++i) {
+    if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) {
+      test_attr_found = true;
+      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), 
attr_set.attributes[i].value_size) == new_testattr_value);
+    } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) {
+      updated_attr_found = true;
+      REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), 
attr_set.attributes[i].value_size) == "UpdatedValue");
+    }
+  }
+  REQUIRE(updated_attr_found == true);
+  REQUIRE(test_attr_found == true);
+
+  free_flowfile(record);
+
+  free_flow(test_flow);
+  free_instance(instance);
+}
+
+TEST_CASE("Test error handling callback", "[errorHandling]") {
+  TestController testController;
+
+  char src_format[] = "/tmp/gt.XXXXXX";
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+
+  // Failure strategy cannot be set before a valid callback is added
+  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0);
+  REQUIRE(add_failure_callback(test_flow, failure_counter) == 0);
+
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  REQUIRE(put_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+  REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
+  REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
+
+  std::fstream file;
+  std::stringstream ss;
+
+  ss << sourcedir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+
+  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+
+  REQUIRE(failure_count == 1);
+
+  // Failure handler function can be replaced runtime
+  REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0);
+  REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
+
+  // Create new testfile to trigger failure again
+  ss << "2";
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+
+  REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
+  REQUIRE(failure_count > 100);
+
+  failure_count = 0;
+
+  free_flow(test_flow);
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/library/CMakeLists.txt b/python/library/CMakeLists.txt
index 5d309a1..684cf22 100644
--- a/python/library/CMakeLists.txt
+++ b/python/library/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../../blocks/ ../../libminifi/include  
../../libminifi/include/c2  ../../libminifi/include/c2/protocols/  
../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-20171024/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/)
+include_directories(../../nanofi/include/ ../../libminifi/include  
../../libminifi/include/c2  ../../libminifi/include/c2/protocols/  
../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-20171024/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/)
 if(WIN32)
        include_directories(../../libminifi/opsys/win)
 else()
@@ -33,9 +33,9 @@ include_directories(../../extensions/http-curl/ 
../../extensions/http-curl/clien
 
 add_library(python-lib SHARED python_lib.cpp)
 if (APPLE)
-       target_link_libraries(python-lib capi core-minifi minifi)
+       target_link_libraries(python-lib nanofi core-minifi minifi)
 else()
-       target_link_libraries(python-lib capi-shared core-minifi-shared 
minifi-shared)
+       target_link_libraries(python-lib nanofi-shared core-minifi-shared 
minifi-shared)
 endif(APPLE)
 
 if (WIN32)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/python/library/python_lib.cpp
----------------------------------------------------------------------
diff --git a/python/library/python_lib.cpp b/python/library/python_lib.cpp
index eb81f79..79241df 100644
--- a/python/library/python_lib.cpp
+++ b/python/library/python_lib.cpp
@@ -22,11 +22,10 @@
 #include <sys/stat.h>
 #include <unistd.h>
 #include <dirent.h>
-#include "capi/api.h"
-#include "file_blocks.h"
-#include "comms.h"
-#include "capi/api.h"
-#include "capi/processors.h"
+#include "api/nanofi.h"
+#include "blocks/file_blocks.h"
+#include "blocks/comms.h"
+#include "core/processors.h"
 #include "HTTPCurlLoader.h"
 #include "python_lib.h"
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/rocksdb/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/CMakeLists.txt 
b/thirdparty/rocksdb/CMakeLists.txt
index 837f0bc..f20d09a 100644
--- a/thirdparty/rocksdb/CMakeLists.txt
+++ b/thirdparty/rocksdb/CMakeLists.txt
@@ -584,6 +584,7 @@ else()
 endif()
 
 set(ROCKSDB_STATIC_LIB rocksdb${ARTIFACT_SUFFIX})
+# commented out to avoid building the shared lib
 #set(ROCKSDB_SHARED_LIB rocksdb-shared${ARTIFACT_SUFFIX})
 set(ROCKSDB_IMPORT_LIB ${ROCKSDB_SHARED_LIB})
 if(WIN32)
@@ -592,16 +593,19 @@ if(WIN32)
   set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
 else()
   set(SYSTEM_LIBS ${CMAKE_THREAD_LIBS_INIT})
-  set(LIBS ${ROCKSDB_SHARED_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
-  add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
- # target_link_libraries(${ROCKSDB_SHARED_LIB}
+  set(LIBS ${ROCKSDB_STATIC_LIB} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
+# commented out to avoid building the shared lib
+# as there is no reason
+#add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
+
+# target_link_libraries(${ROCKSDB_SHARED_LIB}
 #    ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
-  set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
-                        LINKER_LANGUAGE CXX
-                        VERSION ${ROCKSDB_VERSION}
-                        SOVERSION ${ROCKSDB_VERSION_MAJOR}
-                        CXX_STANDARD 11
-                        OUTPUT_NAME "rocksdb")
+#  set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
+#                        LINKER_LANGUAGE CXX
+#                        VERSION ${ROCKSDB_VERSION}
+#                        SOVERSION ${ROCKSDB_VERSION_MAJOR}
+#                        CXX_STANDARD 11
+#                        OUTPUT_NAME "rocksdb")
 endif()
 
 option(WITH_LIBRADOS "Build with librados" OFF)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/556794b1/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt 
b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
index f4239a4..6716638 100644
--- a/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
+++ b/thirdparty/yaml-cpp-yaml-cpp-20171024/CMakeLists.txt
@@ -44,7 +44,7 @@ option(YAML_CPP_BUILD_CONTRIB "Enable contrib stuff in 
library" ON)
 # --> General
 # see 
http://www.cmake.org/cmake/help/cmake2.6docs.html#variable:BUILD_SHARED_LIBS
 #     http://www.cmake.org/cmake/help/cmake2.6docs.html#command:add_library
-option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF)
+#option(BUILD_SHARED_LIBS "Build Shared Libraries" OFF)
 
 
 

Reply via email to