http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/src/RealTimeDataCollector.cpp b/src/RealTimeDataCollector.cpp
new file mode 100644
index 0000000..c7118ff
--- /dev/null
+++ b/src/RealTimeDataCollector.cpp
@@ -0,0 +1,482 @@
+/**
+ * @file RealTimeDataCollector.cpp
+ * RealTimeDataCollector class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+
+#include "RealTimeDataCollector.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string 
RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
+Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real 
time processor to process", "data.osp");
+Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", 
"Real Time Server Name", "localhost");
+Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", 
"Real Time Server Port", "10000");
+Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch 
Server Name", "localhost");
+Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch 
Server Port", "10001");
+Property RealTimeDataCollector::ITERATION("Iteration",
+               "If true, sample osp file will be iterated", "true");
+Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real 
Time Message ID", "41");
+Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message 
ID", "172, 30, 48");
+Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real 
Time Data Collection Interval in msec", "10 ms");
+Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch 
Processing Interval in msec", "100 ms");
+Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", 
"Batch Buffer Maximum size in bytes", "262144");
+Relationship RealTimeDataCollector::Success("success", "success operational on 
the flow record");
+
+void RealTimeDataCollector::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(FILENAME);
+       properties.insert(REALTIMESERVERNAME);
+       properties.insert(REALTIMESERVERPORT);
+       properties.insert(BATCHSERVERNAME);
+       properties.insert(BATCHSERVERPORT);
+       properties.insert(ITERATION);
+       properties.insert(REALTIMEMSGID);
+       properties.insert(BATCHMSGID);
+       properties.insert(REALTIMEINTERVAL);
+       properties.insert(BATCHINTERVAL);
+       properties.insert(BATCHMAXBUFFERSIZE);
+
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+
+}
+
+int RealTimeDataCollector::connectServer(const char *host, uint16_t port)
+{
+       in_addr_t addr;
+       int sock = 0;
+       struct hostent *h;
+#ifdef __MACH__
+       h = gethostbyname(host);
+#else
+       char buf[1024];
+       struct hostent he;
+       int hh_errno;
+       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+       sock = socket(AF_INET, SOCK_STREAM, 0);
+       if (sock < 0)
+       {
+               _logger->log_error("Could not create socket to hostName %s", 
host);
+               return 0;
+       }
+
+#ifndef __MACH__
+       int opt = 1;
+       bool nagle_off = true;
+
+       if (nagle_off)
+       {
+               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() TCP_NODELAY failed");
+                       close(sock);
+                       return 0;
+               }
+               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&opt, sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
+                       close(sock);
+                       return 0;
+               }
+       }
+
+       int sndsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_SNDBUF failed");
+               close(sock);
+               return 0;
+       }
+#endif
+
+       struct sockaddr_in sa;
+       socklen_t socklen;
+       int status;
+
+       //TODO bind socket to the interface
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = htonl(INADDR_ANY);
+       sa.sin_port = htons(0);
+       socklen = sizeof(sa);
+       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+       {
+               _logger->log_error("socket bind failed");
+               close(sock);
+               return 0;
+       }
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = addr;
+       sa.sin_port = htons(port);
+       socklen = sizeof(sa);
+
+       status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+       if (status < 0)
+       {
+               _logger->log_error("socket connect failed to %s %d", host, 
port);
+               close(sock);
+               return 0;
+       }
+
+       _logger->log_info("socket %d connect to server %s port %d success", 
sock, host, port);
+
+       return sock;
+}
+
+int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen)
+{
+       int ret = 0, bytes = 0;
+
+       while (bytes < buflen)
+       {
+               ret = send(socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       if (ret)
+               _logger->log_debug("Send data size %d over socket %d", buflen, 
socket);
+
+       return ret;
+}
+
+void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, 
ProcessSession *session)
+{
+       if (_realTimeAccumulated >= this->_realTimeInterval)
+       {
+               std::string value;
+               if (this->getProperty(REALTIMEMSGID.getName(), value))
+               {
+                       this->_realTimeMsgID.clear();
+                       this->_logger->log_info("Real Time Msg IDs %s", 
value.c_str());
+                       std::stringstream lineStream(value);
+                       std::string cell;
+
+                       while(std::getline(lineStream, cell, ','))
+                   {
+                       this->_realTimeMsgID.push_back(cell);
+                       // this->_logger->log_debug("Real Time Msg ID %s", 
cell.c_str());
+                   }
+               }
+               if (this->getProperty(BATCHMSGID.getName(), value))
+               {
+                       this->_batchMsgID.clear();
+                       this->_logger->log_info("Batch Msg IDs %s", 
value.c_str());
+                       std::stringstream lineStream(value);
+                       std::string cell;
+
+                       while(std::getline(lineStream, cell, ','))
+                   {
+                               cell = Property::trim(cell);
+                       this->_batchMsgID.push_back(cell);
+                       // this->_logger->log_debug("Batch Msg ID %s", 
cell.c_str());
+                   }
+               }
+               // _logger->log_info("onTriggerRealTime");
+               // Open the file
+               if (!this->_fileStream.is_open())
+               {
+                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
+                       if (this->_fileStream.is_open())
+                               _logger->log_debug("open %s", 
_fileName.c_str());
+               }
+               if (!_fileStream.good())
+               {
+                       _logger->log_error("load data file failed %s", 
_fileName.c_str());
+                       return;
+               }
+               if (this->_fileStream.is_open())
+               {
+                       std::string line;
+
+                       while (std::getline(_fileStream, line))
+                       {
+                               line += "\n";
+                               std::stringstream lineStream(line);
+                               std::string cell;
+                               if (std::getline(lineStream, cell, ','))
+                               {
+                                       cell = Property::trim(cell);
+                                       // Check whether it match to the batch 
traffic
+                                       for (std::vector<std::string>::iterator 
it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it)
+                                       {
+                                               if (cell == *it)
+                                               {
+                                                       // push the batch data 
to the queue
+                                                       
std::lock_guard<std::mutex> lock(_mtx);
+                                                       while ((_queuedDataSize 
+ line.size()) > _batchMaxBufferSize)
+                                                       {
+                                                               std::string 
item = _queue.front();
+                                                               _queuedDataSize 
-= item.size();
+                                                               
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
+                                                               _queue.pop();
+                                                       }
+                                                       _queue.push(line);
+                                                       _queuedDataSize += 
line.size();
+                                                       
_logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size 
%d", cell.c_str(), _queuedDataSize);
+                                               }
+                                       }
+                                       bool findRealTime = false;
+                                       // Check whether it match to the real 
time traffic
+                                       for (std::vector<std::string>::iterator 
it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it)
+                                       {
+                                               if (cell == *it)
+                                               {
+                                                       int status = 0;
+                                                       if 
(this->_realTimeSocket <= 0)
+                                                       {
+                                                               // Connect the 
LTE socket
+                                                               uint16_t port = 
_realTimeServerPort;
+                                                               
this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
+                                                       }
+                                                       if 
(this->_realTimeSocket)
+                                                       {
+                                                               // try to send 
the data
+                                                               status = 
sendData(_realTimeSocket, line.data(), line.size());
+                                                               if (status < 0)
+                                                               {
+                                                                       
close(_realTimeSocket);
+                                                                       
_realTimeSocket = 0;
+                                                               }
+                                                       }
+                                                       if 
(this->_realTimeSocket <= 0 || status < 0)
+                                                       {
+                                                               // push the 
batch data to the queue
+                                                               
std::lock_guard<std::mutex> lock(_mtx);
+                                                               while 
((_queuedDataSize + line.size()) > _batchMaxBufferSize)
+                                                               {
+                                                                       
std::string item = _queue.front();
+                                                                       
_queuedDataSize -= item.size();
+                                                                       
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
+                                                                       
_queue.pop();
+                                                               }
+                                                               
_queue.push(line);
+                                                               _queuedDataSize 
+= line.size();
+                                                               
_logger->log_debug("Push real time msg ID %s into batch queue, queue buffer 
size %d", cell.c_str(), _queuedDataSize);
+                                                       }
+                                                       // find real time
+                                                       findRealTime = true;
+                                               } // cell
+                                       } // for real time pattern
+                                       if (findRealTime)
+                                               // we break the while once we 
find the first real time
+                                               break;
+                               }  // if get line
+                       } // while
+                       if (_fileStream.eof())
+                       {
+                               _fileStream.close();
+                       }
+               } // if open
+               _realTimeAccumulated = 0;
+       }
+       _realTimeAccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, 
ProcessSession *session)
+{
+       if (_batchAcccumulated >= this->_batchInterval)
+       {
+               // _logger->log_info("onTriggerBatch");
+               // dequeue the batch and send over WIFI
+               int status = 0;
+               if (this->_batchSocket <= 0)
+               {
+                       // Connect the WIFI socket
+                       uint16_t port = _batchServerPort;
+                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
+               }
+               if (this->_batchSocket)
+               {
+                       std::lock_guard<std::mutex> lock(_mtx);
+
+                       while (!_queue.empty())
+                       {
+                               std::string line = _queue.front();
+                               status = sendData(_batchSocket, line.data(), 
line.size());
+                               _queue.pop();
+                               _queuedDataSize -= line.size();
+                               if (status < 0)
+                               {
+                                       close(_batchSocket);
+                                       _batchSocket = 0;
+                                       break;
+                               }
+                       }
+               }
+               _batchAcccumulated = 0;
+       }
+       _batchAcccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession 
*session)
+{
+       std::thread::id id = std::this_thread::get_id();
+
+       if (id == _realTimeThreadId)
+               return onTriggerRealTime(context, session);
+       else if (id == _batchThreadId)
+               return onTriggerBatch(context, session);
+       else
+       {
+               std::lock_guard<std::mutex> lock(_mtx);
+               if (!this->_firstInvoking)
+               {
+                       this->_fileName = "data.osp";
+                       std::string value;
+                       if (this->getProperty(FILENAME.getName(), value))
+                       {
+                               this->_fileName = value;
+                               this->_logger->log_info("Data Collector File 
Name %s", _fileName.c_str());
+                       }
+                       this->_realTimeServerName = "localhost";
+                       if (this->getProperty(REALTIMESERVERNAME.getName(), 
value))
+                       {
+                               this->_realTimeServerName = value;
+                               this->_logger->log_info("Real Time Server Name 
%s", this->_realTimeServerName.c_str());
+                       }
+                       this->_realTimeServerPort = 10000;
+                       if (this->getProperty(REALTIMESERVERPORT.getName(), 
value))
+                       {
+                               Property::StringToInt(value, 
_realTimeServerPort);
+                               this->_logger->log_info("Real Time Server Port 
%d", _realTimeServerPort);
+                       }
+                       if (this->getProperty(BATCHSERVERNAME.getName(), value))
+                       {
+                               this->_batchServerName = value;
+                               this->_logger->log_info("Batch Server Name %s", 
this->_batchServerName.c_str());
+                       }
+                       this->_batchServerPort = 10001;
+                       if (this->getProperty(BATCHSERVERPORT.getName(), value))
+                       {
+                               Property::StringToInt(value, _batchServerPort);
+                               this->_logger->log_info("Batch Server Port %d", 
_batchServerPort);
+                       }
+                       if (this->getProperty(ITERATION.getName(), value))
+                       {
+                               Property::StringToBool(value, this->_iteration);
+                               _logger->log_info("Iteration %d", _iteration);
+                       }
+                       this->_realTimeInterval = 10000000; //10 msec
+                       if (this->getProperty(REALTIMEINTERVAL.getName(), 
value))
+                       {
+                               TimeUnit unit;
+                               if (Property::StringToTime(value, 
_realTimeInterval, unit) &&
+                                                               
Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval))
+                               {
+                                       _logger->log_info("Real Time Interval: 
[%d] ns", _realTimeInterval);
+                               }
+                       }
+                       this->_batchInterval = 100000000; //100 msec
+                       if (this->getProperty(BATCHINTERVAL.getName(), value))
+                       {
+                               TimeUnit unit;
+                               if (Property::StringToTime(value, 
_batchInterval, unit) &&
+                                                               
Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval))
+                               {
+                                       _logger->log_info("Batch Time Interval: 
[%d] ns", _batchInterval);
+                               }
+                       }
+                       this->_batchMaxBufferSize = 256*1024;
+                       if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), 
value))
+                       {
+                               Property::StringToInt(value, 
_batchMaxBufferSize);
+                               this->_logger->log_info("Batch Max Buffer Size 
%d", _batchMaxBufferSize);
+                       }
+                       if (this->getProperty(REALTIMEMSGID.getName(), value))
+                       {
+                               this->_logger->log_info("Real Time Msg IDs %s", 
value.c_str());
+                               std::stringstream lineStream(value);
+                               std::string cell;
+
+                               while(std::getline(lineStream, cell, ','))
+                           {
+                               this->_realTimeMsgID.push_back(cell);
+                               this->_logger->log_info("Real Time Msg ID %s", 
cell.c_str());
+                           }
+                       }
+                       if (this->getProperty(BATCHMSGID.getName(), value))
+                       {
+                               this->_logger->log_info("Batch Msg IDs %s", 
value.c_str());
+                               std::stringstream lineStream(value);
+                               std::string cell;
+
+                               while(std::getline(lineStream, cell, ','))
+                           {
+                                       cell = Property::trim(cell);
+                               this->_batchMsgID.push_back(cell);
+                               this->_logger->log_info("Batch Msg ID %s", 
cell.c_str());
+                           }
+                       }
+                       // Connect the LTE socket
+                       uint16_t port = _realTimeServerPort;
+
+                       this->_realTimeSocket = 
connectServer(_realTimeServerName.c_str(), port);
+
+                       // Connect the WIFI socket
+                       port = _batchServerPort;
+
+                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
+
+                       // Open the file
+                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
+                       if (!_fileStream.good())
+                       {
+                               _logger->log_error("load data file failed %s", 
_fileName.c_str());
+                               return;
+                       }
+                       else
+                       {
+                               _logger->log_debug("open %s", 
_fileName.c_str());
+                       }
+                       _realTimeThreadId = id;
+                       this->_firstInvoking = true;
+               }
+               else
+               {
+                       if (id != _realTimeThreadId)
+                               _batchThreadId = id;
+                       this->_firstInvoking = false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/src/RemoteProcessorGroupPort.cpp b/src/RemoteProcessorGroupPort.cpp
new file mode 100644
index 0000000..711e846
--- /dev/null
+++ b/src/RemoteProcessorGroupPort.cpp
@@ -0,0 +1,99 @@
+/**
+ * @file RemoteProcessorGroupPort.cpp
+ * RemoteProcessorGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "TimeUtil.h"
+#include "RemoteProcessorGroupPort.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string 
RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
+Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", 
"localhost");
+Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+Relationship RemoteProcessorGroupPort::relation;
+
+void RemoteProcessorGroupPort::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(hostName);
+       properties.insert(port);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(relation);
+       setSupportedRelationships(relationships);
+}
+
+void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, 
ProcessSession *session)
+{
+       std::string value;
+
+       if (!_transmitting)
+               return;
+
+       std::string host = _peer->getHostName();
+       uint16_t sport = _peer->getPort();
+       int64_t lvalue;
+       bool needReset = false;
+
+       if (context->getProperty(hostName.getName(), value))
+       {
+               host = value;
+       }
+       if (context->getProperty(port.getName(), value) && 
Property::StringToInt(value, lvalue))
+       {
+               sport = (uint16_t) lvalue;
+       }
+       if (host != _peer->getHostName())
+       {
+               _peer->setHostName(host);
+               needReset= true;
+       }
+       if (sport != _peer->getPort())
+       {
+               _peer->setPort(sport);
+               needReset = true;
+       }
+       if (needReset)
+               _protocol->tearDown();
+
+       if (!_protocol->bootstrap())
+       {
+               // bootstrap the client protocol if needeed
+               context->yield();
+               return;
+       }
+
+       if (_direction == RECEIVE)
+               _protocol->receiveFlowFiles(context, session);
+       else
+               _protocol->transferFlowFiles(context, session);
+
+       return;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/src/Site2SiteClientProtocol.cpp b/src/Site2SiteClientProtocol.cpp
new file mode 100644
index 0000000..88ea78a
--- /dev/null
+++ b/src/Site2SiteClientProtocol.cpp
@@ -0,0 +1,1313 @@
+/**
+ * @file Site2SiteProtocol.cpp
+ * Site2SiteProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "Site2SitePeer.h"
+#include "Site2SiteClientProtocol.h"
+
+bool Site2SiteClientProtocol::establish()
+{
+       if (_peerState != IDLE)
+       {
+               _logger->log_error("Site2Site peer state is not idle while try 
to establish");
+               return false;
+       }
+
+       bool ret = _peer->Open();
+
+       if (!ret)
+       {
+               _logger->log_error("Site2Site peer socket open failed");
+               return false;
+       }
+
+       // Negotiate the version
+       ret = initiateResourceNegotiation();
+
+       if (!ret)
+       {
+               _logger->log_error("Site2Site Protocol Version Negotiation 
failed");
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       }
+
+       _logger->log_info("Site2Site socket established");
+       _peerState = ESTABLISHED;
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::initiateResourceNegotiation()
+{
+       // Negotiate the version
+       if (_peerState != IDLE)
+       {
+               _logger->log_error("Site2Site peer state is not idle while 
initiateResourceNegotiation");
+               return false;
+       }
+
+       _logger->log_info("Negotiate protocol version with destination port %s 
current version %d", _portIdStr.c_str(), _currentVersion);
+
+       int ret = _peer->writeUTF(this->getResourceName());
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       ret = _peer->write(_currentVersion);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       uint8_t statusCode;
+       ret = _peer->read(statusCode);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       switch (statusCode)
+       {
+       case RESOURCE_OK:
+               _logger->log_info("Site2Site Protocol Negotiate protocol 
version OK");
+               return true;
+       case DIFFERENT_RESOURCE_VERSION:
+               uint32_t serverVersion;
+               ret = _peer->read(serverVersion);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               _logger->log_info("Site2Site Server Response asked for a 
different protocol version %d", serverVersion);
+               for (unsigned int i = (_currentVersionIndex + 1); i < 
sizeof(_supportedVersion)/sizeof(uint32_t); i++)
+               {
+                       if (serverVersion >= _supportedVersion[i])
+                       {
+                               _currentVersion = _supportedVersion[i];
+                               _currentVersionIndex = i;
+                               return initiateResourceNegotiation();
+                       }
+               }
+               ret = -1;
+               // tearDown();
+               return false;
+       case NEGOTIATED_ABORT:
+               _logger->log_info("Site2Site Negotiate protocol response 
ABORT");
+               ret = -1;
+               // tearDown();
+               return false;
+       default:
+               _logger->log_info("Negotiate protocol response unknown code 
%d", statusCode);
+               return true;
+       }
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::initiateCodecResourceNegotiation()
+{
+       // Negotiate the version
+       if (_peerState != HANDSHAKED)
+       {
+               _logger->log_error("Site2Site peer state is not handshaked 
while initiateCodecResourceNegotiation");
+               return false;
+       }
+
+       _logger->log_info("Negotiate Codec version with destination port %s 
current version %d", _portIdStr.c_str(), _currentCodecVersion);
+
+       int ret = _peer->writeUTF(this->getCodecResourceName());
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       ret = _peer->write(_currentCodecVersion);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       uint8_t statusCode;
+       ret = _peer->read(statusCode);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       switch (statusCode)
+       {
+       case RESOURCE_OK:
+               _logger->log_info("Site2Site Codec Negotiate version OK");
+               return true;
+       case DIFFERENT_RESOURCE_VERSION:
+               uint32_t serverVersion;
+               ret = _peer->read(serverVersion);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               _logger->log_info("Site2Site Server Response asked for a 
different codec version %d", serverVersion);
+               for (unsigned int i = (_currentCodecVersionIndex + 1); i < 
sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++)
+               {
+                       if (serverVersion >= _supportedCodecVersion[i])
+                       {
+                               _currentCodecVersion = 
_supportedCodecVersion[i];
+                               _currentCodecVersionIndex = i;
+                               return initiateCodecResourceNegotiation();
+                       }
+               }
+               ret = -1;
+               // tearDown();
+               return false;
+       case NEGOTIATED_ABORT:
+               _logger->log_info("Site2Site Codec Negotiate response ABORT");
+               ret = -1;
+               // tearDown();
+               return false;
+       default:
+               _logger->log_info("Negotiate Codec response unknown code %d", 
statusCode);
+               return true;
+       }
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::handShake()
+{
+       if (_peerState != ESTABLISHED)
+       {
+               _logger->log_error("Site2Site peer state is not established 
while handshake");
+               return false;
+       }
+       _logger->log_info("Site2Site Protocol Perform hand shake with 
destination port %s", _portIdStr.c_str());
+       uuid_t uuid;
+       // Generate the global UUID for the com identify
+       uuid_generate(uuid);
+       char uuidStr[37];
+       uuid_unparse(uuid, uuidStr);
+       _commsIdentifier = uuidStr;
+
+       int ret = _peer->writeUTF(_commsIdentifier);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       std::map<std::string, std::string> properties;
+       properties[HandShakePropertyStr[GZIP]] = "false";
+       properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
+       properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = 
std::to_string(this->_timeOut);
+       if (this->_currentVersion >= 5)
+       {
+               if (this->_batchCount > 0)
+                       properties[HandShakePropertyStr[BATCH_COUNT]] = 
std::to_string(this->_batchCount);
+               if (this->_batchSize > 0)
+                       properties[HandShakePropertyStr[BATCH_SIZE]] = 
std::to_string(this->_batchSize);
+               if (this->_batchDuration > 0)
+                       properties[HandShakePropertyStr[BATCH_DURATION]] = 
std::to_string(this->_batchDuration);
+       }
+
+       if (_currentVersion >= 3)
+       {
+               ret = _peer->writeUTF(_peer->getURL());
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+       }
+
+       uint32_t size = properties.size();
+       ret = _peer->write(size);
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       std::map<std::string, std::string>::iterator it;
+       for (it = properties.begin(); it!= properties.end(); it++)
+       {
+               ret = _peer->writeUTF(it->first);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               ret = _peer->writeUTF(it->second);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               _logger->log_info("Site2Site Protocol Send handshake properties 
%s %s", it->first.c_str(), it->second.c_str());
+       }
+
+       RespondCode code;
+       std::string message;
+
+       ret = this->readRespond(code, message);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       switch (code)
+       {
+       case PROPERTIES_OK:
+               _logger->log_info("Site2Site HandShake Completed");
+               _peerState = HANDSHAKED;
+               return true;
+       case PORT_NOT_IN_VALID_STATE:
+    case UNKNOWN_PORT:
+    case PORTS_DESTINATION_FULL:
+       _logger->log_error("Site2Site HandShake Failed because destination port 
is either invalid or full");
+               ret = -1;
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       default:
+               _logger->log_info("HandShake Failed because of unknown respond 
code %d", code);
+               ret = -1;
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       }
+
+       return false;
+}
+
+void Site2SiteClientProtocol::tearDown()
+{
+       if (_peerState >= ESTABLISHED)
+       {
+               _logger->log_info("Site2Site Protocol tearDown");
+               // need to write shutdown request
+               writeRequestType(SHUTDOWN);
+       }
+
+       std::map<std::string, Transaction *>::iterator it;
+       for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++)
+       {
+               delete it->second;
+       }
+       _transactionMap.clear();
+       _peer->Close();
+       _peerState = IDLE;
+}
+
+int Site2SiteClientProtocol::writeRequestType(RequestType type)
+{
+       if (type >= MAX_REQUEST_TYPE)
+               return -1;
+
+       return _peer->writeUTF(RequestTypeStr[type]);
+}
+
+int Site2SiteClientProtocol::readRequestType(RequestType &type)
+{
+       std::string requestTypeStr;
+
+       int ret = _peer->readUTF(requestTypeStr);
+
+       if (ret <= 0)
+               return ret;
+
+       for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++)
+       {
+               if (RequestTypeStr[i] == requestTypeStr)
+               {
+                       type = (RequestType) i;
+                       return ret;
+               }
+       }
+
+       return -1;
+}
+
+int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string 
&message)
+{
+       uint8_t firstByte;
+
+       int ret = _peer->read(firstByte);
+
+       if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+               return -1;
+
+       uint8_t secondByte;
+
+       ret = _peer->read(secondByte);
+
+       if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+               return -1;
+
+       uint8_t thirdByte;
+
+       ret = _peer->read(thirdByte);
+
+       if (ret <= 0)
+               return ret;
+
+       code = (RespondCode) thirdByte;
+
+       RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+       if ( resCode == NULL)
+       {
+               // Not a valid respond code
+               return -1;
+       }
+       if (resCode->hasDescription)
+       {
+               ret = _peer->readUTF(message);
+               if (ret <= 0)
+                       return -1;
+       }
+       return 3 + message.size();
+}
+
+int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string 
message)
+{
+       RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+       if (resCode == NULL)
+       {
+               // Not a valid respond code
+               return -1;
+       }
+
+       uint8_t codeSeq[3];
+       codeSeq[0] = CODE_SEQUENCE_VALUE_1;
+       codeSeq[1] = CODE_SEQUENCE_VALUE_2;
+       codeSeq[2] = (uint8_t) code;
+
+       int ret = _peer->write(codeSeq, 3);
+
+       if (ret != 3)
+               return -1;
+
+       if (resCode->hasDescription)
+       {
+               ret = _peer->writeUTF(message);
+               if (ret > 0)
+                       return (3 + ret);
+               else
+                       return ret;
+       }
+       else
+               return 3;
+}
+
+bool Site2SiteClientProtocol::negotiateCodec()
+{
+       if (_peerState != HANDSHAKED)
+       {
+               _logger->log_error("Site2Site peer state is not handshaked 
while negotiate codec");
+               return false;
+       }
+
+       _logger->log_info("Site2Site Protocol Negotiate Codec with destination 
port %s", _portIdStr.c_str());
+
+       int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
+
+       if (status <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       // Negotiate the codec version
+       bool ret = initiateCodecResourceNegotiation();
+
+       if (!ret)
+       {
+               _logger->log_error("Site2Site Codec Version Negotiation 
failed");
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       }
+
+       _logger->log_info("Site2Site Codec Completed and move to READY state 
for data transfer");
+       _peerState = READY;
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::bootstrap()
+{
+       if (_peerState == READY)
+               return true;
+
+       tearDown();
+
+       if (establish() && handShake() && negotiateCodec())
+       {
+               _logger->log_info("Site2Site Ready For data transaction");
+               return true;
+       }
+       else
+       {
+               _peer->yield();
+               tearDown();
+               return false;
+       }
+}
+
+Transaction* Site2SiteClientProtocol::createTransaction(std::string 
&transactionID, TransferDirection direction)
+{
+       int ret;
+       bool dataAvailable;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return NULL;
+       }
+
+       if (direction == RECEIVE)
+       {
+               ret = writeRequestType(RECEIVE_FLOWFILES);
+
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return NULL;
+               }
+
+               RespondCode code;
+               std::string message;
+
+               ret = readRespond(code, message);
+
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return NULL;
+               }
+
+               switch (code)
+               {
+               case MORE_DATA:
+                       dataAvailable = true;
+                       _logger->log_info("Site2Site peer indicates that data 
is available");
+                       transaction = new Transaction(direction);
+                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
+                       transactionID = transaction->getUUIDStr();
+                       transaction->setDataAvailable(dataAvailable);
+                       _logger->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+                       return transaction;
+               case NO_MORE_DATA:
+                       dataAvailable = false;
+                       _logger->log_info("Site2Site peer indicates that no 
data is available");
+                       transaction = new Transaction(direction);
+                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
+                       transactionID = transaction->getUUIDStr();
+                       transaction->setDataAvailable(dataAvailable);
+                       _logger->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+                       return transaction;
+               default:
+                       _logger->log_info("Site2Site got unexpected response %d 
when asking for data", code);
+                       // tearDown();
+                       return NULL;
+               }
+       }
+       else
+       {
+               ret = writeRequestType(SEND_FLOWFILES);
+
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return NULL;
+               }
+               else
+               {
+                       transaction = new Transaction(direction);
+                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
+                       transactionID = transaction->getUUIDStr();
+                       _logger->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+                       return transaction;
+               }
+       }
+}
+
+bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket 
*packet, bool &eof)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED)
+       {
+               _logger->log_info("Site2Site transaction %s is not at started 
or exchanged state", transactionID.c_str());
+               return false;
+       }
+
+       if (transaction->getDirection() != RECEIVE)
+       {
+               _logger->log_info("Site2Site transaction %s direction is 
wrong", transactionID.c_str());
+               return false;
+       }
+
+       if (!transaction->isDataAvailable())
+       {
+               eof = true;
+               return true;
+       }
+
+       if (transaction->_transfers > 0)
+       {
+               // if we already has transfer before, check to see whether 
another one is available
+               RespondCode code;
+               std::string message;
+
+               ret = readRespond(code, message);
+
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               if (code == CONTINUE_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate continue transaction", transactionID.c_str());
+                       transaction->_dataAvailable = true;
+               }
+               else if (code == FINISH_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate finish transaction", transactionID.c_str());
+                       transaction->_dataAvailable = false;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate wrong respond code %d", transactionID.c_str(), code);
+                       return false;
+               }
+       }
+
+       if (!transaction->isDataAvailable())
+       {
+               eof = true;
+               return true;
+       }
+
+       // start to read the packet
+       uint32_t numAttributes;
+       ret = _peer->read(numAttributes, &transaction->_crc);
+       if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES)
+       {
+               return false;
+       }
+
+       // read the attributes
+       for (unsigned int i = 0; i < numAttributes; i++)
+       {
+               std::string key;
+               std::string value;
+               ret = _peer->readUTF(key, true, &transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               ret = _peer->readUTF(value, true, &transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               packet->_attributes[key] = value;
+               _logger->log_info("Site2Site transaction %s receives attribute 
key %s value %s", transactionID.c_str(), key.c_str(), value.c_str());
+       }
+
+       uint64_t len;
+       ret = _peer->read(len, &transaction->_crc);
+       if (ret <= 0)
+       {
+               return false;
+       }
+
+       packet->_size = len;
+       transaction->_transfers++;
+       transaction->_state = DATA_EXCHANGED;
+       transaction->_bytes += len;
+       _logger->log_info("Site2Site transaction %s receives flow record %d, 
total length %d", transactionID.c_str(),
+                       transaction->_transfers, transaction->_bytes);
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket 
*packet, FlowFileRecord *flowFile, ProcessSession *session)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED)
+       {
+               _logger->log_info("Site2Site transaction %s is not at started 
or exchanged state", transactionID.c_str());
+               return false;
+       }
+
+       if (transaction->getDirection() != SEND)
+       {
+               _logger->log_info("Site2Site transaction %s direction is 
wrong", transactionID.c_str());
+               return false;
+       }
+
+       if (transaction->_transfers > 0)
+       {
+               ret = writeRespond(CONTINUE_TRANSACTION, 
"CONTINUE_TRANSACTION");
+               if (ret <= 0)
+               {
+                       return false;
+               }
+       }
+
+       // start to read the packet
+       uint32_t numAttributes = packet->_attributes.size();
+       ret = _peer->write(numAttributes, &transaction->_crc);
+       if (ret != 4)
+       {
+               return false;
+       }
+
+       std::map<std::string, std::string>::iterator itAttribute;
+       for (itAttribute = packet->_attributes.begin(); itAttribute!= 
packet->_attributes.end(); itAttribute++)
+       {
+               ret = _peer->writeUTF(itAttribute->first, true, 
&transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               ret = _peer->writeUTF(itAttribute->second, true, 
&transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               _logger->log_info("Site2Site transaction %s send attribute key 
%s value %s", transactionID.c_str(),
+                               itAttribute->first.c_str(), 
itAttribute->second.c_str());
+       }
+
+       uint64_t len = flowFile->getSize() ;
+       ret = _peer->write(len, &transaction->_crc);
+       if (ret != 8)
+       {
+               return false;
+       }
+
+       if (flowFile->getSize())
+       {
+               Site2SiteClientProtocol::ReadCallback callback(packet);
+               session->read(flowFile, &callback);
+               if (flowFile->getSize() != packet->_size)
+               {
+                       return false;
+               }
+       }
+
+       transaction->_transfers++;
+       transaction->_state = DATA_EXCHANGED;
+       transaction->_bytes += len;
+       _logger->log_info("Site2Site transaction %s send flow record %d, total 
length %d", transactionID.c_str(),
+                               transaction->_transfers, transaction->_bytes);
+
+       return true;
+}
+
+void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, 
ProcessSession *session)
+{
+       uint64_t bytes = 0;
+       int transfers = 0;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
+               return;
+       }
+
+       // Create the transaction
+       std::string transactionID;
+       transaction = createTransaction(transactionID, RECEIVE);
+
+       if (transaction == NULL)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
+               return;
+       }
+
+       try
+       {
+               while (true)
+               {
+                       std::map<std::string, std::string> empty;
+                       DataPacket packet(this, transaction, empty);
+                       bool eof = false;
+
+                       if (!receive(transactionID, &packet, eof))
+                       {
+                               throw Exception(SITE2SITE_EXCEPTION, "Receive 
Failed");
+                               return;
+                       }
+                       if (eof)
+                       {
+                               // transaction done
+                               break;
+                       }
+                       FlowFileRecord *flowFile = session->create();
+                       if (!flowFile)
+                       {
+                               throw Exception(SITE2SITE_EXCEPTION, "Flow File 
Creation Failed");
+                               return;
+                       }
+                       std::map<std::string, std::string>::iterator it;
+                       for (it = packet._attributes.begin(); it!= 
packet._attributes.end(); it++)
+                       {
+                               flowFile->addAttribute(it->first, it->second);
+                       }
+
+                       if (packet._size > 0)
+                       {
+                               Site2SiteClientProtocol::WriteCallback 
callback(&packet);
+                               session->write(flowFile, &callback);
+                               if (flowFile->getSize() != packet._size)
+                               {
+                                       throw Exception(SITE2SITE_EXCEPTION, 
"Receive Size Not Right");
+                                       return;
+                               }
+                       }
+                       Relationship relation; // undefined relationship
+                       session->transfer(flowFile, relation);
+                       // receive the transfer for the flow record
+                       bytes += packet._size;
+                       transfers++;
+               } // while true
+
+               if (!confirm(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Confirm 
Transaction Failed");
+                       return;
+               }
+               if (!complete(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Complete 
Transaction Failed");
+                       return;
+               }
+               _logger->log_info("Site2Site transaction %s successfully 
receive flow record %d, content bytes %d",
+                               transactionID.c_str(), transfers, bytes);
+               // we yield the receive if we did not get anything
+               if (transfers == 0)
+                       context->yield();
+       }
+       catch (std::exception &exception)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception during 
Site2SiteClientProtocol::receiveFlowFiles");
+               throw;
+       }
+
+       deleteTransaction(transactionID);
+
+       return;
+}
+
+bool Site2SiteClientProtocol::confirm(std::string transactionID)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() == TRANSACTION_STARTED && 
!transaction->isDataAvailable() &&
+                       transaction->getDirection() == RECEIVE)
+       {
+               transaction->_state = TRANSACTION_CONFIRMED;
+               return true;
+       }
+
+       if (transaction->getState() != DATA_EXCHANGED)
+               return false;
+
+       if (transaction->getDirection() == RECEIVE)
+       {
+               if (transaction->isDataAvailable())
+                       return false;
+               // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
+               // to peer so that we can verify that the connection is still 
open. This is a two-phase commit,
+               // which helps to prevent the chances of data duplication. 
Without doing this, we may commit the
+               // session and then when we send the response back to the peer, 
the peer may have timed out and may not
+               // be listening. As a result, it will re-send the data. By 
doing this two-phase commit, we narrow the
+               // Critical Section involved in this transaction so that rather 
than the Critical Section being the
+               // time window involved in the entire transaction, it is 
reduced to a simple round-trip conversation.
+               long crcValue = transaction->getCRC();
+               std::string crc = std::to_string(crcValue);
+               _logger->log_info("Site2Site Send confirm with CRC %d to 
transaction %s", transaction->getCRC(),
+                                               transactionID.c_str());
+               ret = writeRespond(CONFIRM_TRANSACTION, crc);
+               if (ret <= 0)
+                       return false;
+               RespondCode code;
+               std::string message;
+               readRespond(code, message);
+               if (ret <= 0)
+                       return false;
+
+               if (code == CONFIRM_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
confirm transaction", transactionID.c_str());
+                       transaction->_state = TRANSACTION_CONFIRMED;
+                       return true;
+               }
+               else if (code == BAD_CHECKSUM)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate bad checksum", transactionID.c_str());
+                       /*
+                       transaction->_state = TRANSACTION_CONFIRMED;
+                       return true; */
+                       return false;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
unknown respond code %d",
+                                       transactionID.c_str(), code);
+                       return false;
+               }
+       }
+       else
+       {
+               _logger->log_info("Site2Site Send FINISH TRANSACTION for 
transaction %s",
+                                                               
transactionID.c_str());
+               ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
+               if (ret <= 0)
+                       return false;
+               RespondCode code;
+               std::string message;
+               readRespond(code, message);
+               if (ret <= 0)
+                       return false;
+
+               // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer 
to send a 'Confirm Transaction' response
+               if (code == CONFIRM_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
confirm transaction with CRC %s", transactionID.c_str(), message.c_str());
+                       if (this->_currentVersion > 3)
+                       {
+                               long crcValue = transaction->getCRC();
+                               std::string crc = std::to_string(crcValue);
+                               if (message == crc)
+                               {
+                                       _logger->log_info("Site2Site 
transaction %s CRC matched", transactionID.c_str());
+                                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
+                                       if (ret <= 0)
+                                               return false;
+                                       transaction->_state = 
TRANSACTION_CONFIRMED;
+                                       return true;
+                               }
+                               else
+                               {
+                                       _logger->log_info("Site2Site 
transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
+                                       ret = writeRespond(BAD_CHECKSUM, 
"BAD_CHECKSUM");
+                                       /*
+                                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
+                                                                               
if (ret <= 0)
+                                                                               
        return false;
+                                                                               
transaction->_state = TRANSACTION_CONFIRMED;
+                                       return true; */
+                                       return false;
+                               }
+                       }
+                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
+                       if (ret <= 0)
+                               return false;
+                       transaction->_state = TRANSACTION_CONFIRMED;
+                       return true;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
unknown respond code %d",
+                                       transactionID.c_str(), code);
+                       return false;
+               }
+               return false;
+       }
+}
+
+void Site2SiteClientProtocol::cancel(std::string transactionID)
+{
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               return;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() == TRANSACTION_CANCELED || 
transaction->getState() == TRANSACTION_COMPLETED
+                       || transaction->getState() == TRANSACTION_ERROR)
+       {
+               return;
+       }
+
+       this->writeRespond(CANCEL_TRANSACTION, "Cancel");
+       transaction->_state = TRANSACTION_CANCELED;
+
+       tearDown();
+       return;
+}
+
+void Site2SiteClientProtocol::deleteTransaction(std::string transactionID)
+{
+       Transaction *transaction = NULL;
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       _logger->log_info("Site2Site delete transaction %s", 
transaction->getUUIDStr().c_str());
+       delete transaction;
+       _transactionMap.erase(transactionID);
+}
+
+void Site2SiteClientProtocol::error(std::string transactionID)
+{
+       Transaction *transaction = NULL;
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       transaction->_state = TRANSACTION_ERROR;
+       tearDown();
+       return;
+}
+
+//! Complete the transaction
+bool Site2SiteClientProtocol::complete(std::string transactionID)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() != TRANSACTION_CONFIRMED)
+       {
+               return false;
+       }
+
+       if (transaction->getDirection() == RECEIVE)
+       {
+               if (transaction->_transfers == 0)
+               {
+                       transaction->_state = TRANSACTION_COMPLETED;
+                       return true;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s send 
finished", transactionID.c_str());
+                       ret = this->writeRespond(TRANSACTION_FINISHED, 
"Finished");
+                       if (ret <= 0)
+                               return false;
+                       else
+                       {
+                               transaction->_state = TRANSACTION_COMPLETED;
+                               return true;
+                       }
+               }
+       }
+       else
+       {
+               RespondCode code;
+               std::string message;
+               int ret;
+
+               ret = readRespond(code, message);
+
+               if (ret <= 0)
+                       return false;
+
+               if (code == TRANSACTION_FINISHED)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
finished transaction", transactionID.c_str());
+                       transaction->_state = TRANSACTION_COMPLETED;
+                       return true;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
unknown respond code %d",
+                                       transactionID.c_str(), code);
+                       return false;
+               }
+       }
+}
+
+void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, 
ProcessSession *session)
+{
+       FlowFileRecord *flow = session->get();
+       Transaction *transaction = NULL;
+
+       if (!flow)
+               return;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
+               return;
+       }
+
+       // Create the transaction
+       std::string transactionID;
+       transaction = createTransaction(transactionID, SEND);
+
+       if (transaction == NULL)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
+               return;
+       }
+
+       bool continueTransaction = true;
+       uint64_t startSendingNanos = getTimeNano();
+
+       try
+       {
+               while (continueTransaction)
+               {
+                       DataPacket packet(this, transaction, 
flow->getAttributes());
+
+                       if (!send(transactionID, &packet, flow, session))
+                       {
+                               throw Exception(SITE2SITE_EXCEPTION, "Send 
Failed");
+                               return;
+                       }
+                       _logger->log_info("Site2Site transaction %s send flow 
record %s",
+                                                       transactionID.c_str(), 
flow->getUUIDStr().c_str());
+                       session->remove(flow);
+
+                       uint64_t transferNanos = getTimeNano() - 
startSendingNanos;
+                       if (transferNanos > _batchSendNanos)
+                               break;
+
+                       flow = session->get();
+                       if (!flow)
+                       {
+                               continueTransaction = false;
+                       }
+               } // while true
+
+               if (!confirm(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+                       return;
+               }
+               if (!complete(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+                       return;
+               }
+               _logger->log_info("Site2Site transaction %s successfully send 
flow record %d, content bytes %d",
+                               transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
+       }
+       catch (std::exception &exception)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception during 
Site2SiteClientProtocol::transferFlowFiles");
+               throw;
+       }
+
+       deleteTransaction(transactionID);
+
+       return;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/src/Site2SitePeer.cpp b/src/Site2SitePeer.cpp
new file mode 100644
index 0000000..c844aa5
--- /dev/null
+++ b/src/Site2SitePeer.cpp
@@ -0,0 +1,434 @@
+/**
+ * @file Site2SitePeer.cpp
+ * Site2SitePeer class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "Site2SitePeer.h"
+
+//! CRC tables
+std::atomic<bool> CRC32::tableInit(false);
+unsigned int CRC32::table[256];
+
+bool Site2SitePeer::Open()
+{
+       in_addr_t addr;
+       int sock = 0;
+       struct hostent *h;
+       const char *host;
+       uint16_t port;
+
+       host = this->_host.c_str();
+       port = this->_port;
+
+       if (strlen(host) == 0)
+               return false;
+
+#ifdef __MACH__
+       h = gethostbyname(host);
+#else
+       char buf[1024];
+       struct hostent he;
+       int hh_errno;
+       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+       sock = socket(AF_INET, SOCK_STREAM, 0);
+       if (sock < 0)
+       {
+               _logger->log_error("Could not create socket to hostName %s", 
host);
+               this->yield();
+               return false;
+       }
+
+#ifndef __MACH__
+       int opt = 1;
+       bool nagle_off = true;
+
+       if (nagle_off)
+       {
+               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() TCP_NODELAY failed");
+                       close(sock);
+                       this->yield();
+                       return false;
+               }
+               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&opt, sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
+                       close(sock);
+                       this->yield();
+                       return false;
+               }
+       }
+
+       int sndsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_SNDBUF failed");
+               close(sock);
+               this->yield();
+               return false;
+       }
+       int rcvsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvsize, 
(int)sizeof(rcvsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_RCVBUF failed");
+               close(sock);
+               this->yield();
+               return false;
+       }
+#endif
+
+       struct sockaddr_in sa;
+       socklen_t socklen;
+       int status;
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = htonl(INADDR_ANY);
+       sa.sin_port = htons(0);
+       socklen = sizeof(sa);
+       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+       {
+               _logger->log_error("socket bind failed");
+               close(sock);
+               this->yield();
+               return false;
+       }
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = addr;
+       sa.sin_port = htons(port);
+       socklen = sizeof(sa);
+
+       status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+       if (status < 0)
+       {
+               _logger->log_error("socket connect failed to %s %d", host, 
port);
+               close(sock);
+               this->yield();
+               return false;
+       }
+
+       _logger->log_info("Site2Site Peer socket %d connect to server %s port 
%d success", sock, host, port);
+
+       _socket = sock;
+
+       status = sendData((uint8_t *) MAGIC_BYTES, sizeof(MAGIC_BYTES));
+
+       if (status <= 0)
+       {
+               Close();
+               return false;
+       }
+
+       return true;
+}
+
+void Site2SitePeer::Close()
+{
+       if (_socket)
+       {
+               _logger->log_info("Site2Site Peer socket %d close", _socket);
+               close(_socket);
+               _socket = 0;
+       }
+}
+
+int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc)
+{
+       int ret = 0, bytes = 0;
+
+       if (_socket <= 0)
+       {
+               // this->yield();
+               return -1;
+       }
+
+       while (bytes < buflen)
+       {
+               ret = send(_socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       Close();
+                       // this->yield();
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       if (crc)
+               crc->update(buf, buflen);
+
+       return bytes;
+}
+
+int Site2SitePeer::Select(int msec)
+{
+       fd_set fds;
+       struct timeval tv;
+    int retval;
+    int fd = _socket;
+
+    FD_ZERO(&fds);
+    FD_SET(fd, &fds);
+
+    tv.tv_sec = msec/1000;
+    tv.tv_usec = (msec % 1000) * 1000;
+
+    if (msec > 0)
+       retval = select(fd+1, &fds, NULL, NULL, &tv);
+    else
+       retval = select(fd+1, &fds, NULL, NULL, NULL);
+
+    if (retval <= 0)
+      return retval;
+    if (FD_ISSET(fd, &fds))
+      return retval;
+    else
+      return 0;
+}
+
+int Site2SitePeer::readData(uint8_t *buf, int buflen, CRC32 *crc)
+{
+       int sendSize = buflen;
+       uint8_t *start = buf;
+
+       if (_socket <= 0)
+       {
+               // this->yield();
+               return -1;
+       }
+
+       while (buflen)
+       {
+               int status;
+               status = Select((int) _timeOut);
+               if (status <= 0)
+               {
+                       Close();
+                       return status;
+               }
+               status = recv(_socket, buf, buflen, 0);
+               if (status <= 0)
+               {
+                       Close();
+                       // this->yield();
+                       return status;
+               }
+               buflen -= status;
+               buf += status;
+       }
+
+       if (crc)
+               crc->update(start, sendSize);
+
+       return sendSize;
+}
+
+int Site2SitePeer::writeUTF(std::string str, bool widen, CRC32 *crc)
+{
+       int strlen = str.length();
+       int utflen = 0;
+       int c, count = 0;
+
+       /* use charAt instead of copying String to char array */
+       for (int i = 0; i < strlen; i++) {
+               c = str.at(i);
+               if ((c >= 0x0001) && (c <= 0x007F)) {
+                       utflen++;
+               } else if (c > 0x07FF) {
+                       utflen += 3;
+               } else {
+                       utflen += 2;
+               }
+       }
+
+       if (utflen > 65535)
+               return -1;
+
+       uint8_t *bytearr = NULL;
+       if (!widen)
+       {
+               bytearr = new uint8_t[utflen+2];
+               bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
+       }
+       else
+       {
+               bytearr = new uint8_t[utflen+4];
+               bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
+       }
+
+       int i=0;
+       for (i=0; i<strlen; i++) {
+               c = str.at(i);
+               if (!((c >= 0x0001) && (c <= 0x007F))) break;
+               bytearr[count++] = (uint8_t) c;
+       }
+
+       for (;i < strlen; i++){
+               c = str.at(i);
+               if ((c >= 0x0001) && (c <= 0x007F)) {
+                       bytearr[count++] = (uint8_t) c;
+               } else if (c > 0x07FF) {
+                       bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 
0x0F));
+                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  6) & 
0x3F));
+                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 
0x3F));
+               } else {
+                       bytearr[count++] = (uint8_t) (0xC0 | ((c >>  6) & 
0x1F));
+                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 
0x3F));
+               }
+       }
+       int ret;
+       if (!widen)
+       {
+               ret = sendData(bytearr, utflen+2, crc);
+       }
+       else
+       {
+               ret = sendData(bytearr, utflen+4, crc);
+       }
+       delete[] bytearr;
+       return ret;
+}
+
+int Site2SitePeer::readUTF(std::string &str, bool widen, CRC32 *crc)
+{
+    uint16_t utflen;
+    int ret;
+
+    if (!widen)
+    {
+       ret = read(utflen, crc);
+       if (ret <= 0)
+               return ret;
+    }
+    else
+    {
+       uint32_t len;
+               ret = read(len, crc);
+        if (ret <= 0)
+               return ret;
+        utflen = len;
+    }
+
+    uint8_t *bytearr = NULL;
+    char *chararr = NULL;
+    bytearr = new uint8_t[utflen];
+    chararr = new char[utflen];
+    memset(chararr, 0, utflen);
+
+    int c, char2, char3;
+    int count = 0;
+    int chararr_count=0;
+
+    ret = read(bytearr, utflen, crc);
+    if (ret <= 0)
+    {
+       delete[] bytearr;
+       delete[] chararr;
+       return ret;
+    }
+
+    while (count < utflen) {
+        c = (int) bytearr[count] & 0xff;
+        if (c > 127) break;
+        count++;
+        chararr[chararr_count++]=(char)c;
+    }
+
+    while (count < utflen) {
+        c = (int) bytearr[count] & 0xff;
+        switch (c >> 4) {
+            case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
+                /* 0xxxxxxx*/
+                count++;
+                chararr[chararr_count++]=(char)c;
+                break;
+            case 12: case 13:
+                /* 110x xxxx   10xx xxxx*/
+                count += 2;
+                if (count > utflen)
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                char2 = (int) bytearr[count-1];
+                if ((char2 & 0xC0) != 0x80)
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
+                                                (char2 & 0x3F));
+                break;
+            case 14:
+                /* 1110 xxxx  10xx xxxx  10xx xxxx */
+                count += 3;
+                if (count > utflen)
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                char2 = (int) bytearr[count-2];
+                char3 = (int) bytearr[count-1];
+                if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                chararr[chararr_count++]=(char)(((c     & 0x0F) << 12) |
+                                                ((char2 & 0x3F) << 6)  |
+                                                ((char3 & 0x3F) << 0));
+                break;
+            default:
+               delete[] bytearr;
+               delete[] chararr;
+               return -1;
+        }
+    }
+    // The number of chars produced may be less than utflen
+    std::string value(chararr, chararr_count);
+    str = value;
+    delete[] bytearr;
+    delete[] chararr;
+    if (!widen)
+       return (2 + utflen);
+    else
+       return (4 + utflen);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/flow.xml
----------------------------------------------------------------------
diff --git a/target/conf/flow.xml b/target/conf/flow.xml
index 56125f8..51b74e8 100644
--- a/target/conf/flow.xml
+++ b/target/conf/flow.xml
@@ -8,6 +8,41 @@
     <position x="0.0" y="0.0"/>
     <comment/>
     <processor>
