http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessContext.h 
b/libminifi/include/ProcessContext.h
new file mode 100644
index 0000000..2a88b93
--- /dev/null
+++ b/libminifi/include/ProcessContext.h
@@ -0,0 +1,99 @@
+/**
+ * @file ProcessContext.h
+ * ProcessContext class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_CONTEXT_H__
+#define __PROCESS_CONTEXT_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+
+#include "Logger.h"
+#include "Processor.h"
+
+//! ProcessContext Class
+class ProcessContext
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new process context associated with the 
processor/controller service/state manager
+        */
+       ProcessContext(Processor *processor = NULL) : _processor(processor) {
+               _logger = Logger::getLogger();
+       }
+       //! Destructor
+       virtual ~ProcessContext() {}
+       //! Get Processor associated with the Process Context
+       Processor *getProcessor() {
+               return _processor;
+       }
+       bool getProperty(std::string name, std::string &value) {
+               if (_processor)
+                       return _processor->getProperty(name, value);
+               else
+                       return false;
+       }
+       //! Whether the relationship is supported
+       bool isSupportedRelationship(Relationship relationship) {
+               if (_processor)
+                       return 
_processor->isSupportedRelationship(relationship);
+               else
+                       return false;
+       }
+       //! Check whether the relationship is auto terminated
+       bool isAutoTerminated(Relationship relationship) {
+               if (_processor)
+                       return _processor->isAutoTerminated(relationship);
+               else
+                       return false;
+       }
+       //! Get ProcessContext Maximum Concurrent Tasks
+       uint8_t getMaxConcurrentTasks(void) {
+               if (_processor)
+                       return _processor->getMaxConcurrentTasks();
+               else
+                       return 0;
+       }
+       //! Yield based on the yield period
+       void yield() {
+               if (_processor)
+                       _processor->yield();
+       }
+
+protected:
+
+private:
+
+       //! Processor
+       Processor *_processor;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ProcessContext(const ProcessContext &parent);
+       ProcessContext &operator=(const ProcessContext &parent);
+       //! Logger
+       Logger *_logger;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessGroup.h b/libminifi/include/ProcessGroup.h
new file mode 100644
index 0000000..4dd26f8
--- /dev/null
+++ b/libminifi/include/ProcessGroup.h
@@ -0,0 +1,182 @@
+/**
+ * @file ProcessGroup.h
+ * ProcessGroup class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_GROUP_H__
+#define __PROCESS_GROUP_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "Logger.h"
+#include "Processor.h"
+#include "Exception.h"
+#include "TimerDrivenSchedulingAgent.h"
+
+//! Process Group Type
+enum ProcessGroupType
+{
+       ROOT_PROCESS_GROUP = 0,
+       REMOTE_PROCESS_GROUP,
+       MAX_PROCESS_GROUP_TYPE
+};
+
+//! ProcessGroup Class
+class ProcessGroup
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new process group
+        */
+       ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = 
NULL, ProcessGroup *parent = NULL);
+       //! Destructor
+       virtual ~ProcessGroup();
+       //! Set Processor Name
+       void setName(std::string name) {
+               _name = name;
+       }
+       //! Get Process Name
+       std::string getName(void) {
+               return (_name);
+       }
+       //! Set URL
+       void setURL(std::string url) {
+               _url = url;
+       }
+       //! Get URL
+       std::string getURL(void) {
+               return (_url);
+       }
+       //! SetTransmitting
+       void setTransmitting(bool val)
+       {
+               _transmitting = val;
+       }
+       //! Get Transmitting
+       bool getTransmitting()
+       {
+               return _transmitting;
+       }
+       //! setTimeOut
+       void setTimeOut(uint64_t time)
+       {
+               _timeOut = time;
+       }
+       uint64_t getTimeOut()
+       {
+               return _timeOut;
+       }
+       //! Set Processor yield period in MilliSecond
+       void setYieldPeriodMsec(uint64_t period) {
+               _yieldPeriodMsec = period;
+       }
+       //! Get Processor yield period in MilliSecond
+       uint64_t getYieldPeriodMsec(void) {
+               return(_yieldPeriodMsec);
+       }
+       //! Set UUID
+       void setUUID(uuid_t uuid) {
+               uuid_copy(_uuid, uuid);
+       }
+       //! Get UUID
+       bool getUUID(uuid_t uuid) {
+               if (uuid)
+               {
+                       uuid_copy(uuid, _uuid);
+                       return true;
+               }
+               else
+                       return false;
+       }
+       //! Start Processing
+       void startProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+       //! Stop Processing
+       void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler);
+       //! Whether it is root process group
+       bool isRootProcessGroup();
+       //! set parent process group
+       void setParent(ProcessGroup *parent) {
+               std::lock_guard<std::mutex> lock(_mtx);
+               _parentProcessGroup = parent;
+       }
+       //! get parent process group
+       ProcessGroup *getParent(void) {
+               std::lock_guard<std::mutex> lock(_mtx);
+               return _parentProcessGroup;
+       }
+       //! Add processor
+       void addProcessor(Processor *processor);
+       //! Remove processor
+       void removeProcessor(Processor *processor);
+       //! Add child processor group
+       void addProcessGroup(ProcessGroup *child);
+       //! Remove child processor group
+       void removeProcessGroup(ProcessGroup *child);
+       // ! Add connections
+       void addConnection(Connection *connection);
+       //! findProcessor based on UUID
+       Processor *findProcessor(uuid_t uuid);
+       //! findProcessor based on name
+       Processor *findProcessor(std::string processorName);
+       //! removeConnection
+       void removeConnection(Connection *connection);
+       //! update property value
+       void updatePropertyValue(std::string processorName, std::string 
propertyName, std::string propertyValue);
+
+protected:
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! Processor Group Name
+       std::string _name;
+       //! Process Group Type
+       ProcessGroupType _type;
+       //! Processors (ProcessNode) inside this process group which include 
Input/Output Port, Remote Process Group input/Output port
+       std::set<Processor *> _processors;
+       std::set<ProcessGroup *> _childProcessGroups;
+       //! Connections between the processor inside the group;
+       std::set<Connection *> _connections;
+       //! Parent Process Group
+       ProcessGroup* _parentProcessGroup;
+       //! Yield Period in Milliseconds
+       std::atomic<uint64_t> _yieldPeriodMsec;
+       std::atomic<uint64_t> _timeOut;
+       //! URL
+       std::string _url;
+       //! Transmitting
+       std::atomic<bool> _transmitting;
+
+private:
+
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Logger
+       Logger *_logger;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ProcessGroup(const ProcessGroup &parent);
+       ProcessGroup &operator=(const ProcessGroup &parent);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ProcessSession.h 
b/libminifi/include/ProcessSession.h
new file mode 100644
index 0000000..c8ec3a5
--- /dev/null
+++ b/libminifi/include/ProcessSession.h
@@ -0,0 +1,116 @@
+/**
+ * @file ProcessSession.h
+ * ProcessSession class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_SESSION_H__
+#define __PROCESS_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "Logger.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "FlowFileRecord.h"
+#include "Exception.h"
+
+//! ProcessSession Class
+class ProcessSession
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new process session
+        */
+       ProcessSession(ProcessContext *processContext = NULL) : 
_processContext(processContext) {
+               _logger = Logger::getLogger();
+               _logger->log_trace("ProcessSession created for %s", 
_processContext->getProcessor()->getName().c_str());
+       }
+       //! Destructor
+       virtual ~ProcessSession() {}
+       //! Commit the session
+       void commit();
+       //! Roll Back the session
+       void rollback();
+       //!
+       //! Get the FlowFile from the highest priority queue
+       FlowFileRecord *get();
+       //! Create a new UUID FlowFile with no content resource claim and 
without parent
+       FlowFileRecord *create();
+       //! Create a new UUID FlowFile with no content resource claim and 
inherit all attributes from parent
+       FlowFileRecord *create(FlowFileRecord *parent);
+       //! Clone a new UUID FlowFile from parent both for content resource 
claim and attributes
+       FlowFileRecord *clone(FlowFileRecord *parent);
+       //! Clone a new UUID FlowFile from parent for attributes and sub set of 
parent content resource claim
+       FlowFileRecord *clone(FlowFileRecord *parent, long offset, long size);
+       //! Duplicate a FlowFile with the same UUID and all attributes and 
content resource claim for the roll back of the session
+       FlowFileRecord *duplicate(FlowFileRecord *orignal);
+       //! Transfer the FlowFile to the relationship
+       void transfer(FlowFileRecord *flow, Relationship relationship);
+       //! Put Attribute
+       void putAttribute(FlowFileRecord *flow, std::string key, std::string 
value);
+       //! Remove Attribute
+       void removeAttribute(FlowFileRecord *flow, std::string key);
+       //! Remove Flow File
+       void remove(FlowFileRecord *flow);
+       //! Execute the given read callback against the content
+       void read(FlowFileRecord *flow, InputStreamCallback *callback);
+       //! Execute the given write callback against the content
+       void write(FlowFileRecord *flow, OutputStreamCallback *callback);
+       //! Execute the given write/append callback against the content
+       void append(FlowFileRecord *flow, OutputStreamCallback *callback);
+       //! Penalize the flow
+       void penalize(FlowFileRecord *flow);
+       //! Import the existed file into the flow
+       void import(std::string source, FlowFileRecord *flow, bool keepSource = 
true, uint64_t offset = 0);
+
+protected:
+       //! FlowFiles being modified by current process session
+       std::map<std::string, FlowFileRecord *> _updatedFlowFiles;
+       //! Copy of the original FlowFiles being modified by current process 
session as above
+       std::map<std::string, FlowFileRecord *> _originalFlowFiles;
+       //! FlowFiles being added by current process session
+       std::map<std::string, FlowFileRecord *> _addedFlowFiles;
+       //! FlowFiles being deleted by current process session
+       std::map<std::string, FlowFileRecord *> _deletedFlowFiles;
+       //! FlowFiles being transfered to the relationship
+       std::map<std::string, Relationship> _transferRelationship;
+       //! FlowFiles being cloned for multiple connections per relationship
+       std::map<std::string, FlowFileRecord *> _clonedFlowFiles;
+
+private:
+       // Clone the flow file during transfer to multiple connections for a 
relationship
+       FlowFileRecord* cloneDuringTransfer(FlowFileRecord *parent);
+       //! ProcessContext
+       ProcessContext *_processContext;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ProcessSession(const ProcessSession &parent);
+       ProcessSession &operator=(const ProcessSession &parent);
+       //! Logger
+       Logger *_logger;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Processor.h b/libminifi/include/Processor.h
new file mode 100644
index 0000000..db26ad0
--- /dev/null
+++ b/libminifi/include/Processor.h
@@ -0,0 +1,346 @@
+/**
+ * @file Processor.h
+ * Processor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESSOR_H__
+#define __PROCESSOR_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "TimeUtil.h"
+#include "Property.h"
+#include "Relationship.h"
+#include "Connection.h"
+
+//! Forwarder declaration
+class ProcessContext;
+class ProcessSession;
+
+//! Minimum scheduling period in Nano Second
+#define MINIMUM_SCHEDULING_NANOS 30000
+
+//! Default yield period in second
+#define DEFAULT_YIELD_PERIOD_SECONDS 1
+
+//! Default penalization period in second
+#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
+
+/*!
+ * Indicates the valid values for the state of a entity
+ * with respect to scheduling the entity to run.
+ */
+enum ScheduledState {
+
+    /**
+     * Entity cannot be scheduled to run
+     */
+    DISABLED,
+    /**
+     * Entity can be scheduled to run but currently is not
+     */
+    STOPPED,
+    /**
+     * Entity is currently scheduled to run
+     */
+    RUNNING
+};
+
+/*!
+ * Scheduling Strategy
+ */
+enum SchedulingStrategy {
+       //! Event driven
+       EVENT_DRIVEN,
+       //! Timer driven
+       TIMER_DRIVEN,
+       //! Cron Driven
+       CRON_DRIVEN
+};
+
+//! Processor Class
+class Processor
+{
+       friend class ProcessContext;
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       Processor(std::string name, uuid_t uuid = NULL);
+       //! Destructor
+       virtual ~Processor();
+       //! Set Processor Name
+       void setName(std::string name) {
+               _name = name;
+       }
+       //! Get Process Name
+       std::string getName(void) {
+               return (_name);
+       }
+       //! Set UUID
+       void setUUID(uuid_t uuid) {
+               uuid_copy(_uuid, uuid);
+               char uuidStr[37];
+               uuid_unparse(_uuid, uuidStr);
+               _uuidStr = uuidStr;
+       }
+       //! Get UUID
+       bool getUUID(uuid_t uuid) {
+               if (uuid)
+               {
+                       uuid_copy(uuid, _uuid);
+                       return true;
+               }
+               else
+                       return false;
+       }
+       //! Set the supported processor properties while the process is not 
running
+       bool setSupportedProperties(std::set<Property> properties);
+       //! Set the supported relationships while the process is not running
+       bool setSupportedRelationships(std::set<Relationship> relationships);
+       //! Get the supported property value by name
+       bool getProperty(std::string name, std::string &value);
+       //! Set the supported property value by name wile the process is not 
running
+       bool setProperty(std::string name, std::string value);
+       //! Whether the relationship is supported
+       bool isSupportedRelationship(Relationship relationship);
+       //! Set the auto terminated relationships while the process is not 
running
+       bool setAutoTerminatedRelationships(std::set<Relationship> 
relationships);
+       //! Check whether the relationship is auto terminated
+       bool isAutoTerminated(Relationship relationship);
+       //! Check whether the processor is running
+       bool isRunning();
+       //! Set Processor Scheduled State
+       void setScheduledState(ScheduledState state) {
+               _state = state;
+       }
+       //! Get Processor Scheduled State
+       ScheduledState getScheduledState(void) {
+               return _state;
+       }
+       //! Set Processor Scheduling Strategy
+       void setSchedulingStrategy(SchedulingStrategy strategy) {
+               _strategy = strategy;
+       }
+       //! Get Processor Scheduling Strategy
+       SchedulingStrategy getSchedulingStrategy(void) {
+               return _strategy;
+       }
+       //! Set Processor Loss Tolerant
+       void setlossTolerant(bool lossTolerant) {
+               _lossTolerant = lossTolerant;
+       }
+       //! Get Processor Loss Tolerant
+       bool getlossTolerant(void) {
+               return _lossTolerant;
+       }
+       //! Set Processor Scheduling Period in Nano Second
+       void setSchedulingPeriodNano(uint64_t period) {
+               uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS;
+               _schedulingPeriodNano = std::max(period, minPeriod);
+       }
+       //! Get Processor Scheduling Period in Nano Second
+       uint64_t getSchedulingPeriodNano(void) {
+               return _schedulingPeriodNano;
+       }
+       //! Set Processor Run Duration in Nano Second
+       void setRunDurationNano(uint64_t period) {
+               _runDurantionNano = period;
+       }
+       //! Get Processor Run Duration in Nano Second
+       uint64_t getRunDurationNano(void) {
+               return(_runDurantionNano);
+       }
+       //! Set Processor yield period in MilliSecond
+       void setYieldPeriodMsec(uint64_t period) {
+               _yieldPeriodMsec = period;
+       }
+       //! Get Processor yield period in MilliSecond
+       uint64_t getYieldPeriodMsec(void) {
+               return(_yieldPeriodMsec);
+       }
+       //! Set Processor penalization period in MilliSecond
+       void setPenalizationPeriodMsec(uint64_t period) {
+               _penalizationPeriodMsec = period;
+       }
+       //! Get Processor penalization period in MilliSecond
+       uint64_t getPenalizationPeriodMsec(void) {
+               return(_penalizationPeriodMsec);
+       }
+       //! Set Processor Maximum Concurrent Tasks
+       void setMaxConcurrentTasks(uint8_t tasks) {
+               _maxConcurrentTasks = tasks;
+       }
+       //! Get Processor Maximum Concurrent Tasks
+       uint8_t getMaxConcurrentTasks(void) {
+               return(_maxConcurrentTasks);
+       }
+       //! Set Trigger when empty
+       void setTriggerWhenEmpty(bool value) {
+               _triggerWhenEmpty = value;
+       }
+       //! Get Trigger when empty
+       bool getTriggerWhenEmpty(void) {
+               return(_triggerWhenEmpty);
+       }
+       //! Get Active Task Counts
+       uint8_t getActiveTasks(void) {
+               return(_activeTasks);
+       }
+       //! Increment Active Task Counts
+       void incrementActiveTasks(void) {
+               _activeTasks++;
+       }
+       //! decrement Active Task Counts
+       void decrementActiveTask(void) {
+               _activeTasks--;
+       }
+       void clearActiveTask(void) {
+               _activeTasks = 0;
+       }
+       //! Yield based on the yield period
+       void yield()
+       {
+               _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+       }
+       //! Yield based on the input time
+       void yield(uint64_t time)
+       {
+               _yieldExpiration = (getTimeMillis() + time);
+       }
+       //! whether need be to yield
+       bool isYield()
+       {
+               if (_yieldExpiration > 0)
+                       return (_yieldExpiration >= getTimeMillis());
+               else
+                       return false;
+       }
+       // clear yield expiration
+       void clearYield()
+       {
+               _yieldExpiration = 0;
+       }
+       // get yield time
+       uint64_t getYieldTime()
+       {
+               uint64_t curTime = getTimeMillis();
+               if (_yieldExpiration > curTime)
+                       return (_yieldExpiration - curTime);
+               else
+                       return 0;;
+       }
+       //! Whether flow file queued in incoming connection
+       bool flowFilesQueued();
+       //! Whether flow file queue full in any of the outgoin connection
+       bool flowFilesOutGoingFull();
+       //! Get incoming connections
+       std::set<Connection *> getIncomingConnections() {
+               return _incomingConnections;
+       }
+       //! Has Incoming Connection
+       bool hasIncomingConnections() {
+               return (_incomingConnections.size() > 0);
+       }
+       //! Get outgoing connections based on relationship name
+       std::set<Connection *> getOutGoingConnections(std::string relationship);
+       //! Add connection
+       bool addConnection(Connection *connection);
+       //! Remove connection
+       void removeConnection(Connection *connection);
+       //! Get the UUID as string
+       std::string getUUIDStr() {
+               return _uuidStr;
+       }
+       //! Get the Next RoundRobin incoming connection
+       Connection *getNextIncomingConnection();
+       //! On Trigger
+       void onTrigger();
+
+public:
+       //! OnTrigger method, implemented by NiFi Processor Designer
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session) = 0;
+       //! Initialize, over write by NiFi Process Designer
+       virtual void initialize(void) {
+               return;
+       }
+
+protected:
+
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! Processor Name
+       std::string _name;
+       //! Supported properties
+       std::map<std::string, Property> _properties;
+       //! Supported relationships
+       std::map<std::string, Relationship> _relationships;
+       //! Autoterminated relationships
+       std::map<std::string, Relationship> _autoTerminatedRelationships;
+       //! Processor state
+       std::atomic<ScheduledState> _state;
+       //! Scheduling Strategy
+       std::atomic<SchedulingStrategy> _strategy;
+       //! lossTolerant
+       std::atomic<bool> _lossTolerant;
+       //! SchedulePeriod in Nano Seconds
+       std::atomic<uint64_t> _schedulingPeriodNano;
+       //! Run Duration in Nano Seconds
+       std::atomic<uint64_t> _runDurantionNano;
+       //! Yield Period in Milliseconds
+       std::atomic<uint64_t> _yieldPeriodMsec;
+       //! Penalization Period in MilliSecond
+       std::atomic<uint64_t> _penalizationPeriodMsec;
+       //! Maximum Concurrent Tasks
+       std::atomic<uint8_t> _maxConcurrentTasks;
+       //! Active Tasks
+       std::atomic<uint8_t> _activeTasks;
+       //! Trigger the Processor even if the incoming connection is empty
+       std::atomic<bool> _triggerWhenEmpty;
+       //! Incoming connections
+       std::set<Connection *> _incomingConnections;
+       //! Outgoing connections map based on Relationship name
+       std::map<std::string, std::set<Connection *>> _outGoingConnections;
+       //! UUID string
+       std::string _uuidStr;
+
+private:
+
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Yield Expiration
+       std::atomic<uint64_t> _yieldExpiration;
+       //! Incoming connection Iterator
+       std::set<Connection *>::iterator _incomingConnectionsIter;
+       //! Logger
+       Logger *_logger;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Processor(const Processor &parent);
+       Processor &operator=(const Processor &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Property.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Property.h b/libminifi/include/Property.h
new file mode 100644
index 0000000..a724394
--- /dev/null
+++ b/libminifi/include/Property.h
@@ -0,0 +1,344 @@
+/**
+ * @file Property.h
+ * Processor Property class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROPERTY_H__
+#define __PROPERTY_H__
+
+#include <string>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <set>
+#include <stdlib.h>
+#include <math.h>
+
+//! Time Unit
+enum TimeUnit {
+       DAY,
+       HOUR,
+       MINUTE,
+       SECOND,
+       MILLISECOND,
+       NANOSECOND
+};
+
+//! Property Class
+class Property {
+
+public:
+       //! Constructor
+       /*!
+        * Create a new property
+        */
+       Property(const std::string name, const std::string description, const 
std::string value)
+               : _name(name), _description(description), _value(value) {
+       }
+       Property() {}
+       //! Destructor
+       virtual ~Property() {}
+       //! Get Name for the property
+       std::string getName() {
+               return _name;
+       }
+       //! Get Description for the property
+       std::string getDescription() {
+               return _description;
+       }
+       //! Get value for the property
+       std::string getValue() {
+               return _value;
+       }
+       //! Set value for the property
+       void setValue(std::string value) {
+               _value = value;
+       }
+       //! Compare
+       bool operator < (const Property & right) const {
+               return _name < right._name;
+       }
+
+       //! Convert TimeUnit to MilliSecond
+       static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t 
&out)
+       {
+               if (unit == MILLISECOND)
+               {
+                       out = input;
+                       return true;
+               }
+               else if (unit == SECOND)
+               {
+                       out = input * 1000;
+                       return true;
+               }
+               else if (unit == MINUTE)
+               {
+                       out = input * 60 * 1000;
+                       return true;
+               }
+               else if (unit == HOUR)
+               {
+                       out = input * 60 * 60 * 1000;
+                       return true;
+               }
+               else if (unit == DAY)
+               {
+                       out = 24 * 60 * 60 * 1000;
+                       return true;
+               }
+               else if (unit == NANOSECOND)
+               {
+                       out = input/1000/1000;
+                       return true;
+               }
+               else
+               {
+                       return false;
+               }
+       }
+       //! Convert TimeUnit to NanoSecond
+       static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t 
&out)
+       {
+               if (unit == MILLISECOND)
+               {
+                       out = input * 1000 * 1000;
+                       return true;
+               }
+               else if (unit == SECOND)
+               {
+                       out = input * 1000 * 1000 * 1000;
+                       return true;
+               }
+               else if (unit == MINUTE)
+               {
+                       out = input * 60 * 1000 * 1000 * 1000;
+                       return true;
+               }
+               else if (unit == HOUR)
+               {
+                       out = input * 60 * 60 * 1000 * 1000 * 1000;
+                       return true;
+               }
+               else if (unit == NANOSECOND)
+               {
+                       out = input;
+                       return true;
+               }
+               else
+               {
+                       return false;
+               }
+       }
+       //! Convert String
+       static bool StringToTime(std::string input, int64_t &output, TimeUnit 
&timeunit)
+       {
+               if (input.size() == 0) {
+                       return false;
+               }
+
+               const char *cvalue = input.c_str();
+               char *pEnd;
+               long int ival = strtol(cvalue, &pEnd, 0);
+
+               if (pEnd[0] == '\0')
+               {
+                       return false;
+               }
+
+               while (*pEnd == ' ')
+               {
+                       // Skip the space
+                       pEnd++;
+               }
+
+               std::string unit(pEnd);
+
+               if (unit == "sec" || unit == "s" || unit == "second" || unit == 
"seconds" || unit == "secs")
+               {
+                       timeunit = SECOND;
+                       output = ival;
+                       return true;
+               }
+               else if (unit == "min" || unit == "m" || unit == "mins" || unit 
== "minute" || unit == "minutes")
+               {
+                       timeunit = MINUTE;
+                       output = ival;
+                       return true;
+               }
+               else if (unit == "ns" || unit == "nano" || unit == "nanos" || 
unit == "nanoseconds")
+               {
+                       timeunit = NANOSECOND;
+                       output = ival;
+                       return true;
+               }
+               else if (unit == "ms" || unit == "milli" || unit == "millis" || 
unit == "milliseconds")
+               {
+                       timeunit = MILLISECOND;
+                       output = ival;
+                       return true;
+               }
+               else if (unit == "h" || unit == "hr" || unit == "hour" || unit 
== "hrs" || unit == "hours")
+               {
+                       timeunit = HOUR;
+                       output = ival;
+                       return true;
+               }
+               else if (unit == "d" || unit == "day" || unit == "days")
+               {
+                       timeunit = DAY;
+                       output = ival;
+                       return true;
+               }
+               else
+                       return false;
+       }
+
+       //! Convert String to Integer
+       static bool StringToInt(std::string input, int64_t &output)
+       {
+               if (input.size() == 0) {
+                       return false;
+               }
+
+               const char *cvalue = input.c_str();
+               char *pEnd;
+               long int ival = strtol(cvalue, &pEnd, 0);
+
+               if (pEnd[0] == '\0')
+               {
+                       output = ival;
+                       return true;
+               }
+
+               while (*pEnd == ' ')
+               {
+                       // Skip the space
+                       pEnd++;
+               }
+
+               char end0 = toupper(pEnd[0]);
+               if ( (end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 
== 'T') || (end0 == 'P') )
+               {
+                       if (pEnd[1] == '\0')
+                       {
+                               unsigned long int multiplier = 1000;
+
+                               if ( (end0 != 'K') ) {
+                                       multiplier *= 1000;
+                                       if (end0 != 'M') {
+                                               multiplier *= 1000;
+                                               if (end0 != 'G') {
+                                                       multiplier *= 1000;
+                                                       if (end0 != 'T') {
+                                                               multiplier *= 
1000;
+                                                       }
+                                               }
+                                       }
+                               }
+                               output = ival * multiplier;
+                               return true;
+
+                       } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && 
(pEnd[2] == '\0')) {
+
+                               unsigned long int multiplier = 1024;
+
+                               if ( (end0 != 'K') ) {
+                                       multiplier *= 1024;
+                                       if (end0 != 'M') {
+                                               multiplier *= 1024;
+                                               if (end0 != 'G') {
+                                                       multiplier *= 1024;
+                                                       if (end0 != 'T') {
+                                                               multiplier *= 
1024;
+                                                       }
+                                               }
+                                       }
+                               }
+                               output = ival * multiplier;
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+       //! Convert String to Float
+       static bool StringToFloat(std::string input, float &output)
+       {
+               const char *cvalue = input.c_str();
+               char *pEnd;
+               float val = strtof(cvalue, &pEnd);
+
+               if (pEnd[0] == '\0')
+               {
+                       output = val;
+                       return true;
+               }
+               else
+                       return false;
+       }
+       //! Convert String to Bool
+       static bool StringToBool(std::string input, bool &output)
+       {
+               if (input == "true" || input == "True" || input == "TRUE")
+               {
+                       output = true;
+                       return true;
+               }
+               if (input == "false" || input == "False" || input == "FALSE")
+               {
+                       output = false;
+                       return true;
+               }
+               return false;
+       }
+
+       // Trim String utils
+       static std::string trim(const std::string& s)
+       {
+           return trimRight(trimLeft(s));
+       }
+
+       static std::string trimLeft(const std::string& s)
+       {
+               const char *WHITESPACE = " \n\r\t";
+           size_t startpos = s.find_first_not_of(WHITESPACE);
+           return (startpos == std::string::npos) ? "" : s.substr(startpos);
+       }
+
+       static std::string trimRight(const std::string& s)
+       {
+               const char *WHITESPACE = " \n\r\t";
+           size_t endpos = s.find_last_not_of(WHITESPACE);
+           return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1);
+       }
+
+protected:
+       //! Name
+       std::string _name;
+       //! Description
+       std::string _description;
+       //! Value
+       std::string _value;
+
+private:
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/RealTimeDataCollector.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RealTimeDataCollector.h 
b/libminifi/include/RealTimeDataCollector.h
new file mode 100644
index 0000000..760b566
--- /dev/null
+++ b/libminifi/include/RealTimeDataCollector.h
@@ -0,0 +1,131 @@
+/**
+ * @file RealTimeDataCollector.h
+ * RealTimeDataCollector class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REAL_TIME_DATA_COLLECTOR_H__
+#define __REAL_TIME_DATA_COLLECTOR_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! RealTimeDataCollector Class
+class RealTimeDataCollector : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       RealTimeDataCollector(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _realTimeSocket = 0;
+               _batchSocket = 0;
+               _logger = Logger::getLogger();
+               _firstInvoking = false;
+               _realTimeAccumulated = 0;
+               _batchAcccumulated = 0;
+               _queuedDataSize = 0;
+       }
+       //! Destructor
+       virtual ~RealTimeDataCollector()
+       {
+               if (_realTimeSocket)
+                       close(_realTimeSocket);
+               if (_batchSocket)
+                       close(_batchSocket);
+               if (_fileStream.is_open())
+                       _fileStream.close();
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property REALTIMESERVERNAME;
+       static Property REALTIMESERVERPORT;
+       static Property BATCHSERVERNAME;
+       static Property BATCHSERVERPORT;
+       static Property FILENAME;
+       static Property ITERATION;
+       static Property REALTIMEMSGID;
+       static Property BATCHMSGID;
+       static Property REALTIMEINTERVAL;
+       static Property BATCHINTERVAL;
+       static Property BATCHMAXBUFFERSIZE;
+       //! Supported Relationships
+       static Relationship Success;
+       //! Connect to the socket
+       int connectServer(const char *host, uint16_t port);
+       int sendData(int socket, const char *buf, int buflen);
+       void onTriggerRealTime(ProcessContext *context, ProcessSession 
*session);
+       void onTriggerBatch(ProcessContext *context, ProcessSession *session);
+
+public:
+       //! OnTrigger method, implemented by NiFi RealTimeDataCollector
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi RealTimeDataCollector
+       virtual void initialize(void);
+
+protected:
+
+private:
+       //! realtime server Name
+       std::string _realTimeServerName;
+       int64_t _realTimeServerPort;
+       std::string _batchServerName;
+       int64_t _batchServerPort;
+       int64_t _realTimeInterval;
+       int64_t _batchInterval;
+       int64_t _batchMaxBufferSize;
+       //! Match pattern for Real time Message ID
+       std::vector<std::string> _realTimeMsgID;
+       //! Match pattern for Batch Message ID
+       std::vector<std::string> _batchMsgID;
+       //! file for which the realTime collector will tail
+       std::string _fileName;
+       //! Whether we need to iterate from the beginning for demo
+       bool _iteration;
+       int _realTimeSocket;
+       int _batchSocket;
+       //! Logger
+       Logger *_logger;
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Queued data size
+       uint64_t _queuedDataSize;
+       //! Queue for the batch process
+       std::queue<std::string> _queue;
+       std::thread::id _realTimeThreadId;
+       std::thread::id _batchThreadId;
+       std::atomic<bool> _firstInvoking;
+       int64_t _realTimeAccumulated;
+       int64_t _batchAcccumulated;
+       std::ifstream _fileStream;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Relationship.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Relationship.h b/libminifi/include/Relationship.h
new file mode 100644
index 0000000..3454ee5
--- /dev/null
+++ b/libminifi/include/Relationship.h
@@ -0,0 +1,87 @@
+/**
+ * @file Relationship.h
+ * Relationship class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RELATIONSHIP_H__
+#define __RELATIONSHIP_H__
+
+#include <string>
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+
+//! undefined relationship for remote process group outgoing port and root 
process group incoming port
+#define UNDEFINED_RELATIONSHIP "undefined"
+
+inline bool isRelationshipNameUndefined(std::string name)
+{
+       if (name == UNDEFINED_RELATIONSHIP)
+               return true;
+       else
+               return false;
+}
+
+//! Relationship Class
+class Relationship {
+
+public:
+       //! Constructor
+       /*!
+        * Create a new relationship 
+        */
+       Relationship(const std::string name, const std::string description)
+               : _name(name), _description(description) {
+       }
+       Relationship()
+               : _name(UNDEFINED_RELATIONSHIP) {
+       }
+       //! Destructor
+       virtual ~Relationship() {
+       }
+       //! Get Name for the relationship
+       std::string getName() {
+               return _name;
+       }
+       //! Get Description for the relationship
+       std::string getDescription() {
+               return _description;
+       }
+       //! Compare
+       bool operator < (const Relationship & right) const {
+               return _name < right._name;
+       }
+       //! Whether it is a undefined relationship
+       bool isRelationshipUndefined()
+       {
+               return isRelationshipNameUndefined(_name);
+       }
+
+protected:
+
+       //! Name
+       std::string _name;
+       //! Description
+       std::string _description;
+
+private:
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h 
b/libminifi/include/RemoteProcessorGroupPort.h
new file mode 100644
index 0000000..cd99e50
--- /dev/null
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -0,0 +1,96 @@
+/**
+ * @file RemoteProcessorGroupPort.h
+ * RemoteProcessorGroupPort class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REMOTE_PROCESSOR_GROUP_PORT_H__
+#define __REMOTE_PROCESSOR_GROUP_PORT_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+#include "Site2SiteClientProtocol.h"
+
+//! RemoteProcessorGroupPort Class
+class RemoteProcessorGroupPort : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+               _peer = new Site2SitePeer("", 9999);
+               _protocol = new Site2SiteClientProtocol(_peer);
+               _protocol->setPortId(uuid);
+       }
+       //! Destructor
+       virtual ~RemoteProcessorGroupPort()
+       {
+               delete _protocol;
+               delete _peer;
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property hostName;
+       static Property port;
+       //! Supported Relationships
+       static Relationship relation;
+public:
+       //! OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi RemoteProcessorGroupPort
+       virtual void initialize(void);
+       //! Set Direction
+       void setDirection(TransferDirection direction)
+       {
+               _direction = direction;
+               if (_direction == RECEIVE)
+                       this->setTriggerWhenEmpty(true);
+       }
+       //! Set Timeout
+       void setTimeOut(uint64_t timeout)
+       {
+               _protocol->setTimeOut(timeout);
+       }
+       //! SetTransmitting
+       void setTransmitting(bool val)
+       {
+               _transmitting = val;
+       }
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+       //! Peer Connection
+       Site2SitePeer *_peer;
+       //! Peer Protocol
+       Site2SiteClientProtocol *_protocol;
+       //! Transaction Direction
+       TransferDirection _direction;
+       //! Transmitting
+       bool _transmitting;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h 
b/libminifi/include/ResourceClaim.h
new file mode 100644
index 0000000..d8f9979
--- /dev/null
+++ b/libminifi/include/ResourceClaim.h
@@ -0,0 +1,92 @@
+/**
+ * @file ResourceClaim.h
+ * Resource Claim class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RESOURCE_CLAIM_H__
+#define __RESOURCE_CLAIM_H__
+
+#include <string>
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include "Configure.h"
+
+//! Default content directory
+#define DEFAULT_CONTENT_DIRECTORY "."
+
+//! ResourceClaim Class
+class ResourceClaim {
+
+public:
+       //! Constructor
+       /*!
+        * Create a new resource claim
+        */
+       ResourceClaim(const std::string contentDirectory);
+       //! Destructor
+       virtual ~ResourceClaim() {}
+       //! increaseFlowFileRecordOwnedCount
+       void increaseFlowFileRecordOwnedCount()
+       {
+               ++_flowFileRecordOwnedCount;
+       }
+       //! decreaseFlowFileRecordOwenedCount
+       void decreaseFlowFileRecordOwnedCount()
+       {
+               --_flowFileRecordOwnedCount;
+       }
+       //! getFlowFileRecordOwenedCount
+       uint64_t getFlowFileRecordOwnedCount()
+       {
+               return _flowFileRecordOwnedCount;
+       }
+       //! Get the content full path
+       std::string getContentFullPath()
+       {
+               return _contentFullPath;
+       }
+
+protected:
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! A local unique identifier
+       uint64_t _id;
+       //! Full path to the content
+       std::string _contentFullPath;
+
+       //! How many FlowFileRecord Own this cliam
+       std::atomic<uint64_t> _flowFileRecordOwnedCount;
+
+private:
+       //! Configure
+       Configure *_configure;
+       //! Logger
+       Logger *_logger;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ResourceClaim(const ResourceClaim &parent);
+       ResourceClaim &operator=(const ResourceClaim &parent);
+
+       //! Local resource claim number
+       static std::atomic<uint64_t> _localResourceClaimNumber;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
new file mode 100644
index 0000000..2e3f6b8
--- /dev/null
+++ b/libminifi/include/SchedulingAgent.h
@@ -0,0 +1,98 @@
+/**
+ * @file SchedulingAgent.h
+ * SchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SCHEDULING_AGENT_H__
+#define __SCHEDULING_AGENT_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <thread>
+#include "TimeUtil.h"
+#include "Logger.h"
+#include "Configure.h"
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+
+//! SchedulingAgent Class
+class SchedulingAgent
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       SchedulingAgent() {
+               _configure = Configure::getConfigure();
+               _logger = Logger::getLogger();
+               _running = false;
+       }
+       //! Destructor
+       virtual ~SchedulingAgent()
+       {
+
+       }
+       //! onTrigger, return whether the yield is need
+       bool onTrigger(Processor *processor);
+       //! Whether agent has work to do
+       bool hasWorkToDo(Processor *processor);
+       //! Whether the outgoing need to be backpressure
+       bool hasTooMuchOutGoing(Processor *processor);
+       //! start
+       void start() {
+               _running = true;
+       }
+       //! stop
+       void stop() {
+               _running = false;
+       }
+
+public:
+       //! schedule, overwritten by different DrivenSchedulingAgent
+       virtual void schedule(Processor *processor) = 0;
+       //! unschedule, overwritten by different DrivenSchedulingAgent
+       virtual void unschedule(Processor *processor) = 0;
+
+protected:
+       //! Logger
+       Logger *_logger;
+       //! Configure
+       Configure *_configure;
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Whether it is running
+       std::atomic<bool> _running;
+       //! AdministrativeYieldDuration
+       int64_t _administrativeYieldDuration;
+       //! BoredYieldDuration
+       int64_t _boredYieldDuration;
+
+private:
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       SchedulingAgent(const SchedulingAgent &parent);
+       SchedulingAgent &operator=(const SchedulingAgent &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h 
b/libminifi/include/Site2SiteClientProtocol.h
new file mode 100644
index 0000000..5b72b11
--- /dev/null
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -0,0 +1,638 @@
+/**
+ * @file Site2SiteClientProtocol.h
+ * Site2SiteClientProtocol class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SITE2SITE_CLIENT_PROTOCOL_H__
+#define __SITE2SITE_CLIENT_PROTOCOL_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <algorithm>
+#include <uuid/uuid.h>
+#include "Logger.h"
+#include "Configure.h"
+#include "Property.h"
+#include "Site2SitePeer.h"
+#include "FlowFileRecord.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+//! Resource Negotiated Status Code
+#define RESOURCE_OK 20
+#define DIFFERENT_RESOURCE_VERSION 21
+#define NEGOTIATED_ABORT 255
+// ! Max attributes
+#define MAX_NUM_ATTRIBUTES 25000
+
+/**
+ * An enumeration for specifying the direction in which data should be
+ * transferred between a client and a remote NiFi instance.
+ */
+typedef enum {
+       /**
+        * * The client is to send data to the remote instance.
+        * */
+       SEND,
+       /**
+        * * The client is to receive data from the remote instance.
+        * */
+       RECEIVE
+} TransferDirection;
+
+
+//! Peer State
+typedef enum {
+       /**
+        * * IDLE
+        * */
+       IDLE = 0,
+       /**
+        * * Socket Established
+        * */
+       ESTABLISHED,
+       /**
+        * * HandShake Done
+        * */
+       HANDSHAKED,
+       /**
+        * * After CodeDec Completion
+        * */
+       READY
+} PeerState;
+
+//! Transaction State
+typedef enum {
+       /**
+        * * Transaction has been started but no data has been sent or received.
+        * */
+       TRANSACTION_STARTED,
+       /**
+        * * Transaction has been started and data has been sent or received.
+        * */
+       DATA_EXCHANGED,
+       /**
+        * * Data that has been transferred has been confirmed via its CRC.
+        * * Transaction is ready to be completed.
+        * */
+       TRANSACTION_CONFIRMED,
+       /**
+        * * Transaction has been successfully completed.
+        * */
+       TRANSACTION_COMPLETED,
+       /**
+        * * The Transaction has been canceled.
+        * */
+       TRANSACTION_CANCELED,
+       /**
+        * * The Transaction ended in an error.
+        * */
+       TRANSACTION_ERROR
+} TransactionState;
+
+//! Request Type
+typedef enum {
+    NEGOTIATE_FLOWFILE_CODEC = 0,
+    REQUEST_PEER_LIST,
+    SEND_FLOWFILES,
+    RECEIVE_FLOWFILES,
+    SHUTDOWN,
+       MAX_REQUEST_TYPE
+} RequestType;
+
+//! Request Type Str
+static const char *RequestTypeStr[MAX_REQUEST_TYPE] =
+{
+               "NEGOTIATE_FLOWFILE_CODEC",
+               "REQUEST_PEER_LIST",
+               "SEND_FLOWFILES",
+               "RECEIVE_FLOWFILES",
+               "SHUTDOWN"
+};
+
+//! Respond Code
+typedef enum {
+               RESERVED = 0,
+               // ResponseCode, so that we can indicate a 0 followed by some 
other bytes
+
+               // handshaking properties
+               PROPERTIES_OK = 1,
+               UNKNOWN_PROPERTY_NAME = 230,
+               ILLEGAL_PROPERTY_VALUE = 231,
+               MISSING_PROPERTY = 232,
+               // transaction indicators
+               CONTINUE_TRANSACTION = 10,
+               FINISH_TRANSACTION = 11,
+               CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the 
checksum
+               TRANSACTION_FINISHED = 13,
+               TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14,
+               CANCEL_TRANSACTION = 15,
+               BAD_CHECKSUM = 19,
+               // data availability indicators
+               MORE_DATA = 20,
+               NO_MORE_DATA = 21,
+               // port state indicators
+               UNKNOWN_PORT = 200,
+               PORT_NOT_IN_VALID_STATE = 201,
+               PORTS_DESTINATION_FULL = 202,
+               // authorization
+               UNAUTHORIZED = 240,
+               // error indicators
+               ABORT = 250,
+               UNRECOGNIZED_RESPONSE_CODE = 254,
+               END_OF_STREAM = 255
+} RespondCode;
+
+//! Respond Code Class
+typedef struct {
+       RespondCode code;
+       const char *description;
+       bool hasDescription;
+} RespondCodeContext;
+
+//! Respond Code Context
+static RespondCodeContext respondCodeContext[]  =
+{
+               {RESERVED, "Reserved for Future Use", false},
+               {PROPERTIES_OK, "Properties OK", false},
+               {UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true},
+               {ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true},
+               {MISSING_PROPERTY, "Missing Property", true},
+               {CONTINUE_TRANSACTION, "Continue Transaction", false},
+               {FINISH_TRANSACTION, "Finish Transaction", false},
+               {CONFIRM_TRANSACTION, "Confirm Transaction", true},
+               {TRANSACTION_FINISHED, "Transaction Finished", false},
+               {TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction 
Finished But Destination is Full", false},
+               {CANCEL_TRANSACTION, "Cancel Transaction", true},
+               {BAD_CHECKSUM, "Bad Checksum", false},
+               {MORE_DATA, "More Data Exists", false},
+               {NO_MORE_DATA, "No More Data Exists", false},
+               {UNKNOWN_PORT, "Unknown Port", false},
+           {PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true},
+               {PORTS_DESTINATION_FULL, "Port's Destination is Full", false},
+               {UNAUTHORIZED, "User Not Authorized", true},
+               {ABORT, "Abort", true},
+               {UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", 
false},
+               {END_OF_STREAM, "End of Stream", false}
+};
+
+//! Respond Code Sequence Pattern
+static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
+static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket
+ * Protocol.
+ */
+typedef enum {
+               /**
+                * Boolean value indicating whether or not the contents of a 
FlowFile should
+                * be GZipped when transferred.
+                */
+               GZIP,
+               /**
+                * The unique identifier of the port to communicate with
+                */
+               PORT_IDENTIFIER,
+               /**
+                * Indicates the number of milliseconds after the request was 
made that the
+                * client will wait for a response. If no response has been 
received by the
+                * time this value expires, the server can move on without 
attempting to
+                * service the request because the client will have already 
disconnected.
+                */
+               REQUEST_EXPIRATION_MILLIS,
+               /**
+                * The preferred number of FlowFiles that the server should 
send to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol.
+                */
+               BATCH_COUNT,
+               /**
+                * The preferred number of bytes that the server should send to 
the client
+                * when pulling data. This property was introduced in version 5 
of the
+                * protocol.
+                */
+               BATCH_SIZE,
+               /**
+                * The preferred amount of time that the server should send 
data to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol. Value is in milliseconds.
+                */
+               BATCH_DURATION,
+               MAX_HANDSHAKE_PROPERTY
+} HandshakeProperty;
+
+//! HandShakeProperty Str
+static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] =
+{
+               /**
+                * Boolean value indicating whether or not the contents of a 
FlowFile should
+                * be GZipped when transferred.
+                */
+               "GZIP",
+               /**
+                * The unique identifier of the port to communicate with
+                */
+               "PORT_IDENTIFIER",
+               /**
+                * Indicates the number of milliseconds after the request was 
made that the
+                * client will wait for a response. If no response has been 
received by the
+                * time this value expires, the server can move on without 
attempting to
+                * service the request because the client will have already 
disconnected.
+                */
+               "REQUEST_EXPIRATION_MILLIS",
+               /**
+                * The preferred number of FlowFiles that the server should 
send to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol.
+                */
+               "BATCH_COUNT",
+               /**
+                * The preferred number of bytes that the server should send to 
the client
+                * when pulling data. This property was introduced in version 5 
of the
+                * protocol.
+                */
+               "BATCH_SIZE",
+               /**
+                * The preferred amount of time that the server should send 
data to the
+                * client when pulling data. This property was introduced in 
version 5 of
+                * the protocol. Value is in milliseconds.
+                */
+               "BATCH_DURATION"
+};
+
+class Site2SiteClientProtocol;
+
+//! Transaction Class
+class Transaction
+{
+       friend class Site2SiteClientProtocol;
+public:
+       //! Constructor
+       /*!
+        * Create a new transaction
+        */
+       Transaction(TransferDirection direction) {
+               _state = TRANSACTION_STARTED;
+               _direction = direction;
+               _dataAvailable = false;
+               _transfers = 0;
+               _bytes = 0;
+
+               char uuidStr[37];
+
+               // Generate the global UUID for the transaction
+               uuid_generate(_uuid);
+               uuid_unparse(_uuid, uuidStr);
+               _uuidStr = uuidStr;
+       }
+       //! Destructor
+       virtual ~Transaction()
+       {
+       }
+       //! getUUIDStr
+       std::string getUUIDStr()
+       {
+               return _uuidStr;
+       }
+       //! getState
+       TransactionState getState()
+       {
+               return _state;
+       }
+       //! isDataAvailable
+       bool isDataAvailable()
+       {
+               return _dataAvailable;
+       }
+       //! setDataAvailable()
+       void setDataAvailable(bool value)
+       {
+               _dataAvailable = value;
+       }
+       //! getDirection
+       TransferDirection getDirection()
+       {
+               return _direction;
+       }
+       //! getCRC
+       long getCRC()
+       {
+               return _crc.getCRC();
+       }
+       //! updateCRC
+       void updateCRC(uint8_t *buffer, uint32_t length)
+       {
+               _crc.update(buffer, length);
+       }
+
+protected:
+
+private:
+       //! Transaction State
+       TransactionState _state;
+       //! Transaction Direction
+       TransferDirection _direction;
+       //! Whether received data is available
+       bool _dataAvailable;
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! UUID string
+       std::string _uuidStr;
+       //! Number of transfer
+       int _transfers;
+       //! Number of content bytes
+       uint64_t _bytes;
+       //! CRC32
+       CRC32 _crc;
+
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Transaction(const Transaction &parent);
+       Transaction &operator=(const Transaction &parent);
+};
+
+/**
+ * Represents a piece of data that is to be sent to or that was received from a
+ * NiFi instance.
+ */
+class DataPacket
+{
+public:
+       DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
+                       std::map<std::string, std::string> attributes) {
+               _protocol = protocol;
+               _size = 0;
+               _transaction = transaction;
+               _attributes = attributes;
+       }
+       std::map<std::string, std::string> _attributes;
+       uint64_t _size;
+       Site2SiteClientProtocol *_protocol;
+       Transaction *_transaction;
+};
+
+//! Site2SiteClientProtocol Class
+class Site2SiteClientProtocol
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new control protocol
+        */
+       Site2SiteClientProtocol(Site2SitePeer *peer) {
+               _logger = Logger::getLogger();
+               _configure = Configure::getConfigure();
+               _peer = peer;
+               _batchSize = 0;
+               _batchCount = 0;
+               _batchDuration = 0;
+               _batchSendNanos = 5000000000; // 5 seconds
+               _timeOut = 30000; // 30 seconds
+               _peerState = IDLE;
+               _supportedVersion[0] = 5;
+               _supportedVersion[1] = 4;
+               _supportedVersion[2] = 3;
+               _supportedVersion[3] = 2;
+               _supportedVersion[4] = 1;
+               _currentVersion = _supportedVersion[0];
+               _currentVersionIndex = 0;
+               _supportedCodecVersion[0] = 1;
+               _currentCodecVersion = _supportedCodecVersion[0];
+               _currentCodecVersionIndex = 0;
+       }
+       //! Destructor
+       virtual ~Site2SiteClientProtocol()
+       {
+       }
+
+public:
+       //! setBatchSize
+       void setBatchSize(uint64_t size)
+       {
+               _batchSize = size;
+       }
+       //! setBatchCount
+       void setBatchCount(uint64_t count)
+       {
+               _batchCount = count;
+       }
+       //! setBatchDuration
+       void setBatchDuration(uint64_t duration)
+       {
+               _batchDuration = duration;
+       }
+       //! setTimeOut
+       void setTimeOut(uint64_t time)
+       {
+               _timeOut = time;
+               if (_peer)
+                       _peer->setTimeOut(time);
+
+       }
+       //! getTimeout
+       uint64_t getTimeOut()
+       {
+               return _timeOut;
+       }
+       //! setPortId
+       void setPortId(uuid_t id)
+       {
+               uuid_copy(_portId, id);
+               char idStr[37];
+               uuid_unparse(id, idStr);
+               _portIdStr = idStr;
+       }
+       //! getResourceName
+       std::string getResourceName()
+       {
+               return "SocketFlowFileProtocol";
+       }
+       //! getCodecResourceName
+       std::string getCodecResourceName()
+       {
+               return "StandardFlowFileCodec";
+       }
+       //! bootstrap the protocol to the ready for transaction state by going 
through the state machine
+       bool bootstrap();
+       //! establish
+       bool establish();
+       //! handShake
+       bool handShake();
+       //! negotiateCodec
+       bool negotiateCodec();
+       //! initiateResourceNegotiation
+       bool initiateResourceNegotiation();
+       //! initiateCodecResourceNegotiation
+       bool initiateCodecResourceNegotiation();
+       //! tearDown
+       void tearDown();
+       //! write Request Type
+       int writeRequestType(RequestType type);
+       //! read Request Type
+       int readRequestType(RequestType &type);
+       //! read Respond
+       int readRespond(RespondCode &code, std::string &message);
+       //! write respond
+       int writeRespond(RespondCode code, std::string message);
+       //! getRespondCodeContext
+       RespondCodeContext *getRespondCodeContext(RespondCode code)
+       {
+               for (unsigned int i = 0; i < 
sizeof(respondCodeContext)/sizeof(RespondCodeContext); i++)
+               {
+                       if (respondCodeContext[i].code == code)
+                       {
+                               return &respondCodeContext[i];
+                       }
+               }
+               return NULL;
+       }
+       //! getPeer
+       Site2SitePeer *getPeer()
+       {
+               return _peer;
+       }
+       //! Creation of a new transaction, return the transaction ID if success,
+       //! Return NULL when any error occurs
+       Transaction *createTransaction(std::string &transactionID, 
TransferDirection direction);
+       //! Receive the data packet from the transaction
+       //! Return false when any error occurs
+       bool receive(std::string transactionID, DataPacket *packet, bool &eof);
+       //! Send the data packet from the transaction
+       //! Return false when any error occurs
+       bool send(std::string transactionID, DataPacket *packet, FlowFileRecord 
*flowFile, ProcessSession *session);
+       //! Confirm the data that was sent or received by comparing CRC32's of 
the data sent and the data received.
+       bool confirm(std::string transactionID);
+       //! Cancel the transaction
+       void cancel(std::string transactionID);
+       //! Complete the transaction
+       bool complete(std::string transactionID);
+       //! Error the transaction
+       void error(std::string transactionID);
+       //! Receive flow files for the process session
+       void receiveFlowFiles(ProcessContext *context, ProcessSession *session);
+       //! Transfer flow files for the process session
+       void transferFlowFiles(ProcessContext *context, ProcessSession 
*session);
+       //! deleteTransaction
+       void deleteTransaction(std::string transactionID);
+       //! Nest Callback Class for write stream
+       class WriteCallback : public OutputStreamCallback
+       {
+               public:
+               WriteCallback(DataPacket *packet)
+               : _packet(packet) {}
+               DataPacket *_packet;
+               void process(std::ofstream *stream) {
+                       uint8_t buffer[8192];
+                       int len = _packet->_size;
+                       while (len > 0)
+                       {
+                               int size = std::min(len, (int) sizeof(buffer));
+                               int ret = 
_packet->_protocol->_peer->readData(buffer, size, &_packet->_transaction->_crc);
+                               if (ret != size)
+                               {
+                                       
_packet->_protocol->_logger->log_error("Site2Site Receive Flow Size %d Failed 
%d", size, ret);
+                                       break;
+                               }
+                               stream->write((const char *) buffer, size);
+                               len -= size;
+                       }
+               }
+       };
+       //! Nest Callback Class for read stream
+       class ReadCallback : public InputStreamCallback
+       {
+               public:
+               ReadCallback(DataPacket *packet)
+               : _packet(packet) {}
+               DataPacket *_packet;
+               void process(std::ifstream *stream) {
+                       _packet->_size = 0;
+                       uint8_t buffer[8192];
+                       int readSize;
+                       while (stream->good())
+                       {
+                               if (!stream->read((char *)buffer, 8192))
+                                       readSize = stream->gcount();
+                               else
+                                       readSize = 8192;
+                               int ret = 
_packet->_protocol->_peer->write(buffer, readSize, 
&_packet->_transaction->_crc);
+                               if (ret != readSize)
+                               {
+                                       
_packet->_protocol->_logger->log_error("Site2Site Send Flow Size %d Failed %d", 
readSize, ret);
+                                       break;
+                               }
+                               _packet->_size += readSize;
+                       }
+               }
+       };
+
+protected:
+
+private:
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Logger
+       Logger *_logger;
+       //! Configure
+       Configure *_configure;
+       //! Batch Count
+       std::atomic<uint64_t> _batchCount;
+       //! Batch Size
+       std::atomic<uint64_t> _batchSize;
+       //! Batch Duration in msec
+       std::atomic<uint64_t> _batchDuration;
+       //! Timeout in msec
+       std::atomic<uint64_t> _timeOut;
+       //! Peer Connection
+       Site2SitePeer *_peer;
+       //! portId
+       uuid_t _portId;
+       //! portIDStr
+       std::string _portIdStr;
+       //! BATCH_SEND_NANOS
+       uint64_t _batchSendNanos;
+       //! Peer State
+       PeerState _peerState;
+       uint32_t _supportedVersion[5];
+       uint32_t _currentVersion;
+       int _currentVersionIndex;
+       uint32_t _supportedCodecVersion[1];
+       uint32_t _currentCodecVersion;
+       int _currentCodecVersionIndex;
+       //! commsIdentifier
+       std::string _commsIdentifier;
+       //! transaction map
+       std::map<std::string, Transaction *> _transactionMap;
+
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Site2SiteClientProtocol(const Site2SiteClientProtocol &parent);
+       Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol 
&parent);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/Site2SitePeer.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SitePeer.h 
b/libminifi/include/Site2SitePeer.h
new file mode 100644
index 0000000..ff11637
--- /dev/null
+++ b/libminifi/include/Site2SitePeer.h
@@ -0,0 +1,364 @@
+/**
+ * @file Site2SitePeer.h
+ * Site2SitePeer class declaration for site to site peer  
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SITE2SITE_PEER_H__
+#define __SITE2SITE_PEER_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <mutex>
+#include <atomic>
+#include "TimeUtil.h"
+#include "Logger.h"
+#include "Configure.h"
+#include "Property.h"
+
+class CRC32
+{
+public:
+    CRC32() {
+       crc = 0;
+
+       if (tableInit)
+               return;
+
+       tableInit = true;
+        unsigned int poly = 0xedb88320;
+        unsigned int temp = 0;
+        for(unsigned int i = 0; i < 256; ++i) {
+            temp = i;
+            for(int j = 8; j > 0; --j) {
+                if((temp & 1) == 1) {
+                    temp = (unsigned int)((temp >> 1) ^ poly);
+                }else {
+                    temp >>= 1;
+                }
+            }
+            table[i] = temp;
+        }
+    }
+
+    unsigned int update(uint8_t * bytes, size_t size) {
+       crc = crc ^ ~0U;
+        for(unsigned int i = 0; i < size; ++i) {
+            uint8_t index = (uint8_t)(((crc) & 0xff) ^ bytes[i]);
+            crc = (unsigned int)((crc >> 8) ^ table[index]);
+        }
+        crc = crc ^ ~0U;
+        return crc;
+    }
+
+    long getCRC()
+    {
+       return crc;
+    }
+
+private:
+    static unsigned int table[256];
+    static std::atomic<bool> tableInit;
+    unsigned int crc;
+};
+
+static const char MAGIC_BYTES[] = {'N', 'i', 'F', 'i'};
+
+//! Site2SitePeer Class
+class Site2SitePeer
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new site2site peer
+        */
+       Site2SitePeer(std::string host, uint16_t port) {
+               _logger = Logger::getLogger();
+               _configure = Configure::getConfigure();
+               _socket = 0;
+               _host = host;
+               _port = port;
+               _yieldExpiration = 0;
+               _timeOut = 30000; // 30 seconds
+               _url = "nifi://" + _host + ":" + std::to_string(_port);
+       }
+       //! Destructor
+       virtual ~Site2SitePeer() { Close();}
+       //! Set Processor yield period in MilliSecond
+       void setYieldPeriodMsec(uint64_t period) {
+               _yieldPeriodMsec = period;
+       }
+       //! get URL
+       std::string getURL() {
+               return _url;
+       }
+       //! Get Processor yield period in MilliSecond
+       uint64_t getYieldPeriodMsec(void) {
+               return(_yieldPeriodMsec);
+       }
+       //! Yield based on the yield period
+       void yield()
+       {
+               _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+       }
+       //! setHostName
+       void setHostName(std::string host)
+       {
+               _host = host;
+               _url = "nifi://" + _host + ":" + std::to_string(_port);
+       }
+       //! setPort
+       void setPort(uint16_t port)
+       {
+               _port = port;
+               _url = "nifi://" + _host + ":" + std::to_string(_port);
+       }
+       //! getHostName
+       std::string getHostName()
+       {
+               return _host;
+       }
+       //! getPort
+       uint16_t getPort()
+       {
+               return _port;
+       }
+       //! Yield based on the input time
+       void yield(uint64_t time)
+       {
+               _yieldExpiration = (getTimeMillis() + time);
+       }
+       //! whether need be to yield
+       bool isYield()
+       {
+               if (_yieldExpiration > 0)
+                       return (_yieldExpiration >= getTimeMillis());
+               else
+                       return false;
+       }
+       // clear yield expiration
+       void clearYield()
+       {
+               _yieldExpiration = 0;
+       }
+       //! Yield based on the yield period
+       void yield(std::string portId)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+               _yieldExpirationPortIdMap[portId] = yieldExpiration;
+       }
+       //! Yield based on the input time
+       void yield(std::string portId, uint64_t time)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               uint64_t yieldExpiration = (getTimeMillis() + time);
+               _yieldExpirationPortIdMap[portId] = yieldExpiration;
+       }
+       //! whether need be to yield
+       bool isYield(std::string portId)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               std::map<std::string, uint64_t>::iterator it = 
this->_yieldExpirationPortIdMap.find(portId);
+               if (it != _yieldExpirationPortIdMap.end())
+               {
+                       uint64_t yieldExpiration = it->second;
+                       return (yieldExpiration >= getTimeMillis());
+               }
+               else
+               {
+                       return false;
+               }
+       }
+       //! clear yield expiration
+       void clearYield(std::string portId)
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               std::map<std::string, uint64_t>::iterator it = 
this->_yieldExpirationPortIdMap.find(portId);
+               if (it != _yieldExpirationPortIdMap.end())
+               {
+                       _yieldExpirationPortIdMap.erase(portId);
+               }
+       }
+       //! setTimeOut
+       void setTimeOut(uint64_t time)
+       {
+               _timeOut = time;
+       }
+       //! getTimeOut
+       uint64_t getTimeOut()
+       {
+               return _timeOut;
+       }
+       int write(uint8_t value, CRC32 *crc = NULL)
+       {
+               return sendData(&value, 1, crc);
+       }
+       int write(char value, CRC32 *crc = NULL)
+       {
+               return sendData((uint8_t *)&value, 1, crc);
+       }
+       int write(uint32_t value, CRC32 *crc = NULL)
+       {
+               uint8_t temp[4];
+
+               temp[0] = (value & 0xFF000000) >> 24;
+               temp[1] = (value & 0x00FF0000) >> 16;
+               temp[2] = (value & 0x0000FF00) >> 8;
+               temp[3] = (value & 0x000000FF);
+               return sendData(temp, 4, crc);
+       }
+       int write(uint16_t value, CRC32 *crc = NULL)
+       {
+               uint8_t temp[2];
+               temp[0] = (value & 0xFF00) >> 8;
+               temp[1] = (value & 0xFF);
+               return sendData(temp, 2, crc);
+       }
+       int write(uint8_t *value, int len, CRC32 *crc = NULL)
+       {
+               return sendData(value, len, crc);
+       }
+       int write(uint64_t value, CRC32 *crc = NULL)
+       {
+               uint8_t temp[8];
+
+               temp[0] = (value >> 56) & 0xFF;
+               temp[1] = (value >> 48) & 0xFF;
+               temp[2] = (value >> 40) & 0xFF;
+               temp[3] = (value >> 32) & 0xFF;
+               temp[4] = (value >> 24) & 0xFF;
+               temp[5] = (value >> 16) & 0xFF;
+               temp[6] = (value >>  8) & 0xFF;
+               temp[7] = (value >>  0) & 0xFF;
+               return sendData(temp, 8, crc);
+       }
+       int write(bool value, CRC32 *crc = NULL)
+       {
+               uint8_t temp = value;
+               return write(temp, crc);
+       }
+       int writeUTF(std::string str, bool widen = false, CRC32 *crc = NULL);
+       int read(uint8_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf;
+
+               int ret = readData(&buf, 1, crc);
+               if (ret == 1)
+                       value = buf;
+               return ret;
+       }
+       int read(uint16_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf[2];
+
+               int ret = readData(buf, 2, crc);
+               if (ret == 2)
+                       value = (buf[0] << 8) | buf[1];
+               return ret;
+       }
+       int read(char &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf;
+
+               int ret = readData(&buf, 1, crc);
+               if (ret == 1)
+                       value = (char) buf;
+               return ret;
+       }
+       int read(uint8_t *value, int len, CRC32 *crc = NULL)
+       {
+               return readData(value, len, crc);
+       }
+       int read(uint32_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf[4];
+
+               int ret = readData(buf, 4, crc);
+               if (ret == 4)
+                       value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) 
| buf[3];
+               return ret;
+       }
+       int read(uint64_t &value, CRC32 *crc = NULL)
+       {
+               uint8_t buf[8];
+
+               int ret = readData(buf, 8, crc);
+               if (ret == 8)
+               {
+                       value = ((uint64_t) buf[0] << 56) |
+                                       ((uint64_t) (buf[1] & 255) << 48) |
+                                       ((uint64_t) (buf[2] & 255) << 40) |
+                                       ((uint64_t) (buf[3] & 255) << 32) |
+                                       ((uint64_t) (buf[4] & 255) << 24) |
+                                       ((uint64_t) (buf[5] & 255) << 16) |
+                                       ((uint64_t) (buf[6] & 255) <<  8) |
+                                       ((uint64_t) (buf[7] & 255) <<  0);
+               }
+               return ret;
+       }
+       int readUTF(std::string &str, bool widen = false, CRC32 *crc = NULL);
+       //! open connection to the peer
+       bool Open();
+       //! close connection to the peer
+       void Close();
+       //! Send Data via the socket, return -1 for failure
+       int sendData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
+       //! Read length into buf, return -1 for failure and 0 for EOF
+       int readData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
+       //! Select on the socket
+       int Select(int msec);
+
+protected:
+
+private:
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! S2S server Name
+       std::string _host;
+       //! S2S server port
+       uint16_t _port;
+       //! socket to server
+       int _socket;
+       //! URL
+       std::string _url;
+       //! socket timeout;
+       std::atomic<uint64_t> _timeOut;
+       //! Logger
+       Logger *_logger;
+       //! Configure
+       Configure *_configure;
+       //! Yield Period in Milliseconds
+       std::atomic<uint64_t> _yieldPeriodMsec;
+       //! Yield Expiration
+       std::atomic<uint64_t> _yieldExpiration;
+       //! Yield Expiration per destination PortID
+       std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Site2SitePeer(const Site2SitePeer &parent);
+       Site2SitePeer &operator=(const Site2SitePeer &parent);
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TailFile.h b/libminifi/include/TailFile.h
new file mode 100644
index 0000000..5c4ba09
--- /dev/null
+++ b/libminifi/include/TailFile.h
@@ -0,0 +1,93 @@
+/**
+ * @file TailFile.h
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TAIL_FILE_H__
+#define __TAIL_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! TailFile Class
+class TailFile : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       TailFile(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+               _stateRecovered = false;
+       }
+       //! Destructor
+       virtual ~TailFile()
+       {
+               storeState();
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property FileName;
+       static Property StateFile;
+       //! Supported Relationships
+       static Relationship Success;
+
+public:
+       //! OnTrigger method, implemented by NiFi TailFile
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi TailFile
+       virtual void initialize(void);
+       //! recoverState
+       void recoverState();
+       //! storeState
+       void storeState();
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+       std::string _fileLocation;
+       //! Property Specified Tailed File Name
+       std::string _fileName;
+       //! File to save state
+       std::string _stateFile;
+       //! State related to the tailed file
+       std::string _currentTailFileName;
+       uint64_t _currentTailFilePosition;
+       bool _stateRecovered;
+       uint64_t _currentTailFileCreatedTime;
+       //! Utils functions for parse state file
+       std::string trimLeft(const std::string& s);
+       std::string trimRight(const std::string& s);
+       void parseStateFileLine(char *buf);
+       void checkRollOver();
+
+};
+
+//! Matched File Item for Roll over check
+typedef struct {
+       std::string fileName;
+       uint64_t modifiedTime;
+} TailMatchedFileItem;
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/TimeUtil.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimeUtil.h b/libminifi/include/TimeUtil.h
new file mode 100644
index 0000000..b024245
--- /dev/null
+++ b/libminifi/include/TimeUtil.h
@@ -0,0 +1,82 @@
+/**
+ * @file TimeUtil.h
+ * Basic Time Utility 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TIME_UTIL_H__
+#define __TIME_UTIL_H__
+
+#include <time.h>
+#include <sys/time.h>
+#include <string.h>
+#include <unistd.h>
+#include <string.h>
+#include <iostream>
+
+#ifdef __MACH__
+#include <mach/clock.h>
+#include <mach/mach.h>
+#endif
+
+inline uint64_t getTimeMillis()
+{
+       uint64_t value;
+
+       timeval time;
+       gettimeofday(&time, NULL);
+       value = ((uint64_t) (time.tv_sec) * 1000) + (time.tv_usec / 1000);
+
+       return value;
+}
+
+inline uint64_t getTimeNano()
+{
+       struct timespec ts;
+       
+#ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time
+       clock_serv_t cclock;
+       mach_timespec_t mts;
+       host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
+       clock_get_time(cclock, &mts);
+       mach_port_deallocate(mach_task_self(), cclock);
+       ts.tv_sec = mts.tv_sec;
+       ts.tv_nsec = mts.tv_nsec;
+#else
+       clock_gettime(CLOCK_REALTIME, &ts);
+#endif
+
+       return ((uint64_t) (ts.tv_sec) * 1000000000 + ts.tv_nsec);
+}
+
+//! Convert millisecond since UTC to a time display string
+inline std::string getTimeStr(uint64_t msec)
+{
+       char date[120];
+       time_t second = (time_t) (msec/1000);
+       msec = msec % 1000;
+       strftime(date, sizeof(date) / sizeof(*date), "%Y-%m-%d %H:%M:%S",
+                    localtime(&second));
+
+       std::string ret = date;
+       date[0] = '\0';
+       sprintf(date, ".%03llu", (unsigned long long) msec);
+
+       ret += date;
+       return ret;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h 
b/libminifi/include/TimerDrivenSchedulingAgent.h
new file mode 100644
index 0000000..9195745
--- /dev/null
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -0,0 +1,66 @@
+/**
+ * @file TimerDrivenSchedulingAgent.h
+ * TimerDrivenSchedulingAgent class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TIMER_DRIVEN_SCHEDULING_AGENT_H__
+#define __TIMER_DRIVEN_SCHEDULING_AGENT_H__
+
+#include "Logger.h"
+#include "Configure.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "SchedulingAgent.h"
+
+//! TimerDrivenSchedulingAgent Class
+class TimerDrivenSchedulingAgent : public SchedulingAgent
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       TimerDrivenSchedulingAgent()
+       : SchedulingAgent()
+       {
+       }
+       //! Destructor
+       virtual ~TimerDrivenSchedulingAgent()
+       {
+       }
+       //! Run function for the thread
+       static void run(TimerDrivenSchedulingAgent *agent, Processor 
*processor);
+
+public:
+       //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent
+       virtual void schedule(Processor *processor);
+       //! unschedule, overwritten by different 
DrivenTimerDrivenSchedulingAgent
+       virtual void unschedule(Processor *processor);
+
+protected:
+
+private:
+       //! Threads
+       std::map<std::string, std::vector<std::thread *>> _threads;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent);
+       TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent 
&parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
new file mode 100644
index 0000000..d7fd95b
--- /dev/null
+++ b/libminifi/src/Configure.cpp
@@ -0,0 +1,167 @@
+/**
+ * @file Configure.cpp
+ * Configure class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Configure.h"
+
+Configure *Configure::_configure(NULL);
+const char *Configure::nifi_flow_configuration_file = 
"nifi.flow.configuration.file";
+const char *Configure::nifi_administrative_yield_duration = 
"nifi.administrative.yield.duration";
+const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
+const char *Configure::nifi_server_name = "nifi.server.name";
+const char *Configure::nifi_server_port = "nifi.server.port";
+const char *Configure::nifi_server_report_interval= 
"nifi.server.report.interval";
+
+
+//! Get the config value
+bool Configure::get(std::string key, std::string &value)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+       std::map<std::string,std::string>::iterator it = _properties.find(key);
+
+       if (it != _properties.end())
+       {
+               value = it->second;
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+// Trim String utils
+std::string Configure::trim(const std::string& s)
+{
+    return trimRight(trimLeft(s));
+}
+
+std::string Configure::trimLeft(const std::string& s)
+{
+       const char *WHITESPACE = " \n\r\t";
+    size_t startpos = s.find_first_not_of(WHITESPACE);
+    return (startpos == std::string::npos) ? "" : s.substr(startpos);
+}
+
+std::string Configure::trimRight(const std::string& s)
+{
+       const char *WHITESPACE = " \n\r\t";
+    size_t endpos = s.find_last_not_of(WHITESPACE);
+    return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1);
+}
+
+//! Parse one line in configure file like key=value
+void Configure::parseConfigureFileLine(char *buf)
+{
+       char *line = buf;
+
+    while ((line[0] == ' ') || (line[0] =='\t'))
+       ++line;
+
+    char first = line[0];
+    if ((first == '\0') || (first == '#')  || (first == '\r') || (first == 
'\n') || (first == '='))
+    {
+       return;
+    }
+
+    char *equal = strchr(line, '=');
+    if (equal == NULL)
+    {
+       return;
+    }
+
+    equal[0] = '\0';
+    std::string key = line;
+
+    equal++;
+    while ((equal[0] == ' ') || (equal[0] == '\t'))
+       ++equal;
+
+    first = equal[0];
+    if ((first == '\0') || (first == '\r') || (first== '\n'))
+    {
+       return;
+    }
+
+    std::string value = equal;
+    key = trimRight(key);
+    value = trimRight(value);
+    set(key, value);
+}
+
+//! Load Configure File
+void Configure::loadConfigureFile(const char *fileName)
+{
+
+    std::string adjustedFilename;
+    if (fileName)
+    {
+        // perform a naive determination if this is a relative path
+        if (fileName[0] != '/')
+        {
+            adjustedFilename = adjustedFilename + _configure->getHome() + "/" 
+ fileName;
+        }
+        else
+        {
+            adjustedFilename += fileName;
+        }
+    }
+    char *path = NULL;
+    char full_path[PATH_MAX];
+    path = realpath(adjustedFilename.c_str(), full_path);
+    _logger->log_info("Using configuration file located at %s", path);
+
+    std::ifstream file(path, std::ifstream::in);
+    if (!file.good())
+    {
+        _logger->log_error("load configure file failed %s", path);
+        return;
+    }
+    this->clear();
+    const unsigned int bufSize = 512;
+    char buf[bufSize];
+    for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize))
+    {
+        parseConfigureFileLine(buf);
+    }
+}
+
+//! Parse Command Line
+void Configure::parseCommandLine(int argc, char **argv)
+{
+       int i;
+       bool keyFound = false;
+       std::string key, value;
+
+       for (i = 1; i < argc; i++)
+       {
+               if (argv[i][0] == '-' && argv[i][1] != '\0')
+               {
+                       keyFound = true;
+                       key = &argv[i][1];
+                       continue;
+               }
+               if (keyFound)
+               {
+                       value = argv[i];
+                       set(key,value);
+                       keyFound = false;
+               }
+       }
+       return;
+}

Reply via email to