http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Provenance.h b/libminifi/include/Provenance.h
deleted file mode 100644
index 3ba9792..0000000
--- a/libminifi/include/Provenance.h
+++ /dev/null
@@ -1,604 +0,0 @@
-/**
- * @file Provenance.h
- * Flow file record class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PROVENANCE_H__
-#define __PROVENANCE_H__
-
-#include <ftw.h>
-#include <uuid/uuid.h>
-#include <atomic>
-#include <cstdint>
-#include <cstring>
-#include <iostream>
-#include <map>
-#include <set>
-#include <string>
-#include <thread>
-#include <vector>
-
-#include "Configure.h"
-#include "Connection.h"
-#include "FlowFileRecord.h"
-#include "Logger.h"
-#include "Property.h"
-#include "ResourceClaim.h"
-#include "io/Serializable.h"
-#include "utils/TimeUtil.h"
-#include "Repository.h"
-
-class ProvenanceRepository;
-
-//! Provenance Event Record
-class ProvenanceEventRecord : protected Serializable
-{
-public:
-       enum ProvenanceEventType {
-
-           /**
-            * A CREATE event is used when a FlowFile is generated from data 
that was
-            * not received from a remote system or external process
-            */
-           CREATE,
-
-           /**
-            * Indicates a provenance event for receiving data from an external 
process. This Event Type
-            * is expected to be the first event for a FlowFile. As such, a 
Processor that receives data
-            * from an external source and uses that data to replace the 
content of an existing FlowFile
-            * should use the {@link #FETCH} event type, rather than the 
RECEIVE event type.
-            */
-           RECEIVE,
-
-           /**
-            * Indicates that the contents of a FlowFile were overwritten using 
the contents of some
-            * external resource. This is similar to the {@link #RECEIVE} event 
but varies in that
-            * RECEIVE events are intended to be used as the event that 
introduces the FlowFile into
-            * the system, whereas FETCH is used to indicate that the contents 
of an existing FlowFile
-            * were overwritten.
-            */
-           FETCH,
-
-           /**
-            * Indicates a provenance event for sending data to an external 
process
-            */
-           SEND,
-
-           /**
-            * Indicates that the contents of a FlowFile were downloaded by a 
user or external entity.
-            */
-           DOWNLOAD,
-
-           /**
-            * Indicates a provenance event for the conclusion of an object's 
life for
-            * some reason other than object expiration
-            */
-           DROP,
-
-           /**
-            * Indicates a provenance event for the conclusion of an object's 
life due
-            * to the fact that the object could not be processed in a timely 
manner
-            */
-           EXPIRE,
-
-           /**
-            * FORK is used to indicate that one or more FlowFile was derived 
from a
-            * parent FlowFile.
-            */
-           FORK,
-
-           /**
-            * JOIN is used to indicate that a single FlowFile is derived from 
joining
-            * together multiple parent FlowFiles.
-            */
-           JOIN,
-
-           /**
-            * CLONE is used to indicate that a FlowFile is an exact duplicate 
of its
-            * parent FlowFile.
-            */
-           CLONE,
-
-           /**
-            * CONTENT_MODIFIED is used to indicate that a FlowFile's content 
was
-            * modified in some way. When using this Event Type, it is 
advisable to
-            * provide details about how the content is modified.
-            */
-           CONTENT_MODIFIED,
-
-           /**
-            * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's 
attributes were
-            * modified in some way. This event is not needed when another 
event is
-            * reported at the same time, as the other event will already 
contain all
-            * FlowFile attributes.
-            */
-           ATTRIBUTES_MODIFIED,
-
-           /**
-            * ROUTE is used to show that a FlowFile was routed to a specified
-            * {@link org.apache.nifi.processor.Relationship Relationship} and 
should provide
-            * information about why the FlowFile was routed to this 
relationship.
-            */
-           ROUTE,
-
-           /**
-            * Indicates a provenance event for adding additional information 
such as a
-            * new linkage to a new URI or UUID
-            */
-           ADDINFO,
-
-           /**
-            * Indicates a provenance event for replaying a FlowFile. The UUID 
of the
-            * event will indicate the UUID of the original FlowFile that is 
being
-            * replayed. The event will contain exactly one Parent UUID that is 
also the
-            * UUID of the FlowFile that is being replayed and exactly one 
Child UUID
-            * that is the UUID of the a newly created FlowFile that will be 
re-queued
-            * for processing.
-            */
-           REPLAY
-       };
-       friend class ProcessSession;
-public:
-       //! Constructor
-       /*!
-        * Create a new provenance event record
-        */
-       ProvenanceEventRecord(ProvenanceEventType event, std::string 
componentId, std::string componentType) {
-               _eventType = event;
-               _componentId = componentId;
-               _componentType = componentType;
-               _eventTime = getTimeMillis();
-               char eventIdStr[37];
-               // Generate the global UUID for th event
-               uuid_generate(_eventId);
-               uuid_unparse_lower(_eventId, eventIdStr);
-               _eventIdStr = eventIdStr;
-               logger_ = Logger::getLogger();
-       }
-
-       ProvenanceEventRecord() {
-                       _eventTime = getTimeMillis();
-                       logger_ = Logger::getLogger();
-       }
-
-       //! Destructor
-       virtual ~ProvenanceEventRecord() {
-       }
-       //! Get the Event ID
-       std::string getEventId() {
-               return _eventIdStr;
-       }
-       //! Get Attributes
-       std::map<std::string, std::string> getAttributes() {
-               return _attributes;
-       }
-       //! Get Size
-       uint64_t getFileSize() {
-               return _size;
-       }
-       // ! Get Offset
-       uint64_t getFileOffset() {
-               return _offset;
-       }
-       // ! Get Entry Date
-       uint64_t getFlowFileEntryDate() {
-               return _entryDate;
-       }
-       // ! Get Lineage Start Date
-       uint64_t getlineageStartDate() {
-               return _lineageStartDate;
-       }
-       // ! Get Event Time
-       uint64_t getEventTime() {
-               return _eventTime;
-       }
-       // ! Get Event Duration
-       uint64_t getEventDuration() {
-               return _eventDuration;
-       }
-       //! Set Event Duration
-       void setEventDuration(uint64_t duration)
-       {
-               _eventDuration = duration;
-       }
-       // ! Get Event Type
-       ProvenanceEventType getEventType() {
-               return _eventType;
-       }
-       //! Get Component ID
-       std::string getComponentId()
-       {
-               return _componentId;
-       }
-       //! Get Component Type
-       std::string getComponentType()
-       {
-               return _componentType;
-       }
-       //! Get FlowFileUuid
-       std::string getFlowFileUuid()
-       {
-               return _uuid;
-       }
-       //! Get content full path
-       std::string getContentFullPath()
-       {
-               return _contentFullPath;
-       }
-       //! Get LineageIdentifiers
-       std::set<std::string> getLineageIdentifiers()
-       {
-               return _lineageIdentifiers;
-       }
-       //! Get Details
-       std::string getDetails()
-       {
-               return _details;
-       }
-       //! Set Details
-       void setDetails(std::string details)
-       {
-               _details = details;
-       }
-       //! Get TransitUri
-       std::string getTransitUri()
-       {
-               return _transitUri;
-       }
-       //! Set TransitUri
-       void setTransitUri(std::string uri)
-       {
-               _transitUri = uri;
-       }
-       //! Get SourceSystemFlowFileIdentifier
-       std::string getSourceSystemFlowFileIdentifier()
-       {
-               return _sourceSystemFlowFileIdentifier;
-       }
-       //! Set SourceSystemFlowFileIdentifier
-       void setSourceSystemFlowFileIdentifier(std::string identifier)
-       {
-               _sourceSystemFlowFileIdentifier = identifier;
-       }
-       //! Get Parent UUIDs
-       std::vector<std::string> getParentUuids()
-       {
-               return _parentUuids;
-       }
-       //! Add Parent UUID
-       void addParentUuid(std::string uuid)
-       {
-               if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) 
!= _parentUuids.end())
-                       return;
-               else
-                       _parentUuids.push_back(uuid);
-       }
-       //! Add Parent Flow File
-       void addParentFlowFile(FlowFileRecord *flow)
-       {
-               addParentUuid(flow->getUUIDStr());
-               return;
-       }
-       //! Remove Parent UUID
-       void removeParentUuid(std::string uuid)
-       {
-               _parentUuids.erase(std::remove(_parentUuids.begin(), 
_parentUuids.end(), uuid), _parentUuids.end());
-       }
-       //! Remove Parent Flow File
-       void removeParentFlowFile(FlowFileRecord *flow)
-       {
-               removeParentUuid(flow->getUUIDStr());
-               return;
-       }
-       //! Get Children UUIDs
-       std::vector<std::string> getChildrenUuids()
-       {
-               return _childrenUuids;
-       }
-       //! Add Child UUID
-       void addChildUuid(std::string uuid)
-       {
-               if (std::find(_childrenUuids.begin(), _childrenUuids.end(), 
uuid) != _childrenUuids.end())
-                       return;
-               else
-                       _childrenUuids.push_back(uuid);
-       }
-       //! Add Child Flow File
-       void addChildFlowFile(FlowFileRecord *flow)
-       {
-               addChildUuid(flow->getUUIDStr());
-               return;
-       }
-       //! Remove Child UUID
-       void removeChildUuid(std::string uuid)
-       {
-               _childrenUuids.erase(std::remove(_childrenUuids.begin(), 
_childrenUuids.end(), uuid), _childrenUuids.end());
-       }
-       //! Remove Child Flow File
-       void removeChildFlowFile(FlowFileRecord *flow)
-       {
-               removeChildUuid(flow->getUUIDStr());
-               return;
-       }
-       //! Get AlternateIdentifierUri
-       std::string getAlternateIdentifierUri()
-       {
-               return _alternateIdentifierUri;
-       }
-       //! Set AlternateIdentifierUri
-       void setAlternateIdentifierUri(std::string uri)
-       {
-               _alternateIdentifierUri = uri;
-       }
-       //! Get Relationship
-       std::string getRelationship()
-       {
-               return _relationship;
-       }
-       //! Set Relationship
-       void setRelationship(std::string relation)
-       {
-               _relationship = relation;
-       }
-       //! Get sourceQueueIdentifier
-       std::string getSourceQueueIdentifier()
-       {
-               return _sourceQueueIdentifier;
-       }
-       //! Set sourceQueueIdentifier
-       void setSourceQueueIdentifier(std::string identifier)
-       {
-               _sourceQueueIdentifier = identifier;
-       }
-       //! fromFlowFile
-       void fromFlowFile(FlowFileRecord *flow)
-       {
-               _entryDate = flow->getEntryDate();
-               _lineageStartDate = flow->getlineageStartDate();
-               _lineageIdentifiers = flow->getlineageIdentifiers();
-               _uuid = flow->getUUIDStr();
-               _attributes = flow->getAttributes();
-               _size = flow->getSize();
-               _offset = flow->getOffset();
-               if (flow->getOriginalConnection())
-                       _sourceQueueIdentifier = 
flow->getOriginalConnection()->getName();
-               if (flow->getResourceClaim())
-               {
-                       _contentFullPath = 
flow->getResourceClaim()->getContentFullPath();
-               }
-       }
-       //! Serialize and Persistent to the repository
-       bool Serialize(ProvenanceRepository *repo);
-       //! DeSerialize
-       bool DeSerialize(const uint8_t *buffer, const int bufferSize);
-       //! DeSerialize
-       bool DeSerialize(DataStream &stream)
-       {
-               return DeSerialize(stream.getBuffer(),stream.getSize());
-       }
-       //! DeSerialize
-       bool DeSerialize(ProvenanceRepository *repo, std::string key);
-
-protected:
-
-       //! Event type
-       ProvenanceEventType _eventType;
-       //! Date at which the event was created
-       uint64_t _eventTime;
-       //! Date at which the flow file entered the flow
-       uint64_t _entryDate;
-       //! Date at which the origin of this flow file entered the flow
-       uint64_t _lineageStartDate;
-       //! Event Duration
-       uint64_t _eventDuration;
-       //! Component ID
-       std::string _componentId;
-       //! Component Type
-       std::string _componentType;
-       //! Size in bytes of the data corresponding to this flow file
-       uint64_t _size;
-       //! flow uuid
-       std::string _uuid;
-       //! Offset to the content
-       uint64_t _offset;
-       //! Full path to the content
-       std::string _contentFullPath;
-       //! Attributes key/values pairs for the flow record
-       std::map<std::string, std::string> _attributes;
-       //! provenance ID
-       uuid_t _eventId;
-       //! UUID string for all parents
-       std::set<std::string> _lineageIdentifiers;
-       //! transitUri
-       std::string _transitUri;
-       //! sourceSystemFlowFileIdentifier
-       std::string _sourceSystemFlowFileIdentifier;
-       //! parent UUID
-       std::vector<std::string> _parentUuids;
-       //! child UUID
-       std::vector<std::string> _childrenUuids;
-       //! detail
-       std::string _details;
-       //! sourceQueueIdentifier
-       std::string _sourceQueueIdentifier;
-       //! event ID Str
-       std::string _eventIdStr;
-       //! relationship
-       std::string _relationship;
-       //! alternateIdentifierUri;
-       std::string _alternateIdentifierUri;
-
-private:
-
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-       
-       // Prevent default copy constructor and assignment operation
-       // Only support pass by reference or pointer
-       ProvenanceEventRecord(const ProvenanceEventRecord &parent);
-       ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent);
-
-};
-
-//! Provenance Reporter
-class ProvenanceReporter
-{
-       friend class ProcessSession;
-public:
-       //! Constructor
-       /*!
-        * Create a new provenance reporter associated with the process session
-        */
-       ProvenanceReporter(std::string componentId, std::string componentType) {
-               logger_ = Logger::getLogger();
-               _componentId = componentId;
-               _componentType = componentType;
-       }
-
-       //! Destructor
-       virtual ~ProvenanceReporter() {
-               clear();
-       }
-       //! Get events
-       std::set<ProvenanceEventRecord *> getEvents()
-       {
-               return _events;
-       }
-       //! Add event
-       void add(ProvenanceEventRecord *event)
-       {
-               _events.insert(event);
-       }
-       //! Remove event
-       void remove(ProvenanceEventRecord *event)
-       {
-               if (_events.find(event) != _events.end())
-               {
-                       _events.erase(event);
-               }
-       }
-       //!
-       //! clear
-       void clear()
-       {
-               for (auto it : _events)
-               {
-                       delete it;
-               }
-               _events.clear();
-       }
-       //! allocate
-       ProvenanceEventRecord 
*allocate(ProvenanceEventRecord::ProvenanceEventType eventType, FlowFileRecord 
*flow)
-       {
-               ProvenanceEventRecord *event = new 
ProvenanceEventRecord(eventType, _componentId, _componentType);
-               if (event)
-                       event->fromFlowFile(flow);
-
-               return event;
-       }
-       //! commit
-       void commit();
-       //! create
-       void create(FlowFileRecord *flow, std::string detail);
-       //! route
-       void route(FlowFileRecord *flow, Relationship relation, std::string 
detail, uint64_t processingDuration);
-       //! modifyAttributes
-       void modifyAttributes(FlowFileRecord *flow, std::string detail);
-       //! modifyContent
-       void modifyContent(FlowFileRecord *flow, std::string detail, uint64_t 
processingDuration);
-       //! clone
-       void clone(FlowFileRecord *parent, FlowFileRecord *child);
-       //! join
-       void join(std::vector<FlowFileRecord *> parents, FlowFileRecord *child, 
std::string detail, uint64_t processingDuration);
-       //! fork
-       void fork(std::vector<FlowFileRecord *> child, FlowFileRecord *parent, 
std::string detail, uint64_t processingDuration);
-       //! expire
-       void expire(FlowFileRecord *flow, std::string detail);
-       //! drop
-       void drop(FlowFileRecord *flow, std::string reason);
-       //! send
-       void send(FlowFileRecord *flow, std::string transitUri, std::string 
detail, uint64_t processingDuration, bool force);
-       //! fetch
-       void fetch(FlowFileRecord *flow, std::string transitUri, std::string 
detail, uint64_t processingDuration);
-       //! receive
-       void receive(FlowFileRecord *flow, std::string transitUri, std::string 
sourceSystemFlowFileIdentifier, std::string detail, uint64_t 
processingDuration);
-
-protected:
-
-       //! Component ID
-       std::string _componentId;
-       //! Component Type
-       std::string _componentType;
-
-private:
-
-       //! Incoming connection Iterator
-       std::set<ProvenanceEventRecord *> _events;
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-
-       // Prevent default copy constructor and assignment operation
-       // Only support pass by reference or pointer
-       ProvenanceReporter(const ProvenanceReporter &parent);
-       ProvenanceReporter &operator=(const ProvenanceReporter &parent);
-};
-
-#define PROVENANCE_DIRECTORY "./provenance_repository"
-#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
-#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
-#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
-
-//! Provenance Repository
-class ProvenanceRepository : public Repository
-{
-public:
-       //! Constructor
-       /*!
-        * Create a new provenance repository
-        */
-       ProvenanceRepository()
-        : Repository(Repository::PROVENANCE, PROVENANCE_DIRECTORY,
-                       MAX_PROVENANCE_ENTRY_LIFE_TIME, 
MAX_PROVENANCE_STORAGE_SIZE, PROVENANCE_PURGE_PERIOD)
-       {
-       }
-
-       //! Destructor
-       virtual ~ProvenanceRepository() {
-       }
-
-       //! Persistent event
-       void registerEvent(ProvenanceEventRecord *event)
-       {
-               event->Serialize(this);
-       }
-       //! Remove event
-       void removeEvent(ProvenanceEventRecord *event)
-       {
-               Delete(event->getEventId());
-       }
-
-protected:
-
-private:
-
-       // Prevent default copy constructor and assignment operation
-       // Only support pass by reference or pointer
-       ProvenanceRepository(const ProvenanceRepository &parent);
-       ProvenanceRepository &operator=(const ProvenanceRepository &parent);
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/PutFile.h b/libminifi/include/PutFile.h
deleted file mode 100644
index 015605e..0000000
--- a/libminifi/include/PutFile.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * @file PutFile.h
- * PutFile class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __PUT_FILE_H__
-#define __PUT_FILE_H__
-
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-
-//! PutFile Class
-class PutFile : public Processor
-{
-public:
-
-       static const std::string CONFLICT_RESOLUTION_STRATEGY_REPLACE;
-       static const std::string CONFLICT_RESOLUTION_STRATEGY_IGNORE;
-       static const std::string CONFLICT_RESOLUTION_STRATEGY_FAIL;
-
-       //! Constructor
-       /*!
-        * Create a new processor
-        */
-       PutFile(std::string name, uuid_t uuid = NULL)
-       : Processor(name, uuid)
-       {
-               logger_ = Logger::getLogger();
-       }
-       //! Destructor
-       virtual ~PutFile()
-       {
-       }
-       //! Processor Name
-       static const std::string ProcessorName;
-       //! Supported Properties
-       static Property Directory;
-       static Property ConflictResolution;
-       //! Supported Relationships
-       static Relationship Success;
-       static Relationship Failure;
-
-       //! OnTrigger method, implemented by NiFi PutFile
-       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
-       //! Initialize, over write by NiFi PutFile
-       virtual void initialize(void);
-
-       class ReadCallback : public InputStreamCallback
-       {
-       public:
-               ReadCallback(const std::string &tmpFile, const std::string 
&destFile);
-               ~ReadCallback();
-               virtual void process(std::ifstream *stream);
-               bool commit();
-
-       private:
-               std::shared_ptr<Logger> logger_;
-               std::ofstream _tmpFileOs;
-               bool _writeSucceeded = false;
-               std::string _tmpFile;
-               std::string _destFile;
-       };
-
-protected:
-
-private:
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-
-       bool putFile(ProcessSession *session, FlowFileRecord *flowFile, const 
std::string &tmpFile, const std::string &destFile);
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/RealTimeDataCollector.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RealTimeDataCollector.h 
b/libminifi/include/RealTimeDataCollector.h
deleted file mode 100644
index 3b6d05f..0000000
--- a/libminifi/include/RealTimeDataCollector.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * @file RealTimeDataCollector.h
- * RealTimeDataCollector class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REAL_TIME_DATA_COLLECTOR_H__
-#define __REAL_TIME_DATA_COLLECTOR_H__
-
-#include <stdio.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <string>
-#include <errno.h>
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-
-//! RealTimeDataCollector Class
-class RealTimeDataCollector : public Processor
-{
-public:
-       //! Constructor
-       /*!
-        * Create a new processor
-        */
-       RealTimeDataCollector(std::string name, uuid_t uuid = NULL)
-       : Processor(name, uuid)
-       {
-               _realTimeSocket = 0;
-               _batchSocket = 0;
-               logger_ = Logger::getLogger();
-               _firstInvoking = false;
-               _realTimeAccumulated = 0;
-               _batchAcccumulated = 0;
-               _queuedDataSize = 0;
-       }
-       //! Destructor
-       virtual ~RealTimeDataCollector()
-       {
-               if (_realTimeSocket)
-                       close(_realTimeSocket);
-               if (_batchSocket)
-                       close(_batchSocket);
-               if (_fileStream.is_open())
-                       _fileStream.close();
-       }
-       //! Processor Name
-       static const std::string ProcessorName;
-       //! Supported Properties
-       static Property REALTIMESERVERNAME;
-       static Property REALTIMESERVERPORT;
-       static Property BATCHSERVERNAME;
-       static Property BATCHSERVERPORT;
-       static Property FILENAME;
-       static Property ITERATION;
-       static Property REALTIMEMSGID;
-       static Property BATCHMSGID;
-       static Property REALTIMEINTERVAL;
-       static Property BATCHINTERVAL;
-       static Property BATCHMAXBUFFERSIZE;
-       //! Supported Relationships
-       static Relationship Success;
-       //! Connect to the socket
-       int connectServer(const char *host, uint16_t port);
-       int sendData(int socket, const char *buf, int buflen);
-       void onTriggerRealTime(ProcessContext *context, ProcessSession 
*session);
-       void onTriggerBatch(ProcessContext *context, ProcessSession *session);
-
-public:
-       //! OnTrigger method, implemented by NiFi RealTimeDataCollector
-       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
-       //! Initialize, over write by NiFi RealTimeDataCollector
-       virtual void initialize(void);
-
-protected:
-
-private:
-       //! realtime server Name
-       std::string _realTimeServerName;
-       int64_t _realTimeServerPort;
-       std::string _batchServerName;
-       int64_t _batchServerPort;
-       int64_t _realTimeInterval;
-       int64_t _batchInterval;
-       int64_t _batchMaxBufferSize;
-       //! Match pattern for Real time Message ID
-       std::vector<std::string> _realTimeMsgID;
-       //! Match pattern for Batch Message ID
-       std::vector<std::string> _batchMsgID;
-       //! file for which the realTime collector will tail
-       std::string _fileName;
-       //! Whether we need to iterate from the beginning for demo
-       bool _iteration;
-       int _realTimeSocket;
-       int _batchSocket;
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-       //! Mutex for protection
-       std::mutex _mtx;
-       //! Queued data size
-       uint64_t _queuedDataSize;
-       //! Queue for the batch process
-       std::queue<std::string> _queue;
-       std::thread::id _realTimeThreadId;
-       std::thread::id _batchThreadId;
-       std::atomic<bool> _firstInvoking;
-       int64_t _realTimeAccumulated;
-       int64_t _batchAcccumulated;
-       std::ifstream _fileStream;
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Relationship.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Relationship.h b/libminifi/include/Relationship.h
deleted file mode 100644
index 3454ee5..0000000
--- a/libminifi/include/Relationship.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * @file Relationship.h
- * Relationship class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __RELATIONSHIP_H__
-#define __RELATIONSHIP_H__
-
-#include <string>
-#include <uuid/uuid.h>
-#include <vector>
-#include <queue>
-#include <map>
-#include <mutex>
-#include <atomic>
-
-//! undefined relationship for remote process group outgoing port and root 
process group incoming port
-#define UNDEFINED_RELATIONSHIP "undefined"
-
-inline bool isRelationshipNameUndefined(std::string name)
-{
-       if (name == UNDEFINED_RELATIONSHIP)
-               return true;
-       else
-               return false;
-}
-
-//! Relationship Class
-class Relationship {
-
-public:
-       //! Constructor
-       /*!
-        * Create a new relationship 
-        */
-       Relationship(const std::string name, const std::string description)
-               : _name(name), _description(description) {
-       }
-       Relationship()
-               : _name(UNDEFINED_RELATIONSHIP) {
-       }
-       //! Destructor
-       virtual ~Relationship() {
-       }
-       //! Get Name for the relationship
-       std::string getName() {
-               return _name;
-       }
-       //! Get Description for the relationship
-       std::string getDescription() {
-               return _description;
-       }
-       //! Compare
-       bool operator < (const Relationship & right) const {
-               return _name < right._name;
-       }
-       //! Whether it is a undefined relationship
-       bool isRelationshipUndefined()
-       {
-               return isRelationshipNameUndefined(_name);
-       }
-
-protected:
-
-       //! Name
-       std::string _name;
-       //! Description
-       std::string _description;
-
-private:
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h 
b/libminifi/include/RemoteProcessorGroupPort.h
index 05ecd17..e9a4228 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -20,71 +20,91 @@
 #ifndef __REMOTE_PROCESSOR_GROUP_PORT_H__
 #define __REMOTE_PROCESSOR_GROUP_PORT_H__
 
+#include <mutex>
 #include <memory>
+#include <stack>
 #include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
 #include "Site2SiteClientProtocol.h"
 
-//! RemoteProcessorGroupPort Class
-class RemoteProcessorGroupPort: public Processor {
-public:
-       //! Constructor
-       /*!
-        * Create a new processor
-        */
-       RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL) :
-                       Processor(name, uuid), direction_(SEND), 
transmitting_(false), peer_() {
-               logger_ = Logger::getLogger();
-               protocol_ = std::unique_ptr<Site2SiteClientProtocol>(
-                               new Site2SiteClientProtocol(0));
-               protocol_->setPortId(uuid);
-       }
-       //! Destructor
-       virtual ~RemoteProcessorGroupPort() {
 
-       }
-       //! Processor Name
-       static const std::string ProcessorName;
-       //! Supported Properties
-       static Property hostName;
-       static Property port;
-       //! Supported Relationships
-       static Relationship relation;
-public:
-       //! OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
-       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
-       //! Initialize, over write by NiFi RemoteProcessorGroupPort
-       virtual void initialize(void);
-       //! Set Direction
-       void setDirection(TransferDirection direction) {
-               direction_ = direction;
-               if (direction_ == RECEIVE)
-                       this->setTriggerWhenEmpty(true);
-       }
-       //! Set Timeout
-       void setTimeOut(uint64_t timeout) {
-               protocol_->setTimeOut(timeout);
-       }
-       //! SetTransmitting
-       void setTransmitting(bool val) {
-               transmitting_ = val;
-       }
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+// RemoteProcessorGroupPort Class
+class RemoteProcessorGroupPort :
+    public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid),
+        direction_(SEND),
+        transmitting_(false){
+    logger_ = logging::Logger::getLogger();
+    uuid_copy(protocol_uuid_,uuid);
+  }
+  // Destructor
+  virtual ~RemoteProcessorGroupPort() {
 
-protected:
+  }
+  // Processor Name
+  static const std::string ProcessorName;
+  // Supported Properties
+  static core::Property hostName;
+  static core::Property port;
+  // Supported Relationships
+  static core::Relationship relation;
+ public:
+  // OnTrigger method, implemented by NiFi RemoteProcessorGroupPort
+  virtual void onTrigger(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Initialize, over write by NiFi RemoteProcessorGroupPort
+  virtual void initialize(void);
+  // Set Direction
+  void setDirection(TransferDirection direction) {
+    direction_ = direction;
+    if (direction_ == RECEIVE)
+      this->setTriggerWhenEmpty(true);
+  }
+  // Set Timeout
+  void setTimeOut(uint64_t timeout) {
+    timeout_ = timeout;
+  }
+  // SetTransmitting
+  void setTransmitting(bool val) {
+    transmitting_ = val;
+  }
 
-private:
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-       //! Peer Connection
-       Site2SitePeer peer_;
-       //! Peer Protocol
-       std::unique_ptr<Site2SiteClientProtocol> protocol_;
-       //! Transaction Direction
-       TransferDirection direction_;
-       //! Transmitting
-       bool transmitting_;
+ protected:
 
+ private:
+   
+  std::unique_ptr<Site2SiteClientProtocol> getNextProtocol();
+  void returnProtocol(std::unique_ptr<Site2SiteClientProtocol> protocol);
+   
+  std::stack<std::unique_ptr<Site2SiteClientProtocol>> available_protocols_;
+  std::mutex protocol_mutex_;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Transaction Direction
+  TransferDirection direction_;
+  // Transmitting
+  bool transmitting_;
+  // timeout
+  uint64_t timeout_;
+  
+  uuid_t protocol_uuid_;
+ 
 };
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Repository.h b/libminifi/include/Repository.h
deleted file mode 100644
index 55fb442..0000000
--- a/libminifi/include/Repository.h
+++ /dev/null
@@ -1,318 +0,0 @@
-/**
- * @file Repository 
- * Repository class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REPOSITORY_H__
-#define __REPOSITORY_H__
-
-#include <ftw.h>
-#include <uuid/uuid.h>
-#include <atomic>
-#include <cstdint>
-#include <cstring>
-#include <iostream>
-#include <map>
-#include <set>
-#include <string>
-#include <thread>
-#include <vector>
-
-#ifdef LEVELDB_SUPPORT
-#include "leveldb/db.h"
-#include "leveldb/options.h"
-#include "leveldb/slice.h"
-#include "leveldb/status.h"
-#endif
-#include "Configure.h"
-#include "Connection.h"
-#include "FlowFileRecord.h"
-#include "Logger.h"
-#include "Property.h"
-#include "ResourceClaim.h"
-#include "io/Serializable.h"
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-
-//! Repository
-class Repository
-{
-public:
-       enum RepositoryType {
-               //! Provenance Repo Type
-               PROVENANCE,
-               //! FlowFile Repo Type
-               FLOWFILE,
-               MAX_REPO_TYPE
-       };
-       static const char *RepositoryTypeStr[MAX_REPO_TYPE];
-       //! Constructor
-       /*!
-        * Create a new provenance repository
-        */
-       Repository(RepositoryType type, std::string directory, 
-               int64_t maxPartitionMillis, int64_t maxPartitionBytes, uint64_t 
purgePeriod) {
-               _type = type;
-               _directory = directory;
-               _maxPartitionMillis = maxPartitionMillis;
-               _maxPartitionBytes = maxPartitionBytes;
-               _purgePeriod = purgePeriod;
-               logger_ = Logger::getLogger();
-               configure_ = Configure::getConfigure();
-#ifdef LEVELDB_SUPPORT
-               _db = NULL;
-#endif
-               _thread = NULL;
-               _running = false;
-               _repoFull = false;
-               _enable = true;
-       }
-
-       //! Destructor
-       virtual ~Repository() {
-               stop();
-               if (this->_thread)
-                       delete this->_thread;
-               destroy();
-       }
-
-       //! initialize
-       virtual bool initialize()
-       {
-               std::string value;
-
-#ifdef LEVELDB_SUPPORT
-               if (_type == PROVENANCE)
-               {
-                       if 
(!(configure_->get(Configure::nifi_provenance_repository_enable, value)
-                                       && StringUtils::StringToBool(value, 
_enable))) {
-                               _enable = true;
-                       }
-                       if (!_enable)
-                               return false;
-                       if 
(configure_->get(Configure::nifi_provenance_repository_directory_default, 
value))
-                       {
-                               _directory = value;
-                       }
-                       logger_->log_info("NiFi Provenance Repository Directory 
%s", _directory.c_str());
-                       if 
(configure_->get(Configure::nifi_provenance_repository_max_storage_size, value))
-                       {
-                               Property::StringToInt(value, 
_maxPartitionBytes);
-                       }
-                       logger_->log_info("NiFi Provenance Max Partition Bytes 
%d", _maxPartitionBytes);
-                       if 
(configure_->get(Configure::nifi_provenance_repository_max_storage_time, value))
-                       {
-                               TimeUnit unit;
-                               if (Property::StringToTime(value, 
_maxPartitionMillis, unit) &&
-                                                       
Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
-                               {
-                               }
-                       }
-                       logger_->log_info("NiFi Provenance Max Storage Time: 
[%d] ms", _maxPartitionMillis);
-                       leveldb::Options options;
-                       options.create_if_missing = true;
-                       leveldb::Status status = leveldb::DB::Open(options, 
_directory.c_str(), &_db);
-                       if (status.ok())
-                       {
-                               logger_->log_info("NiFi Provenance Repository 
database open %s success", _directory.c_str());
-                       }
-                       else
-                       {
-                               logger_->log_error("NiFi Provenance Repository 
database open %s fail", _directory.c_str());
-                               return false;
-                       }
-               }
-
-               if (_type == FLOWFILE)
-               {
-                       if 
(!(configure_->get(Configure::nifi_flowfile_repository_enable, value)
-                                       && StringUtils::StringToBool(value, 
_enable))) {
-                               _enable = true;
-                       }
-                       if (!_enable)
-                               return false;
-                       if 
(configure_->get(Configure::nifi_flowfile_repository_directory_default, value))
-                       {
-                               _directory = value;
-                       }
-                       logger_->log_info("NiFi FlowFile Repository Directory 
%s", _directory.c_str());
-                       if 
(configure_->get(Configure::nifi_flowfile_repository_max_storage_size, value))
-                       {
-                               Property::StringToInt(value, 
_maxPartitionBytes);
-                       }
-                       logger_->log_info("NiFi FlowFile Max Partition Bytes 
%d", _maxPartitionBytes);
-                       if 
(configure_->get(Configure::nifi_flowfile_repository_max_storage_time, value))
-                       {
-                               TimeUnit unit;
-                               if (Property::StringToTime(value, 
_maxPartitionMillis, unit) &&
-                                                       
Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis))
-                               {
-                               }
-                       }
-                       logger_->log_info("NiFi FlowFile Max Storage Time: [%d] 
ms", _maxPartitionMillis);
-                       leveldb::Options options;
-                       options.create_if_missing = true;
-                       leveldb::Status status = leveldb::DB::Open(options, 
_directory.c_str(), &_db);
-                       if (status.ok())
-                       {
-                               logger_->log_info("NiFi FlowFile Repository 
database open %s success", _directory.c_str());
-                       }
-                       else
-                       {
-                               logger_->log_error("NiFi FlowFile Repository 
database open %s fail", _directory.c_str());
-                               return false;
-                       }
-               }
-
-               return true;
-#else
-               return false;
-#endif
-       }
-       //! Put
-       virtual bool Put(std::string key, uint8_t *buf, int bufLen)
-       {
-#ifdef LEVELDB_SUPPORT
-               if (!_enable)
-                       return false;
-                       
-               // persistent to the DB
-               leveldb::Slice value((const char *) buf, bufLen);
-               leveldb::Status status;
-               status = _db->Put(leveldb::WriteOptions(), key, value);
-               if (status.ok())
-                       return true;
-               else
-                       return false;
-#else
-               return false;
-#endif
-       }
-       //! Delete
-       virtual bool Delete(std::string key)
-       {
-#ifdef LEVELDB_SUPPORT
-               if (!_enable)
-                       return false;
-               leveldb::Status status;
-               status = _db->Delete(leveldb::WriteOptions(), key);
-               if (status.ok())
-                       return true;
-               else
-                       return false;
-#else
-               return false;
-#endif
-       }
-       //! Get
-       virtual bool Get(std::string key, std::string &value)
-       {
-#ifdef LEVELDB_SUPPORT
-               if (!_enable)
-                       return false;
-               leveldb::Status status;
-               status = _db->Get(leveldb::ReadOptions(), key, &value);
-               if (status.ok())
-                       return true;
-               else
-                       return false;
-#else
-               return false;
-#endif
-       }
-       //! Run function for the thread
-       static void run(Repository *repo);
-       //! Start the repository monitor thread
-       virtual void start();
-       //! Stop the repository monitor thread
-       virtual void stop();
-       //! whether the repo is full
-       virtual bool isFull()
-       {
-               return _repoFull;
-       }
-       //! whether the repo is enable 
-       virtual bool isEnable()
-       {
-               return _enable;
-       }
-
-protected:
-       //! Repo Type
-       RepositoryType _type;
-       //! Mutex for protection
-       std::mutex _mtx;
-       //! repository directory
-       std::string _directory;
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-       //! Configure
-       //! max db entry life time
-       Configure *configure_;
-       int64_t _maxPartitionMillis;
-       //! max db size
-       int64_t _maxPartitionBytes;
-       //! purge period
-       uint64_t _purgePeriod;
-#ifdef LEVELDB_SUPPORT
-       //! level DB database
-       leveldb::DB* _db;
-#endif
-       //! thread
-       std::thread *_thread;
-       //! whether the monitoring thread is running for the repo while it was 
enabled 
-       bool _running;
-       //! whether it is enabled by minfi property for the repo 
-       bool _enable;
-       //! whether stop accepting provenace event
-       std::atomic<bool> _repoFull;
-       //! repoSize
-       uint64_t repoSize();
-       //! size of the directory
-       static uint64_t _repoSize[MAX_REPO_TYPE];
-       //! call back for directory size
-       static int repoSumProvenance(const char *fpath, const struct stat *sb, 
int typeflag)
-       {
-               _repoSize[PROVENANCE] += sb->st_size;
-               return 0;
-       }
-       //! call back for directory size
-       static int repoSumFlowFile(const char *fpath, const struct stat *sb, 
int typeflag)
-       {
-               _repoSize[FLOWFILE] += sb->st_size;
-               return 0;
-       }
-
-private:
-       //! destroy
-       void destroy()
-       {
-#ifdef LEVELDB_SUPPORT
-               if (_db)
-               {
-                       delete _db;
-                       _db = NULL;
-               }
-#endif
-       }
-       // Prevent default copy constructor and assignment operation
-       // Only support pass by reference or pointer
-       Repository(const Repository &parent);
-       Repository &operator=(const Repository &parent);
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h 
b/libminifi/include/ResourceClaim.h
index 7ca79a3..4c5438c 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -27,75 +27,85 @@
 #include <map>
 #include <mutex>
 #include <atomic>
-#include "Configure.h"
+#include "properties/Configure.h"
 
-//! Default content directory
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+
+// Default content directory
 #define DEFAULT_CONTENT_DIRECTORY "./content_repository"
 
 
 
-//! ResourceClaim Class
+// ResourceClaim Class
 class ResourceClaim {
 
 public:
   
        static std::string default_directory_path;
-       //! Constructor
+       // Constructor
        /*!
         * Create a new resource claim
         */
        ResourceClaim(const std::string contentDirectory = 
default_directory_path);
-       //! Destructor
+       // Destructor
        virtual ~ResourceClaim() {}
-       //! increaseFlowFileRecordOwnedCount
+       // increaseFlowFileRecordOwnedCount
        void increaseFlowFileRecordOwnedCount()
        {
                ++_flowFileRecordOwnedCount;
        }
-       //! decreaseFlowFileRecordOwenedCount
+       // decreaseFlowFileRecordOwenedCount
        void decreaseFlowFileRecordOwnedCount()
        {
                --_flowFileRecordOwnedCount;
        }
-       //! getFlowFileRecordOwenedCount
+       // getFlowFileRecordOwenedCount
        uint64_t getFlowFileRecordOwnedCount()
        {
                return _flowFileRecordOwnedCount;
        }
-       //! Get the content full path
+       // Get the content full path
        std::string getContentFullPath()
        {
                return _contentFullPath;
        }
-       //! Set the content full path
+       // Set the content full path
        void setContentFullPath(std::string path)
        {
                _contentFullPath = path;
        }
 
 protected:
-       //! A global unique identifier
+       // A global unique identifier
        uuid_t _uuid;
-       //! A local unique identifier
+       // A local unique identifier
        uint64_t _id;
-       //! Full path to the content
+       // Full path to the content
        std::string _contentFullPath;
 
-       //! How many FlowFileRecord Own this cliam
+       // How many FlowFileRecord Own this cliam
        std::atomic<uint64_t> _flowFileRecordOwnedCount;
 
 private:
-       //! Configure
+       // Configure
        Configure *configure_;
-       //! Logger
-       std::shared_ptr<Logger> logger_;
+       // Logger
+       std::shared_ptr<logging::Logger> logger_;
        // Prevent default copy constructor and assignment operation
        // Only support pass by reference or pointer
        ResourceClaim(const ResourceClaim &parent);
        ResourceClaim &operator=(const ResourceClaim &parent);
 
-       //! Local resource claim number
+       // Local resource claim number
        static std::atomic<uint64_t> _localResourceClaimNumber;
 };
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index f6d5a1c..0493640 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -28,72 +28,92 @@
 #include <algorithm>
 #include <thread>
 #include "utils/TimeUtil.h"
-#include "Logger.h"
-#include "Configure.h"
+#include "core/core.h"
+#include "core/logging/Logger.h"
+#include "properties/Configure.h"
 #include "FlowFileRecord.h"
-#include "Logger.h"
-#include "Processor.h"
-#include "ProcessContext.h"
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "provenance/ProvenanceRepository.h"
 
-//! SchedulingAgent Class
-class SchedulingAgent
-{
-public:
-       //! Constructor
-       /*!
-        * Create a new processor
-        */
-       SchedulingAgent() {
-               configure_ = Configure::getConfigure();
-               logger_ = Logger::getLogger();
-               _running = false;
-       }
-       //! Destructor
-       virtual ~SchedulingAgent()
-       {
 
-       }
-       //! onTrigger, return whether the yield is need
-       bool onTrigger(Processor *processor, ProcessContext *processContext, 
ProcessSessionFactory *sessionFactory);
-       //! Whether agent has work to do
-       bool hasWorkToDo(Processor *processor);
-       //! Whether the outgoing need to be backpressure
-       bool hasTooMuchOutGoing(Processor *processor);
-       //! start
-       void start() {
-               _running = true;
-       }
-       //! stop
-       void stop() {
-               _running = false;
-       }
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
-public:
-       //! schedule, overwritten by different DrivenSchedulingAgent
-       virtual void schedule(Processor *processor) = 0;
-       //! unschedule, overwritten by different DrivenSchedulingAgent
-       virtual void unschedule(Processor *processor) = 0;
 
-protected:
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-       //! Configure
-       Configure *configure_;
-       //! Mutex for protection
-       std::mutex _mtx;
-       //! Whether it is running
-       std::atomic<bool> _running;
-       //! AdministrativeYieldDuration
-       int64_t _administrativeYieldDuration;
-       //! BoredYieldDuration
-       int64_t _boredYieldDuration;
+// SchedulingAgent Class
+class SchedulingAgent {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  SchedulingAgent(std::shared_ptr<core::Repository> repo) {
+    configure_ = Configure::getConfigure();
+    logger_ = logging::Logger::getLogger();
+    running_ = false;
+    repo_ = repo;
+  }
+  // Destructor
+  virtual ~SchedulingAgent() {
 
-private:
-       // Prevent default copy constructor and assignment operation
-       // Only support pass by reference or pointer
-       SchedulingAgent(const SchedulingAgent &parent);
-       SchedulingAgent &operator=(const SchedulingAgent &parent);
+  }
+  // onTrigger, return whether the yield is need
+  bool onTrigger(
+      std::shared_ptr<core::Processor> processor,
+      core::ProcessContext *processContext,
+      core::ProcessSessionFactory *sessionFactory);
+  // Whether agent has work to do
+  bool hasWorkToDo(std::shared_ptr<core::Processor> processor);
+  // Whether the outgoing need to be backpressure
+  bool hasTooMuchOutGoing(
+      std::shared_ptr<core::Processor> processor);
+  // start
+  void start() {
+    running_ = true;
+  }
+  // stop
+  void stop() {
+    running_ = false;
+  }
+
+ public:
+  // schedule, overwritten by different DrivenSchedulingAgent
+  virtual void schedule(
+      std::shared_ptr<core::Processor> processor) = 0;
+  // unschedule, overwritten by different DrivenSchedulingAgent
+  virtual void unschedule(
+      std::shared_ptr<core::Processor> processor) = 0;
+
+  SchedulingAgent(const SchedulingAgent &parent) = delete;
+  SchedulingAgent &operator=(const SchedulingAgent &parent) = delete;
+ protected:
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Configure
+  Configure *configure_;
+  // Mutex for protection
+  std::mutex mutex_;
+  // Whether it is running
+  std::atomic<bool> running_;
+  // AdministrativeYieldDuration
+  int64_t _administrativeYieldDuration;
+  // BoredYieldDuration
+  int64_t _boredYieldDuration;
+
+  std::shared_ptr<core::Repository> repo_;
+
+ private:
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
 
 };
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h 
b/libminifi/include/Site2SiteClientProtocol.h
index 444eed5..6120e3e 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -34,16 +34,23 @@
 #include <thread>
 #include <algorithm>
 #include <uuid/uuid.h>
-#include "Configure.h"
-#include "Property.h"
+
+#include "core/Property.h"
+#include "properties/Configure.h"
 #include "Site2SitePeer.h"
 #include "FlowFileRecord.h"
-#include "Logger.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
 #include "io/CRCStream.h"
 
-//! Resource Negotiated Status Code
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+// Resource Negotiated Status Code
 #define RESOURCE_OK 20
 #define DIFFERENT_RESOURCE_VERSION 21
 #define NEGOTIATED_ABORT 255
@@ -55,143 +62,144 @@
  * transferred between a client and a remote NiFi instance.
  */
 typedef enum {
-       /**
-        * * The client is to send data to the remote instance.
-        * */
-       SEND,
-       /**
-        * * The client is to receive data from the remote instance.
-        * */
-       RECEIVE
+  /**
+   * * The client is to send data to the remote instance.
+   * */
+  SEND,
+  /**
+   * * The client is to receive data from the remote instance.
+   * */
+  RECEIVE
 } TransferDirection;
 
-//! Peer State
+// Peer State
 typedef enum {
-       /**
-        * * IDLE
-        * */
-       IDLE = 0,
-       /**
-        * * Socket Established
-        * */
-       ESTABLISHED,
-       /**
-        * * HandShake Done
-        * */
-       HANDSHAKED,
-       /**
-        * * After CodeDec Completion
-        * */
-       READY
+  /**
+   * * IDLE
+   * */
+  IDLE = 0,
+  /**
+   * * Socket Established
+   * */
+  ESTABLISHED,
+  /**
+   * * HandShake Done
+   * */
+  HANDSHAKED,
+  /**
+   * * After CodeDec Completion
+   * */
+  READY
 } PeerState;
 
-//! Transaction State
+// Transaction State
 typedef enum {
-       /**
-        * * Transaction has been started but no data has been sent or received.
-        * */
-       TRANSACTION_STARTED,
-       /**
-        * * Transaction has been started and data has been sent or received.
-        * */
-       DATA_EXCHANGED,
-       /**
-        * * Data that has been transferred has been confirmed via its CRC.
-        * * Transaction is ready to be completed.
-        * */
-       TRANSACTION_CONFIRMED,
-       /**
-        * * Transaction has been successfully completed.
-        * */
-       TRANSACTION_COMPLETED,
-       /**
-        * * The Transaction has been canceled.
-        * */
-       TRANSACTION_CANCELED,
-       /**
-        * * The Transaction ended in an error.
-        * */
-       TRANSACTION_ERROR
+  /**
+   * * Transaction has been started but no data has been sent or received.
+   * */
+  TRANSACTION_STARTED,
+  /**
+   * * Transaction has been started and data has been sent or received.
+   * */
+  DATA_EXCHANGED,
+  /**
+   * * Data that has been transferred has been confirmed via its CRC.
+   * * Transaction is ready to be completed.
+   * */
+  TRANSACTION_CONFIRMED,
+  /**
+   * * Transaction has been successfully completed.
+   * */
+  TRANSACTION_COMPLETED,
+  /**
+   * * The Transaction has been canceled.
+   * */
+  TRANSACTION_CANCELED,
+  /**
+   * * The Transaction ended in an error.
+   * */
+  TRANSACTION_ERROR
 } TransactionState;
 
-//! Request Type
+// Request Type
 typedef enum {
-       NEGOTIATE_FLOWFILE_CODEC = 0,
-       REQUEST_PEER_LIST,
-       SEND_FLOWFILES,
-       RECEIVE_FLOWFILES,
-       SHUTDOWN,
-       MAX_REQUEST_TYPE
+  NEGOTIATE_FLOWFILE_CODEC = 0,
+  REQUEST_PEER_LIST,
+  SEND_FLOWFILES,
+  RECEIVE_FLOWFILES,
+  SHUTDOWN,
+  MAX_REQUEST_TYPE
 } RequestType;
 
-//! Request Type Str
+// Request Type Str
 static const char *RequestTypeStr[MAX_REQUEST_TYPE] = {
-               "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", 
"SEND_FLOWFILES",
-               "RECEIVE_FLOWFILES", "SHUTDOWN" };
+    "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES",
+    "RECEIVE_FLOWFILES", "SHUTDOWN" };
 
-//! Respond Code
+// Respond Code
 typedef enum {
-       RESERVED = 0,
-       // ResponseCode, so that we can indicate a 0 followed by some other 
bytes
-
-       // handshaking properties
-       PROPERTIES_OK = 1,
-       UNKNOWN_PROPERTY_NAME = 230,
-       ILLEGAL_PROPERTY_VALUE = 231,
-       MISSING_PROPERTY = 232,
-       // transaction indicators
-       CONTINUE_TRANSACTION = 10,
-       FINISH_TRANSACTION = 11,
-       CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum
-       TRANSACTION_FINISHED = 13,
-       TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14,
-       CANCEL_TRANSACTION = 15,
-       BAD_CHECKSUM = 19,
-       // data availability indicators
-       MORE_DATA = 20,
-       NO_MORE_DATA = 21,
-       // port state indicators
-       UNKNOWN_PORT = 200,
-       PORT_NOT_IN_VALID_STATE = 201,
-       PORTS_DESTINATION_FULL = 202,
-       // authorization
-       UNAUTHORIZED = 240,
-       // error indicators
-       ABORT = 250,
-       UNRECOGNIZED_RESPONSE_CODE = 254,
-       END_OF_STREAM = 255
+  RESERVED = 0,
+  // ResponseCode, so that we can indicate a 0 followed by some other bytes
+
+  // handshaking properties
+  PROPERTIES_OK = 1,
+  UNKNOWN_PROPERTY_NAME = 230,
+  ILLEGAL_PROPERTY_VALUE = 231,
+  MISSING_PROPERTY = 232,
+  // transaction indicators
+  CONTINUE_TRANSACTION = 10,
+  FINISH_TRANSACTION = 11,
+  CONFIRM_TRANSACTION = 12,  // "Explanation" of this code is the checksum
+  TRANSACTION_FINISHED = 13,
+  TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14,
+  CANCEL_TRANSACTION = 15,
+  BAD_CHECKSUM = 19,
+  // data availability indicators
+  MORE_DATA = 20,
+  NO_MORE_DATA = 21,
+  // port state indicators
+  UNKNOWN_PORT = 200,
+  PORT_NOT_IN_VALID_STATE = 201,
+  PORTS_DESTINATION_FULL = 202,
+  // authorization
+  UNAUTHORIZED = 240,
+  // error indicators
+  ABORT = 250,
+  UNRECOGNIZED_RESPONSE_CODE = 254,
+  END_OF_STREAM = 255
 } RespondCode;
 
-//! Respond Code Class
+// Respond Code Class
 typedef struct {
-       RespondCode code;
-       const char *description;
-       bool hasDescription;
+  RespondCode code;
+  const char *description;
+  bool hasDescription;
 } RespondCodeContext;
 
-//! Respond Code Context
+// Respond Code Context
 static RespondCodeContext respondCodeContext[] = { { RESERVED,
-               "Reserved for Future Use", false }, { PROPERTIES_OK, 
"Properties OK",
-               false }, { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true 
}, {
-               ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, {
-               MISSING_PROPERTY, "Missing Property", true }, { 
CONTINUE_TRANSACTION,
-               "Continue Transaction", false }, { FINISH_TRANSACTION,
-               "Finish Transaction", false }, { CONFIRM_TRANSACTION,
-               "Confirm Transaction", true }, { TRANSACTION_FINISHED,
-               "Transaction Finished", false }, {
-               TRANSACTION_FINISHED_BUT_DESTINATION_FULL,
-               "Transaction Finished But Destination is Full", false }, {
-               CANCEL_TRANSACTION, "Cancel Transaction", true }, { 
BAD_CHECKSUM,
-               "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false 
}, {
-               NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT,
-               "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE,
-               "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL,
-               "Port's Destination is Full", false }, { UNAUTHORIZED,
-               "User Not Authorized", true }, { ABORT, "Abort", true }, {
-               UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false 
}, {
-               END_OF_STREAM, "End of Stream", false } };
-
-//! Respond Code Sequence Pattern
+    "Reserved for Future Use", false },
+    { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME,
+        "Unknown Property Name", true }, { ILLEGAL_PROPERTY_VALUE,
+        "Illegal Property Value", true }, { MISSING_PROPERTY,
+        "Missing Property", true }, { CONTINUE_TRANSACTION,
+        "Continue Transaction", false }, { FINISH_TRANSACTION,
+        "Finish Transaction", false }, { CONFIRM_TRANSACTION,
+        "Confirm Transaction", true }, { TRANSACTION_FINISHED,
+        "Transaction Finished", false }, {
+        TRANSACTION_FINISHED_BUT_DESTINATION_FULL,
+        "Transaction Finished But Destination is Full", false }, {
+        CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM,
+        "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false }, {
+        NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT,
+        "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE,
+        "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL,
+        "Port's Destination is Full", false }, { UNAUTHORIZED,
+        "User Not Authorized", true }, { ABORT, "Abort", true }, {
+        UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, {
+        END_OF_STREAM, "End of Stream", false } };
+
+// Respond Code Sequence Pattern
 static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
 static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
 
@@ -200,43 +208,44 @@ static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 
'C';
  * Protocol.
  */
 typedef enum {
-       /**
-        * Boolean value indicating whether or not the contents of a FlowFile 
should
-        * be GZipped when transferred.
-        */
-       GZIP,
-       /**
-        * The unique identifier of the port to communicate with
-        */
-       PORT_IDENTIFIER,
-       /**
-        * Indicates the number of milliseconds after the request was made that 
the
-        * client will wait for a response. If no response has been received by 
the
-        * time this value expires, the server can move on without attempting to
-        * service the request because the client will have already 
disconnected.
-        */
-       REQUEST_EXPIRATION_MILLIS,
-       /**
-        * The preferred number of FlowFiles that the server should send to the
-        * client when pulling data. This property was introduced in version 5 
of
-        * the protocol.
-        */
-       BATCH_COUNT,
-       /**
-        * The preferred number of bytes that the server should send to the 
client
-        * when pulling data. This property was introduced in version 5 of the
-        * protocol.
-        */
-       BATCH_SIZE,
-       /**
-        * The preferred amount of time that the server should send data to the
-        * client when pulling data. This property was introduced in version 5 
of
-        * the protocol. Value is in milliseconds.
-        */
-       BATCH_DURATION, MAX_HANDSHAKE_PROPERTY
+  /**
+   * Boolean value indicating whether or not the contents of a FlowFile should
+   * be GZipped when transferred.
+   */
+  GZIP,
+  /**
+   * The unique identifier of the port to communicate with
+   */
+  PORT_IDENTIFIER,
+  /**
+   * Indicates the number of milliseconds after the request was made that the
+   * client will wait for a response. If no response has been received by the
+   * time this value expires, the server can move on without attempting to
+   * service the request because the client will have already disconnected.
+   */
+  REQUEST_EXPIRATION_MILLIS,
+  /**
+   * The preferred number of FlowFiles that the server should send to the
+   * client when pulling data. This property was introduced in version 5 of
+   * the protocol.
+   */
+  BATCH_COUNT,
+  /**
+   * The preferred number of bytes that the server should send to the client
+   * when pulling data. This property was introduced in version 5 of the
+   * protocol.
+   */
+  BATCH_SIZE,
+  /**
+   * The preferred amount of time that the server should send data to the
+   * client when pulling data. This property was introduced in version 5 of
+   * the protocol. Value is in milliseconds.
+   */
+  BATCH_DURATION,
+  MAX_HANDSHAKE_PROPERTY
 } HandshakeProperty;
 
-//! HandShakeProperty Str
+// HandShakeProperty Str
 static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
 /**
  * Boolean value indicating whether or not the contents of a FlowFile should
@@ -275,89 +284,89 @@ static const char 
*HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
 
 class Site2SiteClientProtocol;
 
-//! Transaction Class
+// Transaction Class
 class Transaction {
-       friend class Site2SiteClientProtocol;
-public:
-       //! Constructor
-       /*!
-        * Create a new transaction
-        */
-       explicit Transaction(TransferDirection direction,
-                       CRCStream<Site2SitePeer> &stream) :
-                       crcStream(std::move(stream)) {
-               _state = TRANSACTION_STARTED;
-               _direction = direction;
-               _dataAvailable = false;
-               _transfers = 0;
-               _bytes = 0;
-
-               char uuidStr[37];
-
-               // Generate the global UUID for the transaction
-               uuid_generate(_uuid);
-               uuid_unparse_lower(_uuid, uuidStr);
-               _uuidStr = uuidStr;
-       }
-       //! Destructor
-       virtual ~Transaction() {
-       }
-       //! getUUIDStr
-       std::string getUUIDStr() {
-               return _uuidStr;
-       }
-       //! getState
-       TransactionState getState() {
-               return _state;
-       }
-       //! isDataAvailable
-       bool isDataAvailable() {
-               return _dataAvailable;
-       }
-       //! setDataAvailable()
-       void setDataAvailable(bool value) {
-               _dataAvailable = value;
-       }
-       //! getDirection
-       TransferDirection getDirection() {
-               return _direction;
-       }
-       //! getCRC
-       long getCRC() {
-               return crcStream.getCRC();
-       }
-       //! updateCRC
-       void updateCRC(uint8_t *buffer, uint32_t length) {
-               crcStream.updateCRC(buffer, length);
-       }
-
-       CRCStream<Site2SitePeer> &getStream() {
-               return crcStream;
-       }
-       
-       
-       Transaction(const Transaction &parent) = delete;
-       Transaction &operator=(const Transaction &parent) = delete;
-
-protected:
-
-private:
-
-       CRCStream<Site2SitePeer> crcStream;
-       //! Transaction State
-       TransactionState _state;
-       //! Transaction Direction
-       TransferDirection _direction;
-       //! Whether received data is available
-       bool _dataAvailable;
-       //! A global unique identifier
-       uuid_t _uuid;
-       //! UUID string
-       std::string _uuidStr;
-       //! Number of transfer
-       int _transfers;
-       //! Number of content bytes
-       uint64_t _bytes;
+  friend class Site2SiteClientProtocol;
+ public:
+  // Constructor
+  /*!
+   * Create a new transaction
+   */
+  explicit Transaction(
+      TransferDirection direction,
+      org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &stream)
+      : crcStream(std::move(stream)) {
+    _state = TRANSACTION_STARTED;
+    _direction = direction;
+    _dataAvailable = false;
+    _transfers = 0;
+    _bytes = 0;
+
+    char uuidStr[37];
+
+    // Generate the global UUID for the transaction
+    uuid_generate(_uuid);
+    uuid_unparse_lower(_uuid, uuidStr);
+    _uuidStr = uuidStr;
+  }
+  // Destructor
+  virtual ~Transaction() {
+  }
+  // getUUIDStr
+  std::string getUUIDStr() {
+    return _uuidStr;
+  }
+  // getState
+  TransactionState getState() {
+    return _state;
+  }
+  // isDataAvailable
+  bool isDataAvailable() {
+    return _dataAvailable;
+  }
+  // setDataAvailable()
+  void setDataAvailable(bool value) {
+    _dataAvailable = value;
+  }
+  // getDirection
+  TransferDirection getDirection() {
+    return _direction;
+  }
+  // getCRC
+  long getCRC() {
+    return crcStream.getCRC();
+  }
+  // updateCRC
+  void updateCRC(uint8_t *buffer, uint32_t length) {
+    crcStream.updateCRC(buffer, length);
+  }
+
+  org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> &getStream() {
+    return crcStream;
+  }
+
+  Transaction(const Transaction &parent) = delete;
+  Transaction &operator=(const Transaction &parent) = delete;
+
+ protected:
+
+ private:
+
+  org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcStream;
+  // Transaction State
+  TransactionState _state;
+  // Transaction Direction
+  TransferDirection _direction;
+  // Whether received data is available
+  bool _dataAvailable;
+  // A global unique identifier
+  uuid_t _uuid;
+  // UUID string
+  std::string _uuidStr;
+  // Number of transfer
+  int _transfers;
+  // Number of content bytes
+  uint64_t _bytes;
 
 };
 
@@ -366,264 +375,268 @@ private:
  * NiFi instance.
  */
 class DataPacket {
-public:
-       DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
-                       std::map<std::string, std::string> attributes) {
-               _protocol = protocol;
-               _size = 0;
-               _transaction = transaction;
-               _attributes = attributes;
-       }
-       std::map<std::string, std::string> _attributes;
-       uint64_t _size;
-       Site2SiteClientProtocol *_protocol;
-       Transaction *_transaction;
+ public:
+  DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
+             std::map<std::string, std::string> attributes) {
+    _protocol = protocol;
+    _size = 0;
+    _transaction = transaction;
+    _attributes = attributes;
+  }
+  std::map<std::string, std::string> _attributes;
+  uint64_t _size;
+  Site2SiteClientProtocol *_protocol;
+  Transaction *_transaction;
 };
 
-//! Site2SiteClientProtocol Class
+// Site2SiteClientProtocol Class
 class Site2SiteClientProtocol {
-public:
-       //! Constructor
-       /*!
-        * Create a new control protocol
-        */
-       Site2SiteClientProtocol(Site2SitePeer *peer) {
-               logger_ = Logger::getLogger();
-               configure_ = Configure::getConfigure();
-               peer_ = peer;
-               _batchSize = 0;
-               _batchCount = 0;
-               _batchDuration = 0;
-               _batchSendNanos = 5000000000; // 5 seconds
-               _timeOut = 30000; // 30 seconds
-               _peerState = IDLE;
-               _supportedVersion[0] = 5;
-               _supportedVersion[1] = 4;
-               _supportedVersion[2] = 3;
-               _supportedVersion[3] = 2;
-               _supportedVersion[4] = 1;
-               _currentVersion = _supportedVersion[0];
-               _currentVersionIndex = 0;
-               _supportedCodecVersion[0] = 1;
-               _currentCodecVersion = _supportedCodecVersion[0];
-               _currentCodecVersionIndex = 0;
-       }
-       //! Destructor
-       virtual ~Site2SiteClientProtocol() {
-       }
-
-public:
-       //! setBatchSize
-       void setBatchSize(uint64_t size) {
-               _batchSize = size;
-       }
-       //! setBatchCount
-       void setBatchCount(uint64_t count) {
-               _batchCount = count;
-       }
-       //! setBatchDuration
-       void setBatchDuration(uint64_t duration) {
-               _batchDuration = duration;
-       }
-       //! setTimeOut
-       void setTimeOut(uint64_t time) {
-               _timeOut = time;
-               if (peer_)
-                       peer_->setTimeOut(time);
-
-       }
-       
-       void setPeer(Site2SitePeer *peer)
-       {
-         peer_ = peer;
-       }
-       /**
-        * Provides a reference to the time out
-        * @returns timeout
-        */
-       const uint64_t getTimeOut() const {
-               return _timeOut;
-       }
-
-       /**
-        * Provides a reference to the port identifier
-        * @returns port identifier
-        */
-       const std::string getPortId() const {
-               return _portIdStr;
-       }
-       //! setPortId
-       void setPortId(uuid_t id) {
-               uuid_copy(_portId, id);
-               char idStr[37];
-               uuid_unparse_lower(id, idStr);
-               _portIdStr = idStr;
-       }
-       //! getResourceName
-       std::string getResourceName() {
-               return "SocketFlowFileProtocol";
-       }
-       //! getCodecResourceName
-       std::string getCodecResourceName() {
-               return "StandardFlowFileCodec";
-       }
-       //! bootstrap the protocol to the ready for transaction state by going 
through the state machine
-       bool bootstrap();
-       //! establish
-       bool establish();
-       //! handShake
-       bool handShake();
-       //! negotiateCodec
-       bool negotiateCodec();
-       //! initiateResourceNegotiation
-       bool initiateResourceNegotiation();
-       //! initiateCodecResourceNegotiation
-       bool initiateCodecResourceNegotiation();
-       //! tearDown
-       void tearDown();
-       //! write Request Type
-       int writeRequestType(RequestType type);
-       //! read Request Type
-       int readRequestType(RequestType &type);
-       //! read Respond
-       int readRespond(RespondCode &code, std::string &message);
-       //! write respond
-       int writeRespond(RespondCode code, std::string message);
-       //! getRespondCodeContext
-       RespondCodeContext *getRespondCodeContext(RespondCode code) {
-               for (unsigned int i = 0;
-                               i < sizeof(respondCodeContext) / 
sizeof(RespondCodeContext);
-                               i++) {
-                       if (respondCodeContext[i].code == code) {
-                               return &respondCodeContext[i];
-                       }
-               }
-               return NULL;
-       }
-       //! getPeer
-       Site2SitePeer *getPeer() {
-               return peer_;
-       }
-       //! Creation of a new transaction, return the transaction ID if success,
-       //! Return NULL when any error occurs
-       Transaction *createTransaction(std::string &transactionID,
-                       TransferDirection direction);
-       //! Receive the data packet from the transaction
-       //! Return false when any error occurs
-       bool receive(std::string transactionID, DataPacket *packet, bool &eof);
-       //! Send the data packet from the transaction
-       //! Return false when any error occurs
-       bool send(std::string transactionID, DataPacket *packet,
-                       FlowFileRecord *flowFile, ProcessSession *session);
-       //! Confirm the data that was sent or received by comparing CRC32's of 
the data sent and the data received.
-       bool confirm(std::string transactionID);
-       //! Cancel the transaction
-       void cancel(std::string transactionID);
-       //! Complete the transaction
-       bool complete(std::string transactionID);
-       //! Error the transaction
-       void error(std::string transactionID);
-       //! Receive flow files for the process session
-       void receiveFlowFiles(ProcessContext *context, ProcessSession *session);
-       //! Transfer flow files for the process session
-       void transferFlowFiles(ProcessContext *context, ProcessSession 
*session);
-       //! deleteTransaction
-       void deleteTransaction(std::string transactionID);
-       //! Nest Callback Class for write stream
-       class WriteCallback: public OutputStreamCallback {
-       public:
-               WriteCallback(DataPacket *packet) :
-                               _packet(packet) {
-               }
-               DataPacket *_packet;
-               void process(std::ofstream *stream) {
-                       uint8_t buffer[8192];
-                       int len = _packet->_size;
-                       while (len > 0) {
-                               int size = std::min(len, (int) sizeof(buffer));
-                               int ret = 
_packet->_transaction->getStream().readData(buffer,size);
-                               if (ret != size) {
-                                       _packet->_protocol->logger_->log_error(
-                                                       "Site2Site Receive Flow 
Size %d Failed %d", size,
-                                                       ret);
-                                       break;
-                               }
-                               stream->write((const char *) buffer, size);
-                               len -= size;
-                       }
-               }
-       };
-       //! Nest Callback Class for read stream
-       class ReadCallback: public InputStreamCallback {
-       public:
-               ReadCallback(DataPacket *packet) :
-                               _packet(packet) {
-               }
-               DataPacket *_packet;
-               void process(std::ifstream *stream) {
-                       _packet->_size = 0;
-                       uint8_t buffer[8192];
-                       int readSize;
-                       while (stream->good()) {
-                               if (!stream->read((char *) buffer, 8192))
-                                       readSize = stream->gcount();
-                               else
-                                       readSize = 8192;
-                               int ret = 
_packet->_transaction->getStream().writeData(buffer,readSize);
-                               if (ret != readSize) {
-                                       _packet->_protocol->logger_->log_error(
-                                                       "Site2Site Send Flow 
Size %d Failed %d", readSize,
-                                                       ret);
-                                       break;
-                               }
-                               _packet->_size += readSize;
-                       }
-               }
-       };
-
-protected:
-
-private:
-
-       //! Mutex for protection
-       std::mutex _mtx;
-       //! Logger
-       std::shared_ptr<Logger> logger_;
-       //! Configure
-       Configure *configure_;
-       //! Batch Count
-       std::atomic<uint64_t> _batchCount;
-       //! Batch Size
-       std::atomic<uint64_t> _batchSize;
-       //! Batch Duration in msec
-       std::atomic<uint64_t> _batchDuration;
-       //! Timeout in msec
-       std::atomic<uint64_t> _timeOut;
-       //! Peer Connection
-       Site2SitePeer *peer_;
-       //! portId
-       uuid_t _portId;
-       //! portIDStr
-       std::string _portIdStr;
-       //! BATCH_SEND_NANOS
-       uint64_t _batchSendNanos;
-       //! Peer State
-       PeerState _peerState;
-       uint32_t _supportedVersion[5];
-       uint32_t _currentVersion;
-       int _currentVersionIndex;
-       uint32_t _supportedCodecVersion[1];
-       uint32_t _currentCodecVersion;
-       int _currentCodecVersionIndex;
-       //! commsIdentifier
-       std::string _commsIdentifier;
-       //! transaction map
-       std::map<std::string, Transaction *> _transactionMap;
-
-       // Prevent default copy constructor and assignment operation
-       // Only support pass by reference or pointer
-       Site2SiteClientProtocol(const Site2SiteClientProtocol &parent);
-       Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol 
&parent);
+ public:
+  // Constructor
+  /*!
+   * Create a new control protocol
+   */
+  Site2SiteClientProtocol(std::unique_ptr<Site2SitePeer> peer) {
+    logger_ = logging::Logger::getLogger();
+    configure_ = Configure::getConfigure();
+    peer_ = std::move(peer);
+    _batchSize = 0;
+    _batchCount = 0;
+    _batchDuration = 0;
+    _batchSendNanos = 5000000000;  // 5 seconds
+    _timeOut = 30000;  // 30 seconds
+    _peerState = IDLE;
+    _supportedVersion[0] = 5;
+    _supportedVersion[1] = 4;
+    _supportedVersion[2] = 3;
+    _supportedVersion[3] = 2;
+    _supportedVersion[4] = 1;
+    _currentVersion = _supportedVersion[0];
+    _currentVersionIndex = 0;
+    _supportedCodecVersion[0] = 1;
+    _currentCodecVersion = _supportedCodecVersion[0];
+    _currentCodecVersionIndex = 0;
+  }
+  // Destructor
+  virtual ~Site2SiteClientProtocol() {
+    tearDown();
+  }
+
+ public:
+  // setBatchSize
+  void setBatchSize(uint64_t size) {
+    _batchSize = size;
+  }
+  // setBatchCount
+  void setBatchCount(uint64_t count) {
+    _batchCount = count;
+  }
+  // setBatchDuration
+  void setBatchDuration(uint64_t duration) {
+    _batchDuration = duration;
+  }
+  // setTimeOut
+  void setTimeOut(uint64_t time) {
+    _timeOut = time;
+    if (peer_)
+      peer_->setTimeOut(time);
+
+  }
+
+  void setPeer(std::unique_ptr<Site2SitePeer> peer) {
+    peer_ = std::move(peer);
+  }
+  /**
+   * Provides a reference to the time out
+   * @returns timeout
+   */
+  const uint64_t getTimeOut() const {
+    return _timeOut;
+  }
+
+  /**
+   * Provides a reference to the port identifier
+   * @returns port identifier
+   */
+  const std::string getPortId() const {
+    return _portIdStr;
+  }
+  // setPortId
+  void setPortId(uuid_t id) {
+    uuid_copy(_portId, id);
+    char idStr[37];
+    uuid_unparse_lower(id, idStr);
+    _portIdStr = idStr;
+  }
+  // getResourceName
+  std::string getResourceName() {
+    return "SocketFlowFileProtocol";
+  }
+  // getCodecResourceName
+  std::string getCodecResourceName() {
+    return "StandardFlowFileCodec";
+  }
+  // bootstrap the protocol to the ready for transaction state by going 
through the state machine
+  bool bootstrap();
+  // establish
+  bool establish();
+  // handShake
+  bool handShake();
+  // negotiateCodec
+  bool negotiateCodec();
+  // initiateResourceNegotiation
+  bool initiateResourceNegotiation();
+  // initiateCodecResourceNegotiation
+  bool initiateCodecResourceNegotiation();
+  // tearDown
+  void tearDown();
+  // write Request Type
+  int writeRequestType(RequestType type);
+  // read Request Type
+  int readRequestType(RequestType &type);
+  // read Respond
+  int readRespond(RespondCode &code, std::string &message);
+  // write respond
+  int writeRespond(RespondCode code, std::string message);
+  // getRespondCodeContext
+  RespondCodeContext *getRespondCodeContext(RespondCode code) {
+    for (unsigned int i = 0;
+        i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) {
+      if (respondCodeContext[i].code == code) {
+        return &respondCodeContext[i];
+      }
+    }
+    return NULL;
+  }
+
+  // Creation of a new transaction, return the transaction ID if success,
+  // Return NULL when any error occurs
+  Transaction *createTransaction(std::string &transactionID,
+                                 TransferDirection direction);
+  // Receive the data packet from the transaction
+  // Return false when any error occurs
+  bool receive(std::string transactionID, DataPacket *packet, bool &eof);
+  // Send the data packet from the transaction
+  // Return false when any error occurs
+  bool send(std::string transactionID, DataPacket *packet,
+            std::shared_ptr<FlowFileRecord> flowFile,
+            core::ProcessSession *session);
+  // Confirm the data that was sent or received by comparing CRC32's of the 
data sent and the data received.
+  bool confirm(std::string transactionID);
+  // Cancel the transaction
+  void cancel(std::string transactionID);
+  // Complete the transaction
+  bool complete(std::string transactionID);
+  // Error the transaction
+  void error(std::string transactionID);
+  // Receive flow files for the process session
+  void receiveFlowFiles(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // Transfer flow files for the process session
+  void transferFlowFiles(
+      core::ProcessContext *context,
+      core::ProcessSession *session);
+  // deleteTransaction
+  void deleteTransaction(std::string transactionID);
+  // Nest Callback Class for write stream
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(DataPacket *packet)
+        : _packet(packet) {
+    }
+    DataPacket *_packet;
+    void process(std::ofstream *stream) {
+      uint8_t buffer[8192];
+      int len = _packet->_size;
+      while (len > 0) {
+        int size = std::min(len, (int) sizeof(buffer));
+        int ret = _packet->_transaction->getStream().readData(buffer, size);
+        if (ret != size) {
+          _packet->_protocol->logger_->log_error(
+              "Site2Site Receive Flow Size %d Failed %d", size, ret);
+          break;
+        }
+        stream->write((const char *) buffer, size);
+        len -= size;
+      }
+    }
+  };
+  // Nest Callback Class for read stream
+  class ReadCallback : public InputStreamCallback {
+   public:
+    ReadCallback(DataPacket *packet)
+        : _packet(packet) {
+    }
+    DataPacket *_packet;
+    void process(std::ifstream *stream) {
+      _packet->_size = 0;
+      uint8_t buffer[8192];
+      int readSize;
+      while (stream->good()) {
+        if (!stream->read((char *) buffer, 8192))
+          readSize = stream->gcount();
+        else
+          readSize = 8192;
+        int ret = _packet->_transaction->getStream().writeData(buffer,
+                                                               readSize);
+        if (ret != readSize) {
+          _packet->_protocol->logger_->log_error(
+              "Site2Site Send Flow Size %d Failed %d", readSize, ret);
+          break;
+        }
+        _packet->_size += readSize;
+      }
+    }
+  };
+
+ protected:
+
+ private:
+
+  // Mutex for protection
+  std::mutex mutex_;
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  // Configure
+  Configure *configure_;
+  // Batch Count
+  std::atomic<uint64_t> _batchCount;
+  // Batch Size
+  std::atomic<uint64_t> _batchSize;
+  // Batch Duration in msec
+  std::atomic<uint64_t> _batchDuration;
+  // Timeout in msec
+  std::atomic<uint64_t> _timeOut;
+  // Peer Connection
+  std::unique_ptr<Site2SitePeer> peer_;
+  // portId
+  uuid_t _portId;
+  // portIDStr
+  std::string _portIdStr;
+  // BATCH_SEND_NANOS
+  uint64_t _batchSendNanos;
+  // Peer State
+  PeerState _peerState;
+  uint32_t _supportedVersion[5];
+  uint32_t _currentVersion;
+  int _currentVersionIndex;
+  uint32_t _supportedCodecVersion[1];
+  uint32_t _currentCodecVersion;
+  int _currentCodecVersionIndex;
+  // commsIdentifier
+  std::string _commsIdentifier;
+  // transaction map
+  std::map<std::string, Transaction *> _transactionMap;
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  Site2SiteClientProtocol(const Site2SiteClientProtocol &parent);
+  Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol &parent);
 };
 
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif

Reply via email to