Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 1f2e60061 -> 4d20f840b


MINIFICPP-648 - add processor and add processor with linkage nomenclature is 
confusing

Fasten C API by eliminitating some allocations and indirections

C API: add a method to extract flow file content

Provide API to invoke with user data or file

This closes #432.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/4d20f840
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/4d20f840
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/4d20f840

Branch: refs/heads/master
Commit: 4d20f840bb4ffb595c303a5c85af88cbf96d797d
Parents: 1f2e600
Author: Arpad Boda <[email protected]>
Authored: Fri Oct 26 14:50:55 2018 +0200
Committer: Marc Parisi <[email protected]>
Committed: Tue Nov 13 11:22:21 2018 -0500

----------------------------------------------------------------------
 libminifi/CMakeLists.txt               |   9 +-
 nanofi/examples/terminate_handler.c    |   2 +-
 nanofi/include/api/nanofi.h            |  24 ++-
 nanofi/include/core/cstructs.h         |  17 +-
 nanofi/include/core/cxxstructs.h       |  41 ++++
 nanofi/include/cxx/CallbackProcessor.h |   4 +-
 nanofi/include/cxx/Plan.h              |  37 +++-
 nanofi/src/api/nanofi.cpp              | 290 ++++++++++++++++++----------
 nanofi/src/cxx/CallbackProcessor.cpp   |   5 +-
 nanofi/src/cxx/Plan.cpp                |  28 +--
 nanofi/tests/CAPITests.cpp             | 184 ++++++++++++++----
 11 files changed, 467 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index f6cc302..41c8063 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -141,11 +141,14 @@ endif()
 SET (LIBMINIFI core-minifi PARENT_SCOPE)
 
 if (ENABLE_PYTHON)
-if (NOT APPLE)
 #### shared
 
 add_library(core-minifi-shared SHARED ${SOURCES})
-target_link_libraries(core-minifi-shared ${CMAKE_DL_LIBS} uuid-shared yaml-cpp)
+if (APPLE)
+       target_link_libraries(core-minifi-shared ${CMAKE_DL_LIBS} yaml-cpp)
+else()
+       target_link_libraries(core-minifi-shared ${CMAKE_DL_LIBS} uuid-shared 
yaml-cpp)
+endif()
 
 find_package(ZLIB REQUIRED)
 include_directories(${ZLIB_INCLUDE_DIRS})
