http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index eca84be..e472a9a 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -30,765 +30,271 @@
 #include <unistd.h>
 #include <future>
 #include "FlowController.h"
-#include "ProcessContext.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessGroup.h"
 #include "utils/StringUtils.h"
+#include "core/core.h"
+#include "core/repository/FlowFileRepository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+#define DEFAULT_CONFIG_NAME "conf/flow.yml"
+
+FlowController::FlowController(
+    std::shared_ptr<core::Repository> provenance_repo,
+    std::shared_ptr<core::Repository> flow_file_repo,
+    std::unique_ptr<core::FlowConfiguration> flow_configuration,
+    const std::string name, bool headless_mode)
+    : CoreComponent(core::getClassName<FlowController>()),
+      root_(nullptr),
+      max_timer_driven_threads_(0),
+      max_event_driven_threads_(0),
+      running_(false),
+      initialized_(false),
+      provenance_repo_(provenance_repo),
+      flow_file_repo_(flow_file_repo),
+      protocol_(0),
+      _timerScheduler(provenance_repo_),
+      _eventScheduler(provenance_repo_),
+      flow_configuration_(std::move(flow_configuration)) {
+  if (provenance_repo == nullptr)
+    throw std::runtime_error("Provenance Repo should not be null");
+  if (flow_file_repo == nullptr)
+    throw std::runtime_error("Flow Repo should not be null");
+
+  uuid_generate(uuid_);
+  setUUID(uuid_);
+
+  // Setup the default values
+  if (flow_configuration_ != nullptr) {
+    configuration_filename_ = flow_configuration_->getConfigurationPath();
+  }
+  max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
+  max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
+  running_ = false;
+  initialized_ = false;
+  root_ = NULL;
+
+  protocol_ = new FlowControlProtocol(this);
+
+  // NiFi config properties
+  configure_ = Configure::getConfigure();
+
+  if (!headless_mode) {
+    std::string rawConfigFileString;
+    configure_->get(Configure::nifi_flow_configuration_file,
+                    rawConfigFileString);
+
+    if (!rawConfigFileString.empty()) {
+      configuration_filename_ = rawConfigFileString;
+    }
+
+    std::string adjustedFilename;
+    if (!configuration_filename_.empty()) {
+      // perform a naive determination if this is a relative path
+      if (configuration_filename_.c_str()[0] != '/') {
+        adjustedFilename = adjustedFilename + configure_->getHome() + "/"
+            + configuration_filename_;
+      } else {
+        adjustedFilename = configuration_filename_;
+      }
+    }
+
+    initializePaths(adjustedFilename);
+  }
 
-FlowController *FlowControllerFactory::_flowController(NULL);
-
-FlowControllerImpl::FlowControllerImpl(std::string name)  {
-       uuid_generate(_uuid);
-
-       _name = name;
-       // Setup the default values
-       _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
-       _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
-       _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
-       _running = false;
-       _initialized = false;
-       _root = NULL;
-       logger_ = Logger::getLogger();
-       _protocol = new FlowControlProtocol(this);
-
-       // NiFi config properties
-       configure_ = Configure::getConfigure();
-
-       std::string rawConfigFileString;
-       configure_->get(Configure::nifi_flow_configuration_file,
-                       rawConfigFileString);
-
-       if (!rawConfigFileString.empty()) {
-               _configurationFileName = rawConfigFileString;
-       }
-
-       char *path = NULL;
-       char full_path[PATH_MAX];
-
-       std::string adjustedFilename;
-       if (!_configurationFileName.empty()) {
-               // perform a naive determination if this is a relative path
-               if (_configurationFileName.c_str()[0] != '/') {
-                       adjustedFilename = adjustedFilename + 
configure_->getHome() + "/"
-                                       + _configurationFileName;
-               } else {
-                       adjustedFilename = _configurationFileName;
-               }
-       }
-
-       path = realpath(adjustedFilename.c_str(), full_path);
-
-       std::string pathString(path);
-       _configurationFileName = pathString;
-       logger_->log_info("FlowController NiFi Configuration file %s", 
pathString.c_str());
-
-       // Create the content repo directory if needed
-       struct stat contentDirStat;
-
-       if (stat(ResourceClaim::default_directory_path.c_str(), 
&contentDirStat) != -1 && S_ISDIR(contentDirStat.st_mode))
-       {
-               path = realpath(ResourceClaim::default_directory_path.c_str(), 
full_path);
-               logger_->log_info("FlowController content directory %s", 
full_path);
-       }
-       else
-       {
-          if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1)
-          {
-                  logger_->log_error("FlowController content directory 
creation failed");
-                  exit(1);
-          }
-       }
-
-       
-       std::string clientAuthStr;
-
-       if (!path) {
-               logger_->log_error(
-                               "Could not locate path from provided 
configuration file name (%s).  Exiting.",
-                               full_path);
-               exit(1);
-       }
-
-
-       // Create repos for flow record and provenance
-       _flowfileRepo = new FlowFileRepository();
-       _flowfileRepo->initialize();
-       _provenanceRepo = new ProvenanceRepository();
-       _provenanceRepo->initialize();
 }
 
-FlowControllerImpl::~FlowControllerImpl() {
+void FlowController::initializePaths(const std::string &adjustedFilename) {
+  char *path = NULL;
+  char full_path[PATH_MAX];
+  path = realpath(adjustedFilename.c_str(), full_path);
+
+  if (path == NULL) {
+    throw std::runtime_error(
+        "Path is not specified. Either manually set MINIFI_HOME or ensure 
../conf exists");
+  }
+  std::string pathString(path);
+  configuration_filename_ = pathString;
+  logger_->log_info("FlowController NiFi Configuration file %s",
+                    pathString.c_str());
+
+  // Create the content repo directory if needed
+  struct stat contentDirStat;
+
+  if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat)
+      != -1&& S_ISDIR(contentDirStat.st_mode)) {
+    path = realpath(ResourceClaim::default_directory_path.c_str(), full_path);
+    logger_->log_info("FlowController content directory %s", full_path);
+  } else {
+    if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) {
+      logger_->log_error("FlowController content directory creation failed");
+      exit(1);
+    }
+  }
 
-       stop(true);
-       unload();
-       if (NULL != _protocol)
-               delete _protocol;
-       if (NULL != _provenanceRepo)
-               delete _provenanceRepo;
-       if (NULL != _flowfileRepo)
-               delete _flowfileRepo;
+  std::string clientAuthStr;
 
-}
+  if (!path) {
+    logger_->log_error(
+        "Could not locate path from provided configuration file name (%s).  
Exiting.",
+        full_path);
+    exit(1);
+  }
 
-void FlowControllerImpl::stop(bool force) {
+}
 
-       if (_running) {
-               // immediately indicate that we are not running
-               _running = false;
+FlowController::~FlowController() {
+  stop(true);
+  unload();
+  if (NULL != protocol_)
+    delete protocol_;
 
-               logger_->log_info("Stop Flow Controller");
-               this->_timerScheduler.stop();
-               this->_eventScheduler.stop();
-               this->_flowfileRepo->stop();
-               this->_provenanceRepo->stop();
-               // Wait for sometime for thread stop
-               std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-               if (this->_root)
-                       this->_root->stopProcessing(&this->_timerScheduler,
-                                       &this->_eventScheduler);
+}
 
-       }
+void FlowController::stop(bool force) {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (running_) {
+    // immediately indicate that we are not running
+    running_ = false;
+
+    logger_->log_info("Stop Flow Controller");
+    this->_timerScheduler.stop();
+    this->_eventScheduler.stop();
+    this->flow_file_repo_->stop();
+    this->provenance_repo_->stop();
+    // Wait for sometime for thread stop
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    if (this->root_)
+      this->root_->stopProcessing(&this->_timerScheduler,
+                                  &this->_eventScheduler);
+
+  }
 }
 
 /**
  * This function will attempt to unload yaml and stop running Processors.
  *
  * If the latter attempt fails or does not complete within the prescribed
- * period, _running will be set to false and we will return.
+ * period, running_ will be set to false and we will return.
  *
  * @param timeToWaitMs Maximum time to wait before manually
  * marking running as false.
  */
