http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RealTimeDataCollector.cpp 
b/libminifi/src/RealTimeDataCollector.cpp
deleted file mode 100644
index 7dd6469..0000000
--- a/libminifi/src/RealTimeDataCollector.cpp
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * @file RealTimeDataCollector.cpp
- * RealTimeDataCollector class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include <netinet/tcp.h>
-
-#include "utils/StringUtils.h"
-#include "RealTimeDataCollector.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string 
RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
-Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real 
time processor to process", "data.osp");
-Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", 
"Real Time Server Name", "localhost");
-Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", 
"Real Time Server Port", "10000");
-Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch 
Server Name", "localhost");
-Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch 
Server Port", "10001");
-Property RealTimeDataCollector::ITERATION("Iteration",
-               "If true, sample osp file will be iterated", "true");
-Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real 
Time Message ID", "41");
-Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message 
ID", "172, 30, 48");
-Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real 
Time Data Collection Interval in msec", "10 ms");
-Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch 
Processing Interval in msec", "100 ms");
-Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", 
"Batch Buffer Maximum size in bytes", "262144");
-Relationship RealTimeDataCollector::Success("success", "success operational on 
the flow record");
-
-void RealTimeDataCollector::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(FILENAME);
-       properties.insert(REALTIMESERVERNAME);
-       properties.insert(REALTIMESERVERPORT);
-       properties.insert(BATCHSERVERNAME);
-       properties.insert(BATCHSERVERPORT);
-       properties.insert(ITERATION);
-       properties.insert(REALTIMEMSGID);
-       properties.insert(BATCHMSGID);
-       properties.insert(REALTIMEINTERVAL);
-       properties.insert(BATCHINTERVAL);
-       properties.insert(BATCHMAXBUFFERSIZE);
-
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-
-}
-
-int RealTimeDataCollector::connectServer(const char *host, uint16_t port)
-{
-       in_addr_t addr;
-       int sock = 0;
-       struct hostent *h;
-#ifdef __MACH__
-       h = gethostbyname(host);
-#else
-       char buf[1024];
-       struct hostent he;
-       int hh_errno;
-       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
-       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
-       sock = socket(AF_INET, SOCK_STREAM, 0);
-       if (sock < 0)
-       {
-               logger_->log_error("Could not create socket to hostName %s", 
host);
-               return 0;
-       }
-
-#ifndef __MACH__
-       int opt = 1;
-       bool nagle_off = true;
-
-       if (nagle_off)
-       {
-               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
-               {
-                       logger_->log_error("setsockopt() TCP_NODELAY failed");
-                       close(sock);
-                       return 0;
-               }
-               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)&opt, sizeof(opt)) < 0)
-               {
-                       logger_->log_error("setsockopt() SO_REUSEADDR failed");
-                       close(sock);
-                       return 0;
-               }
-       }
-
-       int sndsize = 256*1024;
-       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
-       {
-               logger_->log_error("setsockopt() SO_SNDBUF failed");
-               close(sock);
-               return 0;
-       }
-#endif
-
-       struct sockaddr_in sa;
-       socklen_t socklen;
-       int status;
-
-       //TODO bind socket to the interface
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = htonl(INADDR_ANY);
-       sa.sin_port = htons(0);
-       socklen = sizeof(sa);
-       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
-       {
-               logger_->log_error("socket bind failed");
-               close(sock);
-               return 0;
-       }
-
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = addr;
-       sa.sin_port = htons(port);
-       socklen = sizeof(sa);
-
-       status = connect(sock, (struct sockaddr *)&sa, socklen);
-
-       if (status < 0)
-       {
-               logger_->log_error("socket connect failed to %s %d", host, 
port);
-               close(sock);
-               return 0;
-       }
-
-       logger_->log_info("socket %d connect to server %s port %d success", 
sock, host, port);
-
-       return sock;
-}
-
-int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen)
-{
-       int ret = 0, bytes = 0;
-
-       while (bytes < buflen)
-       {
-               ret = send(socket, buf+bytes, buflen-bytes, 0);
-               //check for errors
-               if (ret == -1)
-               {
-                       return ret;
-               }
-               bytes+=ret;
-       }
-
-       if (ret)
-               logger_->log_debug("Send data size %d over socket %d", buflen, 
socket);
-
-       return ret;
-}
-
-void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, 
ProcessSession *session)
-{
-       if (_realTimeAccumulated >= this->_realTimeInterval)
-       {
-               std::string value;
-               if (this->getProperty(REALTIMEMSGID.getName(), value))
-               {
-                       this->_realTimeMsgID.clear();
-                       this->logger_->log_info("Real Time Msg IDs %s", 
value.c_str());
-                       std::stringstream lineStream(value);
-                       std::string cell;
-
-                       while(std::getline(lineStream, cell, ','))
-                   {
-                       this->_realTimeMsgID.push_back(cell);
-                       // this->logger_->log_debug("Real Time Msg ID %s", 
cell.c_str());
-                   }
-               }
-               if (this->getProperty(BATCHMSGID.getName(), value))
-               {
-                       this->_batchMsgID.clear();
-                       this->logger_->log_info("Batch Msg IDs %s", 
value.c_str());
-                       std::stringstream lineStream(value);
-                       std::string cell;
-
-                       while(std::getline(lineStream, cell, ','))
-                   {
-                               cell = StringUtils::trim(cell);
-                       this->_batchMsgID.push_back(cell);
-                   }
-               }
-               // Open the file
-               if (!this->_fileStream.is_open())
-               {
-                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
-                       if (this->_fileStream.is_open())
-                               logger_->log_debug("open %s", 
_fileName.c_str());
-               }
-               if (!_fileStream.good())
-               {
-                       logger_->log_error("load data file failed %s", 
_fileName.c_str());
-                       return;
-               }
-               if (this->_fileStream.is_open())
-               {
-                       std::string line;
-
-                       while (std::getline(_fileStream, line))
-                       {
-                               line += "\n";
-                               std::stringstream lineStream(line);
-                               std::string cell;
-                               if (std::getline(lineStream, cell, ','))
-                               {
-                                       cell = StringUtils::trim(cell);
-                                       // Check whether it match to the batch 
traffic
-                                       for (std::vector<std::string>::iterator 
it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it)
-                                       {
-                                               if (cell == *it)
-                                               {
-                                                       // push the batch data 
to the queue
-                                                       
std::lock_guard<std::mutex> lock(_mtx);
-                                                       while ((_queuedDataSize 
+ line.size()) > _batchMaxBufferSize)
-                                                       {
-                                                               std::string 
item = _queue.front();
-                                                               _queuedDataSize 
-= item.size();
-                                                               
logger_->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
-                                                               _queue.pop();
-                                                       }
-                                                       _queue.push(line);
-                                                       _queuedDataSize += 
line.size();
-                                                       
logger_->log_debug("Push batch msg ID %s into batch queue, queue buffer size 
%d", cell.c_str(), _queuedDataSize);
-                                               }
-                                       }
-                                       bool findRealTime = false;
-                                       // Check whether it match to the real 
time traffic
-                                       for (std::vector<std::string>::iterator 
it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it)
-                                       {
-                                               if (cell == *it)
-                                               {
-                                                       int status = 0;
-                                                       if 
(this->_realTimeSocket <= 0)
-                                                       {
-                                                               // Connect the 
LTE socket
-                                                               uint16_t port = 
_realTimeServerPort;
-                                                               
this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
-                                                       }
-                                                       if 
(this->_realTimeSocket)
-                                                       {
-                                                               // try to send 
the data
-                                                               status = 
sendData(_realTimeSocket, line.data(), line.size());
-                                                               if (status < 0)
-                                                               {
-                                                                       
close(_realTimeSocket);
-                                                                       
_realTimeSocket = 0;
-                                                               }
-                                                       }
-                                                       if 
(this->_realTimeSocket <= 0 || status < 0)
-                                                       {
-                                                               // push the 
batch data to the queue
-                                                               
std::lock_guard<std::mutex> lock(_mtx);
-                                                               while 
((_queuedDataSize + line.size()) > _batchMaxBufferSize)
-                                                               {
-                                                                       
std::string item = _queue.front();
-                                                                       
_queuedDataSize -= item.size();
-                                                                       
logger_->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
-                                                                       
_queue.pop();
-                                                               }
-                                                               
_queue.push(line);
-                                                               _queuedDataSize 
+= line.size();
-                                                               
logger_->log_debug("Push real time msg ID %s into batch queue, queue buffer 
size %d", cell.c_str(), _queuedDataSize);
-                                                       }
-                                                       // find real time
-                                                       findRealTime = true;
-                                               } // cell
-                                       } // for real time pattern
-                                       if (findRealTime)
-                                               // we break the while once we 
find the first real time
-                                               break;
-                               }  // if get line
-                       } // while
-                       if (_fileStream.eof())
-                       {
-                               _fileStream.close();
-                       }
-               } // if open
-               _realTimeAccumulated = 0;
-       }
-       _realTimeAccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
-}
-
-void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, 
ProcessSession *session)
-{
-       if (_batchAcccumulated >= this->_batchInterval)
-       {
-               // logger_->log_info("onTriggerBatch");
-               // dequeue the batch and send over WIFI
-               int status = 0;
-               if (this->_batchSocket <= 0)
-               {
-                       // Connect the WIFI socket
-                       uint16_t port = _batchServerPort;
-                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
-               }
-               if (this->_batchSocket)
-               {
-                       std::lock_guard<std::mutex> lock(_mtx);
-
-                       while (!_queue.empty())
-                       {
-                               std::string line = _queue.front();
-                               status = sendData(_batchSocket, line.data(), 
line.size());
-                               _queue.pop();
-                               _queuedDataSize -= line.size();
-                               if (status < 0)
-                               {
-                                       close(_batchSocket);
-                                       _batchSocket = 0;
-                                       break;
-                               }
-                       }
-               }
-               _batchAcccumulated = 0;
-       }
-       _batchAcccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
-}
-
-void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession 
*session)
-{
-       std::thread::id id = std::this_thread::get_id();
-
-       if (id == _realTimeThreadId)
-               return onTriggerRealTime(context, session);
-       else if (id == _batchThreadId)
-               return onTriggerBatch(context, session);
-       else
-       {
-               std::lock_guard<std::mutex> lock(_mtx);
-               if (!this->_firstInvoking)
-               {
-                       this->_fileName = "data.osp";
-                       std::string value;
-                       if (this->getProperty(FILENAME.getName(), value))
-                       {
-                               this->_fileName = value;
-                               this->logger_->log_info("Data Collector File 
Name %s", _fileName.c_str());
-                       }
-                       this->_realTimeServerName = "localhost";
-                       if (this->getProperty(REALTIMESERVERNAME.getName(), 
value))
-                       {
-                               this->_realTimeServerName = value;
-                               this->logger_->log_info("Real Time Server Name 
%s", this->_realTimeServerName.c_str());
-                       }
-                       this->_realTimeServerPort = 10000;
-                       if (this->getProperty(REALTIMESERVERPORT.getName(), 
value))
-                       {
-                               Property::StringToInt(value, 
_realTimeServerPort);
-                               this->logger_->log_info("Real Time Server Port 
%d", _realTimeServerPort);
-                       }
-                       if (this->getProperty(BATCHSERVERNAME.getName(), value))
-                       {
-                               this->_batchServerName = value;
-                               this->logger_->log_info("Batch Server Name %s", 
this->_batchServerName.c_str());
-                       }
-                       this->_batchServerPort = 10001;
-                       if (this->getProperty(BATCHSERVERPORT.getName(), value))
-                       {
-                               Property::StringToInt(value, _batchServerPort);
-                               this->logger_->log_info("Batch Server Port %d", 
_batchServerPort);
-                       }
-                       if (this->getProperty(ITERATION.getName(), value))
-                       {
-                               StringUtils::StringToBool(value, 
this->_iteration);
-                               logger_->log_info("Iteration %d", _iteration);
-                       }
-                       this->_realTimeInterval = 10000000; //10 msec
-                       if (this->getProperty(REALTIMEINTERVAL.getName(), 
value))
-                       {
-                               TimeUnit unit;
-                               if (Property::StringToTime(value, 
_realTimeInterval, unit) &&
-                                                               
Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval))
-                               {
-                                       logger_->log_info("Real Time Interval: 
[%d] ns", _realTimeInterval);
-                               }
-                       }
-                       this->_batchInterval = 100000000; //100 msec
-                       if (this->getProperty(BATCHINTERVAL.getName(), value))
-                       {
-                               TimeUnit unit;
-                               if (Property::StringToTime(value, 
_batchInterval, unit) &&
-                                                               
Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval))
-                               {
-                                       logger_->log_info("Batch Time Interval: 
[%d] ns", _batchInterval);
-                               }
-                       }
-                       this->_batchMaxBufferSize = 256*1024;
-                       if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), 
value))
-                       {
-                               Property::StringToInt(value, 
_batchMaxBufferSize);
-                               this->logger_->log_info("Batch Max Buffer Size 
%d", _batchMaxBufferSize);
-                       }
-                       if (this->getProperty(REALTIMEMSGID.getName(), value))
-                       {
-                               this->logger_->log_info("Real Time Msg IDs %s", 
value.c_str());
-                               std::stringstream lineStream(value);
-                               std::string cell;
-
-                               while(std::getline(lineStream, cell, ','))
-                           {
-                               this->_realTimeMsgID.push_back(cell);
-                               this->logger_->log_info("Real Time Msg ID %s", 
cell.c_str());
-                           }
-                       }
-                       if (this->getProperty(BATCHMSGID.getName(), value))
-                       {
-                               this->logger_->log_info("Batch Msg IDs %s", 
value.c_str());
-                               std::stringstream lineStream(value);
-                               std::string cell;
-
-                               while(std::getline(lineStream, cell, ','))
-                           {
-                                       cell = StringUtils::trim(cell);
-                               this->_batchMsgID.push_back(cell);
-                               this->logger_->log_info("Batch Msg ID %s", 
cell.c_str());
-                           }
-                       }
-                       // Connect the LTE socket
-                       uint16_t port = _realTimeServerPort;
-
-                       this->_realTimeSocket = 
connectServer(_realTimeServerName.c_str(), port);
-
-                       // Connect the WIFI socket
-                       port = _batchServerPort;
-
-                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
-
-                       // Open the file
-                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
-                       if (!_fileStream.good())
-                       {
-                               logger_->log_error("load data file failed %s", 
_fileName.c_str());
-                               return;
-                       }
-                       else
-                       {
-                               logger_->log_debug("open %s", 
_fileName.c_str());
-                       }
-                       _realTimeThreadId = id;
-                       this->_firstInvoking = true;
-               }
-               else
-               {
-                       if (id != _realTimeThreadId)
-                               _batchThreadId = id;
-                       this->_firstInvoking = false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index dd1d035..9790256 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -29,93 +29,116 @@
 
 #include "RemoteProcessorGroupPort.h"
 
+#include "../include/io/StreamFactory.h"
 #include "io/ClientSocket.h"
-#include "io/SocketFactory.h"
-
 #include "utils/TimeUtil.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string 
RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
-Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", 
"localhost");
-Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
-Relationship RemoteProcessorGroupPort::relation;
-
-void RemoteProcessorGroupPort::initialize()
-{
-
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(hostName);
-       properties.insert(port);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(relation);
-       setSupportedRelationships(relationships);
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+const std::string RemoteProcessorGroupPort::ProcessorName(
+    "RemoteProcessorGroupPort");
+core::Property RemoteProcessorGroupPort::hostName("Host Name",
+                                                  "Remote Host Name.",
+                                                  "localhost");
+core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+core::Relationship RemoteProcessorGroupPort::relation;
+
+
+std::unique_ptr<Site2SiteClientProtocol> 
RemoteProcessorGroupPort::getNextProtocol() {
+  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
+  if (available_protocols_.empty())
+    return nullptr;
+
+  std::unique_ptr<Site2SiteClientProtocol> return_pointer = 
std::move(available_protocols_.top());
+  available_protocols_.pop();
+  return std::move(return_pointer);
+}
+
+void RemoteProcessorGroupPort::returnProtocol(
+    std::unique_ptr<Site2SiteClientProtocol> return_protocol) {
+  std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_);
+  available_protocols_.push(std::move(return_protocol));
+}
+
+void RemoteProcessorGroupPort::initialize() {
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(hostName);
+  properties.insert(port);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(relation);
+  setSupportedRelationships(relationships);
 
 }
 
-void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, 
ProcessSession *session)
-{
-       std::string value;
-
-       if (!transmitting_)
-               return;
-       
-       std::string host = peer_.getHostName();
-       uint16_t sport = peer_.getPort();
-       int64_t lvalue;
-       
-       if (context->getProperty(hostName.getName(), value))
-       {
-               host = value;
-       }
-       if (context->getProperty(port.getName(), value) && 
Property::StringToInt(value, lvalue))
-       {
-               sport = (uint16_t) lvalue;
-       }
-       
-       if (host != peer_.getHostName() || sport != peer_.getPort())
-       
-       {
-         
-             std::unique_ptr<DataStream> str = 
std::unique_ptr<DataStream>(SocketFactory::getInstance()->createSocket(host,sport));
-             peer_ = std::move(Site2SitePeer (std::move(str), host, sport));
-             protocol_->setPeer(&peer_);
-         
-       }
-               
-       
-       
-       bool needReset = false;
-
-       
-       if (host != peer_.getHostName())
-       {
-               peer_.setHostName(host);
-               needReset= true;
-       }
-       if (sport != peer_.getPort())
-       {
-               peer_.setPort(sport);
-               needReset = true;
-       }
-       if (needReset)
-               protocol_->tearDown();
-
-       if (!protocol_->bootstrap())
-       {
-               // bootstrap the client protocol if needeed
-               context->yield();
-               logger_->log_error("Site2Site bootstrap failed yield period %d 
peer ", context->getProcessor()->getYieldPeriodMsec());
-               return;
-       }
-
-       if (direction_ == RECEIVE)
-               protocol_->receiveFlowFiles(context, session);
-       else
-               protocol_->transferFlowFiles(context, session);
-
-       return;
+void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
+                                         core::ProcessSession *session) {
+  std::string value;
+
+  if (!transmitting_)
+    return;
+
+  std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol();
+
+  // Peer Connection
+  if (protocol_ == nullptr) {
+
+    protocol_ = std::unique_ptr<Site2SiteClientProtocol>(
+        new Site2SiteClientProtocol(0));
+    protocol_->setPortId(protocol_uuid_);
+    protocol_->setTimeOut(timeout_);
+
+    std::string host = "";
+    uint16_t sport = 0;
+    int64_t lvalue;
+
+    if (context->getProperty(hostName.getName(), value)) {
+      host = value;
+    }
+    if (context->getProperty(port.getName(), value)
+        && core::Property::StringToInt(value, lvalue)) {
+      sport = (uint16_t) lvalue;
+    }
+    std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
+        std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
+            org::apache::nifi::minifi::io::StreamFactory::getInstance()
+                ->createSocket(host, sport));
+
+    std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(
+        new Site2SitePeer(std::move(str), host, sport));
+
+    protocol_->setPeer(std::move(peer_));
+  }
+
+  if (!protocol_->bootstrap()) {
+    // bootstrap the client protocol if needeed
+    context->yield();
+    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
+        context->getProcessorNode().getProcessor());
+    logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
+                       processor->getYieldPeriodMsec());
+    return;
+  }
+
+  if (direction_ == RECEIVE)
+    protocol_->receiveFlowFiles(context, session);
+  else
+    protocol_->transferFlowFiles(context, session);
+
+  returnProtocol(std::move(protocol_));
+
+  return;
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Repository.cpp b/libminifi/src/Repository.cpp
deleted file mode 100644
index 1a3c7b0..0000000
--- a/libminifi/src/Repository.cpp
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * @file Repository.cpp
- * Repository implemenatation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <cstdint>
-#include <vector>
-#include <arpa/inet.h>
-#include "io/DataStream.h"
-#include "io/Serializable.h"
-#include "Relationship.h"
-#include "Logger.h"
-#include "FlowController.h"
-#include "Repository.h"
-#include "Provenance.h"
-#include "FlowFileRepository.h"
-
-const char *Repository::RepositoryTypeStr[MAX_REPO_TYPE] = {"Provenace 
Repository", "FlowFile Repository"};
-uint64_t Repository::_repoSize[MAX_REPO_TYPE] = {0, 0}; 
-
-void Repository::start() {
-       if (!_enable)
-               return;
-       if (this->_purgePeriod <= 0)
-               return;
-       if (_running)
-               return;
-       _running = true;
-       logger_->log_info("%s Repository Monitor Thread Start", 
RepositoryTypeStr[_type]);
-       _thread = new std::thread(run, this);
-       _thread->detach();
-}
-
-void Repository::stop() {
-       if (!_running)
-               return;
-       _running = false;
-       logger_->log_info("%s Repository Monitor Thread Stop", 
RepositoryTypeStr[_type]);
-}
-
-void Repository::run(Repository *repo) {
-#ifdef LEVELDB_SUPPORT
-       // threshold for purge
-       uint64_t purgeThreshold = repo->_maxPartitionBytes * 3 / 4;
-       while (repo->_running) {
-               std::this_thread::sleep_for(
-                               std::chrono::milliseconds(repo->_purgePeriod));
-               uint64_t curTime = getTimeMillis();
-               uint64_t size = repo->repoSize();
-               if (size >= purgeThreshold) {
-                       std::vector<std::string> purgeList;
-                       leveldb::Iterator* it = repo->_db->NewIterator(
-                                       leveldb::ReadOptions());
-                       if (repo->_type == PROVENANCE)
-                       {
-                               for (it->SeekToFirst(); it->Valid(); 
it->Next()) {
-                                       ProvenanceEventRecord eventRead;
-                                       std::string key = it->key().ToString();
-                                       if (eventRead.DeSerialize((uint8_t *) 
it->value().data(),
-                                               (int) it->value().size())) {
-                                               if ((curTime - 
eventRead.getEventTime())
-                                                       > 
repo->_maxPartitionMillis)
-                                                       
purgeList.push_back(key);
-                                       } else {
-                                               repo->logger_->log_debug(
-                                                       "NiFi %s retrieve event 
%s fail",
-                                                       
RepositoryTypeStr[repo->_type],
-                                                       key.c_str());
-                                               purgeList.push_back(key);
-                                       }
-                               }
-                       }
-                       if (repo->_type == FLOWFILE)
-                       {
-                               for (it->SeekToFirst(); it->Valid(); 
it->Next()) {
-                                       FlowFileEventRecord eventRead;
-                                       std::string key = it->key().ToString();
-                                       if (eventRead.DeSerialize((uint8_t *) 
it->value().data(),
-                                               (int) it->value().size())) {
-                                               if ((curTime - 
eventRead.getEventTime())
-                                                       > 
repo->_maxPartitionMillis)
-                                                       
purgeList.push_back(key);
-                                       } else {
-                                               repo->logger_->log_debug(
-                                                       "NiFi %s retrieve event 
%s fail",
-                                                       
RepositoryTypeStr[repo->_type],
-                                                       key.c_str());
-                                               purgeList.push_back(key);
-                                       }
-                               }
-                       }
-                       delete it;
-                       for (auto eventId : purgeList)
-                       {
-                               repo->logger_->log_info("Repository Repo %s 
Purge %s",
-                                               RepositoryTypeStr[repo->_type],
-                                               eventId.c_str());
-                               repo->Delete(eventId);
-                       }
-               }
-               if (size > repo->_maxPartitionBytes)
-                       repo->_repoFull = true;
-               else
-                       repo->_repoFull = false;
-       }
-#endif
-       return;
-}
-
-//! repoSize
-uint64_t Repository::repoSize()
-{
-       _repoSize[_type] = 0;
-       if (_type == PROVENANCE)
-        {
-               if (ftw(_directory.c_str(), repoSumProvenance, 1) != 0)
-                       _repoSize[_type] = 0;
-       }
-       if (_type == FLOWFILE)
-        {
-               if (ftw(_directory.c_str(), repoSumFlowFile, 1) != 0)
-                       _repoSize[_type] = 0;
-       }
-       return _repoSize[_type];
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index be52b49..826ca1d 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -23,27 +23,36 @@
 
 #include "ResourceClaim.h"
 
-std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
+std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
 
-std::string ResourceClaim::default_directory_path=DEFAULT_CONTENT_DIRECTORY;
+std::string ResourceClaim::default_directory_path = DEFAULT_CONTENT_DIRECTORY;
 
 ResourceClaim::ResourceClaim(const std::string contentDirectory)
-: _id(_localResourceClaimNumber.load()),
-  _flowFileRecordOwnedCount(0)
-{
-  
-       char uuidStr[37];
-
-       // Generate the global UUID for the resource claim
-       uuid_generate(_uuid);
-       // Increase the local ID for the resource claim
-       ++_localResourceClaimNumber;
-       uuid_unparse_lower(_uuid, uuidStr);
-       // Create the full content path for the content
-       _contentFullPath = contentDirectory + "/" + uuidStr;
-
-       configure_ = Configure::getConfigure();
-       logger_ = Logger::getLogger();
-       logger_->log_debug("Resource Claim created %s", uuidStr);
+    : _id(_localResourceClaimNumber.load()),
+      _flowFileRecordOwnedCount(0) {
+
+  char uuidStr[37];
+
+  // Generate the global UUID for the resource claim
+  uuid_generate(_uuid);
+  // Increase the local ID for the resource claim
+  ++_localResourceClaimNumber;
+  uuid_unparse_lower(_uuid, uuidStr);
+  // Create the full content path for the content
+  _contentFullPath = contentDirectory + "/" + uuidStr;
+
+  configure_ = Configure::getConfigure();
+
+  logger_ = logging::Logger::getLogger();
+  logger_->log_debug("Resource Claim created %s", _contentFullPath.c_str());
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp 
b/libminifi/src/SchedulingAgent.cpp
index 984abdc..8cb88e0 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -22,64 +22,71 @@
 #include <iostream>
 #include "Exception.h"
 #include "SchedulingAgent.h"
+#include "core/Processor.h"
 
-bool SchedulingAgent::hasWorkToDo(Processor *processor)
-{
-       // Whether it has work to do
-       if (processor->getTriggerWhenEmpty() || 
!processor->hasIncomingConnections() ||
-                       processor->flowFilesQueued())
-               return true;
-       else
-               return false;
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+bool SchedulingAgent::hasWorkToDo(
+    std::shared_ptr<core::Processor> processor) {
+  // Whether it has work to do
+  if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections()
+      || processor->flowFilesQueued())
+    return true;
+  else
+    return false;
 }
 
-bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor)
-{
-       return processor->flowFilesOutGoingFull();
+bool SchedulingAgent::hasTooMuchOutGoing(
+    std::shared_ptr<core::Processor> processor) {
+  return processor->flowFilesOutGoingFull();
 }
 
-bool SchedulingAgent::onTrigger(Processor *processor, ProcessContext 
*processContext, ProcessSessionFactory *sessionFactory)
-{
-       if (processor->isYield())
-               return false;
+bool SchedulingAgent::onTrigger(
+    std::shared_ptr<core::Processor> processor,
+    core::ProcessContext *processContext,
+    core::ProcessSessionFactory *sessionFactory) {
+  if (processor->isYield())
+    return false;
+
+  // No need to yield, reset yield expiration to 0
+  processor->clearYield();
 
-       // No need to yield, reset yield expiration to 0
-       processor->clearYield();
+  if (!hasWorkToDo(processor))
+    // No work to do, yield
+    return true;
 
-       if (!hasWorkToDo(processor))
-               // No work to do, yield
-               return true;
+  if (hasTooMuchOutGoing(processor))
+    // need to apply backpressure
+    return true;
 
-       if(hasTooMuchOutGoing(processor))
-               // need to apply backpressure
-               return true;
+  //TODO runDuration
 
-       //TODO runDuration
+  processor->incrementActiveTasks();
+  try {
+    processor->onTrigger(processContext, sessionFactory);
+    processor->decrementActiveTask();
+  } catch (Exception &exception) {
+    // Normal exception
+    logger_->log_debug("Caught Exception %s", exception.what());
+    processor->decrementActiveTask();
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    processor->yield(_administrativeYieldDuration);
+    processor->decrementActiveTask();
+  } catch (...) {
+    logger_->log_debug("Caught Exception during SchedulingAgent::onTrigger");
+    processor->yield(_administrativeYieldDuration);
+    processor->decrementActiveTask();
+  }
+
+  return false;
+}
 
-       processor->incrementActiveTasks();
-       try
-       {
-               processor->onTrigger(processContext, sessionFactory);
-               processor->decrementActiveTask();
-       }
-       catch (Exception &exception)
-       {
-               // Normal exception
-               logger_->log_debug("Caught Exception %s", exception.what());
-               processor->decrementActiveTask();
-       }
-       catch (std::exception &exception)
-       {
-               logger_->log_debug("Caught Exception %s", exception.what());
-               processor->yield(_administrativeYieldDuration);
-               processor->decrementActiveTask();
-       }
-       catch (...)
-       {
-               logger_->log_debug("Caught Exception during 
SchedulingAgent::onTrigger");
-               processor->yield(_administrativeYieldDuration);
-               processor->decrementActiveTask();
-       }
 
-       return false;
-}
\ No newline at end of file
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

Reply via email to