+      <id>e01275ae-ac38-48f9-ac53-1a44df1be88e</id>
+      <name>LogAttribute</name>
+      <position x="3950.0958625440016" y="1355.8949219185629"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Log Level</name>
+        <value>info</value>
+      </property>
+      <property>
+        <name>Log Payload</name>
+        <value>false</value>
+      </property>
+      <property>
+        <name>Attributes to Log</name>
+      </property>
+      <property>
+        <name>Attributes to Ignore</name>
+      </property>
+      <property>
+        <name>Log prefix</name>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <processor>
       <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id>
       <name>LogAttribute</name>
       <position x="3259.732177734375" y="1739.991943359375"/>
@@ -60,7 +95,7 @@
       <runDurationNanos>0</runDurationNanos>
       <property>
         <name>File Size</name>
-        <value>1 kB</value>
+        <value>1024 kB</value>
       </property>
       <property>
         <name>Batch Size</name>
@@ -75,6 +110,69 @@
         <value>false</value>
       </property>
     </processor>
+    <label>
+      <id>809d63d9-6feb-496a-9dc3-d23c217e52fd</id>
+      <position x="3635.581271381991" y="1309.9918825902428"/>
+      <size height="193.5023651123047" width="641.0671997070312"/>
+      <styles>
+        <style name="background-color">#9a91ff</style>
+        <style name="font-size">16px</style>
+      </styles>
+      <value>Pull From Node B</value>
+    </label>
+    <label>
+      <id>d95ce8d3-c005-4d0b-8fcc-b2f6fae7172f</id>
+      <position x="2601.7320892530847" y="1413.1875613011803"/>
+      <size height="193.5023651123047" width="641.0671997070312"/>
+      <styles>
+        <style name="font-size">16px</style>
+      </styles>
+      <value>Push to Node B</value>
+    </label>
+    <remoteProcessGroup>
+      <id>8f3b248f-d493-4269-b317-36f85719f480</id>
+      <name>NiFi Flow</name>
+      <position x="3254.3356850982673" y="1432.3274284388426"/>
+      <comment/>
+      <url>http://localhost:8081/nifi</url>
+      <timeout>30 sec</timeout>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <transmitting>true</transmitting>
+      <inputPort>
+        <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id>
+        <name> From Node A</name>
+        <position x="0.0" y="0.0"/>
+        <comments/>
+        <scheduledState>RUNNING</scheduledState>
+        <maxConcurrentTasks>1</maxConcurrentTasks>
+        <useCompression>false</useCompression>
+        <property>
+            <name>Host Name</name>
+               <value>localhost</value>
+        </property>
+        <property>
+            <name>Port</name>
+            <value>10001</value>
+        </property>
+      </inputPort>
+      <outputPort>
+        <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id>
+        <name>To A</name>
+        <position x="0.0" y="0.0"/>
+        <comments/>
+        <scheduledState>RUNNING</scheduledState>
+        <maxConcurrentTasks>1</maxConcurrentTasks>
+        <useCompression>false</useCompression>
+        <property>
+            <name>Host Name</name>
+            <value>localhost</value>
+        </property>
+        <property>
+            <name>Port</name>
+            <value>10001</value>
+        </property>
+      </outputPort>
+    </remoteProcessGroup>
     <connection>
       <id>c4cf70d8-be05-4c3d-b926-465f330d6503</id>
       <name/>