-void FlowControllerImpl::waitUnload(const uint64_t timeToWaitMs) {
-       if (_running) {
-               // use the current time and increment with the provided 
argument.
-               std::chrono::system_clock::time_point wait_time =
-                               std::chrono::system_clock::now()
-                                               + 
std::chrono::milliseconds(timeToWaitMs);
-
-               // create an asynchronous future.
-               std::future<void> unload_task = std::async(std::launch::async,
-                               [this]() {unload();});
-
-               if (std::future_status::ready == 
unload_task.wait_until(wait_time)) {
-                       _running = false;
-               }
-
-       }
-}
-
-
-void FlowControllerImpl::unload() {
-       if (_running) {
-               stop(true);
-       }
-       if (_initialized) {
-               logger_->log_info("Unload Flow Controller");
-               if (_root)
-                       delete _root;
-               _root = NULL;
-               _initialized = false;
-               _name = "";
-       }
-
-       return;
-}
-
-Processor *FlowControllerImpl::createProcessor(std::string name, uuid_t uuid) {
-       Processor *processor = NULL;
-       if (name == GenerateFlowFile::ProcessorName) {
-               processor = new GenerateFlowFile(name, uuid);
-       } else if (name == LogAttribute::ProcessorName) {
-               processor = new LogAttribute(name, uuid);
-       } else if (name == RealTimeDataCollector::ProcessorName) {
-               processor = new RealTimeDataCollector(name, uuid);
-       } else if (name == GetFile::ProcessorName) {
-               processor = new GetFile(name, uuid);
-       } else if (name == PutFile::ProcessorName) {
-               processor = new PutFile(name, uuid);
-       } else if (name == TailFile::ProcessorName) {
-               processor = new TailFile(name, uuid);
-       } else if (name == ListenSyslog::ProcessorName) {
-               processor = new ListenSyslog(name, uuid);
-       } else if (name == ListenHTTP::ProcessorName) {
-        processor = new ListenHTTP(name, uuid);
-       } else if (name == ExecuteProcess::ProcessorName) {
-               processor = new ExecuteProcess(name, uuid);
-       } else if (name == AppendHostInfo::ProcessorName) {
-               processor = new AppendHostInfo(name, uuid);
-       } else {
-               logger_->log_error("No Processor defined for %s", name.c_str());
-               return NULL;
-       }
-
-       //! initialize the processor
-       processor->initialize();
-
-       return processor;
-}
-
-ProcessGroup *FlowControllerImpl::createRootProcessGroup(std::string name,
-               uuid_t uuid) {
-       return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
-}
-
-ProcessGroup *FlowControllerImpl::createRemoteProcessGroup(std::string name,
-               uuid_t uuid) {
-       return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
-}
-
-Connection *FlowControllerImpl::createConnection(std::string name,
-               uuid_t uuid) {
-       return new Connection(name, uuid);
-}
-
-#ifdef YAML_SUPPORT
-void FlowControllerImpl::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
-       uuid_t uuid;
-       ProcessGroup *group = NULL;
-
-       std::string flowName = rootFlowNode["name"].as<std::string>();
-       std::string id = rootFlowNode["id"].as<std::string>();
-
-       uuid_parse(id.c_str(), uuid);
-
-       logger_->log_debug("parseRootProcessGroup: id => [%s]", id.c_str());
-       logger_->log_debug("parseRootProcessGroup: name => [%s]", 
flowName.c_str());
-       group = this->createRootProcessGroup(flowName, uuid);
-       this->_root = group;
-       this->_name = flowName;
-}
-
-void FlowControllerImpl::parseProcessorNodeYaml(YAML::Node processorsNode,
-               ProcessGroup *parentGroup) {
-       int64_t schedulingPeriod = -1;
-       int64_t penalizationPeriod = -1;
-       int64_t yieldPeriod = -1;
-       int64_t runDurationNanos = -1;
-       uuid_t uuid;
-       Processor *processor = NULL;
-
-       if (!parentGroup) {
-               logger_->log_error("parseProcessNodeYaml: no parent group 
exists");
-               return;
-       }
-
-       if (processorsNode) {
-
-               if (processorsNode.IsSequence()) {
-                       // Evaluate sequence of processors
-                       int numProcessors = processorsNode.size();
-
-                       for (YAML::const_iterator iter = processorsNode.begin();
-                                       iter != processorsNode.end(); ++iter) {
-                               ProcessorConfig procCfg;
-                               YAML::Node procNode = iter->as<YAML::Node>();
-
-                               procCfg.name = 
procNode["name"].as<std::string>();
-                               procCfg.id = procNode["id"].as<std::string>();
-                               logger_->log_debug("parseProcessorNode: name => 
[%s] id => [%s]",
-                                               procCfg.name.c_str(), 
procCfg.id.c_str());
-                               procCfg.javaClass = 
procNode["class"].as<std::string>();
-                               logger_->log_debug("parseProcessorNode: class 
=> [%s]",
-                                               procCfg.javaClass.c_str());
-
-                               uuid_parse(procCfg.id.c_str(), uuid);
-
-                               // Determine the processor name only from the 
Java class
-                               int lastOfIdx = 
procCfg.javaClass.find_last_of(".");
-                               if (lastOfIdx != std::string::npos) {
-                                       lastOfIdx++; // if a value is found, 
increment to move beyond the .
-                                       int nameLength = 
procCfg.javaClass.length() - lastOfIdx;
-                                       std::string processorName = 
procCfg.javaClass.substr(
-                                                       lastOfIdx, nameLength);
-                                       processor = 
this->createProcessor(processorName, uuid);
-                               }
-
-                               if (!processor) {
-                                       logger_->log_error(
-                                                       "Could not create a 
processor %s with name %s",
-                                                       procCfg.name.c_str(), 
procCfg.id.c_str());
-                                       throw std::invalid_argument(
-                                                       "Could not create 
processor " + procCfg.name);
-                               }
-                               processor->setName(procCfg.name);
-
-                               procCfg.maxConcurrentTasks =
-                                               procNode["max concurrent 
tasks"].as<std::string>();
-                               logger_->log_debug(
-                                               "parseProcessorNode: max 
concurrent tasks => [%s]",
-                                               
procCfg.maxConcurrentTasks.c_str());
-                               procCfg.schedulingStrategy = 
procNode["scheduling strategy"].as<
-                                               std::string>();
-                               logger_->log_debug(
-                                               "parseProcessorNode: scheduling 
strategy => [%s]",
-                                               
procCfg.schedulingStrategy.c_str());
-                               procCfg.schedulingPeriod = procNode["scheduling 
period"].as<
-                                               std::string>();
-                               logger_->log_debug(
-                                               "parseProcessorNode: scheduling 
period => [%s]",
-                                               
procCfg.schedulingPeriod.c_str());
-                               procCfg.penalizationPeriod = 
procNode["penalization period"].as<
-                                               std::string>();
-                               logger_->log_debug(
-                                               "parseProcessorNode: 
penalization period => [%s]",
-                                               
procCfg.penalizationPeriod.c_str());
-                               procCfg.yieldPeriod =
-                                               procNode["yield 
period"].as<std::string>();
-                               logger_->log_debug("parseProcessorNode: yield 
period => [%s]",
-                                               procCfg.yieldPeriod.c_str());
-                               procCfg.yieldPeriod = procNode["run duration 
nanos"].as<
-                                               std::string>();
-                               logger_->log_debug(
-                                               "parseProcessorNode: run 
duration nanos => [%s]",
-                                               
procCfg.runDurationNanos.c_str());
-
-                               // handle auto-terminated relationships
-                               YAML::Node autoTerminatedSequence =
-                                               procNode["auto-terminated 
relationships list"];
-                               std::vector<std::string> 
rawAutoTerminatedRelationshipValues;
-                               if (autoTerminatedSequence.IsSequence()
-                                               && 
!autoTerminatedSequence.IsNull()
-                                               && 
autoTerminatedSequence.size() > 0) {
-                                       for (YAML::const_iterator relIter =
-                                                       
autoTerminatedSequence.begin();
-                                                       relIter != 
autoTerminatedSequence.end();
-                                                       ++relIter) {
-                                               std::string autoTerminatedRel =
-                                                               
relIter->as<std::string>();
-                                               
rawAutoTerminatedRelationshipValues.push_back(
-                                                               
autoTerminatedRel);
-                                       }
-                               }
-                               procCfg.autoTerminatedRelationships =
-                                               
rawAutoTerminatedRelationshipValues;
-
-                               // handle processor properties
-                               YAML::Node propertiesNode = 
procNode["Properties"];
-                               parsePropertiesNodeYaml(&propertiesNode, 
processor);
-
-                               // Take care of scheduling
-                               TimeUnit unit;
-                               if 
(Property::StringToTime(procCfg.schedulingPeriod,
-                                               schedulingPeriod, unit)
-                                               && 
Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
-                                                               
schedulingPeriod)) {
-                                       logger_->log_debug(
-                                                       "convert: 
parseProcessorNode: schedulingPeriod => [%d] ns",
-                                                       schedulingPeriod);
-                                       
processor->setSchedulingPeriodNano(schedulingPeriod);
-                               }
-
-                               if 
(Property::StringToTime(procCfg.penalizationPeriod,
-                                               penalizationPeriod, unit)
-                                               && 
Property::ConvertTimeUnitToMS(penalizationPeriod,
-                                                               unit, 
penalizationPeriod)) {
-                                       logger_->log_debug(
-                                                       "convert: 
parseProcessorNode: penalizationPeriod => [%d] ms",
-                                                       penalizationPeriod);
-                                       
processor->setPenalizationPeriodMsec(penalizationPeriod);
-                               }
-
-                               if (Property::StringToTime(procCfg.yieldPeriod, 
yieldPeriod,
-                                               unit)
-                                               && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit,
-                                                               yieldPeriod)) {
-                                       logger_->log_debug(
-                                                       "convert: 
parseProcessorNode: yieldPeriod => [%d] ms",
-                                                       yieldPeriod);
-                                       
processor->setYieldPeriodMsec(yieldPeriod);
-                               }
-
-                               // Default to running
-                               processor->setScheduledState(RUNNING);
-
-                               if (procCfg.schedulingStrategy == 
"TIMER_DRIVEN") {
-                                       
processor->setSchedulingStrategy(TIMER_DRIVEN);
-                                       logger_->log_debug("setting scheduling 
strategy as %s",
-                                                       
procCfg.schedulingStrategy.c_str());
-                               } else if (procCfg.schedulingStrategy == 
"EVENT_DRIVEN") {
-                                       
processor->setSchedulingStrategy(EVENT_DRIVEN);
-                                       logger_->log_debug("setting scheduling 
strategy as %s",
-                                                       
procCfg.schedulingStrategy.c_str());
-                               } else {
-                                       
processor->setSchedulingStrategy(CRON_DRIVEN);
-                                       logger_->log_debug("setting scheduling 
strategy as %s",
-                                                       
procCfg.schedulingStrategy.c_str());
-
-                               }
-
-                               int64_t maxConcurrentTasks;
-                               if 
(Property::StringToInt(procCfg.maxConcurrentTasks,
-                                               maxConcurrentTasks)) {
-                                       logger_->log_debug(
-                                                       "parseProcessorNode: 
maxConcurrentTasks => [%d]",
-                                                       maxConcurrentTasks);
-                                       
processor->setMaxConcurrentTasks(maxConcurrentTasks);
-                               }
-
-                               if 
(Property::StringToInt(procCfg.runDurationNanos,
-                                               runDurationNanos)) {
-                                       logger_->log_debug(
-                                                       "parseProcessorNode: 
runDurationNanos => [%d]",
-                                                       runDurationNanos);
-                                       
processor->setRunDurationNano(runDurationNanos);
-                               }
-
-                               std::set<Relationship> 
autoTerminatedRelationships;
-                               for (auto &&relString : 
procCfg.autoTerminatedRelationships) {
-                                       Relationship relationship(relString, 
"");
-                                       logger_->log_debug(
-                                                       "parseProcessorNode: 
autoTerminatedRelationship  => [%s]",
-                                                       relString.c_str());
-                                       
autoTerminatedRelationships.insert(relationship);
-                               }
-
-                               processor->setAutoTerminatedRelationships(
-                                               autoTerminatedRelationships);
-
-                               parentGroup->addProcessor(processor);
-                       }
-               }
-       } else {
-               throw new std::invalid_argument(
-                               "Cannot instantiate a MiNiFi instance without a 
defined Processors configuration node.");
-       }
-}
-
-void FlowControllerImpl::parseRemoteProcessGroupYaml(YAML::Node *rpgNode,
-               ProcessGroup *parentGroup) {
-       uuid_t uuid;
-
-       if (!parentGroup) {
-               logger_->log_error(
-                               "parseRemoteProcessGroupYaml: no parent group 
exists");
-               return;
-       }
-
-       if (rpgNode) {
-               if (rpgNode->IsSequence()) {
-                       for (YAML::const_iterator iter = rpgNode->begin();
-                                       iter != rpgNode->end(); ++iter) {
-                               YAML::Node rpgNode = iter->as<YAML::Node>();
-
-                               auto name = rpgNode["name"].as<std::string>();
-                               auto id = rpgNode["id"].as<std::string>();
-
-                               
logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]",
-                                               name.c_str(), id.c_str());
-
-                               std::string url = 
rpgNode["url"].as<std::string>();
-                               
logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]",
-                                               url.c_str());
-
-                               std::string timeout = 
rpgNode["timeout"].as<std::string>();
-                               logger_->log_debug(
-                                               "parseRemoteProcessGroupYaml: 
timeout => [%s]",
-                                               timeout.c_str());
-
-                               std::string yieldPeriod =
-                                               rpgNode["yield 
period"].as<std::string>();
-                               logger_->log_debug(
-                                               "parseRemoteProcessGroupYaml: 
yield period => [%s]",
-                                               yieldPeriod.c_str());
-
-                               YAML::Node inputPorts = rpgNode["Input 
Ports"].as<YAML::Node>();
-                               YAML::Node outputPorts =
-                                               rpgNode["Output 
Ports"].as<YAML::Node>();
-                               ProcessGroup *group = NULL;
-
-                               uuid_parse(id.c_str(), uuid);
-
-                               int64_t timeoutValue = -1;
-                               int64_t yieldPeriodValue = -1;
-
-                               group = 
this->createRemoteProcessGroup(name.c_str(), uuid);
-                               group->setParent(parentGroup);
-                               parentGroup->addProcessGroup(group);
-
-                               TimeUnit unit;
-
-                               if (Property::StringToTime(yieldPeriod, 
yieldPeriodValue, unit)
-                                               && 
Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
-                                                               
yieldPeriodValue) && group) {
-                                       logger_->log_debug(
-                                                       
"parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
-                                                       yieldPeriodValue);
-                                       
group->setYieldPeriodMsec(yieldPeriodValue);
-                               }
-
-                               if (Property::StringToTime(timeout, 
timeoutValue, unit)
-                                               && 
Property::ConvertTimeUnitToMS(timeoutValue, unit,
-                                                               timeoutValue) 
&& group) {
-                                       logger_->log_debug(
-                                                       
"parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
-                                                       timeoutValue);
-                                       group->setTimeOut(timeoutValue);
-                               }
-
-                               group->setTransmitting(true);
-                               group->setURL(url);
-
-                               if (inputPorts && inputPorts.IsSequence()) {
-                                       for (YAML::const_iterator portIter = 
inputPorts.begin();
-                                                       portIter != 
inputPorts.end(); ++portIter) {
-                                               logger_->log_debug("Got a 
current port, iterating...");
-
-                                               YAML::Node currPort = 
portIter->as<YAML::Node>();
-
-                                               this->parsePortYaml(&currPort, 
group, SEND);
-                                       } // for node
-                               }
-                               if (outputPorts && outputPorts.IsSequence()) {
-                                       for (YAML::const_iterator portIter = 
outputPorts.begin();
-                                                       portIter != 
outputPorts.end(); ++portIter) {
-                                               logger_->log_debug("Got a 
current port, iterating...");
-
-                                               YAML::Node currPort = 
portIter->as<YAML::Node>();
-
-                                               this->parsePortYaml(&currPort, 
group, RECEIVE);
-                                       } // for node
-                               }
-
-                       }
-               }
-       }
-}
-
-void FlowControllerImpl::parseConnectionYaml(YAML::Node *connectionsNode,
-               ProcessGroup *parent) {
-       uuid_t uuid;
-       Connection *connection = NULL;
-
-       if (!parent) {
-               logger_->log_error("parseProcessNode: no parent group was 
provided");
-               return;
-       }
-
-       if (connectionsNode) {
-
-               if (connectionsNode->IsSequence()) {
-                       for (YAML::const_iterator iter = 
connectionsNode->begin();
-                                       iter != connectionsNode->end(); ++iter) 
{
-
-                               YAML::Node connectionNode = 
iter->as<YAML::Node>();
-
-                               std::string name = 
connectionNode["name"].as<std::string>();
-                               std::string id = 
connectionNode["id"].as<std::string>();
-                               std::string destId = 
connectionNode["destination id"].as<
-                                               std::string>();
-
-                               uuid_parse(id.c_str(), uuid);
-
-                               logger_->log_debug(
-                                               "Created connection with UUID 
%s and name %s", id.c_str(),
-                                               name.c_str());
-                               connection = this->createConnection(name, uuid);
-                               auto rawRelationship =
-                                               connectionNode["source 
relationship name"].as<
-                                                               std::string>();
-                               Relationship relationship(rawRelationship, "");
-                               logger_->log_debug("parseConnection: 
relationship => [%s]",
-                                               rawRelationship.c_str());
-                               if (connection)
-                                       
connection->setRelationship(relationship);
-                               std::string connectionSrcProcId =
-                                               connectionNode["source 
id"].as<std::string>();
-                               uuid_t srcUUID;
-                               uuid_parse(connectionSrcProcId.c_str(), 
srcUUID);
-
-                               Processor *srcProcessor = 
this->_root->findProcessor(
-                                               srcUUID);
-
-                               if (!srcProcessor) {
-                                       logger_->log_error(
-                                                       "Could not locate a 
source with id %s to create a connection",
-                                                       
connectionSrcProcId.c_str());
-                                       throw std::invalid_argument(
-                                                       "Could not locate a 
source with id %s to create a connection "
-                                                                       + 
connectionSrcProcId);
-                               }
-
-                               uuid_t destUUID;
-                               uuid_parse(destId.c_str(), destUUID);
-                               Processor *destProcessor = 
this->_root->findProcessor(destUUID);
-                               // If we could not find name, try by UUID
-                               if (!destProcessor) {
-                                       uuid_t destUuid;
-                                       uuid_parse(destId.c_str(), destUuid);
-                                       destProcessor = 
this->_root->findProcessor(destUuid);
-                               }
-                               if (destProcessor) {
-                                       std::string destUuid = 
destProcessor->getUUIDStr();
-                               }
-
-                               uuid_t srcUuid;
-                               uuid_t destUuid;
-                               srcProcessor->getUUID(srcUuid);
-                               connection->setSourceProcessorUUID(srcUuid);
-                               destProcessor->getUUID(destUuid);
-                               
connection->setDestinationProcessorUUID(destUuid);
-
-                               if (connection) {
-                                       parent->addConnection(connection);
-                               }
-                       }
-               }
-
-               if (connection)
-                       parent->addConnection(connection);
-
-               return;
-       }
-}
-
-void FlowControllerImpl::parsePortYaml(YAML::Node *portNode,
-               ProcessGroup *parent, TransferDirection direction) {
-       uuid_t uuid;
-       Processor *processor = NULL;
-       RemoteProcessorGroupPort *port = NULL;
-
-       if (!parent) {
-               logger_->log_error("parseProcessNode: no parent group existed");
-               return;
-       }
-
-       YAML::Node inputPortsObj = portNode->as<YAML::Node>();
-
-       // generate the random UIID
-       uuid_generate(uuid);
-
-       auto portId = inputPortsObj["id"].as<std::string>();
-       auto nameStr = inputPortsObj["name"].as<std::string>();
-       uuid_parse(portId.c_str(), uuid);
-
-       port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
-
-       processor = (Processor *) port;
-       port->setDirection(direction);
-       port->setTimeOut(parent->getTimeOut());
-       port->setTransmitting(true);
-       processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-       processor->initialize();
-
-       // handle port properties
-       YAML::Node nodeVal = portNode->as<YAML::Node>();
-       YAML::Node propertiesNode = nodeVal["Properties"];
-
-       parsePropertiesNodeYaml(&propertiesNode, processor);
-
-       // add processor to parent
-       parent->addProcessor(processor);
-       processor->setScheduledState(RUNNING);
-       auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<
-                       std::string>();
-       int64_t maxConcurrentTasks;
-       if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
-               processor->setMaxConcurrentTasks(maxConcurrentTasks);
-       }
-       logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
-                       maxConcurrentTasks);
-       processor->setMaxConcurrentTasks(maxConcurrentTasks);
+void FlowController::waitUnload(const uint64_t timeToWaitMs) {
+  if (running_) {
+    // use the current time and increment with the provided argument.
+    std::chrono::system_clock::time_point wait_time =
+        std::chrono::system_clock::now()
+            + std::chrono::milliseconds(timeToWaitMs);
+
+    // create an asynchronous future.
+    std::future<void> unload_task = std::async(std::launch::async,
+                                               [this]() {unload();});
+
+    if (std::future_status::ready == unload_task.wait_until(wait_time)) {
+      running_ = false;
+    }
 
+  }
 }
 
-void FlowControllerImpl::parsePropertiesNodeYaml(YAML::Node *propertiesNode,
-               Processor *processor) {
-       // Treat generically as a YAML node so we can perform inspection on 
entries to ensure they are populated
-       for (YAML::const_iterator propsIter = propertiesNode->begin();
-                       propsIter != propertiesNode->end(); ++propsIter) {
-               std::string propertyName = propsIter->first.as<std::string>();
-               YAML::Node propertyValueNode = propsIter->second;
-               if (!propertyValueNode.IsNull() && 
propertyValueNode.IsDefined()) {
-                       std::string rawValueString = 
propertyValueNode.as<std::string>();
-                       if (!processor->setProperty(propertyName, 
rawValueString)) {
-                               logger_->log_warn(
-                                               "Received property %s with 
value %s but is not one of the properties for %s",
-                                               propertyName.c_str(), 
rawValueString.c_str(),
-                                               processor->getName().c_str());
-                       }
-               }
-       }
+void FlowController::unload() {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (running_) {
+    stop(true);
+  }
+  if (initialized_) {
+    logger_->log_info("Unload Flow Controller");
+    root_ = nullptr;
+    initialized_ = false;
+    name_ = "";
+  }
+
+  return;
 }
-#endif /* ifdef YAML_SUPPORT */
 
-void FlowControllerImpl::load() {
-    if (_running) {
-        stop(true);
-    }
-    if (!_initialized) {
-        logger_->log_info("Load Flow Controller from file %s", 
_configurationFileName.c_str());
-
-#ifdef YAML_SUPPORT
-               YAML::Node flow = YAML::LoadFile(_configurationFileName);
-
-               YAML::Node flowControllerNode = flow["Flow Controller"];
-               YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
-               YAML::Node connectionsNode = flow["Connections"];
-               YAML::Node remoteProcessingGroupNode = flow["Remote Processing 
Groups"];
+void FlowController::load() {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (running_) {
+    stop(true);
+  }
+  if (!initialized_) {
+    logger_->log_info("Load Flow Controller from file %s",
+                      configuration_filename_.c_str());
 
-               // Create the root process group
-               parseRootProcessGroupYaml(flowControllerNode);
-               parseProcessorNodeYaml(processorsNode, this->_root);
-               parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, 
this->_root);
-               parseConnectionYaml(&connectionsNode, this->_root);
+    this->root_ = flow_configuration_->getRoot(configuration_filename_);
 
-               // Load Flow File from Repo
-               loadFlowRepo();
-#endif
+    // Load Flow File from Repo
+    loadFlowRepo();
 
-               _initialized = true;
-    }
+    initialized_ = true;
+  }
 }
 
-void FlowControllerImpl::loadFlowRepo()
-{
-       if (this->_flowfileRepo && this->_flowfileRepo->isEnable())
-       {
-               std::map<std::string, Connection *> connectionMap;
-               this->_root->getConnections(&connectionMap);
-               this->_flowfileRepo->loadFlowFileToConnections(&connectionMap);
-       }
-}
-
-void FlowControllerImpl::reload(std::string yamlFile)
-{
-    logger_->log_info("Starting to reload Flow Controller with yaml %s", 
yamlFile.c_str());
+void FlowController::reload(std::string yamlFile) {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  logger_->log_info("Starting to reload Flow Controller with yaml %s",
+                    yamlFile.c_str());
+  stop(true);
+  unload();
+  std::string oldYamlFile = this->configuration_filename_;
+  this->configuration_filename_ = yamlFile;
+  load();
+  start();
+  if (this->root_ != nullptr) {
+    this->configuration_filename_ = oldYamlFile;
+    logger_->log_info("Rollback Flow Controller to YAML %s",
+                      oldYamlFile.c_str());
     stop(true);
     unload();
-    std::string oldYamlFile = this->_configurationFileName;
-    this->_configurationFileName = yamlFile;
     load();
     start();
-       if (!this->_root)
-       {
-        this->_configurationFileName = oldYamlFile;
-        logger_->log_info("Rollback Flow Controller to YAML %s", 
oldYamlFile.c_str());
-        stop(true);
-        unload();
-        load();
-        start();
+  }
+}
+
+void FlowController::loadFlowRepo() {
+  if (this->flow_file_repo_) {
+    std::map<std::string, std::shared_ptr<Connection>> connectionMap;
+    if (this->root_ != nullptr) {
+      this->root_->getConnections(connectionMap);
     }
+    auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>(
+        flow_file_repo_);
+    rep->loadFlowFileToConnections(connectionMap);
+  }
 }
 
-bool FlowControllerImpl::start() {
-       if (!_initialized) {
-               logger_->log_error(
-                               "Can not start Flow Controller because it has 
not been initialized");
-               return false;
-       } else {
-
-               if (!_running) {
-                       logger_->log_info("Starting Flow Controller");
-                       this->_timerScheduler.start();
-                       this->_eventScheduler.start();
-                       if (this->_root)
-                               
this->_root->startProcessing(&this->_timerScheduler,
-                                               &this->_eventScheduler);
-                       _running = true;
-                       this->_protocol->start();
-                       this->_provenanceRepo->start();
-                       this->_flowfileRepo->start();
-                       logger_->log_info("Started Flow Controller");
-               }
-               return true;
-       }
+bool FlowController::start() {
+  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  if (!initialized_) {
+    logger_->log_error(
+        "Can not start Flow Controller because it has not been initialized");
+    return false;
+  } else {
+
+    if (!running_) {
+      logger_->log_info("Starting Flow Controller");
+      this->_timerScheduler.start();
+      this->_eventScheduler.start();
+      if (this->root_ != nullptr) {
+        this->root_->startProcessing(&this->_timerScheduler,
+                                     &this->_eventScheduler);
+      }
+      running_ = true;
+      this->protocol_->start();
+      this->provenance_repo_->start();
+      this->flow_file_repo_->start();
+      logger_->log_info("Started Flow Controller");
+    }
+    return true;
+  }
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index a2f2323..7383574 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -27,253 +27,339 @@
 #include <cstdio>
 
 #include "FlowFileRecord.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-#include "FlowFileRepository.h"
-
-std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
-
-FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, 
ResourceClaim *claim)
-: _size(0),
-  _id(_localFlowSeqNumber.load()),
-  _offset(0),
-  _penaltyExpirationMs(0),
-  _claim(claim),
-  _isStoredToRepo(false),
-  _markedDelete(false),
-  _connection(NULL),
-  _orginalConnection(NULL)
-{
-       _entryDate = getTimeMillis();
-       _lineageStartDate = _entryDate;
-
-       char uuidStr[37];
-
-       // Generate the global UUID for the flow record
-       uuid_generate(_uuid);
-       // Increase the local ID for the flow record
-       ++_localFlowSeqNumber;
-       uuid_unparse_lower(_uuid, uuidStr);
-       _uuidStr = uuidStr;
-
-       // Populate the default attributes
-    addAttribute(FILENAME, std::to_string(getTimeNano()));
-    addAttribute(PATH, DEFAULT_FLOWFILE_PATH);
-    addAttribute(UUID, uuidStr);
-       // Populate the attributes from the input
-    std::map<std::string, std::string>::iterator it;
-    for (it = attributes.begin(); it!= attributes.end(); it++)
-    {
-       addAttribute(it->first, it->second);
-    }
+#include "core/logging/Logger.h"
+#include "core/Relationship.h"
+#include "core/Repository.h"
 
-    _snapshot = false;
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
-       if (_claim)
-               // Increase the flow file record owned count for the resource 
claim
-               _claim->increaseFlowFileRecordOwnedCount();
-       logger_ = Logger::getLogger();
-}
+std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0);
 
-FlowFileRecord::FlowFileRecord(FlowFileEventRecord *event)
-: _size(0),
-  _id(_localFlowSeqNumber.load()),
-  _offset(0),
-  _penaltyExpirationMs(0),
-  _claim(NULL),
-  _isStoredToRepo(false),
-  _markedDelete(false),
-  _connection(NULL),
-  _orginalConnection(NULL)
-{
-       _entryDate = event->getFlowFileEntryDate();
-       _lineageStartDate = event->getlineageStartDate();
-       _size = event->getFileSize();
-       _offset = event->getFileOffset();
-       _lineageIdentifiers = event->getLineageIdentifiers();
-       _attributes = event->getAttributes();
-    _snapshot = false;
-    _uuidStr = event->getFlowFileUuid();
-    uuid_parse(_uuidStr.c_str(), _uuid);
-
-    if (_size > 0)
-    {
-       _claim = new ResourceClaim();
-    }
+FlowFileRecord::FlowFileRecord(
+    std::shared_ptr<core::Repository> flow_repository,
+    std::map<std::string, std::string> attributes,
+    std::shared_ptr<ResourceClaim> claim)
+    : FlowFile(),
+      flow_repository_(flow_repository) {
+
+  id_ = local_flow_seq_number_.load();
+  claim_ = claim;
+  // Increase the local ID for the flow record
+  ++local_flow_seq_number_;
+  // Populate the default attributes
+  addKeyedAttribute(FILENAME, std::to_string(getTimeNano()));
+  addKeyedAttribute(PATH, DEFAULT_FLOWFILE_PATH);
+  addKeyedAttribute(UUID, getUUIDStr());
+  // Populate the attributes from the input
+  std::map<std::string, std::string>::iterator it;
+  for (it = attributes.begin(); it != attributes.end(); it++) {
+    FlowFile::addAttribute(it->first, it->second);
+  }
+
+  snapshot_ = false;
 
-       if (_claim)
-       {
-               _claim->setContentFullPath(event->getContentFullPath());
-               // Increase the flow file record owned count for the resource 
claim
-               _claim->increaseFlowFileRecordOwnedCount();
+  if (claim_ != nullptr)
+    // Increase the flow file record owned count for the resource claim
+    claim_->increaseFlowFileRecordOwnedCount();
+  logger_ = logging::Logger::getLogger();
+}
+
+FlowFileRecord::FlowFileRecord(
+    std::shared_ptr<core::Repository> flow_repository,
+    std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection)
+    : FlowFile(),
+      snapshot_(""),
+      flow_repository_(flow_repository) {
+       entry_date_ = event->getEntryDate();
+       lineage_start_date_ = event->getlineageStartDate();
+       lineage_Identifiers_ = event->getlineageIdentifiers();
+       uuid_str_ = event->getUUIDStr();
+       attributes_ = event->getAttributes();
+       size_ = event->getSize();
+       offset_ = event->getOffset();
+       event->getUUID(uuid_);
+       uuid_connection_ = uuidConnection;
+       if (event->getResourceClaim()) {
+         content_full_fath_ = event->getResourceClaim()->getContentFullPath();
        }
-       logger_ = Logger::getLogger();
-       ++_localFlowSeqNumber;
 }
 
+FlowFileRecord::FlowFileRecord(
+    std::shared_ptr<core::Repository> flow_repository,
+    std::shared_ptr<core::FlowFile> &event)
+    : FlowFile(),
+      uuid_connection_(""),
+      snapshot_(""),
+      flow_repository_(flow_repository) {
 
-FlowFileRecord::~FlowFileRecord()
-{
-       if (!_snapshot)
-               logger_->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str());
-       else
-               logger_->log_debug("Delete SnapShot FlowFile UUID %s", 
_uuidStr.c_str());
-       if (_claim)
-       {
-               // Decrease the flow file record owned count for the resource 
claim
-               _claim->decreaseFlowFileRecordOwnedCount();
-               if (_claim->getFlowFileRecordOwnedCount() <= 0)
-               {
-                       logger_->log_debug("Delete Resource Claim %s", 
_claim->getContentFullPath().c_str());
-                       std::string value;
-                       if 
(!FlowControllerFactory::getFlowController()->getFlowFileRepository() ||
-                                       
!FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable()
 ||
-                                       !this->_isStoredToRepo ||
-                                       
!FlowControllerFactory::getFlowController()->getFlowFileRepository()->Get(_uuidStr,
 value))
-                       {
-                               // if it is persistent to DB already while it 
is in the queue, we keep the content
-                               
std::remove(_claim->getContentFullPath().c_str());
-                       }
-                       delete _claim;
-               }
-       }
 }
 
-bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return addAttribute(keyString, value);
-       }
-       else
-       {
-               return false;
-       }
+FlowFileRecord::~FlowFileRecord() {
+  if (!snapshot_)
+    logger_->log_debug("Delete FlowFile UUID %s", uuid_str_.c_str());
+  else
+    logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuid_str_.c_str());
+  if (claim_) {
+    // Decrease the flow file record owned count for the resource claim
+    claim_->decreaseFlowFileRecordOwnedCount();
+    std::string value;
+    if (claim_->getFlowFileRecordOwnedCount() <= 0) {
+      logger_->log_debug("Delete Resource Claim %s",
+                         claim_->getContentFullPath().c_str());
+      if (!this->stored || !flow_repository_->Get(uuid_str_, value)) {
+        std::remove(claim_->getContentFullPath().c_str());
+      }
+    }
+  }
 }
 
-bool FlowFileRecord::addAttribute(std::string key, std::string value)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               // attribute already there in the map
-               return false;
-       }
-       else
-       {
-               _attributes[key] = value;
-               return true;
-       }
+bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, std::string value) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    const std::string keyString = keyStr;
+    return FlowFile::addAttribute(keyString, value);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::removeAttribute(FlowAttribute key)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return removeAttribute(keyString);
-       }
-       else
-       {
-               return false;
-       }
+bool FlowFileRecord::removeKeyedAttribute(FlowAttribute key) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    std::string keyString = keyStr;
+    return FlowFile::removeAttribute(keyString);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::removeAttribute(std::string key)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               _attributes.erase(key);
-               return true;
-       }
-       else
-       {
-               return false;
-       }
+bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key,
+                                          std::string value) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    std::string keyString = keyStr;
+    return FlowFile::updateAttribute(keyString, value);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return updateAttribute(keyString, value);
-       }
-       else
-       {
-               return false;
-       }
+bool FlowFileRecord::getKeyedAttribute(FlowAttribute key, std::string &value) {
+  const char *keyStr = FlowAttributeKey(key);
+  if (keyStr) {
+    std::string keyString = keyStr;
+    return FlowFile::getAttribute(keyString, value);
+  } else {
+    return false;
+  }
 }
 
-bool FlowFileRecord::updateAttribute(std::string key, std::string value)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               _attributes[key] = value;
-               return true;
-       }
-       else
-       {
-               return false;
-       }
+FlowFileRecord &FlowFileRecord::operator=(const FlowFileRecord &other) {
+  core::FlowFile::operator=(other);
+  uuid_connection_ = other.uuid_connection_;
+  content_full_fath_ = other.content_full_fath_;
+  snapshot_ = other.snapshot_;
+  return *this;
 }
 
-bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return getAttribute(keyString, value);
-       }
-       else
-       {
-               return false;
-       }
+bool FlowFileRecord::DeSerialize(std::string key) {
+  std::string value;
+  bool ret;
+
+  ret = flow_repository_->Get(key, value);
+
+  if (!ret) {
+    logger_->log_error("NiFi FlowFile Store event %s can not found",
+                       key.c_str());
+    return false;
+  } else
+    logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(),
+                       value.length());
+
+  io::DataStream stream((const uint8_t*) value.data(), value.length());
+
+  ret = DeSerialize(stream);
+
+  if (ret) {
+    logger_->log_debug(
+        "NiFi FlowFile retrieve uuid %s size %d connection %s success",
+        uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+  } else {
+    logger_->log_debug(
+        "NiFi FlowFile retrieve uuid %s size %d connection %d fail",
+        uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+  }
+
+  return ret;
 }
 
-bool FlowFileRecord::getAttribute(std::string key, std::string &value)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               value = it->second;
-               return true;
-       }
-       else
-       {
-               return false;
-       }
+bool FlowFileRecord::Serialize() {
+
+  io::DataStream outStream;
+
+  int ret;
+
+  ret = write(this->event_time_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  ret = write(this->entry_date_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = write(this->lineage_start_date_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  ret = writeUTF(this->uuid_str_, &outStream);
+  if (ret <= 0) {
+
+    return false;
+  }
+
+  ret = writeUTF(this->uuid_connection_, &outStream);
+  if (ret <= 0) {
+
+    return false;
+  }
+  // write flow attributes
+  uint32_t numAttributes = this->attributes_.size();
+  ret = write(numAttributes, &outStream);
+  if (ret != 4) {
+
+    return false;
+  }
+
+  for (auto itAttribute : attributes_) {
+    ret = writeUTF(itAttribute.first, &outStream, true);
+    if (ret <= 0) {
+
+      return false;
+    }
+    ret = writeUTF(itAttribute.second, &outStream, true);
+    if (ret <= 0) {
+
+      return false;
+    }
+  }
+
+  ret = writeUTF(this->content_full_fath_, &outStream);
+  if (ret <= 0) {
+
+    return false;
+  }
+
+  ret = write(this->size_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  ret = write(this->offset_, &outStream);
+  if (ret != 8) {
+
+    return false;
+  }
+
+  // Persistent to the DB
+  
+
+  if (flow_repository_->Put(uuid_str_,
+                            const_cast<uint8_t*>(outStream.getBuffer()),
+                            outStream.getSize())) {
+    logger_->log_debug("NiFi FlowFile Store event %s size %d success",
+                       uuid_str_.c_str(), outStream.getSize());
+    return true;
+  } else {
+    logger_->log_error("NiFi FlowFile Store event %s size %d fail",
+                       uuid_str_.c_str(), outStream.getSize());
+    return false;
+  }
+
+  // cleanup
+
+  return true;
 }
 
-void FlowFileRecord::duplicate(FlowFileRecord *original)
-{
-       uuid_copy(this->_uuid, original->_uuid);
-       this->_attributes = original->_attributes;
-       this->_entryDate = original->_entryDate;
-       this->_id = original->_id;
-       this->_lastQueueDate = original->_lastQueueDate;
-       this->_lineageStartDate = original->_lineageStartDate;
-       this->_offset = original->_offset;
-       this->_penaltyExpirationMs = original->_penaltyExpirationMs;
-       this->_size = original->_size;
-       this->_lineageIdentifiers = original->_lineageIdentifiers;
-       this->_orginalConnection = original->_orginalConnection;
-       this->_uuidStr = original->_uuidStr;
-       this->_connection = original->_connection;
-       this->_markedDelete = original->_markedDelete;
-
-       this->_claim = original->_claim;
-       if (this->_claim)
-               this->_claim->increaseFlowFileRecordOwnedCount();
-
-       this->_snapshot = true;
+bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
+
+  int ret;
+
+  io::DataStream outStream(buffer, bufferSize);
+
+  ret = read(this->event_time_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = read(this->entry_date_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = read(this->lineage_start_date_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = readUTF(this->uuid_str_, &outStream);
+  if (ret <= 0) {
+    return false;
+  }
+
+  ret = readUTF(this->uuid_connection_, &outStream);
+  if (ret <= 0) {
+    return false;
+  }
+
+  // read flow attributes
+  uint32_t numAttributes = 0;
+  ret = read(numAttributes, &outStream);
+  if (ret != 4) {
+    return false;
+  }
+
+  for (uint32_t i = 0; i < numAttributes; i++) {
+    std::string key;
+    ret = readUTF(key, &outStream, true);
+    if (ret <= 0) {
+      return false;
+    }
+    std::string value;
+    ret = readUTF(value, &outStream, true);
+    if (ret <= 0) {
+      return false;
+    }
+    this->attributes_[key] = value;
+  }
+
+  ret = readUTF(this->content_full_fath_, &outStream);
+  if (ret <= 0) {
+    return false;
+  }
+
+  ret = read(this->size_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  ret = read(this->offset_, &outStream);
+  if (ret != 8) {
+    return false;
+  }
+
+  return true;
 }
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRepository.cpp 
b/libminifi/src/FlowFileRepository.cpp
deleted file mode 100644
index 3388738..0000000
--- a/libminifi/src/FlowFileRepository.cpp
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * @file FlowFileRepository.cpp
- * FlowFile implemenatation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <cstdint>
-#include <vector>
-#include <arpa/inet.h>
-#include "io/DataStream.h"
-#include "io/Serializable.h"
-#include "FlowFileRecord.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-#include "FlowFileRepository.h"
-
-//! DeSerialize
-bool FlowFileEventRecord::DeSerialize(FlowFileRepository *repo,
-               std::string key) {
-       std::string value;
-       bool ret;
-
-       ret = repo->Get(key, value);
-
-       if (!ret) {
-               logger_->log_error("NiFi FlowFile Store event %s can not found",
-                               key.c_str());
-               return false;
-       } else
-               logger_->log_debug("NiFi FlowFile Read event %s length %d",
-                               key.c_str(), value.length());
-
-
-       DataStream stream((const uint8_t*)value.data(),value.length());
-
-       ret = DeSerialize(stream);
-
-       if (ret) {
-               logger_->log_debug(
-                               "NiFi FlowFile retrieve uuid %s size %d 
connection %s success",
-                               _uuid.c_str(), stream.getSize(), 
_uuidConnection.c_str());
-       } else {
-               logger_->log_debug(
-                               "NiFi FlowFile retrieve uuid %s size %d 
connection %d fail",
-                               _uuid.c_str(), stream.getSize(), 
_uuidConnection.c_str());
-       }
-
-       return ret;
-}
-
-bool FlowFileEventRecord::Serialize(FlowFileRepository *repo) {
-
-       DataStream outStream;
-
-       int ret;
-
-       ret = write(this->_eventTime,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = write(this->_entryDate,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = write(this->_lineageStartDate,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = writeUTF(this->_uuid,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       ret = writeUTF(this->_uuidConnection,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       // write flow attributes
-       uint32_t numAttributes = this->_attributes.size();
-       ret = write(numAttributes,&outStream);
-       if (ret != 4) {
-
-               return false;
-       }
-
-       for (auto itAttribute : _attributes) {
-               ret = writeUTF(itAttribute.first,&outStream, true);
-               if (ret <= 0) {
-
-                       return false;
-               }
-               ret = writeUTF(itAttribute.second,&outStream, true);
-               if (ret <= 0) {
-
-                       return false;
-               }
-       }
-
-       ret = writeUTF(this->_contentFullPath,&outStream);
-       if (ret <= 0) {
-
-               return false;
-       }
-
-       ret = write(this->_size,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       ret = write(this->_offset,&outStream);
-       if (ret != 8) {
-
-               return false;
-       }
-
-       // Persistent to the DB
-
-       if (repo->Put(_uuid, const_cast<uint8_t*>(outStream.getBuffer()), 
outStream.getSize())) {
-               logger_->log_debug("NiFi FlowFile Store event %s size %d 
success",
-                               _uuid.c_str(), outStream.getSize());
-               return true;
-       } else {
-               logger_->log_error("NiFi FlowFile Store event %s size %d fail",
-                               _uuid.c_str(), outStream.getSize());
-               return false;
-       }
-
-       // cleanup
-
-       return true;
-}
-
-bool FlowFileEventRecord::DeSerialize(const uint8_t *buffer, const int 
bufferSize) {
-
-       int ret;
-
-       DataStream outStream(buffer,bufferSize);
-
-       ret = read(this->_eventTime,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = read(this->_entryDate,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = read(this->_lineageStartDate,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = readUTF(this->_uuid,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       ret = readUTF(this->_uuidConnection,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       // read flow attributes
-       uint32_t numAttributes = 0;
-       ret = read(numAttributes,&outStream);
-       if (ret != 4) {
-               return false;
-       }
-
-       for (uint32_t i = 0; i < numAttributes; i++) {
-               std::string key;
-               ret = readUTF(key,&outStream, true);
-               if (ret <= 0) {
-                       return false;
-               }
-               std::string value;
-               ret = readUTF(value,&outStream, true);
-               if (ret <= 0) {
-                       return false;
-               }
-               this->_attributes[key] = value;
-       }
-
-       ret = readUTF(this->_contentFullPath,&outStream);
-       if (ret <= 0) {
-               return false;
-       }
-
-       ret = read(this->_size,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       ret = read(this->_offset,&outStream);
-       if (ret != 8) {
-               return false;
-       }
-
-       return true;
-}
-
-void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, 
Connection *> *connectionMap)
-{
-#ifdef LEVELDB_SUPPORT
-       if (!_enable)
-               return;
-
-       std::vector<std::string> purgeList;
-       leveldb::Iterator* it = _db->NewIterator(
-                                               leveldb::ReadOptions());
-
-       for (it->SeekToFirst(); it->Valid(); it->Next())
-       {
-               FlowFileEventRecord eventRead;
-               std::string key = it->key().ToString();
-               if (eventRead.DeSerialize((uint8_t *) it->value().data(),
-                               (int) it->value().size()))
-               {
-                       auto search = 
connectionMap->find(eventRead.getConnectionUuid());
-                       if (search != connectionMap->end())
-                       {
-                               // we find the connection for the persistent 
flowfile, create the flowfile and enqueue that
-                               FlowFileRecord *record = new 
FlowFileRecord(&eventRead);
-                               // set store to repo to true so that we do need 
to persistent again in enqueue
-                               record->setStoredToRepository(true);
-                               search->second->put(record);
-                       }
-                       else
-                       {
-                               if (eventRead.getContentFullPath().length() > 0)
-                               {
-                                       
std::remove(eventRead.getContentFullPath().c_str());
-                               }
-                               purgeList.push_back(key);
-                       }
-               }
-               else
-               {
-                       purgeList.push_back(key);
-               }
-       }
-
-       delete it;
-       std::vector<std::string>::iterator itPurge;
-       for (itPurge = purgeList.begin(); itPurge != purgeList.end();
-                                               itPurge++)
-       {
-               std::string eventId = *itPurge;
-               logger_->log_info("Repository Repo %s Purge %s",
-                                                                               
RepositoryTypeStr[_type],
-                                                                               
eventId.c_str());
-               Delete(eventId);
-       }
-#endif
-
-       return;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GenerateFlowFile.cpp 
b/libminifi/src/GenerateFlowFile.cpp
deleted file mode 100644
index 12d7f70..0000000
--- a/libminifi/src/GenerateFlowFile.cpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * @file GenerateFlowFile.cpp
- * GenerateFlowFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include "utils/StringUtils.h"
-
-#include "GenerateFlowFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
-const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
-Property GenerateFlowFile::FileSize("File Size", "The size of the file that 
will be used", "1 kB");
-Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to 
be transferred in each invocation", "1");
-Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the 
data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
-Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
-               "If true, each FlowFile that is generated will be unique. If 
false, a random value will be generated and all FlowFiles", "true");
-Relationship GenerateFlowFile::Success("success", "success operational on the 
flow record");
-
-void GenerateFlowFile::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(FileSize);
-       properties.insert(BatchSize);
-       properties.insert(DataFormat);
-       properties.insert(UniqueFlowFiles);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession 
*session)
-{
-       int64_t batchSize = 1;
-       bool uniqueFlowFile = true;
-       int64_t fileSize = 1024;
-
-       std::string value;
-       if (context->getProperty(FileSize.getName(), value))
-       {
-               Property::StringToInt(value, fileSize);
-       }
-       if (context->getProperty(BatchSize.getName(), value))
-       {
-               Property::StringToInt(value, batchSize);
-       }
-       if (context->getProperty(UniqueFlowFiles.getName(), value))
-       {
-               StringUtils::StringToBool(value, uniqueFlowFile);
-       }
-
-       if (!uniqueFlowFile)
-       {
-               char *data;
-               data = new char[fileSize];
-               if (!data)
-                       return;
-               uint64_t dataSize = fileSize;
-               GenerateFlowFile::WriteCallback callback(data, dataSize);
-               char *current = data;
-               for (int i = 0; i < fileSize; i+= sizeof(int))
-               {
-                       int randValue = random();
-                       *((int *) current) = randValue;
-                       current += sizeof(int);
-               }
-               for (int i = 0; i < batchSize; i++)
-               {
-                       // For each batch
-                       FlowFileRecord *flowFile = session->create();
-                       if (!flowFile)
-                               return;
-                       if (fileSize > 0)
-                               session->write(flowFile, &callback);
-                       session->transfer(flowFile, Success);
-               }
-               delete[] data;
-       }
-       else
-       {
-               if (!_data)
-               {
-                       // We have not create the unique data yet
-                       _data = new char[fileSize];
-                       _dataSize = fileSize;
-                       char *current = _data;
-                       for (int i = 0; i < fileSize; i+= sizeof(int))
-                       {
-                               int randValue = random();
-                               *((int *) current) = randValue;
-                               // *((int *) current) = (0xFFFFFFFF & i);
-                               current += sizeof(int);
-                       }
-               }
-               GenerateFlowFile::WriteCallback callback(_data, _dataSize);
-               for (int i = 0; i < batchSize; i++)
-               {
-                       // For each batch
-                       FlowFileRecord *flowFile = session->create();
-                       if (!flowFile)
-                               return;
-                       if (fileSize > 0)
-                               session->write(flowFile, &callback);
-                       session->transfer(flowFile, Success);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/GetFile.cpp b/libminifi/src/GetFile.cpp
deleted file mode 100644
index 40dd387..0000000
--- a/libminifi/src/GetFile.cpp
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * @file GetFile.cpp
- * GetFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <dirent.h>
-#include <limits.h>
-#include <unistd.h>
-#if  (__GNUC__ >= 4) 
-       #if (__GNUC_MINOR__ < 9)
-               #include <regex.h>
-       #endif
-#endif
-#include "utils/StringUtils.h"
-#include <regex>
-#include "utils/TimeUtil.h"
-#include "GetFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string GetFile::ProcessorName("GetFile");
-Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull 
in each iteration", "10");
-Property GetFile::Directory("Input Directory", "The input directory from which 
to pull files", ".");
-Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether 
or not hidden files should be ignored", "true");
-Property GetFile::KeepSourceFile("Keep Source File",
-               "If true, the file is not deleted after it has been copied to 
the Content Repository", "false");
-Property GetFile::MaxAge("Maximum File Age",
-               "The minimum age that a file must be in order to be pulled; any 
file younger than this amount of time (according to last modification date) 
will be ignored", "0 sec");
-Property GetFile::MinAge("Minimum File Age",
-               "The maximum age that a file must be in order to be pulled; any 
file older than this amount of time (according to last modification date) will 
be ignored", "0 sec");
-Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file 
can be in order to be pulled", "0 B");
-Property GetFile::MinSize("Minimum File Size", "The minimum size that a file 
must be in order to be pulled", "0 B");
-Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait 
before performing a directory listing", "0 sec");
-Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not 
to pull files from subdirectories", "true");
-Property GetFile::FileFilter("File Filter", "Only files whose names match the 
given regular expression will be picked up", "[^\\.].*");
-Relationship GetFile::Success("success", "All files are routed to success");
-
-void GetFile::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(BatchSize);
-       properties.insert(Directory);
-       properties.insert(IgnoreHiddenFile);
-       properties.insert(KeepSourceFile);
-       properties.insert(MaxAge);
-       properties.insert(MinAge);
-       properties.insert(MaxSize);
-       properties.insert(MinSize);
-       properties.insert(PollInterval);
-       properties.insert(Recurse);
-       properties.insert(FileFilter);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string value;
-
-       logger_->log_info("onTrigger GetFile");
-       if (context->getProperty(Directory.getName(), value))
-       {
-               _directory = value;
-       }
-       if (context->getProperty(BatchSize.getName(), value))
-       {
-               Property::StringToInt(value, _batchSize);
-       }
-       if (context->getProperty(IgnoreHiddenFile.getName(), value))
-       {
-               StringUtils::StringToBool(value, _ignoreHiddenFile);
-       }
-       if (context->getProperty(KeepSourceFile.getName(), value))
-       {
-               StringUtils::StringToBool(value, _keepSourceFile);
-       }
-
-       logger_->log_info("onTrigger GetFile");
-       if (context->getProperty(MaxAge.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _maxAge, unit) &&
-                       Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge))
-               {
-
-               }
-       }
-       if (context->getProperty(MinAge.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _minAge, unit) &&
-                       Property::ConvertTimeUnitToMS(_minAge, unit, _minAge))
-               {
-
-               }
-       }
-       if (context->getProperty(MaxSize.getName(), value))
-       {
-               Property::StringToInt(value, _maxSize);
-       }
-       if (context->getProperty(MinSize.getName(), value))
-       {
-               Property::StringToInt(value, _minSize);
-       }
-       if (context->getProperty(PollInterval.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _pollInterval, unit) &&
-                       Property::ConvertTimeUnitToMS(_pollInterval, unit, 
_pollInterval))
-               {
-
-               }
-       }
-       if (context->getProperty(Recurse.getName(), value))
-       {
-               StringUtils::StringToBool(value, _recursive);
-       }
-
-       if (context->getProperty(FileFilter.getName(), value))
-       {
-               _fileFilter = value;
-       }
-
-       // Perform directory list
-       logger_->log_info("Is listing empty %i",isListingEmpty());
-       if (isListingEmpty())
-       {
-               if (_pollInterval == 0 || (getTimeMillis() - 
_lastDirectoryListingTime) > _pollInterval)
-               {
-                       performListing(_directory);
-               }
-       }
-       logger_->log_info("Is listing empty %i",isListingEmpty());
-
-       if (!isListingEmpty())
-       {
-               try
-               {
-                       std::queue<std::string> list;
-                       pollListing(list, _batchSize);
-                       while (!list.empty())
-                       {
-
-                               std::string fileName = list.front();
-                               list.pop();
-                               logger_->log_info("GetFile process %s", 
fileName.c_str());
-                               FlowFileRecord *flowFile = session->create();
-                               if (!flowFile)
-                                       return;
-                               std::size_t found = 
fileName.find_last_of("/\\");
-                               std::string path = fileName.substr(0,found);
-                               std::string name = fileName.substr(found+1);
-                               flowFile->updateAttribute(FILENAME, name);
-                               flowFile->updateAttribute(PATH, path);
-                               flowFile->addAttribute(ABSOLUTE_PATH, fileName);
-                               session->import(fileName, flowFile, 
_keepSourceFile);
-                               session->transfer(flowFile, Success);
-                       }
-               }
-               catch (std::exception &exception)
-               {
-                       logger_->log_debug("GetFile Caught Exception %s", 
exception.what());
-                       throw;
-               }
-               catch (...)
-               {
-                       throw;
-               }
-       }
-
-}
-
-bool GetFile::isListingEmpty()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       return _dirList.empty();
-}
-
-void GetFile::putListing(std::string fileName)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _dirList.push(fileName);
-}
-
-void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize))
-       {
-               std::string fileName = _dirList.front();
-               _dirList.pop();
-               list.push(fileName);
-       }
-
-       return;
-}
-
-bool GetFile::acceptFile(std::string fullName, std::string name)
-{
-       struct stat statbuf;
-
-       if (stat(fullName.c_str(), &statbuf) == 0)
-       {
-               if (_minSize > 0 && statbuf.st_size <_minSize)
-                       return false;
-
-               if (_maxSize > 0 && statbuf.st_size > _maxSize)
-                       return false;
-
-               uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
-               uint64_t fileAge = getTimeMillis() - modifiedTime;
-               if (_minAge > 0 && fileAge < _minAge)
-                       return false;
-               if (_maxAge > 0 && fileAge > _maxAge)
-                       return false;
-
-               if (_ignoreHiddenFile && fullName.c_str()[0] == '.')
-                       return false;
-
-               if (access(fullName.c_str(), R_OK) != 0)
-                       return false;
-
-               if (_keepSourceFile == false && access(fullName.c_str(), W_OK) 
!= 0)
-                       return false;
-
-
-       #ifdef __GNUC__ 
-               #if (__GNUC__ >= 4)
-                       #if (__GNUC_MINOR__ < 9)
-                               regex_t regex;
-                               int ret = regcomp(&regex, 
_fileFilter.c_str(),0);
-                               if (ret)
-                                       return false;
-                               ret = 
regexec(&regex,name.c_str(),(size_t)0,NULL,0);
-                               regfree(&regex);
-                               if (ret)
-                                       return false;   
-                       #else
-                               try{
-                                       std::regex re(_fileFilter);
-       
-                                       if (!std::regex_match(name, re)) {
-                                               return false;
-                                       }
-                               } catch (std::regex_error e) {
-                                       logger_->log_error("Invalid File Filter 
regex: %s.", e.what());
-                                       return false;
-                               }
-                       #endif
-               #endif
-               #else
-                       logger_->log_info("Cannot support regex filtering");
-               #endif
-               return true;
-       }
-
-       return false;
-}
-
-void GetFile::performListing(std::string dir)
-{
-       logger_->log_info("Performing file listing against %s",dir.c_str());
-       DIR *d;
-       d = opendir(dir.c_str());
-       if (!d)
-               return;
-       // only perform a listing while we are not empty
-       logger_->log_info("Performing file listing against %s",dir.c_str());
-       while (isRunning())
-       {
-               struct dirent *entry;
-               entry = readdir(d);
-               if (!entry)
-                       break;
-               std::string d_name = entry->d_name;
-               if ((entry->d_type & DT_DIR))
-               {
-                       // if this is a directory
-                       if (_recursive && strcmp(d_name.c_str(), "..") != 0 && 
strcmp(d_name.c_str(), ".") != 0)
-                       {
-                               std::string path = dir + "/" + d_name;
-                               performListing(path);
-                       }
-               }
-               else
-               {
-                       std::string fileName = dir + "/" + d_name;
-                       if (acceptFile(fileName, d_name))
-                       {
-                               // check whether we can take this file
-                               putListing(fileName);
-                       }
-               }
-       }
-       closedir(d);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenHTTP.cpp b/libminifi/src/ListenHTTP.cpp
deleted file mode 100644
index 89ce1d2..0000000
--- a/libminifi/src/ListenHTTP.cpp
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * @file ListenHTTP.cpp
- * ListenHTTP class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <uuid/uuid.h>
-
-#include <CivetServer.h>
-
-#include "ListenHTTP.h"
-
-#include "utils/TimeUtil.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include "ProcessSessionFactory.h"
-
-const std::string ListenHTTP::ProcessorName("ListenHTTP");
-
-Property ListenHTTP::BasePath("Base Path", "Base path for incoming 
connections", "contentListener");
-Property ListenHTTP::Port("Listening Port", "The Port to listen on for 
incoming connections", "");
-Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular 
Expression to apply against the Distinguished Name of incoming connections. If 
the Pattern does not match the DN, the connection will be refused.", ".*");
-Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing 
PEM-formatted file including TLS/SSL certificate and key", "");
-Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", 
"File containing trusted PEM-formatted certificates", "");
-Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to 
verify the client's certificate (yes/no)", "no");
-Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL 
version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
-Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as 
Attributes (Regex)", "Specifies the Regular Expression that determines the 
names of HTTP Headers that should be passed along as FlowFile attributes", "");
-
-Relationship ListenHTTP::Success("success", "All files are routed to success");
-
-void ListenHTTP::initialize()
-{
-       _logger->log_info("Initializing ListenHTTP");
-
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(BasePath);
-       properties.insert(Port);
-       properties.insert(AuthorizedDNPattern);
-       properties.insert(SSLCertificate);
-       properties.insert(SSLCertificateAuthority);
-       properties.insert(SSLVerifyPeer);
-       properties.insert(SSLMinimumVersion);
-       properties.insert(HeadersAsAttributesRegex);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void ListenHTTP::onSchedule(ProcessContext *context, ProcessSessionFactory 
*sessionFactory)
-{
-
-       std::string basePath;
-
-       if (!context->getProperty(BasePath.getName(), basePath))
-       {
-               _logger->log_info("%s attribute is missing, so default value of 
%s will be used",
-                               BasePath.getName().c_str(),
-                               BasePath.getValue().c_str());
-               basePath = BasePath.getValue();
-       }
-
-       basePath.insert(0, "/");
-
-       std::string listeningPort;
-
-       if (!context->getProperty(Port.getName(), listeningPort))
-       {
-               _logger->log_error("%s attribute is missing or invalid",
-                               Port.getName().c_str());
-               return;
-       }
-
-       std::string authDNPattern;
-
-       if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) 
&& !authDNPattern.empty())
-       {
-               _logger->log_info("ListenHTTP using %s: %s",
-                               AuthorizedDNPattern.getName().c_str(),
-                               authDNPattern.c_str());
-       }
-
-       std::string sslCertFile;
-
-       if (context->getProperty(SSLCertificate.getName(), sslCertFile) && 
!sslCertFile.empty())
-       {
-               _logger->log_info("ListenHTTP using %s: %s",
-                               SSLCertificate.getName().c_str(),
-                               sslCertFile.c_str());
-       }
-
-       // Read further TLS/SSL options only if TLS/SSL usage is implied by 
virtue of certificate value being set
-       std::string sslCertAuthorityFile;
-       std::string sslVerifyPeer;
-       std::string sslMinVer;
-
-       if (!sslCertFile.empty())
-       {
-               if (context->getProperty(SSLCertificateAuthority.getName(), 
sslCertAuthorityFile)
-                               && !sslCertAuthorityFile.empty())
-               {
-                       _logger->log_info("ListenHTTP using %s: %s",
-                                       
SSLCertificateAuthority.getName().c_str(),
-                                       sslCertAuthorityFile.c_str());
-               }
-
-               if (context->getProperty(SSLVerifyPeer.getName(), 
sslVerifyPeer))
-               {
-                       if (sslVerifyPeer.empty() || 
sslVerifyPeer.compare("no") == 0)
-                       {
-                               _logger->log_info("ListenHTTP will not verify 
peers");
-                       }
-                       else
-                       {
-                               _logger->log_info("ListenHTTP will verify 
peers");
-                       }
-               }
-               else
-               {
-                       _logger->log_info("ListenHTTP will not verify peers");
-               }
-
-               if (context->getProperty(SSLMinimumVersion.getName(), 
sslMinVer))
-               {
-                       _logger->log_info("ListenHTTP using %s: %s",
-                                       SSLMinimumVersion.getName().c_str(),
-                                       sslMinVer.c_str());
-               }
-       }
-
-       std::string headersAsAttributesPattern;
-
-       if 
(context->getProperty(HeadersAsAttributesRegex.getName(),headersAsAttributesPattern)
-                       && !headersAsAttributesPattern.empty())
-       {
-               _logger->log_info("ListenHTTP using %s: %s",
-                               HeadersAsAttributesRegex.getName().c_str(),
-                               headersAsAttributesPattern.c_str());
-       }
-
-       auto numThreads = getMaxConcurrentTasks();
-
-       _logger->log_info("ListenHTTP starting HTTP server on port %s and path 
%s with %d threads",
-                       listeningPort.c_str(),
-                       basePath.c_str(),
-                       numThreads);
-
-       // Initialize web server
-       std::vector<std::string> options;
-       options.push_back("enable_keep_alive");
-       options.push_back("yes");
-       options.push_back("keep_alive_timeout_ms");
-       options.push_back("15000");
-       options.push_back("num_threads");
-       options.push_back(std::to_string(numThreads));
-
-       if (sslCertFile.empty())
-       {
-               options.push_back("listening_ports");
-               options.push_back(listeningPort);
-       }
-       else
-       {
-               listeningPort += "s";
-               options.push_back("listening_ports");
-               options.push_back(listeningPort);
-
-               options.push_back("ssl_certificate");
-               options.push_back(sslCertFile);
-
-               if (!sslCertAuthorityFile.empty())
-               {
-                       options.push_back("ssl_ca_file");
-                       options.push_back(sslCertAuthorityFile);
-               }
-
-               if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0)
-               {
-                       options.push_back("ssl_verify_peer");
-                       options.push_back("no");
-               }
-               else
-               {
-                       options.push_back("ssl_verify_peer");
-                       options.push_back("yes");
-               }
-
-               if (sslMinVer.compare("SSL2") == 0)
-               {
-                       options.push_back("ssl_protocol_version");
-                       options.push_back(std::to_string(0));
-               }
-               else if (sslMinVer.compare("SSL3") == 0)
-               {
-                       options.push_back("ssl_protocol_version");
-                       options.push_back(std::to_string(1));
-               }
-               else if (sslMinVer.compare("TLS1.0") == 0)
-               {
-                       options.push_back("ssl_protocol_version");
-                       options.push_back(std::to_string(2));
-               }
-               else if (sslMinVer.compare("TLS1.1") == 0)
-               {
-                       options.push_back("ssl_protocol_version");
-                       options.push_back(std::to_string(3));
-               }
-               else
-               {
-                       options.push_back("ssl_protocol_version");
-                       options.push_back(std::to_string(4));
-               }
-       }
-
-       _server.reset(new CivetServer(options));
-       _handler.reset(new Handler(context,
-                       sessionFactory,
-                       std::move(authDNPattern),
-                       std::move(headersAsAttributesPattern)));
-       _server->addHandler(basePath, _handler.get());
-}
-
-void ListenHTTP::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-
-       FlowFileRecord *flowFile = session->get();
-
-       // Do nothing if there are no incoming files
-       if (!flowFile)
-       {
-               return;
-       }
-}
-
-ListenHTTP::Handler::Handler(ProcessContext *context,
-               ProcessSessionFactory *sessionFactory,
-               std::string &&authDNPattern,
-               std::string &&headersAsAttributesPattern)
-: _authDNRegex(std::move(authDNPattern))
-, _headersAsAttributesRegex(std::move(headersAsAttributesPattern))
-{
-       _processContext = context;
-       _processSessionFactory = sessionFactory;
-}
-
-void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn)
-{
-       mg_printf(conn,
-                       "HTTP/1.1 500 Internal Server Error\r\n"
-                       "Content-Type: text/html\r\n"
-                       "Content-Length: 0\r\n\r\n");
-}
-
-bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection 
*conn)
-{
-       _logger = Logger::getLogger();
-
-       auto req_info = mg_get_request_info(conn);
-       _logger->log_info("ListenHTTP handling POST request of length %d", 
req_info->content_length);
-
-       // If this is a two-way TLS connection, authorize the peer against the 
configured pattern
-       if (req_info->is_ssl && req_info->client_cert != nullptr)
-       {
-               if (!std::regex_match(req_info->client_cert->subject, 
_authDNRegex))
-               {
-                       mg_printf(conn,
-                                       "HTTP/1.1 403 Forbidden\r\n"
-                                       "Content-Type: text/html\r\n"
-                                       "Content-Length: 0\r\n\r\n");
-                       _logger->log_warn("ListenHTTP client DN not authorized: 
%s", req_info->client_cert->subject);
-                       return true;
-               }
-       }
-
-       // Always send 100 Continue, as allowed per standard to minimize client 
delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
-       mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
-
-       auto session = _processSessionFactory->createSession();
-       ListenHTTP::WriteCallback callback(conn, req_info);
-       auto flowFile = session->create();
-
-       if (!flowFile)
-       {
-               sendErrorResponse(conn);
-               return true;
-       }
-
-       try
-       {
-               session->write(flowFile, &callback);
-
-               // Add filename from "filename" header value (and pattern 
headers)
-               for (int i = 0; i < req_info->num_headers; i++)
-               {
-                       auto header = &req_info->http_headers[i];
-
-                       if (strcmp("filename", header->name) == 0)
-                       {
-                               if (!flowFile->updateAttribute("filename", 
header->value))
-                               {
-                                       flowFile->addAttribute("filename", 
header->value);
-                               }
-                       }
-                       else if (std::regex_match(header->name, 
_headersAsAttributesRegex))
-                       {
-                               if (!flowFile->updateAttribute(header->name, 
header->value))
-                               {
-                                       flowFile->addAttribute(header->name, 
header->value);
-                               }
-                       }
-               }
-
-               session->transfer(flowFile, Success);
-               session->commit();
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("ListenHTTP Caught Exception %s", 
exception.what());
-               sendErrorResponse(conn);
-               session->rollback();
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("ListenHTTP Caught Exception 
Processor::onTrigger");
-               sendErrorResponse(conn);
-               session->rollback();
-               throw;
-       }
-
-       mg_printf(conn,
-                       "HTTP/1.1 200 OK\r\n"
-                       "Content-Type: text/html\r\n"
-                       "Content-Length: 0\r\n\r\n");
-
-       return true;
-}
-
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const 
struct mg_request_info *reqInfo)
-{
-       _logger = Logger::getLogger();
-       _conn = conn;
-       _reqInfo = reqInfo;
-}
-
-void ListenHTTP::WriteCallback::process(std::ofstream *stream)
-{
-       long long rlen;
-       long long nlen = 0;
-       long long tlen = _reqInfo->content_length;
-       char buf[16384];
-
-       while (nlen < tlen)
-       {
-               rlen = tlen - nlen;
-
-               if (rlen > sizeof(buf))
-               {
-                       rlen = sizeof(buf);
-               }
-
-               // Read a buffer of data from client
-               rlen = mg_read(_conn, &buf[0], (size_t)rlen);
-
-               if (rlen <= 0)
-               {
-                       break;
-               }
-
-               // Transfer buffer data to the output stream
-               stream->write(&buf[0], rlen);
-
-               nlen += rlen;
-       }
-}

Reply via email to