http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h 
b/libminifi/include/provenance/Provenance.h
new file mode 100644
index 0000000..3d5d19e
--- /dev/null
+++ b/libminifi/include/provenance/Provenance.h
@@ -0,0 +1,560 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROVENANCE_H__
+#define __PROVENANCE_H__
+
+#include <ftw.h>
+#include <uuid/uuid.h>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <map>
+#include <set>
+#include <string>
+#include <thread>
+#include <vector>
+
+
+#include "core/Repository.h"
+#include "core/Property.h"
+#include "properties/Configure.h"
+#include "Connection.h"
+#include "FlowFileRecord.h"
+#include "core/logging/Logger.h"
+#include "ResourceClaim.h"
+#include "io/Serializable.h"
+#include "utils/TimeUtil.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance {
+// Provenance Event Record Serialization Seg Size
+#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
+
+// Provenance Event Record
+class ProvenanceEventRecord :
+    protected org::apache::nifi::minifi::io::Serializable {
+ public:
+  enum ProvenanceEventType {
+
+    /**
+     * A CREATE event is used when a FlowFile is generated from data that was
+     * not received from a remote system or external process
+     */
+    CREATE,
+
+    /**
+     * Indicates a provenance event for receiving data from an external 
process. This Event Type
+     * is expected to be the first event for a FlowFile. As such, a Processor 
that receives data
+     * from an external source and uses that data to replace the content of an 
existing FlowFile
+     * should use the {@link #FETCH} event type, rather than the RECEIVE event 
type.
+     */
+    RECEIVE,
+
+    /**
+     * Indicates that the contents of a FlowFile were overwritten using the 
contents of some
+     * external resource. This is similar to the {@link #RECEIVE} event but 
varies in that
+     * RECEIVE events are intended to be used as the event that introduces the 
FlowFile into
+     * the system, whereas FETCH is used to indicate that the contents of an 
existing FlowFile
+     * were overwritten.
+     */
+    FETCH,
+
+    /**
+     * Indicates a provenance event for sending data to an external process
+     */
+    SEND,
+
+    /**
+     * Indicates that the contents of a FlowFile were downloaded by a user or 
external entity.
+     */
+    DOWNLOAD,
+
+    /**
+     * Indicates a provenance event for the conclusion of an object's life for
+     * some reason other than object expiration
+     */
+    DROP,
+
+    /**
+     * Indicates a provenance event for the conclusion of an object's life due
+     * to the fact that the object could not be processed in a timely manner
+     */
+    EXPIRE,
+
+    /**
+     * FORK is used to indicate that one or more FlowFile was derived from a
+     * parent FlowFile.
+     */
+    FORK,
+
+    /**
+     * JOIN is used to indicate that a single FlowFile is derived from joining
+     * together multiple parent FlowFiles.
+     */
+    JOIN,
+
+    /**
+     * CLONE is used to indicate that a FlowFile is an exact duplicate of its
+     * parent FlowFile.
+     */
+    CLONE,
+
+    /**
+     * CONTENT_MODIFIED is used to indicate that a FlowFile's content was
+     * modified in some way. When using this Event Type, it is advisable to
+     * provide details about how the content is modified.
+     */
+    CONTENT_MODIFIED,
+
+    /**
+     * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes 
were
+     * modified in some way. This event is not needed when another event is
+     * reported at the same time, as the other event will already contain all
+     * FlowFile attributes.
+     */
+    ATTRIBUTES_MODIFIED,
+
+    /**
+     * ROUTE is used to show that a FlowFile was routed to a specified
+     * {@link org.apache.nifi.processor.Relationship Relationship} and should 
provide
+     * information about why the FlowFile was routed to this relationship.
+     */
+    ROUTE,
+
+    /**
+     * Indicates a provenance event for adding additional information such as a
+     * new linkage to a new URI or UUID
+     */
+    ADDINFO,
+
+    /**
+     * Indicates a provenance event for replaying a FlowFile. The UUID of the
+     * event will indicate the UUID of the original FlowFile that is being
+     * replayed. The event will contain exactly one Parent UUID that is also 
the
+     * UUID of the FlowFile that is being replayed and exactly one Child UUID
+     * that is the UUID of the a newly created FlowFile that will be re-queued
+     * for processing.
+     */
+    REPLAY
+  };
+ public:
+  // Constructor
+  /*!
+   * Create a new provenance event record
+   */
+  ProvenanceEventRecord(ProvenanceEventType event, std::string componentId,
+                        std::string componentType) {
+    _eventType = event;
+    _componentId = componentId;
+    _componentType = componentType;
+    _eventTime = getTimeMillis();
+    char eventIdStr[37];
+    // Generate the global UUID for th event
+    uuid_generate(_eventId);
+    uuid_unparse_lower(_eventId, eventIdStr);
+    _eventIdStr = eventIdStr;
+    logger_ = logging::Logger::getLogger();
+  }
+
+  ProvenanceEventRecord() {
+    _eventTime = getTimeMillis();
+    logger_ = logging::Logger::getLogger();
+  }
+
+  // Destructor
+  virtual ~ProvenanceEventRecord() {
+  }
+  // Get the Event ID
+  std::string getEventId() {
+    return _eventIdStr;
+  }
+  // Get Attributes
+  std::map<std::string, std::string> getAttributes() {
+    return _attributes;
+  }
+  // Get Size
+  uint64_t getFileSize() {
+    return _size;
+  }
+  // ! Get Offset
+  uint64_t getFileOffset() {
+    return _offset;
+  }
+  // ! Get Entry Date
+  uint64_t getFlowFileEntryDate() {
+    return _entryDate;
+  }
+  // ! Get Lineage Start Date
+  uint64_t getlineageStartDate() {
+    return _lineageStartDate;
+  }
+  // ! Get Event Time
+  uint64_t getEventTime() {
+    return _eventTime;
+  }
+  // ! Get Event Duration
+  uint64_t getEventDuration() {
+    return _eventDuration;
+  }
+  // Set Event Duration
+  void setEventDuration(uint64_t duration) {
+    _eventDuration = duration;
+  }
+  // ! Get Event Type
+  ProvenanceEventType getEventType() {
+    return _eventType;
+  }
+  // Get Component ID
+  std::string getComponentId() {
+    return _componentId;
+  }
+  // Get Component Type
+  std::string getComponentType() {
+    return _componentType;
+  }
+  // Get FlowFileUuid
+  std::string getFlowFileUuid() {
+    return uuid_;
+  }
+  // Get content full path
+  std::string getContentFullPath() {
+    return _contentFullPath;
+  }
+  // Get LineageIdentifiers
+  std::set<std::string> getLineageIdentifiers() {
+    return _lineageIdentifiers;
+  }
+  // Get Details
+  std::string getDetails() {
+    return _details;
+  }
+  // Set Details
+  void setDetails(std::string details) {
+    _details = details;
+  }
+  // Get TransitUri
+  std::string getTransitUri() {
+    return _transitUri;
+  }
+  // Set TransitUri
+  void setTransitUri(std::string uri) {
+    _transitUri = uri;
+  }
+  // Get SourceSystemFlowFileIdentifier
+  std::string getSourceSystemFlowFileIdentifier() {
+    return _sourceSystemFlowFileIdentifier;
+  }
+  // Set SourceSystemFlowFileIdentifier
+  void setSourceSystemFlowFileIdentifier(std::string identifier) {
+    _sourceSystemFlowFileIdentifier = identifier;
+  }
+  // Get Parent UUIDs
+  std::vector<std::string> getParentUuids() {
+    return _parentUuids;
+  }
+  // Add Parent UUID
+  void addParentUuid(std::string uuid) {
+    if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid)
+        != _parentUuids.end())
+      return;
+    else
+      _parentUuids.push_back(uuid);
+  }
+  // Add Parent Flow File
+  void addParentFlowFile(std::shared_ptr<core::FlowFile> flow) {
+    addParentUuid(flow->getUUIDStr());
+    return;
+  }
+  // Remove Parent UUID
+  void removeParentUuid(std::string uuid) {
+    _parentUuids.erase(
+        std::remove(_parentUuids.begin(), _parentUuids.end(), uuid),
+        _parentUuids.end());
+  }
+  // Remove Parent Flow File
+  void removeParentFlowFile(std::shared_ptr<core::FlowFile> flow) {
+    removeParentUuid(flow->getUUIDStr());
+    return;
+  }
+  // Get Children UUIDs
+  std::vector<std::string> getChildrenUuids() {
+    return _childrenUuids;
+  }
+  // Add Child UUID
+  void addChildUuid(std::string uuid) {
+    if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid)
+        != _childrenUuids.end())
+      return;
+    else
+      _childrenUuids.push_back(uuid);
+  }
+  // Add Child Flow File
+  void addChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
+    addChildUuid(flow->getUUIDStr());
+    return;
+  }
+  // Remove Child UUID
+  void removeChildUuid(std::string uuid) {
+    _childrenUuids.erase(
+        std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid),
+        _childrenUuids.end());
+  }
+  // Remove Child Flow File
+  void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
+    removeChildUuid(flow->getUUIDStr());
+    return;
+  }
+  // Get AlternateIdentifierUri
+  std::string getAlternateIdentifierUri() {
+    return _alternateIdentifierUri;
+  }
+  // Set AlternateIdentifierUri
+  void setAlternateIdentifierUri(std::string uri) {
+    _alternateIdentifierUri = uri;
+  }
+  // Get Relationship
+  std::string getRelationship() {
+    return _relationship;
+  }
+  // Set Relationship
+  void setRelationship(std::string relation) {
+    _relationship = relation;
+  }
+  // Get sourceQueueIdentifier
+  std::string getSourceQueueIdentifier() {
+    return _sourceQueueIdentifier;
+  }
+  // Set sourceQueueIdentifier
+  void setSourceQueueIdentifier(std::string identifier) {
+    _sourceQueueIdentifier = identifier;
+  }
+  // fromFlowFile
+  void fromFlowFile(std::shared_ptr<core::FlowFile> &flow) {
+    _entryDate = flow->getEntryDate();
+    _lineageStartDate = flow->getlineageStartDate();
+    _lineageIdentifiers = flow->getlineageIdentifiers();
+    uuid_ = flow->getUUIDStr();
+    _attributes = flow->getAttributes();
+    _size = flow->getSize();
+    _offset = flow->getOffset();
+    if (flow->getOriginalConnection())
+      _sourceQueueIdentifier = flow->getOriginalConnection()->getName();
+    if (flow->getResourceClaim()) {
+      _contentFullPath = flow->getResourceClaim()->getContentFullPath();
+    }
+  }
+  // Serialize and Persistent to the repository
+  bool Serialize(const std::shared_ptr<core::Repository> &repo);
+  // DeSerialize
+  bool DeSerialize(const uint8_t *buffer, const int bufferSize);
+  // DeSerialize
+  bool DeSerialize(org::apache::nifi::minifi::io::DataStream &stream) {
+    return DeSerialize(stream.getBuffer(), stream.getSize());
+  }
+  // DeSerialize
+  bool DeSerialize(const std::shared_ptr<core::Repository> &repo,
+                   std::string key);
+
+ protected:
+
+  // Event type
+  ProvenanceEventType _eventType;
+  // Date at which the event was created
+  uint64_t _eventTime;
+  // Date at which the flow file entered the flow
+  uint64_t _entryDate;
+  // Date at which the origin of this flow file entered the flow
+  uint64_t _lineageStartDate;
+  // Event Duration
+  uint64_t _eventDuration;
+  // Component ID
+  std::string _componentId;
+  // Component Type
+  std::string _componentType;
+  // Size in bytes of the data corresponding to this flow file
+  uint64_t _size;
+  // flow uuid
+  std::string uuid_;
+  // Offset to the content
+  uint64_t _offset;
+  // Full path to the content
+  std::string _contentFullPath;
+  // Attributes key/values pairs for the flow record
+  std::map<std::string, std::string> _attributes;
+  // provenance ID
+  uuid_t _eventId;
+  // UUID string for all parents
+  std::set<std::string> _lineageIdentifiers;
+  // transitUri
+  std::string _transitUri;
+  // sourceSystemFlowFileIdentifier
+  std::string _sourceSystemFlowFileIdentifier;
+  // parent UUID
+  std::vector<std::string> _parentUuids;
+  // child UUID
+  std::vector<std::string> _childrenUuids;
+  // detail
+  std::string _details;
+  // sourceQueueIdentifier
+  std::string _sourceQueueIdentifier;
+  // event ID Str
+  std::string _eventIdStr;
+  // relationship
+  std::string _relationship;
+  // alternateIdentifierUri;
+  std::string _alternateIdentifierUri;
+
+ private:
+
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ProvenanceEventRecord(const ProvenanceEventRecord &parent);
+  ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent);
+
+};
+
+// Provenance Reporter
+class ProvenanceReporter {
+ public:
+  // Constructor
+  /*!
+   * Create a new provenance reporter associated with the process session
+   */
+  ProvenanceReporter(std::shared_ptr<core::Repository> repo,
+                     std::string componentId, std::string componentType) {
+    logger_ = logging::Logger::getLogger();
+    _componentId = componentId;
+    _componentType = componentType;
+    repo_ = repo;
+  }
+
+  // Destructor
+  virtual ~ProvenanceReporter() {
+    clear();
+  }
+  // Get events
+  std::set<ProvenanceEventRecord *> getEvents() {
+    return _events;
+  }
+  // Add event
+  void add(ProvenanceEventRecord *event) {
+    _events.insert(event);
+  }
+  // Remove event
+  void remove(ProvenanceEventRecord *event) {
+    if (_events.find(event) != _events.end()) {
+      _events.erase(event);
+    }
+  }
+  //
+  // clear
+  void clear() {
+    for (auto it : _events) {
+      delete it;
+    }
+    _events.clear();
+  }
+  // allocate
+  ProvenanceEventRecord *allocate(
+      ProvenanceEventRecord::ProvenanceEventType eventType,
+      std::shared_ptr<core::FlowFile> flow) {
+    ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType,
+                                                             _componentId,
+                                                             _componentType);
+    if (event)
+      event->fromFlowFile(flow);
+
+    return event;
+  }
+  // commit
+  void commit();
+  // create
+  void create(std::shared_ptr<core::FlowFile> flow, std::string detail);
+  // route
+  void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation,
+             std::string detail, uint64_t processingDuration);
+  // modifyAttributes
+  void modifyAttributes(std::shared_ptr<core::FlowFile> flow,
+                        std::string detail);
+  // modifyContent
+  void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail,
+                     uint64_t processingDuration);
+  // clone
+  void clone(std::shared_ptr<core::FlowFile> parent,
+             std::shared_ptr<core::FlowFile> child);
+  // join
+  void join(std::vector<std::shared_ptr<core::FlowFile> > parents,
+            std::shared_ptr<core::FlowFile> child, std::string detail,
+            uint64_t processingDuration);
+  // fork
+  void fork(std::vector<std::shared_ptr<core::FlowFile> > child,
+            std::shared_ptr<core::FlowFile> parent, std::string detail,
+            uint64_t processingDuration);
+  // expire
+  void expire(std::shared_ptr<core::FlowFile> flow, std::string detail);
+  // drop
+  void drop(std::shared_ptr<core::FlowFile> flow, std::string reason);
+  // send
+  void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri,
+            std::string detail, uint64_t processingDuration, bool force);
+  // fetch
+  void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri,
+             std::string detail, uint64_t processingDuration);
+  // receive
+  void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri,
+               std::string sourceSystemFlowFileIdentifier, std::string detail,
+               uint64_t processingDuration);
+
+ protected:
+
+  // Component ID
+  std::string _componentId;
+  // Component Type
+  std::string _componentType;
+
+ private:
+
+  // Incoming connection Iterator
+  std::set<ProvenanceEventRecord *> _events;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // provenance repository.
+  std::shared_ptr<core::Repository> repo_;
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ProvenanceReporter(const ProvenanceReporter &parent);
+  ProvenanceReporter &operator=(const ProvenanceReporter &parent);
+};
+
+// Provenance Repository
+
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h 
b/libminifi/include/provenance/ProvenanceRepository.h
new file mode 100644
index 0000000..0f8ee5d
--- /dev/null
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
+#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_
+
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/slice.h"
+#include "leveldb/status.h"
+#include "core/Repository.h"
+#include "core/core.h"
+#include "provenance/Provenance.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace provenance {
+
+#define PROVENANCE_DIRECTORY "./provenance_repository"
+#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
+#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
+
+class ProvenanceRepository : public core::Repository,
+    public std::enable_shared_from_this<ProvenanceRepository> {
+ public:
+  // Constructor
+  /*!
+   * Create a new provenance repository
+   */
+  ProvenanceRepository(std::string directory = PROVENANCE_DIRECTORY,
+                       int64_t maxPartitionMillis =
+                       MAX_PROVENANCE_ENTRY_LIFE_TIME,
+                       int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
+                       uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
+      : Repository(core::getClassName<ProvenanceRepository>(), directory,
+                   maxPartitionMillis, maxPartitionBytes, purgePeriod) {
+
+    db_ = NULL;
+  }
+
+  // Destructor
+  virtual ~ProvenanceRepository() {
+    if (db_)
+      delete db_;
+  }
+
+  // initialize
+  virtual bool initialize() {
+    std::string value;
+    if 
(configure_->get(Configure::nifi_provenance_repository_directory_default,
+                        value)) {
+      directory_ = value;
+    }
+    logger_->log_info("NiFi Provenance Repository Directory %s",
+                      directory_.c_str());
+    if (configure_->get(Configure::nifi_provenance_repository_max_storage_size,
+                        value)) {
+      core::Property::StringToInt(value, max_partition_bytes_);
+    }
+    logger_->log_info("NiFi Provenance Max Partition Bytes %d",
+                      max_partition_bytes_);
+    if (configure_->get(Configure::nifi_provenance_repository_max_storage_time,
+                        value)) {
+      core::TimeUnit unit;
+      if (core::Property::StringToTime(value, max_partition_millis_, unit)
+          && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit,
+                                                 max_partition_millis_)) {
+      }
+    }
+    logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms",
+                      max_partition_millis_);
+    leveldb::Options options;
+    options.create_if_missing = true;
+    leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(),
+                                               &db_);
+    if (status.ok()) {
+      logger_->log_info("NiFi Provenance Repository database open %s success",
+                        directory_.c_str());
+    } else {
+      logger_->log_error("NiFi Provenance Repository database open %s fail",
+                         directory_.c_str());
+      return false;
+    }
+
+    return true;
+  }
+  // Put
+  virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
+
+    // persistent to the DB
+    leveldb::Slice value((const char *) buf, bufLen);
+    leveldb::Status status;
+    status = db_->Put(leveldb::WriteOptions(), key, value);
+    if (status.ok())
+      return true;
+    else
+      return false;
+  }
+  // Delete
+  virtual bool Delete(std::string key) {
+    leveldb::Status status;
+    status = db_->Delete(leveldb::WriteOptions(), key);
+    if (status.ok())
+      return true;
+    else
+      return false;
+  }
+  // Get
+  virtual bool Get(std::string key, std::string &value) {
+    leveldb::Status status;
+    status = db_->Get(leveldb::ReadOptions(), key, &value);
+    if (status.ok())
+      return true;
+    else
+      return false;
+  }
+  // Persistent event
+  void registerEvent(std::shared_ptr<ProvenanceEventRecord> &event) {
+    event->Serialize(
+        std::static_pointer_cast<core::Repository>(shared_from_this()));
+  }
+  // Remove event
+  void removeEvent(ProvenanceEventRecord *event) {
+    Delete(event->getEventId());
+  }
+  // destroy
+  void destroy() {
+    if (db_) {
+      delete db_;
+      db_ = NULL;
+    }
+  }
+  // Run function for the thread
+   void run();
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  ProvenanceRepository(const ProvenanceRepository &parent) = delete;
+  ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
+
+ private:
+  leveldb::DB* db_;
+
+};
+
+} /* namespace provenance */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/utils/FailurePolicy.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/FailurePolicy.h 
b/libminifi/include/utils/FailurePolicy.h
index a4a7f9e..98ec18a 100644
--- a/libminifi/include/utils/FailurePolicy.h
+++ b/libminifi/include/utils/FailurePolicy.h
@@ -17,6 +17,12 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_FAILUREPOLICY_H_
 #define LIBMINIFI_INCLUDE_UTILS_FAILUREPOLICY_H_
 
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
 /**
  * Basic failure policy enumeration
  *
@@ -42,4 +48,11 @@ enum FailurePolicy {
        EXIT
 };
 
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
 #endif /* LIBMINIFI_INCLUDE_UTILS_FAILUREPOLICY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/utils/StringUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/StringUtils.h 
b/libminifi/include/utils/StringUtils.h
index 30858c8..82459db 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -19,7 +19,13 @@
 
 #include <algorithm>
 #include <sstream>
-#include "../utils/FailurePolicy.h"
+#include "utils/FailurePolicy.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
 
 /**
  * Stateless String utility class.
@@ -122,4 +128,11 @@ public:
 
 };
 
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
 #endif /* LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/AppendHostInfo.cpp b/libminifi/src/AppendHostInfo.cpp
deleted file mode 100644
index d0769c1..0000000
--- a/libminifi/src/AppendHostInfo.cpp
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * @file AppendHostInfo.cpp
- * AppendHostInfo class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <set>
-#include <sys/time.h>
-#include <string.h>
-#include "AppendHostInfo.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-#include <netdb.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <sys/ioctl.h>
-#include <net/if.h>
-#include <arpa/inet.h>
-
-#include "io/ClientSocket.h"
-
-#define __USE_POSIX
-#include <limits.h>
-
-#ifndef HOST_NAME_MAX
-#define HOST_NAME_MAX 255
-#endif
-
-const std::string AppendHostInfo::ProcessorName("AppendHostInfo");
-Property AppendHostInfo::InterfaceName("Network Interface Name", "Network 
interface from which to read an IP v4 address", "eth0");
-Property AppendHostInfo::HostAttribute("Hostname Attribute", "Flowfile 
attribute to used to record the agent's hostname", "source.hostname");
-Property AppendHostInfo::IPAttribute("IP Attribute", "Flowfile attribute to 
used to record the agent's IP address", "source.ipv4");
-Relationship AppendHostInfo::Success("success", "success operational on the 
flow record");
-
-void AppendHostInfo::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(InterfaceName);
-       properties.insert(HostAttribute);
-       properties.insert(IPAttribute);
-       setSupportedProperties(properties);
-
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void AppendHostInfo::onTrigger(ProcessContext *context, ProcessSession 
*session)
-{
-       FlowFileRecord *flow = session->get();
-       if (!flow)
-         return;
-
-       //Get Hostname
-
-       std::string hostAttribute = "";
-       context->getProperty(HostAttribute.getName(), hostAttribute);
-       flow->addAttribute(hostAttribute.c_str(), Socket::getMyHostName());
-
-       //Get IP address for the specified interface
-  std::string iface;
-       context->getProperty(InterfaceName.getName(), iface);
-  //Confirm the specified interface name exists on this device
-  if (if_nametoindex(iface.c_str()) != 0){
-    struct ifreq ifr;
-    int fd = socket(AF_INET, SOCK_DGRAM, 0);
-    //Type of address to retrieve - IPv4 IP address
-    ifr.ifr_addr.sa_family = AF_INET;
-    //Copy the interface name in the ifreq structure
-    strncpy(ifr.ifr_name , iface.c_str(), IFNAMSIZ-1);
-    ioctl(fd, SIOCGIFADDR, &ifr);
-    close(fd);
-
-    std::string ipAttribute;
-    context->getProperty(IPAttribute.getName(), ipAttribute);
-    flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in 
*)&ifr.ifr_addr)->sin_addr));
-  }
-
-       // Transfer to the relationship
-       session->transfer(flow, Success);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/BaseLogger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/BaseLogger.cpp b/libminifi/src/BaseLogger.cpp
deleted file mode 100644
index 1b3b2fd..0000000
--- a/libminifi/src/BaseLogger.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "BaseLogger.h"
-
-// Logger related configuration items.
-const char *BaseLogger::nifi_log_level = "nifi.log.level";
-const char *BaseLogger::nifi_log_appender = "nifi.log.appender";
-
-/**
- * @brief Log error message
- * @param format format string ('man printf' for syntax)
- * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
- */
- void BaseLogger::log_error(const char * const format, ...) {
-       if (logger_ == NULL || 
!logger_->should_log(spdlog::level::level_enum::err))
-               return;
-       FILL_BUFFER
-       log_str(err,buffer);
-}
-/**
- * @brief Log warn message
- * @param format format string ('man printf' for syntax)
- * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
- */
- void BaseLogger::log_warn(const char * const format, ...) {
-       if (logger_ == NULL
-                       || 
!logger_->should_log(spdlog::level::level_enum::warn))
-               return;
-       FILL_BUFFER
-       log_str(warn,buffer);
-}
-/**
- * @brief Log info message
- * @param format format string ('man printf' for syntax)
- * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
- */
- void BaseLogger::log_info(const char * const format, ...) {
-       if (logger_ == NULL
-                       || 
!logger_->should_log(spdlog::level::level_enum::info))
-               return;
-       FILL_BUFFER
-       log_str(info,buffer);
-}
-/**
- * @brief Log debug message
- * @param format format string ('man printf' for syntax)
- * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
- */
- void BaseLogger::log_debug(const char * const format, ...) {
-
-       if (logger_ == NULL
-                       || 
!logger_->should_log(spdlog::level::level_enum::debug))
-               return;
-       FILL_BUFFER
-       log_str(debug,buffer);
-}
-/**
- * @brief Log trace message
- * @param format format string ('man printf' for syntax)
- * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
- */
- void BaseLogger::log_trace(const char * const format, ...) {
-
-       if (logger_ == NULL
-                       || 
!logger_->should_log(spdlog::level::level_enum::trace))
-               return;
-       FILL_BUFFER
-       log_str(debug,buffer);
-}
-
-// overridables
-
-/**
- * @brief Log error message
- * @param format format string ('man printf' for syntax)
- * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
- */
-void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) {
-       switch (level) {
-       case err:
-       case critical:
-               if (stderr_ != nullptr) {
-                       stderr_->error(buffer);
-               } else {
-                       logger_->error(buffer);
-               }
-               break;
-       case warn:
-               logger_->warn(buffer);
-               break;
-       case info:
-               logger_->info(buffer);
-               break;
-       case debug:
-               logger_->debug(buffer);
-               break;
-       case trace:
-               logger_->trace(buffer);
-               break;
-       case off:
-               break;
-       default:
-               logger_->info(buffer);
-               break;
-       }
-
-}
-
-void BaseLogger::setLogLevel(const std::string &level,
-               LOG_LEVEL_E defaultLevel) {
-       std::string logLevel = level;
-       std::transform(logLevel.begin(), logLevel.end(), logLevel.begin(),
-                       ::tolower);
-
-       if (logLevel == "trace") {
-               setLogLevel(trace);
-       } else if (logLevel == "debug") {
-               setLogLevel(debug);
-       } else if (logLevel == "info") {
-               setLogLevel(info);
-       } else if (logLevel == "warn") {
-               setLogLevel(warn);
-       } else if (logLevel == "error") {
-               setLogLevel(err);
-       } else if (logLevel == "critical") {
-               setLogLevel(critical);
-       } else if (logLevel == "off") {
-               setLogLevel(off);
-       } else {
-               setLogLevel(defaultLevel);
-       }
-}
-
-void BaseLogger::set_error_logger(std::shared_ptr<spdlog::logger> other) {
-       stderr_ = std::move(other);
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 6f5c08d..96ed7c7 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -1,6 +1,4 @@
 /**
- * @file Configure.cpp
- * Configure class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,26 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "Configure.h"
+#include "properties/Configure.h"
 #include "utils/StringUtils.h"
+#include "core/core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
 Configure *Configure::configure_(NULL);
 const char *Configure::nifi_flow_configuration_file = 
"nifi.flow.configuration.file";
 const char *Configure::nifi_administrative_yield_duration = 
"nifi.administrative.yield.duration";
 const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
-const char *Configure::nifi_graceful_shutdown_seconds  = 
"nifi.graceful.shutdown.seconds";
+const char *Configure::nifi_graceful_shutdown_seconds  = 
"nifi.flowcontroller.graceful.shutdown.period";
 const char *Configure::nifi_log_level = "nifi.log.level";
 const char *Configure::nifi_server_name = "nifi.server.name";
+const char *Configure::nifi_configuration_class_name = 
"nifi.flow.configuration.class.name";
+const char *Configure::nifi_flow_repository_class_name = 
"nifi.flow.repository.class.name";
+const char *Configure::nifi_provenance_repository_class_name = 
"nifi.provenance.repository.class.name";
 const char *Configure::nifi_server_port = "nifi.server.port";
 const char *Configure::nifi_server_report_interval= 
"nifi.server.report.interval";
 const char *Configure::nifi_provenance_repository_max_storage_size = 
"nifi.provenance.repository.max.storage.size";
 const char *Configure::nifi_provenance_repository_max_storage_time = 
"nifi.provenance.repository.max.storage.time";
 const char *Configure::nifi_provenance_repository_directory_default = 
"nifi.provenance.repository.directory.default";
-const char *Configure::nifi_provenance_repository_enable = 
"nifi.provenance.repository.enable";
 const char *Configure::nifi_flowfile_repository_max_storage_size = 
"nifi.flowfile.repository.max.storage.size";
 const char *Configure::nifi_flowfile_repository_max_storage_time = 
"nifi.flowfile.repository.max.storage.time";
 const char *Configure::nifi_flowfile_repository_directory_default = 
"nifi.flowfile.repository.directory.default";
-const char *Configure::nifi_flowfile_repository_enable = 
"nifi.flowfile.repository.enable";
 const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
 const char *Configure::nifi_security_need_ClientAuth = 
"nifi.security.need.ClientAuth";
 const char *Configure::nifi_security_client_certificate = 
"nifi.security.client.certificate";
@@ -44,13 +49,13 @@ const char *Configure::nifi_security_client_private_key = 
"nifi.security.client.
 const char *Configure::nifi_security_client_pass_phrase = 
"nifi.security.client.pass.phrase";
 const char *Configure::nifi_security_client_ca_certificate = 
"nifi.security.client.ca.certificate";
 
-//! Get the config value
+// Get the config value
 bool Configure::get(std::string key, std::string &value)
 {
-       std::lock_guard<std::mutex> lock(_mtx);
-       auto it = _properties.find(key);
+       std::lock_guard<std::mutex> lock(mutex_);
+       auto it = properties_.find(key);
 
-       if (it != _properties.end())
+       if (it != properties_.end())
        {
                value = it->second;
                return true;
@@ -62,7 +67,7 @@ bool Configure::get(std::string key, std::string &value)
 }
 
 
-//! Parse one line in configure file like key=value
+// Parse one line in configure file like key=value
 void Configure::parseConfigureFileLine(char *buf)
 {
        char *line = buf;
@@ -96,12 +101,12 @@ void Configure::parseConfigureFileLine(char *buf)
     }
 
     std::string value = equal;
-    key = StringUtils::trimRight(key);
-    value = StringUtils::trimRight(value);
+    key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key);
+    value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value);
     set(key, value);
 }
 
-//! Load Configure File
+// Load Configure File
 void Configure::loadConfigureFile(const char *fileName)
 {
 
@@ -138,7 +143,7 @@ void Configure::loadConfigureFile(const char *fileName)
     }
 }
 
-//! Parse Command Line
+// Parse Command Line
 void Configure::parseCommandLine(int argc, char **argv)
 {
        int i;
@@ -162,3 +167,8 @@ void Configure::parseCommandLine(int argc, char **argv)
        }
        return;
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 42dbfe4..6f64ff3 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -27,185 +27,161 @@
 #include <thread>
 #include <iostream>
 
+#include "core/FlowFile.h"
 #include "Connection.h"
-#include "Processor.h"
-#include "FlowFileRepository.h"
-#include "FlowController.h"
-
-Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t 
destUUID)
-: _name(name)
-{
-       if (!uuid)
-               // Generate the global UUID for the flow record
-               uuid_generate(_uuid);
-       else
-               uuid_copy(_uuid, uuid);
-
-       if (srcUUID)
-               uuid_copy(_srcUUID, srcUUID);
-       if (destUUID)
-               uuid_copy(_destUUID, destUUID);
-
-       _srcProcessor = NULL;
-       _destProcessor = NULL;
-       _maxQueueSize = 0;
-       _maxQueueDataSize = 0;
-       _expiredDuration = 0;
-       _queuedDataSize = 0;
-
-       logger_ = Logger::getLogger();
-
-       char uuidStr[37];
-       uuid_unparse_lower(_uuid, uuidStr);
-       _uuidStr = uuidStr;
-
-       logger_->log_info("Connection %s created", _name.c_str());
+#include "core/Processor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+Connection::Connection(std::shared_ptr<core::Repository> flow_repository,
+                       std::string name, uuid_t uuid, uuid_t srcUUID,
+                       uuid_t destUUID)
+    : core::Connectable(name, uuid),
+      flow_repository_(flow_repository) {
+
+  if (srcUUID)
+    uuid_copy(src_uuid_, srcUUID);
+  if (destUUID)
+    uuid_copy(dest_uuid_, destUUID);
+
+  source_connectable_ = nullptr;
+  dest_connectable_ = nullptr;
+  max_queue_size_ = 0;
+  max_data_queue_size_ = 0;
+  expired_duration_ = 0;
+  queued_data_size_ = 0;
+
+  logger_ = logging::Logger::getLogger();
+
+  logger_->log_info("Connection %s created", name_.c_str());
 }
 
-bool Connection::isEmpty()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
+bool Connection::isEmpty() {
+  std::lock_guard<std::mutex> lock(mutex_);
 
-       return _queue.empty();
+  return queue_.empty();
 }
 
-bool Connection::isFull()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
+bool Connection::isFull() {
+  std::lock_guard<std::mutex> lock(mutex_);
 
-       if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0)
-               // No back pressure setting
-               return false;
+  if (max_queue_size_ <= 0 && max_data_queue_size_ <= 0)
+    // No back pressure setting
+    return false;
 
-       if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize)
-               return true;
+  if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_)
+    return true;
 
-       if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize)
-               return true;
+  if (max_data_queue_size_ > 0 && queued_data_size_ >= max_data_queue_size_)
+    return true;
 
-       return false;
+  return false;
 }
 
-void Connection::put(FlowFileRecord *flow)
-{
-       {
-               std::lock_guard<std::mutex> lock(_mtx);
-       
-               _queue.push(flow);
-       
-               _queuedDataSize += flow->getSize();
-       
-               logger_->log_debug("Enqueue flow file UUID %s to connection %s",
-                               flow->getUUIDStr().c_str(), _name.c_str());
-       }
-
-
-       if (FlowControllerFactory::getFlowController()->getFlowFileRepository() 
&&
-                       
FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable() 
&&
-                       !flow->isStoredToRepository())
-       {
-               // Save to the flowfile repo
-               FlowFileEventRecord event;
-               event.fromFlowFile(flow, this->_uuidStr);
-               if (event.Serialize(
-                               
FlowControllerFactory::getFlowController()->getFlowFileRepository()))
-               {
-                       flow->setStoredToRepository(true);
-               }
-       }
-
-       // Notify receiving processor that work may be available
-       if(_destProcessor)
-       {
-               _destProcessor->notifyWork();
-       }
+void Connection::put(std::shared_ptr<core::FlowFile> flow) {
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+
+    queue_.push(flow);
+
+    queued_data_size_ += flow->getSize();
+
+    logger_->log_debug("Enqueue flow file UUID %s to connection %s",
+                       flow->getUUIDStr().c_str(), name_.c_str());
+  }
+
+  if (!flow->isStored()) {
+    // Save to the flowfile repo
+    FlowFileRecord event(flow_repository_,flow,this->uuidStr_);
+    if (event.Serialize()) {
+      flow->setStoredToRepository(true);
+    }
+  }
+
+  // Notify receiving processor that work may be available
+  if (dest_connectable_) {
+    dest_connectable_->notifyWork();
+  }
 }
 
-FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> 
&expiredFlowRecords)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       while (!_queue.empty())
-       {
-               FlowFileRecord *item = _queue.front();
-               _queue.pop();
-               _queuedDataSize -= item->getSize();
-
-               if (_expiredDuration > 0)
-               {
-                       // We need to check for flow expiration
-                       if (getTimeMillis() > (item->getEntryDate() + 
_expiredDuration))
-                       {
-                               // Flow record expired
-                               expiredFlowRecords.insert(item);
-                               if 
(FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
-                                               
FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
-                               {
-                                       // delete from the flowfile repo
-                                       
FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
-                                       item->setStoredToRepository(false);
-                               }
-                       }
-                       else
-                       {
-                               // Flow record not expired
-                               if (item->isPenalized())
-                               {
-                                       // Flow record was penalized
-                                       _queue.push(item);
-                                       _queuedDataSize += item->getSize();
-                                       break;
-                               }
-                               item->setOriginalConnection(this);
-                               logger_->log_debug("Dequeue flow file UUID %s 
from connection %s",
-                                               item->getUUIDStr().c_str(), 
_name.c_str());
-                               if 
(FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
-                                               
FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
-                               {
-                                       // delete from the flowfile repo
-                                       
FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
-                                       item->setStoredToRepository(false);
-                               }
-                               return item;
-                       }
-               }
-               else
-               {
-                       // Flow record not expired
-                       if (item->isPenalized())
-                       {
-                               // Flow record was penalized
-                               _queue.push(item);
-                               _queuedDataSize += item->getSize();
-                               break;
-                       }
-                       item->setOriginalConnection(this);
-                       logger_->log_debug("Dequeue flow file UUID %s from 
connection %s",
-                                       item->getUUIDStr().c_str(), 
_name.c_str());
-                       if 
(FlowControllerFactory::getFlowController()->getFlowFileRepository() &&
-                                       
FlowControllerFactory::getFlowController()->getFlowFileRepository()->isEnable())
-                       {
-                               // delete from the flowfile repo
-                               
FlowControllerFactory::getFlowController()->getFlowFileRepository()->Delete(item->getUUIDStr());
-                               item->setStoredToRepository(false);
-                       }
-                       return item;
-               }
-       }
-
-       return NULL;
+std::shared_ptr<core::FlowFile> Connection::poll(
+    std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  while (!queue_.empty()) {
+    std::shared_ptr<core::FlowFile> item = queue_.front();
+    queue_.pop();
+    queued_data_size_ -= item->getSize();
+
+    if (expired_duration_ > 0) {
+      // We need to check for flow expiration
+      if (getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+        // Flow record expired
+        expiredFlowRecords.insert(item);
+        if (flow_repository_->Delete(item->getUUIDStr())) {
+          item->setStoredToRepository(false);
+        }
+      } else {
+        // Flow record not expired
+        if (item->isPenalized()) {
+          // Flow record was penalized
+          queue_.push(item);
+          queued_data_size_ += item->getSize();
+          break;
+        }
+        std::shared_ptr<Connectable> connectable = std::static_pointer_cast<
+            Connectable>(shared_from_this());
+        item->setOriginalConnection(connectable);
+        logger_->log_debug("Dequeue flow file UUID %s from connection %s",
+                           item->getUUIDStr().c_str(), name_.c_str());
+
+        // delete from the flowfile repo
+        if (flow_repository_->Delete(item->getUUIDStr())) {
+          item->setStoredToRepository(false);
+        }
+
+        return item;
+      }
+    } else {
+      // Flow record not expired
+      if (item->isPenalized()) {
+        // Flow record was penalized
+        queue_.push(item);
+        queued_data_size_ += item->getSize();
+        break;
+      }
+      std::shared_ptr<Connectable> connectable = std::static_pointer_cast<
+          Connectable>(shared_from_this());
+      item->setOriginalConnection(connectable);
+      logger_->log_debug("Dequeue flow file UUID %s from connection %s",
+                         item->getUUIDStr().c_str(), name_.c_str());
+      // delete from the flowfile repo
+      if (flow_repository_->Delete(item->getUUIDStr())) {
+        item->setStoredToRepository(false);
+      }
+
+      return item;
+    }
+  }
+
+  return NULL;
 }
 
-void Connection::drain()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
+void Connection::drain() {
+  std::lock_guard<std::mutex> lock(mutex_);
 
-       while (!_queue.empty())
-       {
-               FlowFileRecord *item = _queue.front();
-               _queue.pop();
-               delete item;
-       }
+  while (!queue_.empty()) {
+    auto &&item = queue_.front();
+    queue_.pop();
+  }
 
-       logger_->log_debug("Drain connection %s", _name.c_str());
+  logger_->log_debug("Drain connection %s", name_.c_str());
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/EventDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp 
b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 53bde4e..0484139 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -20,28 +20,43 @@
 #include <chrono>
 #include <thread>
 #include <iostream>
-#include "Property.h"
 #include "EventDrivenSchedulingAgent.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSessionFactory.h"
+#include "core/Property.h"
 
-void EventDrivenSchedulingAgent::run(Processor *processor, ProcessContext 
*processContext, ProcessSessionFactory *sessionFactory)
-{
-       while (this->_running)
-       {
-               bool shouldYield = this->onTrigger(processor, processContext, 
sessionFactory);
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
-               if (processor->isYield())
-               {
-                       // Honor the yield
-                       
std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
-               }
-               else if (shouldYield && this->_boredYieldDuration > 0)
-               {
-                       // No work to do or need to apply back pressure
-                       
std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
-               }
 
-               // Block until work is available
-               processor->waitForWork(1000);
-       }
-       return;
+void EventDrivenSchedulingAgent::run(
+    std::shared_ptr<core::Processor> processor,
+    core::ProcessContext *processContext,
+    core::ProcessSessionFactory *sessionFactory) {
+  while (this->running_) {
+    bool shouldYield = this->onTrigger(processor, processContext,
+                                       sessionFactory);
+
+    if (processor->isYield()) {
+      // Honor the yield
+      std::this_thread::sleep_for(
+          std::chrono::milliseconds(processor->getYieldTime()));
+    } else if (shouldYield && this->_boredYieldDuration > 0) {
+      // No work to do or need to apply back pressure
+      std::this_thread::sleep_for(
+          std::chrono::milliseconds(this->_boredYieldDuration));
+    }
+
+    // Block until work is available
+    processor->waitForWork(1000);
+  }
+  return;
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ExecuteProcess.cpp b/libminifi/src/ExecuteProcess.cpp
deleted file mode 100644
index 61f96d5..0000000
--- a/libminifi/src/ExecuteProcess.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * @file ExecuteProcess.cpp
- * ExecuteProcess class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ExecuteProcess.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include <cstring>
-#include "utils/StringUtils.h"
-#include "utils/TimeUtil.h"
-
-const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
-Property ExecuteProcess::Command("Command", "Specifies the command to be 
executed; if just the name of an executable is provided, it must be in the 
user's environment PATH.", "");
-Property ExecuteProcess::CommandArguments("Command Arguments",
-               "The arguments to supply to the executable delimited by white 
space. White space can be escaped by enclosing it in double-quotes.", "");
-Property ExecuteProcess::WorkingDir("Working Directory",
-               "The directory to use as the current working directory when 
executing the command", "");
-Property ExecuteProcess::BatchDuration("Batch Duration",
-               "If the process is expected to be long-running and produce 
textual output, a batch duration can be specified.", "0");
-Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream",
-               "If true will redirect any error stream output of the process 
to the output stream.", "false");
-Relationship ExecuteProcess::Success("success", "All created FlowFiles are 
routed to this relationship.");
-
-void ExecuteProcess::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(Command);
-       properties.insert(CommandArguments);
-       properties.insert(WorkingDir);
-       properties.insert(BatchDuration);
-       properties.insert(RedirectErrorStream);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-
-void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession 
*session)
-{
-       std::string value;
-       if (context->getProperty(Command.getName(), value))
-       {
-               this->_command = value;
-       }
-       if (context->getProperty(CommandArguments.getName(), value))
-       {
-               this->_commandArgument = value;
-       }
-       if (context->getProperty(WorkingDir.getName(), value))
-       {
-               this->_workingDir = value;
-       }
-       if (context->getProperty(BatchDuration.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _batchDuration, unit) &&
-                       Property::ConvertTimeUnitToMS(_batchDuration, unit, 
_batchDuration))
-               {
-
-               }
-       }
-       if (context->getProperty(RedirectErrorStream.getName(), value))
-       {
-               StringUtils::StringToBool(value, _redirectErrorStream);
-       }
-       this->_fullCommand = _command + " " + _commandArgument;
-       if (_fullCommand.length() == 0)
-       {
-               yield();
-               return;
-       }
-       if (_workingDir.length() > 0 && _workingDir != ".")
-       {
-               // change to working directory
-               if (chdir(_workingDir.c_str()) != 0)
-               {
-                       logger_->log_error("Execute Command can not chdir %s", 
_workingDir.c_str());
-                       yield();
-                       return;
-               }
-       }
-       logger_->log_info("Execute Command %s", _fullCommand.c_str());
-       // split the command into array
-       char cstr[_fullCommand.length()+1];
-       std::strcpy(cstr, _fullCommand.c_str());
-       char *p = std::strtok (cstr, " ");
-       int argc = 0;
-       char *argv[64];
-       while (p != 0 && argc < 64)
-       {
-               argv[argc] = p;
-               p = std::strtok(NULL, " ");
-               argc++;
-       }
-       argv[argc] = NULL;
-       int status, died;
-       if (!_processRunning)
-       {
-               _processRunning = true;
-               // if the process has not launched yet
-               // create the pipe
-               if (pipe(_pipefd) == -1)
-               {
-                       _processRunning = false;
-                       yield();
-                       return;
-               }
-               switch (_pid = fork())
-               {
-               case -1:
-                       logger_->log_error("Execute Process fork failed");
-                       _processRunning = false;
-                       close(_pipefd[0]);
-                       close(_pipefd[1]);
-                       yield();
-                       break;
-               case 0 : // this is the code the child runs
-                       close(1);      // close stdout
-                       dup(_pipefd[1]); // points pipefd at file descriptor
-                       if (_redirectErrorStream)
-                               // redirect stderr
-                               dup2(_pipefd[1], 2);
-                       close(_pipefd[0]);
-                       execvp(argv[0], argv);
-                       exit(1);
-                       break;
-               default: // this is the code the parent runs
-                       // the parent isn't going to write to the pipe
-                       close(_pipefd[1]);
-                       if (_batchDuration > 0)
-                       {
-                               while (1)
-                               {
-                                       
std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
-                                       char buffer[4096];
-                                       int numRead = read(_pipefd[0], buffer, 
sizeof(buffer));
-                                       if (numRead <= 0)
-                                               break;
-                                       logger_->log_info("Execute Command 
Respond %d", numRead);
-                                       ExecuteProcess::WriteCallback 
callback(buffer, numRead);
-                                       FlowFileRecord *flowFile = 
session->create();
-                                       if (!flowFile)
-                                               continue;
-          flowFile->addAttribute("command", _command.c_str());
-          flowFile->addAttribute("command.arguments", 
_commandArgument.c_str());
-                                       session->write(flowFile, &callback);
-                                       session->transfer(flowFile, Success);
-                                       session->commit();
-                               }
-                       }
-                       else
-                       {
-                               char buffer[4096];
-                               char *bufPtr = buffer;
-                               int totalRead = 0;
-                               FlowFileRecord *flowFile = NULL;
-                               while (1)
-                               {
-                                       int numRead = read(_pipefd[0], bufPtr, 
(sizeof(buffer) - totalRead));
-                                       if (numRead <= 0)
-                                       {
-                                               if (totalRead > 0)
-                                               {
-                                                       
logger_->log_info("Execute Command Respond %d", totalRead);
-                                                       // child exits and 
close the pipe
-                                                       
ExecuteProcess::WriteCallback callback(buffer, totalRead);
-                                                       if (!flowFile)
-                                                       {
-                                                               flowFile = 
session->create();
-                                                               if (!flowFile)
-                                                                       break;
-                                                               
flowFile->addAttribute("command", _command.c_str());
-                                                               
flowFile->addAttribute("command.arguments", _commandArgument.c_str());
-                                                               
session->write(flowFile, &callback);
-                                                       }
-                                                       else
-                                                       {
-                                                               
session->append(flowFile, &callback);
-                                                       }
-                                                       
session->transfer(flowFile, Success);
-                                               }
-                                               break;
-                                       }
-                                       else
-                                       {
-                                               if (numRead == (sizeof(buffer) 
- totalRead))
-                                               {
-                                                       // we reach the max 
buffer size
-                                                       
logger_->log_info("Execute Command Max Respond %d", sizeof(buffer));
-                                                       
ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
-                                                       if (!flowFile)
-                                                       {
-                                                               flowFile = 
session->create();
-                                                               if (!flowFile)
-                                                                       
continue;
-                                                               
flowFile->addAttribute("command", _command.c_str());
-                                                               
flowFile->addAttribute("command.arguments", _commandArgument.c_str());
-                                                               
session->write(flowFile, &callback);
-                                                       }
-                                                       else
-                                                       {
-                                                               
session->append(flowFile, &callback);
-                                                       }
-                                                       // Rewind
-                                                       totalRead = 0;
-                                                       bufPtr = buffer;
-                                               }
-                                               else
-                                               {
-                                                       totalRead += numRead;
-                                                       bufPtr += numRead;
-                                               }
-                                       }
-                               }
-                       }
-
-                       died= wait(&status);
-                       if (WIFEXITED(status))
-                       {
-                               logger_->log_info("Execute Command Complete %s 
status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
-                       }
-                       else
-                       {
-                               logger_->log_info("Execute Command Complete %s 
status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
-                       }
-
-                       close(_pipefd[0]);
-                       _processRunning = false;
-                       break;
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp 
b/libminifi/src/FlowControlProtocol.cpp
index 22ef1f9..50fc0e2 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -27,493 +27,471 @@
 #include <iostream>
 #include "FlowController.h"
 #include "FlowControlProtocol.h"
-
-int FlowControlProtocol::connectServer(const char *host, uint16_t port)
-{
-       in_addr_t addr;
-       int sock = 0;
-       struct hostent *h;
+#include "core/core.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
+  in_addr_t addr;
+  int sock = 0;
+  struct hostent *h;
 #ifdef __MACH__
-       h = gethostbyname(host);
+  h = gethostbyname(host);
 #else
-       char buf[1024];
-       struct hostent he;
-       int hh_errno;
-       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+  char buf[1024];
+  struct hostent he;
+  int hh_errno;
+  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
 #endif
-       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
-       sock = socket(AF_INET, SOCK_STREAM, 0);
-       if (sock < 0)
-       {
-               logger_->log_error("Could not create socket to hostName %s", 
host);
-               return 0;
-       }
+  memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+  sock = socket(AF_INET, SOCK_STREAM, 0);
+  if (sock < 0) {
+    logger_->log_error("Could not create socket to hostName %s", host);
+    return 0;
+  }
 
 #ifndef __MACH__
-       int opt = 1;
-       bool nagle_off = true;
-
-       if (nagle_off)
-       {
-               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
-               {
-                       logger_->log_error("setsockopt() TCP_NODELAY failed");
-                       close(sock);
-                       return 0;
-               }
-               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)&opt, sizeof(opt)) < 0)
-               {
-                       logger_->log_error("setsockopt() SO_REUSEADDR failed");
-                       close(sock);
-                       return 0;
-               }
-       }
-
-       int sndsize = 256*1024;
-       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
-       {
-               logger_->log_error("setsockopt() SO_SNDBUF failed");
-               close(sock);
-               return 0;
-       }
+  int opt = 1;
+  bool nagle_off = true;
+
+  if (nagle_off)
+  {
+    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
+    {
+      logger_->log_error("setsockopt() TCP_NODELAY failed");
+      close(sock);
+      return 0;
+    }
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+            (char *)&opt, sizeof(opt)) < 0)
+    {
+      logger_->log_error("setsockopt() SO_REUSEADDR failed");
+      close(sock);
+      return 0;
+    }
+  }
+
+  int sndsize = 256*1024;
+  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+  {
+    logger_->log_error("setsockopt() SO_SNDBUF failed");
+    close(sock);
+    return 0;
+  }
 #endif
 
-       struct sockaddr_in sa;
-       socklen_t socklen;
-       int status;
-
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = htonl(INADDR_ANY);
-       sa.sin_port = htons(0);
-       socklen = sizeof(sa);
-       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
-       {
-               logger_->log_error("socket bind failed");
-               close(sock);
-               return 0;
-       }
-
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = addr;
-       sa.sin_port = htons(port);
-       socklen = sizeof(sa);
-
-       status = connect(sock, (struct sockaddr *)&sa, socklen);
-
-       if (status < 0)
-       {
-               logger_->log_error("socket connect failed to %s %d", host, 
port);
-               close(sock);
-               return 0;
-       }
-
-       logger_->log_info("Flow Control Protocol socket %d connect to server %s 
port %d success", sock, host, port);
-
-       return sock;
+  struct sockaddr_in sa;
+  socklen_t socklen;
+  int status;
+
+  memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_addr.s_addr = htonl(INADDR_ANY);
+  sa.sin_port = htons(0);
+  socklen = sizeof(sa);
+  if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) {
+    logger_->log_error("socket bind failed");
+    close(sock);
+    return 0;
+  }
+
+  memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_addr.s_addr = addr;
+  sa.sin_port = htons(port);
+  socklen = sizeof(sa);
+
+  status = connect(sock, (struct sockaddr *) &sa, socklen);
+
+  if (status < 0) {
+    logger_->log_error("socket connect failed to %s %d", host, port);
+    close(sock);
+    return 0;
+  }
+
+  logger_->log_info(
+      "Flow Control Protocol socket %d connect to server %s port %d success",
+      sock, host, port);
+
+  return sock;
 }
 
-int FlowControlProtocol::sendData(uint8_t *buf, int buflen)
-{
-       int ret = 0, bytes = 0;
-
-       while (bytes < buflen)
-       {
-               ret = send(_socket, buf+bytes, buflen-bytes, 0);
-               //check for errors
-               if (ret == -1)
-               {
-                       return ret;
-               }
-               bytes+=ret;
-       }
-
-       return bytes;
+int FlowControlProtocol::sendData(uint8_t *buf, int buflen) {
+  int ret = 0, bytes = 0;
+
+  while (bytes < buflen) {
+    ret = send(_socket, buf + bytes, buflen - bytes, 0);
+    //check for errors
+    if (ret == -1) {
+      return ret;
+    }
+    bytes += ret;
+  }
+
+  return bytes;
 }
 
-int FlowControlProtocol::selectClient(int msec)
-{
-       fd_set fds;
-       struct timeval tv;
-    int retval;
-    int fd = _socket;
-
-    FD_ZERO(&fds);
-    FD_SET(fd, &fds);
-
-    tv.tv_sec = msec/1000;
-    tv.tv_usec = (msec % 1000) * 1000;
-
-    if (msec > 0)
-       retval = select(fd+1, &fds, NULL, NULL, &tv);
-    else
-       retval = select(fd+1, &fds, NULL, NULL, NULL);
-
-    if (retval <= 0)
-      return retval;
-    if (FD_ISSET(fd, &fds))
-      return retval;
-    else
-      return 0;
+int FlowControlProtocol::selectClient(int msec) {
+  fd_set fds;
+  struct timeval tv;
+  int retval;
+  int fd = _socket;
+
+  FD_ZERO(&fds);
+  FD_SET(fd, &fds);
+
+  tv.tv_sec = msec / 1000;
+  tv.tv_usec = (msec % 1000) * 1000;
+
+  if (msec > 0)
+    retval = select(fd + 1, &fds, NULL, NULL, &tv);
+  else
+    retval = select(fd + 1, &fds, NULL, NULL, NULL);
+
+  if (retval <= 0)
+    return retval;
+  if (FD_ISSET(fd, &fds))
+    return retval;
+  else
+    return 0;
 }
 
-int FlowControlProtocol::readData(uint8_t *buf, int buflen)
-{
-       int sendSize = buflen;
-
-       while (buflen)
-       {
-               int status;
-               status = selectClient(MAX_READ_TIMEOUT);
-               if (status <= 0)
-               {
-                       return status;
-               }
+int FlowControlProtocol::readData(uint8_t *buf, int buflen) {
+  int sendSize = buflen;
+
+  while (buflen) {
+    int status;
+    status = selectClient(MAX_READ_TIMEOUT);
+    if (status <= 0) {
+      return status;
+    }
 #ifndef __MACH__
-               status = read(_socket, buf, buflen);
+    status = read(_socket, buf, buflen);
 #else
-               status = recv(_socket, buf, buflen, 0);
+    status = recv(_socket, buf, buflen, 0);
 #endif
-               if (status <= 0)
-               {
-                       return status;
-               }
-               buflen -= status;
-               buf += status;
-       }
-
-       return sendSize;
+    if (status <= 0) {
+      return status;
+    }
+    buflen -= status;
+    buf += status;
+  }
+
+  return sendSize;
 }
 
-int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr)
-{
-       uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) {
+  uint8_t buffer[sizeof(FlowControlProtocolHeader)];
 
-       uint8_t *data = buffer;
+  uint8_t *data = buffer;
 
-       int status = readData(buffer, sizeof(FlowControlProtocolHeader));
-       if (status <= 0)
-               return status;
+  int status = readData(buffer, sizeof(FlowControlProtocolHeader));
+  if (status <= 0)
+    return status;
 
-       uint32_t value;
-       data = this->decode(data, value);
-       hdr->msgType = value;
+  uint32_t value;
+  data = this->decode(data, value);
+  hdr->msgType = value;
 
-       data = this->decode(data, value);
-       hdr->seqNumber = value;
+  data = this->decode(data, value);
+  hdr->seqNumber = value;
 
-       data = this->decode(data, value);
-       hdr->status = value;
+  data = this->decode(data, value);
+  hdr->status = value;
 
-       data = this->decode(data, value);
-       hdr->payloadLen = value;
+  data = this->decode(data, value);
+  hdr->payloadLen = value;
 
-       return sizeof(FlowControlProtocolHeader);
+  return sizeof(FlowControlProtocolHeader);
 }
 
-void FlowControlProtocol::start()
-{
-       if (_reportInterval <= 0)
-               return;
-       if (_running)
-               return;
-       _running = true;
-       logger_->log_info("FlowControl Protocol Start");
-       _thread = new std::thread(run, this);
-       _thread->detach();
+void FlowControlProtocol::start() {
+  if (_reportInterval <= 0)
+    return;
+  if (running_)
+    return;
+  running_ = true;
+  logger_->log_info("FlowControl Protocol Start");
+  _thread = new std::thread(run, this);
+  _thread->detach();
 }
 
-void FlowControlProtocol::stop()
-{
-       if (!_running)
-               return;
-       _running = false;
-       logger_->log_info("FlowControl Protocol Stop");
+void FlowControlProtocol::stop() {
+  if (!running_)
+    return;
+  running_ = false;
+  logger_->log_info("FlowControl Protocol Stop");
 }
 
-void FlowControlProtocol::run(FlowControlProtocol *protocol)
-{
-       while (protocol->_running)
-       {
-               
std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
-               if (!protocol->_registered)
-               {
-                       // if it is not register yet
-                       protocol->sendRegisterReq();
-               }
-               else
-                       protocol->sendReportReq();
-       }
-       return;
+void FlowControlProtocol::run(FlowControlProtocol *protocol) {
+  while (protocol->running_) {
+    std::this_thread::sleep_for(
+        std::chrono::milliseconds(protocol->_reportInterval));
+    if (!protocol->_registered) {
+      // if it is not register yet
+      protocol->sendRegisterReq();
+    } else
+      protocol->sendReportReq();
+  }
+  return;
 }
 
-int FlowControlProtocol::sendRegisterReq()
-{
-       if (_registered)
-       {
-               logger_->log_info("Already registered");
-               return -1;
-       }
-
-       uint16_t port = this->_serverPort;
-
-       if (this->_socket <= 0)
-               this->_socket = connectServer(_serverName.c_str(), port);
-
-       if (this->_socket <= 0)
-               return -1;
-
-       // Calculate the total payload msg size
-       uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 
0) +
-                       FlowControlMsgIDEncodingLen(FLOW_YML_NAME, 
this->_controller->getName().size()+1);
-       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
-
-       uint8_t *data = new uint8_t[size];
-       uint8_t *start = data;
-
-       // encode the HDR
-       FlowControlProtocolHeader hdr;
-       hdr.msgType = REGISTER_REQ;
-       hdr.payloadLen = payloadSize;
-       hdr.seqNumber  = this->_seqNumber;
-       hdr.status = RESP_SUCCESS;
-       data = this->encode(data, hdr.msgType);
-       data = this->encode(data, hdr.seqNumber);
-       data = this->encode(data, hdr.status);
-       data = this->encode(data, hdr.payloadLen);
-
-       // encode the serial number
-       data = this->encode(data, FLOW_SERIAL_NUMBER);
-       data = this->encode(data, this->_serialNumber, 8);
-
-       // encode the YAML name
-       data = this->encode(data, FLOW_YML_NAME);
-       data = this->encode(data, this->_controller->getName());
-
-       // send it
-       int status = sendData(start, size);
-       delete[] start;
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               logger_->log_error("Flow Control Protocol Send Register Req 
failed");
-               return -1;
-       }
-
-       // Looking for register respond
-       status = readHdr(&hdr);
-
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               logger_->log_error("Flow Control Protocol Read Register Resp 
header failed");
-               return -1;
-       }
-       logger_->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
-       logger_->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
-       logger_->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
-       logger_->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
-
-       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
-       {
-               this->_registered = true;
-               this->_seqNumber++;
-               logger_->log_info("Flow Control Protocol Register success");
-               uint8_t *payload = new uint8_t[hdr.payloadLen];
-               uint8_t *payloadPtr = payload;
-               status = readData(payload, hdr.payloadLen);
-               if (status <= 0)
-               {
-                       delete[] payload;
-                       logger_->log_info("Flow Control Protocol Register Read 
Payload fail");
-                       close(_socket);
-                       _socket = 0;
-                       return -1;
-               }
-               while (payloadPtr < (payload + hdr.payloadLen))
-               {
-                       uint32_t msgID;
-                       payloadPtr = this->decode(payloadPtr, msgID);
-                       if (((FlowControlMsgID) msgID) == REPORT_INTERVAL)
-                       {
-                               // Fixed 4 bytes
-                               uint32_t reportInterval;
-                               payloadPtr = this->decode(payloadPtr, 
reportInterval);
-                               logger_->log_info("Flow Control Protocol 
receive report interval %d ms", reportInterval);
-                               this->_reportInterval = reportInterval;
-                       }
-                       else
-                       {
-                               break;
-                       }
-               }
-               delete[] payload;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else
-       {
-               logger_->log_info("Flow Control Protocol Register fail");
-               close(_socket);
-               _socket = 0;
-               return -1;
-       }
+int FlowControlProtocol::sendRegisterReq() {
+  if (_registered) {
+    logger_->log_info("Already registered");
+    return -1;
+  }
+
+  uint16_t port = this->_serverPort;
+
+  if (this->_socket <= 0)
+    this->_socket = connectServer(_serverName.c_str(), port);
+
+  if (this->_socket <= 0)
+    return -1;
+
+  // Calculate the total payload msg size
+  uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0)
+      + FlowControlMsgIDEncodingLen(FLOW_YML_NAME,
+                                    this->_controller->getName().size() + 1);
+  uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+  uint8_t *data = new uint8_t[size];
+  uint8_t *start = data;
+
+  // encode the HDR
+  FlowControlProtocolHeader hdr;
+  hdr.msgType = REGISTER_REQ;
+  hdr.payloadLen = payloadSize;
+  hdr.seqNumber = this->_seqNumber;
+  hdr.status = RESP_SUCCESS;
+  data = this->encode(data, hdr.msgType);
+  data = this->encode(data, hdr.seqNumber);
+  data = this->encode(data, hdr.status);
+  data = this->encode(data, hdr.payloadLen);
+
+  // encode the serial number
+  data = this->encode(data, FLOW_SERIAL_NUMBER);
+  data = this->encode(data, this->_serialNumber, 8);
+
+  // encode the YAML name
+  data = this->encode(data, FLOW_YML_NAME);
+  data = this->encode(data, this->_controller->getName());
+
+  // send it
+  int status = sendData(start, size);
+  delete[] start;
+  if (status <= 0) {
+    close(_socket);
+    _socket = 0;
+    logger_->log_error("Flow Control Protocol Send Register Req failed");
+    return -1;
+  }
+
+  // Looking for register respond
+  status = readHdr(&hdr);
+
+  if (status <= 0) {
+    close(_socket);
+    _socket = 0;
+    logger_->log_error(
+        "Flow Control Protocol Read Register Resp header failed");
+    return -1;
+  }
+  logger_->log_info("Flow Control Protocol receive MsgType %s",
+                    FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+  logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
+  logger_->log_info("Flow Control Protocol receive Resp Code %s",
+                    FlowControlRespCodeToStr((FlowControlRespCode) 
hdr.status));
+  logger_->log_info("Flow Control Protocol receive Payload len %d",
+                    hdr.payloadLen);
+
+  if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) {
+    this->_registered = true;
+    this->_seqNumber++;
+    logger_->log_info("Flow Control Protocol Register success");
+    uint8_t *payload = new uint8_t[hdr.payloadLen];
+    uint8_t *payloadPtr = payload;
+    status = readData(payload, hdr.payloadLen);
+    if (status <= 0) {
+      delete[] payload;
+      logger_->log_info("Flow Control Protocol Register Read Payload fail");
+      close(_socket);
+      _socket = 0;
+      return -1;
+    }
+    while (payloadPtr < (payload + hdr.payloadLen)) {
+      uint32_t msgID;
+      payloadPtr = this->decode(payloadPtr, msgID);
+      if (((FlowControlMsgID) msgID) == REPORT_INTERVAL) {
+        // Fixed 4 bytes
+        uint32_t reportInterval;
+        payloadPtr = this->decode(payloadPtr, reportInterval);
+        logger_->log_info("Flow Control Protocol receive report interval %d 
ms",
+                          reportInterval);
+        this->_reportInterval = reportInterval;
+      } else {
+        break;
+      }
+    }
+    delete[] payload;
+    close(_socket);
+    _socket = 0;
+    return 0;
+  } else {
+    logger_->log_info("Flow Control Protocol Register fail");
+    close(_socket);
+    _socket = 0;
+    return -1;
+  }
 }
 
-
-int FlowControlProtocol::sendReportReq()
-{
-       uint16_t port = this->_serverPort;
-
-       if (this->_socket <= 0)
-               this->_socket = connectServer(_serverName.c_str(), port);
-
-       if (this->_socket <= 0)
-               return -1;
-
-       // Calculate the total payload msg size
-       uint32_t payloadSize =
-                       FlowControlMsgIDEncodingLen(FLOW_YML_NAME, 
this->_controller->getName().size()+1);
-       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
-
-       uint8_t *data = new uint8_t[size];
-       uint8_t *start = data;
-
-       // encode the HDR
-       FlowControlProtocolHeader hdr;
-       hdr.msgType = REPORT_REQ;
-       hdr.payloadLen = payloadSize;
-       hdr.seqNumber  = this->_seqNumber;
-       hdr.status = RESP_SUCCESS;
-       data = this->encode(data, hdr.msgType);
-       data = this->encode(data, hdr.seqNumber);
-       data = this->encode(data, hdr.status);
-       data = this->encode(data, hdr.payloadLen);
-
-       // encode the YAML name
-       data = this->encode(data, FLOW_YML_NAME);
-       data = this->encode(data, this->_controller->getName());
-
-       // send it
-       int status = sendData(start, size);
-       delete[] start;
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               logger_->log_error("Flow Control Protocol Send Report Req 
failed");
-               return -1;
-       }
-
-       // Looking for report respond
-       status = readHdr(&hdr);
-
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               logger_->log_error("Flow Control Protocol Read Report Resp 
header failed");
-               return -1;
-       }
-       logger_->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
-       logger_->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
-       logger_->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
-       logger_->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
-
-       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
-       {
-               this->_seqNumber++;
-               uint8_t *payload = new uint8_t[hdr.payloadLen];
-               uint8_t *payloadPtr = payload;
-               status = readData(payload, hdr.payloadLen);
-               if (status <= 0)
-               {
-                       delete[] payload;
-                       logger_->log_info("Flow Control Protocol Report Resp 
Read Payload fail");
-                       close(_socket);
-                       _socket = 0;
-                       return -1;
-               }
-               std::string processor;
-               std::string propertyName;
-               std::string propertyValue;
-               while (payloadPtr < (payload + hdr.payloadLen))
-               {
-                       uint32_t msgID;
-                       payloadPtr = this->decode(payloadPtr, msgID);
-                       if (((FlowControlMsgID) msgID) == PROCESSOR_NAME)
-                       {
-                               uint32_t len;
-                               payloadPtr = this->decode(payloadPtr, len);
-                               processor = (const char *) payloadPtr;
-                               payloadPtr += len;
-                               logger_->log_info("Flow Control Protocol 
receive report resp processor %s", processor.c_str());
-                       }
-                       else if (((FlowControlMsgID) msgID) == PROPERTY_NAME)
-                       {
-                               uint32_t len;
-                               payloadPtr = this->decode(payloadPtr, len);
-                               propertyName = (const char *) payloadPtr;
-                               payloadPtr += len;
-                               logger_->log_info("Flow Control Protocol 
receive report resp property name %s", propertyName.c_str());
-                       }
-                       else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE)
-                       {
-                               uint32_t len;
-                               payloadPtr = this->decode(payloadPtr, len);
-                               propertyValue = (const char *) payloadPtr;
-                               payloadPtr += len;
-                               logger_->log_info("Flow Control Protocol 
receive report resp property value %s", propertyValue.c_str());
-                               
this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
-                       }
-                       else
-                       {
-                               break;
-                       }
-               }
-               delete[] payload;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == 
this->_seqNumber)
-       {
-               logger_->log_info("Flow Control Protocol trigger reregister");
-               this->_registered = false;
-               this->_seqNumber++;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
-       {
-               logger_->log_info("Flow Control Protocol stop flow controller");
-               this->_controller->stop(true);
-               this->_seqNumber++;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
-       {
-               logger_->log_info("Flow Control Protocol start flow 
controller");
-               this->_controller->start();
-               this->_seqNumber++;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else
-       {
-               logger_->log_info("Flow Control Protocol Report fail");
-               close(_socket);
-               _socket = 0;
-               return -1;
-       }
+int FlowControlProtocol::sendReportReq() {
+  uint16_t port = this->_serverPort;
+
+  if (this->_socket <= 0)
+    this->_socket = connectServer(_serverName.c_str(), port);
+
+  if (this->_socket <= 0)
+    return -1;
+
+  // Calculate the total payload msg size
+  uint32_t payloadSize = FlowControlMsgIDEncodingLen(
+      FLOW_YML_NAME, this->_controller->getName().size() + 1);
+  uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+  uint8_t *data = new uint8_t[size];
+  uint8_t *start = data;
+
+  // encode the HDR
+  FlowControlProtocolHeader hdr;
+  hdr.msgType = REPORT_REQ;
+  hdr.payloadLen = payloadSize;
+  hdr.seqNumber = this->_seqNumber;
+  hdr.status = RESP_SUCCESS;
+  data = this->encode(data, hdr.msgType);
+  data = this->encode(data, hdr.seqNumber);
+  data = this->encode(data, hdr.status);
+  data = this->encode(data, hdr.payloadLen);
+
+  // encode the YAML name
+  data = this->encode(data, FLOW_YML_NAME);
+  data = this->encode(data, this->_controller->getName());
+
+  // send it
+  int status = sendData(start, size);
+  delete[] start;
+  if (status <= 0) {
+    close(_socket);
+    _socket = 0;
+    logger_->log_error("Flow Control Protocol Send Report Req failed");
+    return -1;
+  }
+
+  // Looking for report respond
+  status = readHdr(&hdr);
+
+  if (status <= 0) {
+    close(_socket);
+    _socket = 0;
+    logger_->log_error("Flow Control Protocol Read Report Resp header failed");
+    return -1;
+  }
+  logger_->log_info("Flow Control Protocol receive MsgType %s",
+                    FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+  logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
+  logger_->log_info("Flow Control Protocol receive Resp Code %s",
+                    FlowControlRespCodeToStr((FlowControlRespCode) 
hdr.status));
+  logger_->log_info("Flow Control Protocol receive Payload len %d",
+                    hdr.payloadLen);
+
+  if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) {
+    this->_seqNumber++;
+    uint8_t *payload = new uint8_t[hdr.payloadLen];
+    uint8_t *payloadPtr = payload;
+    status = readData(payload, hdr.payloadLen);
+    if (status <= 0) {
+      delete[] payload;
+      logger_->log_info("Flow Control Protocol Report Resp Read Payload fail");
+      close(_socket);
+      _socket = 0;
+      return -1;
+    }
+    std::string processor;
+    std::string propertyName;
+    std::string propertyValue;
+    while (payloadPtr < (payload + hdr.payloadLen)) {
+      uint32_t msgID;
+      payloadPtr = this->decode(payloadPtr, msgID);
+      if (((FlowControlMsgID) msgID) == PROCESSOR_NAME) {
+        uint32_t len;
+        payloadPtr = this->decode(payloadPtr, len);
+        processor = (const char *) payloadPtr;
+        payloadPtr += len;
+        logger_->log_info(
+            "Flow Control Protocol receive report resp processor %s",
+            processor.c_str());
+      } else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) {
+        uint32_t len;
+        payloadPtr = this->decode(payloadPtr, len);
+        propertyName = (const char *) payloadPtr;
+        payloadPtr += len;
+        logger_->log_info(
+            "Flow Control Protocol receive report resp property name %s",
+            propertyName.c_str());
+      } else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) {
+        uint32_t len;
+        payloadPtr = this->decode(payloadPtr, len);
+        propertyValue = (const char *) payloadPtr;
+        payloadPtr += len;
+        logger_->log_info(
+            "Flow Control Protocol receive report resp property value %s",
+            propertyValue.c_str());
+        this->_controller->updatePropertyValue(processor, propertyName,
+                                               propertyValue);
+      } else {
+        break;
+      }
+    }
+    delete[] payload;
+    close(_socket);
+    _socket = 0;
+    return 0;
+  } else if (hdr.status == RESP_TRIGGER_REGISTER
+      && hdr.seqNumber == this->_seqNumber) {
+    logger_->log_info("Flow Control Protocol trigger reregister");
+    this->_registered = false;
+    this->_seqNumber++;
+    close(_socket);
+    _socket = 0;
+    return 0;
+  } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER
+      && hdr.seqNumber == this->_seqNumber) {
+    logger_->log_info("Flow Control Protocol stop flow controller");
+    this->_controller->stop(true);
+    this->_seqNumber++;
+    close(_socket);
+    _socket = 0;
+    return 0;
+  } else if (hdr.status == RESP_START_FLOW_CONTROLLER
+      && hdr.seqNumber == this->_seqNumber) {
+    logger_->log_info("Flow Control Protocol start flow controller");
+    this->_controller->start();
+    this->_seqNumber++;
+    close(_socket);
+    _socket = 0;
+    return 0;
+  } else {
+    logger_->log_info("Flow Control Protocol Report fail");
+    close(_socket);
+    _socket = 0;
+    return -1;
+  }
 }
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

Reply via email to