@@ -93,20 +191,35 @@
       <flowFileExpiration>0 sec</flowFileExpiration>
     </connection>
     <connection>
-      <id>673cf83c-d261-4b6b-8e5a-19052fe40025</id>
-      <name/>
-      <bendPoints>
-        <bendPoint x="3106.4228882570283" y="1573.7169700192542"/>
-      </bendPoints>
+      <id>c9573abe-937c-464b-b18d-48b29c42dce2</id>
+      <name>site2siteSEND</name>
+      <bendPoints/>
       <labelIndex>1</labelIndex>
       <zIndex>0</zIndex>
       <sourceId>a0e57bb2-5b89-438e-8869-0326bbdbbe43</sourceId>
       <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
       <sourceType>PROCESSOR</sourceType>
-      <destinationId>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</destinationId>
+      <destinationId>471deef6-2a6e-4a7d-912a-81cc17e3a204</destinationId>
+      
<destinationGroupId>8f3b248f-d493-4269-b317-36f85719f480</destinationGroupId>
+      <destinationType>REMOTE_INPUT_PORT</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+    <connection>
+      <id>2cb90b4c-d6cb-4fef-8f0f-b16459561af5</id>
+      <name>site2siteReceive</name>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>75f88005-0a87-4fef-8320-6219cdbcf18b</sourceId>
+      <sourceGroupId>8f3b248f-d493-4269-b317-36f85719f480</sourceGroupId>
+      <sourceType>REMOTE_OUTPUT_PORT</sourceType>
+      <destinationId>e01275ae-ac38-48f9-ac53-1a44df1be88e</destinationId>
       
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
       <destinationType>PROCESSOR</destinationType>