@@ -175,5 +178,5 @@ endif()
 
 set_property(TARGET core-minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
 set_property(TARGET minifi-shared PROPERTY POSITION_INDEPENDENT_CODE ON)
-endif()
+#endif()
 endif(ENABLE_PYTHON)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/examples/terminate_handler.c
----------------------------------------------------------------------
diff --git a/nanofi/examples/terminate_handler.c 
b/nanofi/examples/terminate_handler.c
index d5443f0..1d5150d 100644
--- a/nanofi/examples/terminate_handler.c
+++ b/nanofi/examples/terminate_handler.c
@@ -42,7 +42,7 @@ int main(int argc, char **argv) {
 
   flow *new_flow = create_flow(instance, "GenerateFlowFile");
 
-  processor *put_proc = add_processor_with_linkage(new_flow, "PutFile");
+  processor *put_proc = add_processor(new_flow, "PutFile");
 
   // Target directory for PutFile is missing, it's not allowed to create, so 
tries to transmit to failure relationship
   // As it doesn't exist, an exception is thrown

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/api/nanofi.h
----------------------------------------------------------------------
diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 31a4829..e25a3a0 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -79,10 +79,12 @@ flow *create_getfile(nifi_instance *instance, flow *parent, 
GetFileConfig *c);
 
 processor *add_processor(flow *, const char *);
 
-processor *add_processor_with_linkage(flow *flow, const char *processor_name);
-
 processor *add_python_processor(flow *, void 
(*ontrigger_callback)(processor_session *session));
 
+standalone_processor *create_processor(const char *);
+
+void free_standalone_processor(standalone_processor*);
+
 /**
 * Register your callback to received flow files that the flow failed to process
 * The flow file ownership is transferred to the caller!
@@ -101,6 +103,8 @@ int set_failure_strategy(flow *flow, FailureStrategy 
strategy);
 
 int set_property(processor *, const char *, const char *);
 
+int set_standalone_property(standalone_processor*, const char*, const char *);
+
 int set_instance_property(nifi_instance *instance, const char*, const char *);
 
 int free_flow(flow *);
@@ -111,6 +115,14 @@ size_t get_flow_files(nifi_instance *, flow *, 
flow_file_record **, size_t);
 
 flow_file_record *get(nifi_instance *,flow *, processor_session *);
 
+flow_file_record *invoke(standalone_processor* proc);
+
+flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record 
*input_ff);
+
+flow_file_record *invoke_file(standalone_processor* proc, const char* path);
+
+flow_file_record *invoke_chunck(standalone_processor* proc, uint8_t* buf, 
uint64_t);
+
 int transfer(processor_session* session, flow *flow, const char *rel);
 
 /**
@@ -135,6 +147,14 @@ int get_attribute_qty(const flow_file_record* ff);
 
 int get_all_attributes(const flow_file_record* ff, attribute_set *target);
 
+/**
+ * reads the content of a flow file
+ * @param target reference in which will set the result
+ * @param size max number of bytes to read (use flow_file_record->size to get 
the whole content)
+ * @return resulting read size (<=size)
+ **/
+int get_content(const flow_file_record* ff, uint8_t* target, int size);
+
 uint8_t remove_attribute(flow_file_record*, char *key);
 
 /****

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/core/cstructs.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h
index bc9700e..e493166 100644
--- a/nanofi/include/core/cstructs.h
+++ b/nanofi/include/core/cstructs.h
@@ -19,6 +19,9 @@
 #ifndef LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
 #define LIBMINIFI_SRC_CAPI_CSTRUCTS_H_
 
+#include <stddef.h>
+#include <stdint.h>
+
 /**
  * NiFi Port struct
  */
@@ -62,13 +65,11 @@ typedef struct {
  * ##################################################################
  */
 
-typedef struct {
-  void *processor_ptr;
-} processor;
+typedef struct processor processor;
 
-typedef struct {
-  void *session;
-} processor_session;
+typedef struct standalone_processor standalone_processor;
+
+typedef struct processor_session processor_session;
 
 /****
  * ##################################################################
@@ -106,9 +107,7 @@ typedef struct {
 
 } flow_file_record;
 
-typedef struct {
-  void *plan;
-} flow;
+typedef struct flow flow;
 
 typedef enum FS {
   AS_IS,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/core/cxxstructs.h
----------------------------------------------------------------------
diff --git a/nanofi/include/core/cxxstructs.h b/nanofi/include/core/cxxstructs.h
new file mode 100644
index 0000000..dfa327c
--- /dev/null
+++ b/nanofi/include/core/cxxstructs.h
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_CXXSTRUCTS_H
+#define NIFI_MINIFI_CPP_CXXSTRUCTS_H
+
+#include "cstructs.h"
+#include "cxx/Plan.h"
+
+struct flow : public ExecutionPlan {
+  using ExecutionPlan::ExecutionPlan;
+};
+
+struct standalone_processor : public core::Processor {
+  using core::Processor::Processor;
+};
+
+struct processor : public core::Processor {
+  using core::Processor::Processor;
+};
+
+struct processor_session : public core::ProcessSession {
+  using core::ProcessSession::ProcessSession;
+};
+
+#endif //NIFI_MINIFI_CPP_CXXSTRUCTS_H

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/cxx/CallbackProcessor.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/CallbackProcessor.h 
b/nanofi/include/cxx/CallbackProcessor.h
index 61e824f..7cfcaf2 100644
--- a/nanofi/include/cxx/CallbackProcessor.h
+++ b/nanofi/include/cxx/CallbackProcessor.h
@@ -65,7 +65,7 @@ class CallbackProcessor : public core::Processor {
 
  public:
 
-  void setCallback(void *obj,std::function<void(processor_session*)> 
ontrigger_callback) {
+  void setCallback(void *obj,std::function<void(core::ProcessSession*)> 
ontrigger_callback) {
     objref_ = obj;
     callback_ = ontrigger_callback;
   }
@@ -82,7 +82,7 @@ class CallbackProcessor : public core::Processor {
 
  protected:
   void *objref_;
-  std::function<void(processor_session*)> callback_;
+  std::function<void(core::ProcessSession*)> callback_;
  private:
   // Logger
   std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/include/cxx/Plan.h
----------------------------------------------------------------------
diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h
index 6171242..e2cb827 100644
--- a/nanofi/include/cxx/Plan.h
+++ b/nanofi/include/cxx/Plan.h
@@ -44,12 +44,16 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessorNode.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
-#include "core/cstructs.h"
 #include "api/nanofi.h"
 
 using failure_callback_type = std::function<void(flow_file_record*)>;
 using content_repo_sptr = std::shared_ptr<core::ContentRepository>;
 
+struct flowfile_input_params {
+  std::shared_ptr<minifi::io::DataStream> content_stream;
+  std::map<std::string, std::string> attributes;
+};
+
 namespace {
 
   void failureStrategyAsIs(core::ProcessSession *session, 
failure_callback_type user_callback, content_repo_sptr cr_ptr) {
@@ -88,7 +92,7 @@ class ExecutionPlan {
 
   explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::Repository> prov_repo);
 
-  std::shared_ptr<core::Processor> addCallback(void *, 
std::function<void(processor_session*)>);
+  std::shared_ptr<core::Processor> addCallback(void *, 
std::function<void(core::ProcessSession*)>);
 
   std::shared_ptr<core::Processor> addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name,
                                                 core::Relationship 
relationship = core::Relationship("success", "description"),
@@ -101,7 +105,7 @@ class ExecutionPlan {
 
   void reset();
 
-  bool runNextProcessor(std::function<void(const 
std::shared_ptr<core::ProcessContext>, const 
std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+  bool runNextProcessor(std::function<void(const 
std::shared_ptr<core::ProcessContext>, const 
std::shared_ptr<core::ProcessSession>)> verify = nullptr, 
std::shared_ptr<flowfile_input_params> = nullptr);
 
   bool setFailureCallback(failure_callback_type onerror_callback);
 
@@ -133,8 +137,29 @@ class ExecutionPlan {
     next_ff_ = ptr;
   }
 
+  bool hasProcessor() {
+    return !processor_queue_.empty();
+  }
+
   static std::shared_ptr<core::Processor> createProcessor(const std::string 
&processor_name, const std::string &name);
 
+  static std::shared_ptr<ExecutionPlan> getPlan(const std::string& uuid) {
+    auto it = proc_plan_map_.find(uuid);
+    return it != proc_plan_map_.end() ? it->second : nullptr;
+  }
+
+  static void addProcessorWithPlan(const std::string &uuid, 
std::shared_ptr<ExecutionPlan> plan) {
+    proc_plan_map_[uuid] = plan;
+  }
+
+  static bool removeProcWithPlan(const std::string& uuid) {
+    return proc_plan_map_.erase(uuid) > 0;
+  }
+
+  static size_t getProcWithPlanQty() {
+    return proc_plan_map_.size();
+  }
+
  protected:
   class FailureHandler {
    public:
@@ -149,9 +174,8 @@ class ExecutionPlan {
     void setStrategy(FailureStrategy strat) {
       strategy_ = strat;
     }
-    void operator()(const processor_session* ps) {
-      auto ses = static_cast<core::ProcessSession*>(ps->session);
-      FailureStrategies.at(strategy_)(ses, callback_, content_repo_);
+    void operator()(core::ProcessSession* ps) {
+      FailureStrategies.at(strategy_)(ps, callback_, content_repo_);
     }
    private:
     failure_callback_type callback_;
@@ -199,6 +223,7 @@ class ExecutionPlan {
   static std::shared_ptr<utils::IdGenerator> id_generator_;
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<FailureHandler> failure_handler_;
+  static std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> 
proc_plan_map_;
 };
 
 #endif /* LIBMINIFI_CAPI_PLAN_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/src/api/nanofi.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index ee33c6b..7a06bfc 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 #include <utility>
 #include <exception>
+#include <stdio.h>
 
 #include "api/nanofi.h"
 #include "core/Core.h"
@@ -30,6 +31,8 @@
 #include "processors/GetFile.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/StringUtils.h"
+#include "io/DataStream.h"
+#include "core/cxxstructs.h"
 
 using string_map = std::map<std::string, std::string>;
 
@@ -40,6 +43,8 @@ class API_INITIALIZER {
 
 int API_INITIALIZER::initialized = initialize_api();
 
+static nifi_instance* standalone_instance = nullptr;
+
 int initialize_api() {
   logging::LoggerConfiguration::getConfiguration().disableLogging();
   return 1;
@@ -89,6 +94,39 @@ nifi_instance *create_instance(const char *url, nifi_port 
*port) {
   return instance;
 }
 
+standalone_processor *create_processor(const char *name) {
+  static int proc_counter = 0;
+  auto ptr = ExecutionPlan::createProcessor(name, name);
+  if (!ptr) {
+    return nullptr;
+  }
+  if (standalone_instance == nullptr) {
+    nifi_port port;
+    char portnum[] = "98765";
+    port.port_id = portnum;
+    standalone_instance = create_instance("internal_standalone", &port);
+  }
+  std::string flow_name = std::to_string(proc_counter++);
+  auto flow = create_flow(standalone_instance, flow_name.c_str());
+  std::shared_ptr<ExecutionPlan> plan(flow);
+  plan->addProcessor(ptr, name);
+  ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
+  return static_cast<standalone_processor*>(ptr.get());
+}
+
+void free_standalone_processor(standalone_processor* proc) {
+  if (proc == nullptr) {
+    return;
+  }
+  ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
+
+  if (ExecutionPlan::getProcWithPlanQty() == 0) {
+    // The instance is not needed any more as there are no standalone 
processors in the system
+    free_instance(standalone_instance);
+    standalone_instance = nullptr;
+  }
+}
+
 /**
  * Initializes the instance
  */
@@ -288,6 +326,16 @@ uint8_t remove_attribute(flow_file_record *ff, const char 
*key) {
   return attribute_map->erase(key) - 1;  // erase by key returns the number of 
elements removed (0 or 1)
 }
 
+int get_content(const flow_file_record* ff, uint8_t* target, int size) {
+  if (ff == nullptr || target == nullptr || size == 0) {
+    return 0;
+  }
+  auto content_repo = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
+  std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo);
+  auto stream = (*content_repo)->read(claim);
+  return stream->read(target, size);
+}
+
 /**
  * Transmits the flowfile
  * @param ff flow file record
@@ -323,13 +371,11 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance 
*instance) {
 
 flow * create_new_flow(nifi_instance * instance) {
   auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
-  flow *new_flow = (flow*) malloc(sizeof(flow));
-
-  auto execution_plan = new 
ExecutionPlan(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
-
-  new_flow->plan = execution_plan;
-
-  return new_flow;
+  flow * area = static_cast<flow*>(malloc(1*sizeof(flow)));
+  if(area == nullptr) {
+    return nullptr;
+  }
+  return new(area) flow(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
 }
 
 flow *create_flow(nifi_instance *instance, const char *first_processor) {
@@ -337,41 +383,40 @@ flow *create_flow(nifi_instance *instance, const char 
*first_processor) {
     return nullptr;
   }
   auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
-  flow *new_flow = (flow*) malloc(sizeof(flow));
-
-  auto execution_plan = new 
ExecutionPlan(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
-
-  new_flow->plan = execution_plan;
+  flow * area = static_cast<flow*>(malloc(1*sizeof(flow)));
+  if(area == nullptr) {
+    return nullptr;
+  }
+  flow *new_flow = new(area) flow(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
 
   if (first_processor != nullptr && strlen(first_processor) > 0) {
     // automatically adds it with success
-    execution_plan->addProcessor(first_processor, first_processor);
+    new_flow->addProcessor(first_processor, first_processor);
   }
   return new_flow;
 }
 
 processor *add_python_processor(flow *flow, void 
(*ontrigger_callback)(processor_session *)) {
-  if (nullptr == flow || nullptr == flow->plan || nullptr == 
ontrigger_callback) {
+  if (nullptr == flow || nullptr == ontrigger_callback) {
     return nullptr;
   }
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addCallback(nullptr, std::bind(ontrigger_callback, 
std::placeholders::_1));
-  processor *new_processor = (processor*) malloc(sizeof(processor));
-  new_processor->processor_ptr = proc.get();
-  return new_processor;
+  auto lambda = [ontrigger_callback](core::ProcessSession *ps) {
+    ontrigger_callback(static_cast<processor_session*>(ps));  //Meh, sorry for 
this
+  };
+  auto proc = flow->addCallback(nullptr, lambda);
+  return static_cast<processor*>(proc.get());
 }
 
 flow * create_getfile(nifi_instance * instance, flow * parent_flow, 
GetFileConfig * c) {
   static const std::string first_processor = "GetFile";
   flow *new_flow = parent_flow == nullptr ? create_flow(instance, nullptr) : 
parent_flow;
 
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
   // automatically adds it with success
-  auto getFile = plan->addProcessor(first_processor, first_processor);
+  auto getFile = new_flow->addProcessor(first_processor, first_processor);
 
-  plan->setProperty(getFile, processors::GetFile::Directory.getName(), 
c->directory);
-  plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), 
c->keep_source ? "true" : "false");
-  plan->setProperty(getFile, processors::GetFile::Recurse.getName(), 
c->recurse ? "true" : "false");
+  new_flow->setProperty(getFile, processors::GetFile::Directory.getName(), 
c->directory);
+  new_flow->setProperty(getFile, 
processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : 
"false");
+  new_flow->setProperty(getFile, processors::GetFile::Recurse.getName(), 
c->recurse ? "true" : "false");
 
   return new_flow;
 }
@@ -380,89 +425,83 @@ processor *add_processor(flow *flow, const char 
*processor_name) {
   if (nullptr == flow || nullptr == processor_name) {
     return nullptr;
   }
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addProcessor(processor_name, processor_name);
-  if (proc) {
-    processor *new_processor = (processor*) malloc(sizeof(processor));
-    new_processor->processor_ptr = proc.get();
-    return new_processor;
-  }
-  return nullptr;
-}
 
-processor *add_processor_with_linkage(flow *flow, const char *processor_name) {
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto proc = plan->addProcessor(processor_name, processor_name, 
core::Relationship("success", "description"), true);
-  if (proc) {
-    processor *new_processor = (processor*) malloc(sizeof(processor));
-    new_processor->processor_ptr = proc.get();
-    return new_processor;
-  }
-  return nullptr;
+  auto proc = flow->addProcessor(processor_name, processor_name, 
core::Relationship("success", "description"), flow->hasProcessor());
+  return static_cast<processor*>(proc.get());
 }
 
 int add_failure_callback(flow *flow, void 
(*onerror_callback)(flow_file_record*)) {
-  ExecutionPlan *plan = static_cast<ExecutionPlan*>(flow->plan);
-  return plan->setFailureCallback(onerror_callback) ? 0 : 1;
+  return flow->setFailureCallback(onerror_callback) ? 0 : 1;
 }
 
 int set_failure_strategy(flow *flow, FailureStrategy strategy) {
-  return static_cast<ExecutionPlan*>(flow->plan)->setFailureStrategy(strategy) 
? 0 : -1;
+  return flow->setFailureStrategy(strategy) ? 0 : -1;
 }
 
-int set_property(processor *proc, const char *name, const char *value) {
-  if (name != nullptr && value != nullptr && proc != nullptr) {
-    core::Processor *p = static_cast<core::Processor*>(proc->processor_ptr);
-    bool success = p->setProperty(name, value) || 
(p->supportsDynamicProperties() && p->setDynamicProperty(name, value));
+int set_propery_internal(core::Processor* proc, const char *name, const char 
*value) {
+  if (name != nullptr && value != nullptr) {
+    bool success = proc->setProperty(name, value) || 
(proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value));
     return success ? 0 : -2;
   }
   return -1;
 }
 
+int set_property(processor *proc, const char *name, const char *value) {
+  if (proc != nullptr) {
+    return set_propery_internal(proc, name, value);
+  }
+  return -1;
+}
+
+int set_standalone_property(standalone_processor *proc, const char *name, 
const char *value) {
+  if (proc != nullptr) {
+    return set_propery_internal(proc, name, value);
+  }
+  return -1;
+}
+
 int free_flow(flow *flow) {
-  if (flow == nullptr || nullptr == flow->plan)
+  if (flow == nullptr)
     return -1;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  delete execution_plan;
-  free(flow);
+  delete flow;
   return 0;
 }
 
-flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) {
-  if (instance == nullptr || nullptr == flow || nullptr == flow->plan)
-    return nullptr;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  execution_plan->reset();
-  while (execution_plan->runNextProcessor()) {
-  }
-  auto ff = execution_plan->getCurrentFlowFile();
+flow_file_record* flowfile_to_record(std::shared_ptr<core::FlowFile> ff, 
ExecutionPlan* plan) {
   if (ff == nullptr) {
     return nullptr;
   }
   auto claim = ff->getResourceClaim();
+  if(claim == nullptr) {
+    return nullptr;
+  }
+
+  // create a flow file.
+  claim->increaseFlowFileRecordOwnedCount();
+  auto path = claim->getContentFullPath();
+  auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
+  ffr->ffp = ff.get();
+  ffr->attributes = ff->getAttributesPtr();
+  auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
+  *content_repo_ptr = plan->getContentRepo();
+  return ffr;
+}
 
-  if (claim != nullptr) {
-    // create a flow file.
-    claim->increaseFlowFileRecordOwnedCount();
-    auto path = claim->getContentFullPath();
-    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-    ffr->ffp = ff.get();
-    ffr->attributes = ff->getAttributesPtr();
-    auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
-    *content_repo_ptr = execution_plan->getContentRepo();
-    return ffr;
-  } else {
+flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) {
+  if (instance == nullptr || nullptr == flow)
     return nullptr;
+  flow->reset();
+  while (flow->runNextProcessor()) {
   }
+  return flowfile_to_record(flow->getCurrentFlowFile(), flow);
 }
 
 size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record 
**ff_r, size_t size) {
   if (nullptr == instance || nullptr == flow || nullptr == ff_r)
     return 0;
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
   size_t i = 0;
   for (; i < size; i++) {
-    execution_plan->reset();
+    flow->reset();
     auto ffr = get_next_flow_file(instance, flow);
     if (ffr == nullptr) {
       break;
@@ -475,44 +514,99 @@ size_t get_flow_files(nifi_instance *instance, flow 
*flow, flow_file_record **ff
 flow_file_record * get(nifi_instance * instance, flow * flow, 
processor_session * session) {
   if (nullptr == instance || nullptr == flow || nullptr == session)
     return nullptr;
-  auto sesh = static_cast<core::ProcessSession*>(session->session);
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  auto ff = sesh->get();
-  execution_plan->setNextFlowFile(ff);
-  if (ff == nullptr) {
+  auto ff = session->get();
+  flow->setNextFlowFile(ff);
+  return flowfile_to_record(ff, flow);
+}
+
+flow_file_record *invoke(standalone_processor* proc) {
+  return invoke_ff(proc, nullptr);
+}
+
+
+flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record 
*input_ff) {
+  if (proc == nullptr) {
     return nullptr;
   }
-  auto claim = ff->getResourceClaim();
+  auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+  if (!plan) {
+    // This is not a standalone processor, shouldn't be used with invoke!
+    return nullptr;
+  }
+
+  plan->reset();
+
+  if (input_ff) {
+    auto ff_data = std::make_shared<flowfile_input_params>();
+    auto content_repo = 
static_cast<std::shared_ptr<minifi::core::ContentRepository> *>(input_ff->crp);
+    std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
+                                                                               
            *content_repo);
+    ff_data->content_stream = (*content_repo)->read(claim);
+    ff_data->attributes = *static_cast<std::map<std::string, std::string> 
*>(input_ff->attributes);
+
+    plan->runNextProcessor(nullptr, ff_data);
+  }
+  while (plan->runNextProcessor()) {
+  }
+  return flowfile_to_record(plan->getCurrentFlowFile(), plan.get());
+}
+
+flow_file_record *invoke_chunk(standalone_processor* proc, uint8_t* buf, 
uint64_t size) {
+  if (proc == nullptr || buf == nullptr || size == 0) {
+    return nullptr;
+  }
+
+  auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+  if (!plan) {
+    // This is not a standalone processor, shouldn't be used with invoke!
+    return nullptr;
+  }
+
+  plan->reset();
+
+  auto ff_data = std::make_shared<flowfile_input_params>();
+  ff_data->content_stream = std::make_shared<minifi::io::DataStream>();
+  ff_data->content_stream->writeData(buf, size);
 
-  if (claim != nullptr) {
-    // create a flow file.
-    claim->increaseFlowFileRecordOwnedCount();
-    auto path = claim->getContentFullPath();
-    auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
-    ffr->attributes = ff->getAttributesPtr();
-    ffr->ffp = ff.get();
-    auto content_repo_ptr = 
static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
-    *content_repo_ptr = execution_plan->getContentRepo();
-    return ffr;
-  } else {
+  plan->runNextProcessor(nullptr, ff_data);
+  while (plan->runNextProcessor()) {
+  }
+
+  return flowfile_to_record(plan->getCurrentFlowFile(), plan.get());
+}
+
+flow_file_record *invoke_file(standalone_processor* proc, const char* path) {
+  FILE *fileptr;
+  uint8_t *buffer;
+  uint64_t filelen;
+
+  fileptr = fopen(path, "rb");
+  if (fileptr == nullptr) {
     return nullptr;
   }
+  fseek(fileptr, 0, SEEK_END);
+  filelen = ftell(fileptr);
+  rewind(fileptr);
+
+  buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory 
for file + \0
+  fread(buffer, filelen, 1, fileptr);
+  fclose(fileptr);
+
+  flow_file_record* ffr = invoke_chunk(proc, buffer, filelen);
+  free(buffer);
+  return ffr;
 }
 
 int transfer(processor_session* session, flow *flow, const char *rel) {
   if (nullptr == session || nullptr == flow || rel == nullptr) {
     return -1;
   }
-  auto sesh = static_cast<core::ProcessSession*>(session->session);
-  auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
-  if (nullptr == sesh || nullptr == execution_plan) {
-    return -1;
-  }
+
   core::Relationship relationship(rel, rel);
-  auto ff = execution_plan->getNextFlowFile();
+  auto ff = flow->getNextFlowFile();
   if (nullptr == ff) {
     return -2;
   }
-  sesh->transfer(ff, relationship);
+  session->transfer(ff, relationship);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/src/cxx/CallbackProcessor.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/CallbackProcessor.cpp 
b/nanofi/src/cxx/CallbackProcessor.cpp
index 5294a1b..013ec47 100644
--- a/nanofi/src/cxx/CallbackProcessor.cpp
+++ b/nanofi/src/cxx/CallbackProcessor.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #include "cxx/CallbackProcessor.h"
+#include "core/cxxstructs.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -24,9 +25,7 @@ namespace processors {
 
 void CallbackProcessor::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
   if (callback_ != nullptr) {
-    processor_session sesh;
-    sesh.session = session;
-    callback_(&sesh);
+    callback_(session);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/src/cxx/Plan.cpp
----------------------------------------------------------------------
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
index f892aa9..b2b4690 100644
--- a/nanofi/src/cxx/Plan.cpp
+++ b/nanofi/src/cxx/Plan.cpp
@@ -23,19 +23,8 @@
 #include <set>
 #include <string>
 
-bool intToFailureStragey(int in, FailureStrategy *out) {
-  auto tmp = static_cast<FailureStrategy>(in);
-  switch (tmp) {
-    case AS_IS:
-    case ROLLBACK:
-      *out = tmp;
-      return true;
-    default:
-      return false;
-  }
-}
-
 std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = 
utils::IdGenerator::getIdGenerator();
+std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> 
ExecutionPlan::proc_plan_map_ = {};
 
 ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::Repository> prov_repo)
     : content_repo_(content_repo),
@@ -52,7 +41,7 @@ 
ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_re
  * Add a callback to obtain and pass processor session to a generated processor
  *
  */
-std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, 
std::function<void(processor_session*)> fp) {
+std::shared_ptr<core::Processor> ExecutionPlan::addCallback(void *obj, 
std::function<void(core::ProcessSession*)> fp) {
   if (finalized) {
     return nullptr;
   }
@@ -144,7 +133,8 @@ void ExecutionPlan::reset() {
   }
 }
 
-bool ExecutionPlan::runNextProcessor(std::function<void(const 
std::shared_ptr<core::ProcessContext>, const 
std::shared_ptr<core::ProcessSession>)> verify) {
+bool ExecutionPlan::runNextProcessor(std::function<void(const 
std::shared_ptr<core::ProcessContext>, const 
std::shared_ptr<core::ProcessSession>)> verify,
+                                     std::shared_ptr<flowfile_input_params> 
input_ff_params) {
   if (!finalized) {
     finalize();
   }
@@ -152,6 +142,7 @@ bool 
ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<co
   if (location >= processor_queue_.size()) {
     return false;
   }
+
   std::shared_ptr<core::Processor> processor = processor_queue_[location];
   std::shared_ptr<core::ProcessContext> context = 
processor_contexts_[location];
   std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
@@ -162,6 +153,15 @@ bool 
ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<co
   }
   std::shared_ptr<core::ProcessSession> current_session = 
std::make_shared<core::ProcessSession>(context);
   process_sessions_.push_back(current_session);
+  if (input_ff_params) {
+    std::shared_ptr<minifi::FlowFileRecord> flowFile = 
std::static_pointer_cast<minifi::FlowFileRecord>(current_session->create());
+    for(const auto& kv : input_ff_params->attributes) {
+      flowFile->setAttribute(kv.first, kv.second);
+    }
+    current_session->importFrom(*(input_ff_params->content_stream.get()), 
flowFile);
+    current_session->transfer(flowFile, core::Relationship("success", 
"success"));
+    
relationships_[relationships_.size()-1]->put(std::static_pointer_cast<core::FlowFile>(flowFile));
+  }
   processor->incrementActiveTasks();
   processor->setScheduledState(core::ScheduledState::RUNNING);
   if (verify != nullptr) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/4d20f840/nanofi/tests/CAPITests.cpp
----------------------------------------------------------------------
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index 65c52e1..54eae0e 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -31,6 +31,11 @@
 #include <thread>
 #include "api/nanofi.h"
 
+char src_format[] = "/tmp/gt.XXXXXX";
+char put_format[] = "/tmp/pt.XXXXXX";
+std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+std::string test_file_name = "tstFile.ext";
+
 static nifi_instance *create_instance_obj(const char *name = 
"random_instance") {
   nifi_port port;
   char port_str[] = "12345";
@@ -51,6 +56,16 @@ void big_failure_counter(flow_file_record * fr) {
   free_flowfile(fr);
 }
 
+std::string create_testfile_for_getfile(const char* sourcedir, const 
std::string& filename = test_file_name) {
+  std::fstream file;
+  std::stringstream ss;
+  ss << sourcedir << "/" << filename;
+  file.open(ss.str(), std::ios::out);
+  file << test_file_content;
+  file.close();
+  return ss.str();
+}
+
 TEST_CASE("Test Creation of instance, one processor", 
"[createInstanceAndFlow]") {
   auto instance = create_instance_obj();
   REQUIRE(instance != nullptr);
@@ -95,35 +110,27 @@ TEST_CASE("Set valid and invalid properties", 
"[setProcesssorProperties]") {
 TEST_CASE("get file and put file", "[getAndPutFile]") {
   TestController testController;
 
-  char src_format[] = "/tmp/gt.XXXXXX";
-  char put_format[] = "/tmp/pt.XXXXXX";
   const char *sourcedir = testController.createTempDirectory(src_format);
   const char *putfiledir = testController.createTempDirectory(put_format);
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
   auto instance = create_instance_obj();
   REQUIRE(instance != nullptr);
   flow *test_flow = create_flow(instance, nullptr);
   REQUIRE(test_flow != nullptr);
   processor *get_proc = add_processor(test_flow, "GetFile");
   REQUIRE(get_proc != nullptr);
-  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  processor *put_proc = add_processor(test_flow, "PutFile");
   REQUIRE(put_proc != nullptr);
   REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
   REQUIRE(set_property(put_proc, "Directory", putfiledir) == 0);
 
-  std::fstream file;
-  std::stringstream ss;
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
+  create_testfile_for_getfile(sourcedir);
 
   flow_file_record *record = get_next_flow_file(instance, test_flow);
   REQUIRE(record != nullptr);
 
-  ss.str("");
+  std::stringstream ss;
 
-  ss << putfiledir << "/" << "tstFile.ext";
+  ss << putfiledir << "/" << test_file_name;
   std::ifstream t(ss.str());
   std::string put_data((std::istreambuf_iterator<char>(t)), 
std::istreambuf_iterator<char>());
 
@@ -132,6 +139,14 @@ TEST_CASE("get file and put file", "[getAndPutFile]") {
   // No failure handler can be added after the flow is finalized
   REQUIRE(add_failure_callback(test_flow, failure_counter) == 1);
 
+  uint8_t* content = (uint8_t*)malloc(record->size* sizeof(uint8_t));
+
+  REQUIRE(get_content(record, content, record->size) == record->size);
+
+  REQUIRE(test_file_content == std::string(reinterpret_cast<char*>(content), 
record->size));
+
+  free(content);
+
   free_flowfile(record);
 
   free_flow(test_flow);
@@ -142,17 +157,10 @@ TEST_CASE("get file and put file", "[getAndPutFile]") {
 TEST_CASE("Test manipulation of attributes", "[testAttributes]") {
   TestController testController;
 
-  char src_format[] = "/tmp/gt.XXXXXX";
   const char *sourcedir = testController.createTempDirectory(src_format);
 
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
+  create_testfile_for_getfile(sourcedir);
 
-  std::fstream file;
-  std::stringstream ss;
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
   auto instance = create_instance_obj();
   REQUIRE(instance != nullptr);
   flow *test_flow = create_flow(instance, nullptr);
@@ -161,10 +169,10 @@ TEST_CASE("Test manipulation of attributes", 
"[testAttributes]") {
   processor *get_proc = add_processor(test_flow, "GetFile");
   REQUIRE(get_proc != nullptr);
   REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
-  processor *extract_test = add_processor_with_linkage(test_flow, 
"ExtractText");
+  processor *extract_test = add_processor(test_flow, "ExtractText");
   REQUIRE(extract_test != nullptr);
   REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0);
-  processor *update_attr = add_processor_with_linkage(test_flow, 
"UpdateAttribute");
+  processor *update_attr = add_processor(test_flow, "UpdateAttribute");
   REQUIRE(update_attr != nullptr);
 
   REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0);
@@ -224,9 +232,7 @@ TEST_CASE("Test manipulation of attributes", 
"[testAttributes]") {
 TEST_CASE("Test error handling callback", "[errorHandling]") {
   TestController testController;
 
-  char src_format[] = "/tmp/gt.XXXXXX";
   const char *sourcedir = testController.createTempDirectory(src_format);
-  std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!";
 
   auto instance = create_instance_obj();
   REQUIRE(instance != nullptr);
@@ -239,19 +245,13 @@ TEST_CASE("Test error handling callback", 
"[errorHandling]") {
 
   processor *get_proc = add_processor(test_flow, "GetFile");
   REQUIRE(get_proc != nullptr);
-  processor *put_proc = add_processor_with_linkage(test_flow, "PutFile");
+  processor *put_proc = add_processor(test_flow, "PutFile");
   REQUIRE(put_proc != nullptr);
   REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
   REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0);
   REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0);
 
-  std::fstream file;
-  std::stringstream ss;
-
-  ss << sourcedir << "/" << "tstFile.ext";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
+  create_testfile_for_getfile(sourcedir);
 
 
   REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
@@ -263,10 +263,7 @@ TEST_CASE("Test error handling callback", 
"[errorHandling]") {
   REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0);
 
   // Create new testfile to trigger failure again
-  ss << "2";
-  file.open(ss.str(), std::ios::out);
-  file << test_file_content;
-  file.close();
+  create_testfile_for_getfile(sourcedir, test_file_name + "2");
 
   REQUIRE(get_next_flow_file(instance, test_flow) == nullptr);
   REQUIRE(failure_count > 100);
@@ -276,3 +273,118 @@ TEST_CASE("Test error handling callback", 
"[errorHandling]") {
   free_flow(test_flow);
   free_instance(instance);
 }
+
+TEST_CASE("Test standalone processors", "[testStandalone]") {
+  TestController testController;
+
+  const char *sourcedir = testController.createTempDirectory(src_format);
+
+  create_testfile_for_getfile(sourcedir);
+
+  standalone_processor* getfile_proc = create_processor("GetFile");
+  REQUIRE(set_standalone_property(getfile_proc, "Input Directory", sourcedir) 
== 0);
+
+  flow_file_record* ffr = invoke(getfile_proc);
+
+  REQUIRE(ffr != nullptr);
+  REQUIRE(get_attribute_qty(ffr) > 0);
+
+  standalone_processor* extract_test = create_processor("ExtractText");
+  REQUIRE(extract_test != nullptr);
+  REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
+
+  flow_file_record* ffr2 = invoke_ff(extract_test, ffr);
+
+  free_flowfile(ffr);
+
+  // Verify the transfer of attributes
+  REQUIRE(ffr2 != nullptr);
+  REQUIRE(get_attribute_qty(ffr2) > 0);
+
+  char filename_key[] = "filename";
+  attribute attr;
+  attr.key = filename_key;
+  attr.value_size = 0;
+
+  REQUIRE(get_attribute(ffr2, &attr) == 0);
+  REQUIRE(attr.value_size > 0);
+
+  // Verify extracttext behavior
+  char test_attr[] = "TestAttr";
+  attr.key = test_attr;
+  attr.value_size = 0;
+  REQUIRE(get_attribute(ffr2, &attr) == 0);
+  REQUIRE(std::string(static_cast<char*>(attr.value), attr.value_size) == 
test_file_content);
+
+  free_flowfile(ffr2);
+  free_standalone_processor(getfile_proc);
+}
+
+TEST_CASE("Test interaction of flow and standlone processors", 
"[testStandaloneWithFlow]") {
+  TestController testController;
+
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  const char *putfiledir = testController.createTempDirectory(put_format);
+
+  create_testfile_for_getfile(sourcedir);
+
+  auto instance = create_instance_obj();
+  REQUIRE(instance != nullptr);
+  flow *test_flow = create_flow(instance, nullptr);
+  REQUIRE(test_flow != nullptr);
+
+  processor *get_proc = add_processor(test_flow, "GetFile");
+  REQUIRE(get_proc != nullptr);
+  REQUIRE(set_property(get_proc, "Input Directory", sourcedir) == 0);
+
+  flow_file_record *record = get_next_flow_file(instance, test_flow);
+  REQUIRE(record != nullptr);
+
+  standalone_processor* putfile_proc = create_processor("PutFile");
+  REQUIRE(set_standalone_property(putfile_proc, "Directory", putfiledir) == 0);
+
+  flow_file_record* put_record = invoke_ff(putfile_proc, record);
+  REQUIRE(put_record != nullptr);
+
+  free_flowfile(record);
+  free_flowfile(put_record);
+
+  std::stringstream ss;
+
+  ss << putfiledir << "/" << test_file_name;
+  std::ifstream t(ss.str());
+  std::string put_data((std::istreambuf_iterator<char>(t)), 
std::istreambuf_iterator<char>());
+
+  REQUIRE(test_file_content == put_data);
+
+  free_flow(test_flow);
+  free_instance(instance);
+  free_standalone_processor(putfile_proc);
+}
+
+TEST_CASE("Test standalone processors with file input", 
"[testStandaloneWithFile]") {
+  TestController testController;
+
+  enable_logging();
+
+  const char *sourcedir = testController.createTempDirectory(src_format);
+  std::string path = create_testfile_for_getfile(sourcedir);
+
+  standalone_processor* extract_test = create_processor("ExtractText");
+  REQUIRE(extract_test != nullptr);
+  REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0);
+
+  flow_file_record* ffr = invoke_file(extract_test, path.c_str());
+
+  REQUIRE(ffr != nullptr);
+
+  attribute attr;
+  char test_attr[] = "TestAttr";
+  attr.key = test_attr;
+  attr.value_size = 0;
+  REQUIRE(get_attribute(ffr, &attr) == 0);
+  REQUIRE(std::string(static_cast<char*>(attr.value), attr.value_size) == 
test_file_content);
+
+  free_flowfile(ffr);
+  free_standalone_processor(extract_test);
+}

Reply via email to