-      <relationship>success</relationship>
+      <relationship/>
       <maxWorkQueueSize>0</maxWorkQueueSize>
       <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
       <flowFileExpiration>0 sec</flowFileExpiration>
@@ -156,7 +269,7 @@
       </property>
       <property>
         <name>Truststore Password</name>
-        
<value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value>
+        
<value>enc{3A31531B76B6395A72FB8BEB4C93E2040877D07C04FDAB5A84499B918BECEB77}</value>
       </property>
       <property>
         <name>Truststore Type</name>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/flowServer.xml
----------------------------------------------------------------------
diff --git a/target/conf/flowServer.xml b/target/conf/flowServer.xml
new file mode 100644
index 0000000..caca3eb
--- /dev/null
+++ b/target/conf/flowServer.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id>
+      <name>RealTimeDataCollector</name>
+      <position x="3259.732177734375" y="1739.991943359375"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.RealTimeDataCollector</class>
+      <maxConcurrentTasks>2</maxConcurrentTasks>
+      <schedulingPeriod>10 ms</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>File Name</name>
+        <value>data.osp</value>
+      </property>
+      <property>
+        <name>Real Time Server Name</name>
+        <value>localhost</value>
+      </property>
+      <property>
+        <name>Real Time Server Port</name>
+        <value>10000</value>
+      </property>
+      <property>
+        <name>Batch Server Name</name>
+        <value>localhost</value>
+      </property>
+      <property>
+        <name>Batch Server Port</name>
+        <value>10001</value>
+      </property>
+      <property>
+        <name>Iteration</name>
+        <value>true</value>
+      </property>
+      <property>
+        <name>Real Time Message ID</name>
+        <value>41</value>
+      </property>
+      <property>
+        <name>Batch Message ID</name>
+        <value>172,48</value>
+      </property>
+      <property>
+        <name>Real Time Interval</name>
+        <value>200 ms</value>
+      </property>
+      <property>
+        <name>Batch Time Interval</name>
+        <value>1 sec</value>
+      </property>
+      <property>
+        <name>Batch Max Buffer Size</name>
+        <value>262144</value>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+  </rootGroup>
+  <controllerServices>
+    <controllerService>
+      <id>b2785fb0-e797-4c4d-8592-d2b2563504c4</id>
+      <name>DistributedMapCacheClientService</name>
+      <comment/>
+      
<class>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</class>
+      <enabled>true</enabled>
+      <property>
+        <name>Server Hostname</name>
+        <value>localhost</value>
+      </property>
+      <property>
+        <name>Server Port</name>
+        <value>4557</value>
+      </property>
+      <property>
+        <name>SSL Context Service</name>
+      </property>
+      <property>
+        <name>Communications Timeout</name>
+        <value>30 secs</value>
+      </property>
+    </controllerService>
+    <controllerService>
+      <id>2855f1e0-dc35-4955-9ae2-b2d7d1765d4e</id>
+      <name>StandardSSLContextService</name>
+      <comment/>
+      <class>org.apache.nifi.ssl.StandardSSLContextService</class>
+      <enabled>true</enabled>
+      <property>
+        <name>Keystore Filename</name>
+      </property>
+      <property>
+        <name>Keystore Password</name>
+      </property>
+      <property>
+        <name>Keystore Type</name>
+      </property>
+      <property>
+        <name>Truststore Filename</name>
+        
<value>/Library/Java/JavaVirtualMachines/jdk1.8.0_73.jdk/Contents/Home/jre/lib/security/cacerts</value>
+      </property>
+      <property>
+        <name>Truststore Password</name>
+        
<value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value>
+      </property>
+      <property>
+        <name>Truststore Type</name>
+        <value>JKS</value>
+      </property>
+      <property>
+        <name>SSL Protocol</name>
+        <value>TLS</value>
+      </property>
+    </controllerService>
+  </controllerServices>
+  <reportingTasks/>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/flow_Site2SiteServer.xml
----------------------------------------------------------------------
diff --git a/target/conf/flow_Site2SiteServer.xml 
b/target/conf/flow_Site2SiteServer.xml
new file mode 100644
index 0000000..acd2c1e
--- /dev/null
+++ b/target/conf/flow_Site2SiteServer.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>cd274fef-168a-486b-b21a-04ed17f981b7</id>
+      <name>LogAttribute</name>
+      <position x="2823.8107761867964" y="623.2524160253959"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Log Level</name>
+        <value>info</value>
+      </property>
+      <property>
+        <name>Log Payload</name>
+        <value>true</value>
+      </property>
+      <property>
+        <name>Attributes to Log</name>
+      </property>
+      <property>
+        <name>Attributes to Ignore</name>
+      </property>
+      <property>
+        <name>Log prefix</name>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <processor>
+      <id>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</id>
+      <name>GenerateFlowFile</name>
+      <position x="2248.4411151522036" y="917.8589272756209"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>1 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>File Size</name>
+        <value>1024 kB</value>
+      </property>
+      <property>
+        <name>Batch Size</name>
+        <value>1</value>
+      </property>
+      <property>
+        <name>Data Format</name>
+        <value>Text</value>
+      </property>
+      <property>
+        <name>Unique FlowFiles</name>
+        <value>false</value>
+      </property>
+    </processor>
+    <inputPort>
+      <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id>
+      <name> From Node A</name>
+      <position x="2305.369919163486" y="646.0466623031645"/>
+      <comments/>
+      <scheduledState>RUNNING</scheduledState>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+    </inputPort>
+    <outputPort>
+      <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id>
+      <name>To A</name>
+      <position x="2915.739181824911" y="1057.8803860295386"/>
+      <comments/>
+      <scheduledState>RUNNING</scheduledState>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+    </outputPort>
+    <label>
+      <id>2f0db43e-1ce0-49ab-96a5-459c285aff09</id>
+      <position x="2197.3693058093504" y="849.4395700448451"/>
+      <size height="286.5726013183594" width="1012.2957763671875"/>
+      <styles>
+        <style name="font-size">18px</style>
+      </styles>
+      <value>Generate Data that is pushed to Node A and made available to be 
pulled</value>
+    </label>
+    <connection>
+      <id>7f869898-3a93-4e28-a60c-064789870574</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>471deef6-2a6e-4a7d-912a-81cc17e3a204</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>INPUT_PORT</sourceType>
+      <destinationId>cd274fef-168a-486b-b21a-04ed17f981b7</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship/>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+    <connection>
+      <id>9dbc73f6-c827-4258-8bc7-06eb6a9b79d5</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>75f88005-0a87-4fef-8320-6219cdbcf18b</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>OUTPUT_PORT</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/target/conf/nifi.properties b/target/conf/nifi.properties
index b1902b1..627876f 100644
--- a/target/conf/nifi.properties
+++ b/target/conf/nifi.properties
@@ -184,7 +184,7 @@ nifi.cluster.manager.safemode.duration=0 sec
 # kerberos #
 nifi.kerberos.krb5.file=
 
-# Server
+# MiNiFi Server for Command Control
 nifi.server.name=localhost
 nifi.server.port=9000
 nifi.server.report.interval=1000 ms

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/thirdparty/uuid/tst_uuid
----------------------------------------------------------------------
diff --git a/thirdparty/uuid/tst_uuid b/thirdparty/uuid/tst_uuid
index cce0cbd..e067cb2 100755
Binary files a/thirdparty/uuid/tst_uuid and b/thirdparty/uuid/tst_uuid differ

Reply via email to