http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp 
b/libminifi/src/Site2SiteClientProtocol.cpp
new file mode 100644
index 0000000..88ea78a
--- /dev/null
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -0,0 +1,1313 @@
+/**
+ * @file Site2SiteProtocol.cpp
+ * Site2SiteProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "Site2SitePeer.h"
+#include "Site2SiteClientProtocol.h"
+
+bool Site2SiteClientProtocol::establish()
+{
+       if (_peerState != IDLE)
+       {
+               _logger->log_error("Site2Site peer state is not idle while try 
to establish");
+               return false;
+       }
+
+       bool ret = _peer->Open();
+
+       if (!ret)
+       {
+               _logger->log_error("Site2Site peer socket open failed");
+               return false;
+       }
+
+       // Negotiate the version
+       ret = initiateResourceNegotiation();
+
+       if (!ret)
+       {
+               _logger->log_error("Site2Site Protocol Version Negotiation 
failed");
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       }
+
+       _logger->log_info("Site2Site socket established");
+       _peerState = ESTABLISHED;
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::initiateResourceNegotiation()
+{
+       // Negotiate the version
+       if (_peerState != IDLE)
+       {
+               _logger->log_error("Site2Site peer state is not idle while 
initiateResourceNegotiation");
+               return false;
+       }
+
+       _logger->log_info("Negotiate protocol version with destination port %s 
current version %d", _portIdStr.c_str(), _currentVersion);
+
+       int ret = _peer->writeUTF(this->getResourceName());
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       ret = _peer->write(_currentVersion);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       uint8_t statusCode;
+       ret = _peer->read(statusCode);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       switch (statusCode)
+       {
+       case RESOURCE_OK:
+               _logger->log_info("Site2Site Protocol Negotiate protocol 
version OK");
+               return true;
+       case DIFFERENT_RESOURCE_VERSION:
+               uint32_t serverVersion;
+               ret = _peer->read(serverVersion);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               _logger->log_info("Site2Site Server Response asked for a 
different protocol version %d", serverVersion);
+               for (unsigned int i = (_currentVersionIndex + 1); i < 
sizeof(_supportedVersion)/sizeof(uint32_t); i++)
+               {
+                       if (serverVersion >= _supportedVersion[i])
+                       {
+                               _currentVersion = _supportedVersion[i];
+                               _currentVersionIndex = i;
+                               return initiateResourceNegotiation();
+                       }
+               }
+               ret = -1;
+               // tearDown();
+               return false;
+       case NEGOTIATED_ABORT:
+               _logger->log_info("Site2Site Negotiate protocol response 
ABORT");
+               ret = -1;
+               // tearDown();
+               return false;
+       default:
+               _logger->log_info("Negotiate protocol response unknown code 
%d", statusCode);
+               return true;
+       }
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::initiateCodecResourceNegotiation()
+{
+       // Negotiate the version
+       if (_peerState != HANDSHAKED)
+       {
+               _logger->log_error("Site2Site peer state is not handshaked 
while initiateCodecResourceNegotiation");
+               return false;
+       }
+
+       _logger->log_info("Negotiate Codec version with destination port %s 
current version %d", _portIdStr.c_str(), _currentCodecVersion);
+
+       int ret = _peer->writeUTF(this->getCodecResourceName());
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       ret = _peer->write(_currentCodecVersion);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       uint8_t statusCode;
+       ret = _peer->read(statusCode);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       switch (statusCode)
+       {
+       case RESOURCE_OK:
+               _logger->log_info("Site2Site Codec Negotiate version OK");
+               return true;
+       case DIFFERENT_RESOURCE_VERSION:
+               uint32_t serverVersion;
+               ret = _peer->read(serverVersion);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               _logger->log_info("Site2Site Server Response asked for a 
different codec version %d", serverVersion);
+               for (unsigned int i = (_currentCodecVersionIndex + 1); i < 
sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++)
+               {
+                       if (serverVersion >= _supportedCodecVersion[i])
+                       {
+                               _currentCodecVersion = 
_supportedCodecVersion[i];
+                               _currentCodecVersionIndex = i;
+                               return initiateCodecResourceNegotiation();
+                       }
+               }
+               ret = -1;
+               // tearDown();
+               return false;
+       case NEGOTIATED_ABORT:
+               _logger->log_info("Site2Site Codec Negotiate response ABORT");
+               ret = -1;
+               // tearDown();
+               return false;
+       default:
+               _logger->log_info("Negotiate Codec response unknown code %d", 
statusCode);
+               return true;
+       }
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::handShake()
+{
+       if (_peerState != ESTABLISHED)
+       {
+               _logger->log_error("Site2Site peer state is not established 
while handshake");
+               return false;
+       }
+       _logger->log_info("Site2Site Protocol Perform hand shake with 
destination port %s", _portIdStr.c_str());
+       uuid_t uuid;
+       // Generate the global UUID for the com identify
+       uuid_generate(uuid);
+       char uuidStr[37];
+       uuid_unparse(uuid, uuidStr);
+       _commsIdentifier = uuidStr;
+
+       int ret = _peer->writeUTF(_commsIdentifier);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       std::map<std::string, std::string> properties;
+       properties[HandShakePropertyStr[GZIP]] = "false";
+       properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
+       properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = 
std::to_string(this->_timeOut);
+       if (this->_currentVersion >= 5)
+       {
+               if (this->_batchCount > 0)
+                       properties[HandShakePropertyStr[BATCH_COUNT]] = 
std::to_string(this->_batchCount);
+               if (this->_batchSize > 0)
+                       properties[HandShakePropertyStr[BATCH_SIZE]] = 
std::to_string(this->_batchSize);
+               if (this->_batchDuration > 0)
+                       properties[HandShakePropertyStr[BATCH_DURATION]] = 
std::to_string(this->_batchDuration);
+       }
+
+       if (_currentVersion >= 3)
+       {
+               ret = _peer->writeUTF(_peer->getURL());
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+       }
+
+       uint32_t size = properties.size();
+       ret = _peer->write(size);
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       std::map<std::string, std::string>::iterator it;
+       for (it = properties.begin(); it!= properties.end(); it++)
+       {
+               ret = _peer->writeUTF(it->first);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               ret = _peer->writeUTF(it->second);
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return false;
+               }
+               _logger->log_info("Site2Site Protocol Send handshake properties 
%s %s", it->first.c_str(), it->second.c_str());
+       }
+
+       RespondCode code;
+       std::string message;
+
+       ret = this->readRespond(code, message);
+
+       if (ret <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       switch (code)
+       {
+       case PROPERTIES_OK:
+               _logger->log_info("Site2Site HandShake Completed");
+               _peerState = HANDSHAKED;
+               return true;
+       case PORT_NOT_IN_VALID_STATE:
+    case UNKNOWN_PORT:
+    case PORTS_DESTINATION_FULL:
+       _logger->log_error("Site2Site HandShake Failed because destination port 
is either invalid or full");
+               ret = -1;
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       default:
+               _logger->log_info("HandShake Failed because of unknown respond 
code %d", code);
+               ret = -1;
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       }
+
+       return false;
+}
+
+void Site2SiteClientProtocol::tearDown()
+{
+       if (_peerState >= ESTABLISHED)
+       {
+               _logger->log_info("Site2Site Protocol tearDown");
+               // need to write shutdown request
+               writeRequestType(SHUTDOWN);
+       }
+
+       std::map<std::string, Transaction *>::iterator it;
+       for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++)
+       {
+               delete it->second;
+       }
+       _transactionMap.clear();
+       _peer->Close();
+       _peerState = IDLE;
+}
+
+int Site2SiteClientProtocol::writeRequestType(RequestType type)
+{
+       if (type >= MAX_REQUEST_TYPE)
+               return -1;
+
+       return _peer->writeUTF(RequestTypeStr[type]);
+}
+
+int Site2SiteClientProtocol::readRequestType(RequestType &type)
+{
+       std::string requestTypeStr;
+
+       int ret = _peer->readUTF(requestTypeStr);
+
+       if (ret <= 0)
+               return ret;
+
+       for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++)
+       {
+               if (RequestTypeStr[i] == requestTypeStr)
+               {
+                       type = (RequestType) i;
+                       return ret;
+               }
+       }
+
+       return -1;
+}
+
+int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string 
&message)
+{
+       uint8_t firstByte;
+
+       int ret = _peer->read(firstByte);
+
+       if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+               return -1;
+
+       uint8_t secondByte;
+
+       ret = _peer->read(secondByte);
+
+       if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+               return -1;
+
+       uint8_t thirdByte;
+
+       ret = _peer->read(thirdByte);
+
+       if (ret <= 0)
+               return ret;
+
+       code = (RespondCode) thirdByte;
+
+       RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+       if ( resCode == NULL)
+       {
+               // Not a valid respond code
+               return -1;
+       }
+       if (resCode->hasDescription)
+       {
+               ret = _peer->readUTF(message);
+               if (ret <= 0)
+                       return -1;
+       }
+       return 3 + message.size();
+}
+
+int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string 
message)
+{
+       RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+       if (resCode == NULL)
+       {
+               // Not a valid respond code
+               return -1;
+       }
+
+       uint8_t codeSeq[3];
+       codeSeq[0] = CODE_SEQUENCE_VALUE_1;
+       codeSeq[1] = CODE_SEQUENCE_VALUE_2;
+       codeSeq[2] = (uint8_t) code;
+
+       int ret = _peer->write(codeSeq, 3);
+
+       if (ret != 3)
+               return -1;
+
+       if (resCode->hasDescription)
+       {
+               ret = _peer->writeUTF(message);
+               if (ret > 0)
+                       return (3 + ret);
+               else
+                       return ret;
+       }
+       else
+               return 3;
+}
+
+bool Site2SiteClientProtocol::negotiateCodec()
+{
+       if (_peerState != HANDSHAKED)
+       {
+               _logger->log_error("Site2Site peer state is not handshaked 
while negotiate codec");
+               return false;
+       }
+
+       _logger->log_info("Site2Site Protocol Negotiate Codec with destination 
port %s", _portIdStr.c_str());
+
+       int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
+
+       if (status <= 0)
+       {
+               // tearDown();
+               return false;
+       }
+
+       // Negotiate the codec version
+       bool ret = initiateCodecResourceNegotiation();
+
+       if (!ret)
+       {
+               _logger->log_error("Site2Site Codec Version Negotiation 
failed");
+               /*
+               _peer->yield();
+               tearDown(); */
+               return false;
+       }
+
+       _logger->log_info("Site2Site Codec Completed and move to READY state 
for data transfer");
+       _peerState = READY;
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::bootstrap()
+{
+       if (_peerState == READY)
+               return true;
+
+       tearDown();
+
+       if (establish() && handShake() && negotiateCodec())
+       {
+               _logger->log_info("Site2Site Ready For data transaction");
+               return true;
+       }
+       else
+       {
+               _peer->yield();
+               tearDown();
+               return false;
+       }
+}
+
+Transaction* Site2SiteClientProtocol::createTransaction(std::string 
&transactionID, TransferDirection direction)
+{
+       int ret;
+       bool dataAvailable;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return NULL;
+       }
+
+       if (direction == RECEIVE)
+       {
+               ret = writeRequestType(RECEIVE_FLOWFILES);
+
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return NULL;
+               }
+
+               RespondCode code;
+               std::string message;
+
+               ret = readRespond(code, message);
+
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return NULL;
+               }
+
+               switch (code)
+               {
+               case MORE_DATA:
+                       dataAvailable = true;
+                       _logger->log_info("Site2Site peer indicates that data 
is available");
+                       transaction = new Transaction(direction);
+                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
+                       transactionID = transaction->getUUIDStr();
+                       transaction->setDataAvailable(dataAvailable);
+                       _logger->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+                       return transaction;
+               case NO_MORE_DATA:
+                       dataAvailable = false;
+                       _logger->log_info("Site2Site peer indicates that no 
data is available");
+                       transaction = new Transaction(direction);
+                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
+                       transactionID = transaction->getUUIDStr();
+                       transaction->setDataAvailable(dataAvailable);
+                       _logger->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+                       return transaction;
+               default:
+                       _logger->log_info("Site2Site got unexpected response %d 
when asking for data", code);
+                       // tearDown();
+                       return NULL;
+               }
+       }
+       else
+       {
+               ret = writeRequestType(SEND_FLOWFILES);
+
+               if (ret <= 0)
+               {
+                       // tearDown();
+                       return NULL;
+               }
+               else
+               {
+                       transaction = new Transaction(direction);
+                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
+                       transactionID = transaction->getUUIDStr();
+                       _logger->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
+                       return transaction;
+               }
+       }
+}
+
+bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket 
*packet, bool &eof)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED)
+       {
+               _logger->log_info("Site2Site transaction %s is not at started 
or exchanged state", transactionID.c_str());
+               return false;
+       }
+
+       if (transaction->getDirection() != RECEIVE)
+       {
+               _logger->log_info("Site2Site transaction %s direction is 
wrong", transactionID.c_str());
+               return false;
+       }
+
+       if (!transaction->isDataAvailable())
+       {
+               eof = true;
+               return true;
+       }
+
+       if (transaction->_transfers > 0)
+       {
+               // if we already has transfer before, check to see whether 
another one is available
+               RespondCode code;
+               std::string message;
+
+               ret = readRespond(code, message);
+
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               if (code == CONTINUE_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate continue transaction", transactionID.c_str());
+                       transaction->_dataAvailable = true;
+               }
+               else if (code == FINISH_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate finish transaction", transactionID.c_str());
+                       transaction->_dataAvailable = false;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate wrong respond code %d", transactionID.c_str(), code);
+                       return false;
+               }
+       }
+
+       if (!transaction->isDataAvailable())
+       {
+               eof = true;
+               return true;
+       }
+
+       // start to read the packet
+       uint32_t numAttributes;
+       ret = _peer->read(numAttributes, &transaction->_crc);
+       if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES)
+       {
+               return false;
+       }
+
+       // read the attributes
+       for (unsigned int i = 0; i < numAttributes; i++)
+       {
+               std::string key;
+               std::string value;
+               ret = _peer->readUTF(key, true, &transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               ret = _peer->readUTF(value, true, &transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               packet->_attributes[key] = value;
+               _logger->log_info("Site2Site transaction %s receives attribute 
key %s value %s", transactionID.c_str(), key.c_str(), value.c_str());
+       }
+
+       uint64_t len;
+       ret = _peer->read(len, &transaction->_crc);
+       if (ret <= 0)
+       {
+               return false;
+       }
+
+       packet->_size = len;
+       transaction->_transfers++;
+       transaction->_state = DATA_EXCHANGED;
+       transaction->_bytes += len;
+       _logger->log_info("Site2Site transaction %s receives flow record %d, 
total length %d", transactionID.c_str(),
+                       transaction->_transfers, transaction->_bytes);
+
+       return true;
+}
+
+bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket 
*packet, FlowFileRecord *flowFile, ProcessSession *session)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED)
+       {
+               _logger->log_info("Site2Site transaction %s is not at started 
or exchanged state", transactionID.c_str());
+               return false;
+       }
+
+       if (transaction->getDirection() != SEND)
+       {
+               _logger->log_info("Site2Site transaction %s direction is 
wrong", transactionID.c_str());
+               return false;
+       }
+
+       if (transaction->_transfers > 0)
+       {
+               ret = writeRespond(CONTINUE_TRANSACTION, 
"CONTINUE_TRANSACTION");
+               if (ret <= 0)
+               {
+                       return false;
+               }
+       }
+
+       // start to read the packet
+       uint32_t numAttributes = packet->_attributes.size();
+       ret = _peer->write(numAttributes, &transaction->_crc);
+       if (ret != 4)
+       {
+               return false;
+       }
+
+       std::map<std::string, std::string>::iterator itAttribute;
+       for (itAttribute = packet->_attributes.begin(); itAttribute!= 
packet->_attributes.end(); itAttribute++)
+       {
+               ret = _peer->writeUTF(itAttribute->first, true, 
&transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               ret = _peer->writeUTF(itAttribute->second, true, 
&transaction->_crc);
+               if (ret <= 0)
+               {
+                       return false;
+               }
+               _logger->log_info("Site2Site transaction %s send attribute key 
%s value %s", transactionID.c_str(),
+                               itAttribute->first.c_str(), 
itAttribute->second.c_str());
+       }
+
+       uint64_t len = flowFile->getSize() ;
+       ret = _peer->write(len, &transaction->_crc);
+       if (ret != 8)
+       {
+               return false;
+       }
+
+       if (flowFile->getSize())
+       {
+               Site2SiteClientProtocol::ReadCallback callback(packet);
+               session->read(flowFile, &callback);
+               if (flowFile->getSize() != packet->_size)
+               {
+                       return false;
+               }
+       }
+
+       transaction->_transfers++;
+       transaction->_state = DATA_EXCHANGED;
+       transaction->_bytes += len;
+       _logger->log_info("Site2Site transaction %s send flow record %d, total 
length %d", transactionID.c_str(),
+                               transaction->_transfers, transaction->_bytes);
+
+       return true;
+}
+
+void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, 
ProcessSession *session)
+{
+       uint64_t bytes = 0;
+       int transfers = 0;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
+               return;
+       }
+
+       // Create the transaction
+       std::string transactionID;
+       transaction = createTransaction(transactionID, RECEIVE);
+
+       if (transaction == NULL)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
+               return;
+       }
+
+       try
+       {
+               while (true)
+               {
+                       std::map<std::string, std::string> empty;
+                       DataPacket packet(this, transaction, empty);
+                       bool eof = false;
+
+                       if (!receive(transactionID, &packet, eof))
+                       {
+                               throw Exception(SITE2SITE_EXCEPTION, "Receive 
Failed");
+                               return;
+                       }
+                       if (eof)
+                       {
+                               // transaction done
+                               break;
+                       }
+                       FlowFileRecord *flowFile = session->create();
+                       if (!flowFile)
+                       {
+                               throw Exception(SITE2SITE_EXCEPTION, "Flow File 
Creation Failed");
+                               return;
+                       }
+                       std::map<std::string, std::string>::iterator it;
+                       for (it = packet._attributes.begin(); it!= 
packet._attributes.end(); it++)
+                       {
+                               flowFile->addAttribute(it->first, it->second);
+                       }
+
+                       if (packet._size > 0)
+                       {
+                               Site2SiteClientProtocol::WriteCallback 
callback(&packet);
+                               session->write(flowFile, &callback);
+                               if (flowFile->getSize() != packet._size)
+                               {
+                                       throw Exception(SITE2SITE_EXCEPTION, 
"Receive Size Not Right");
+                                       return;
+                               }
+                       }
+                       Relationship relation; // undefined relationship
+                       session->transfer(flowFile, relation);
+                       // receive the transfer for the flow record
+                       bytes += packet._size;
+                       transfers++;
+               } // while true
+
+               if (!confirm(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Confirm 
Transaction Failed");
+                       return;
+               }
+               if (!complete(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Complete 
Transaction Failed");
+                       return;
+               }
+               _logger->log_info("Site2Site transaction %s successfully 
receive flow record %d, content bytes %d",
+                               transactionID.c_str(), transfers, bytes);
+               // we yield the receive if we did not get anything
+               if (transfers == 0)
+                       context->yield();
+       }
+       catch (std::exception &exception)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception during 
Site2SiteClientProtocol::receiveFlowFiles");
+               throw;
+       }
+
+       deleteTransaction(transactionID);
+
+       return;
+}
+
+bool Site2SiteClientProtocol::confirm(std::string transactionID)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() == TRANSACTION_STARTED && 
!transaction->isDataAvailable() &&
+                       transaction->getDirection() == RECEIVE)
+       {
+               transaction->_state = TRANSACTION_CONFIRMED;
+               return true;
+       }
+
+       if (transaction->getState() != DATA_EXCHANGED)
+               return false;
+
+       if (transaction->getDirection() == RECEIVE)
+       {
+               if (transaction->isDataAvailable())
+                       return false;
+               // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
+               // to peer so that we can verify that the connection is still 
open. This is a two-phase commit,
+               // which helps to prevent the chances of data duplication. 
Without doing this, we may commit the
+               // session and then when we send the response back to the peer, 
the peer may have timed out and may not
+               // be listening. As a result, it will re-send the data. By 
doing this two-phase commit, we narrow the
+               // Critical Section involved in this transaction so that rather 
than the Critical Section being the
+               // time window involved in the entire transaction, it is 
reduced to a simple round-trip conversation.
+               long crcValue = transaction->getCRC();
+               std::string crc = std::to_string(crcValue);
+               _logger->log_info("Site2Site Send confirm with CRC %d to 
transaction %s", transaction->getCRC(),
+                                               transactionID.c_str());
+               ret = writeRespond(CONFIRM_TRANSACTION, crc);
+               if (ret <= 0)
+                       return false;
+               RespondCode code;
+               std::string message;
+               readRespond(code, message);
+               if (ret <= 0)
+                       return false;
+
+               if (code == CONFIRM_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
confirm transaction", transactionID.c_str());
+                       transaction->_state = TRANSACTION_CONFIRMED;
+                       return true;
+               }
+               else if (code == BAD_CHECKSUM)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
indicate bad checksum", transactionID.c_str());
+                       /*
+                       transaction->_state = TRANSACTION_CONFIRMED;
+                       return true; */
+                       return false;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
unknown respond code %d",
+                                       transactionID.c_str(), code);
+                       return false;
+               }
+       }
+       else
+       {
+               _logger->log_info("Site2Site Send FINISH TRANSACTION for 
transaction %s",
+                                                               
transactionID.c_str());
+               ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
+               if (ret <= 0)
+                       return false;
+               RespondCode code;
+               std::string message;
+               readRespond(code, message);
+               if (ret <= 0)
+                       return false;
+
+               // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer 
to send a 'Confirm Transaction' response
+               if (code == CONFIRM_TRANSACTION)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
confirm transaction with CRC %s", transactionID.c_str(), message.c_str());
+                       if (this->_currentVersion > 3)
+                       {
+                               long crcValue = transaction->getCRC();
+                               std::string crc = std::to_string(crcValue);
+                               if (message == crc)
+                               {
+                                       _logger->log_info("Site2Site 
transaction %s CRC matched", transactionID.c_str());
+                                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
+                                       if (ret <= 0)
+                                               return false;
+                                       transaction->_state = 
TRANSACTION_CONFIRMED;
+                                       return true;
+                               }
+                               else
+                               {
+                                       _logger->log_info("Site2Site 
transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
+                                       ret = writeRespond(BAD_CHECKSUM, 
"BAD_CHECKSUM");
+                                       /*
+                                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
+                                                                               
if (ret <= 0)
+                                                                               
        return false;
+                                                                               
transaction->_state = TRANSACTION_CONFIRMED;
+                                       return true; */
+                                       return false;
+                               }
+                       }
+                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
+                       if (ret <= 0)
+                               return false;
+                       transaction->_state = TRANSACTION_CONFIRMED;
+                       return true;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
unknown respond code %d",
+                                       transactionID.c_str(), code);
+                       return false;
+               }
+               return false;
+       }
+}
+
+void Site2SiteClientProtocol::cancel(std::string transactionID)
+{
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               return;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() == TRANSACTION_CANCELED || 
transaction->getState() == TRANSACTION_COMPLETED
+                       || transaction->getState() == TRANSACTION_ERROR)
+       {
+               return;
+       }
+
+       this->writeRespond(CANCEL_TRANSACTION, "Cancel");
+       transaction->_state = TRANSACTION_CANCELED;
+
+       tearDown();
+       return;
+}
+
+void Site2SiteClientProtocol::deleteTransaction(std::string transactionID)
+{
+       Transaction *transaction = NULL;
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       _logger->log_info("Site2Site delete transaction %s", 
transaction->getUUIDStr().c_str());
+       delete transaction;
+       _transactionMap.erase(transactionID);
+}
+
+void Site2SiteClientProtocol::error(std::string transactionID)
+{
+       Transaction *transaction = NULL;
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       transaction->_state = TRANSACTION_ERROR;
+       tearDown();
+       return;
+}
+
+//! Complete the transaction
+bool Site2SiteClientProtocol::complete(std::string transactionID)
+{
+       int ret;
+       Transaction *transaction = NULL;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               return false;
+       }
+
+       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+
+       if (it == _transactionMap.end())
+       {
+               return false;
+       }
+       else
+       {
+               transaction = it->second;
+       }
+
+       if (transaction->getState() != TRANSACTION_CONFIRMED)
+       {
+               return false;
+       }
+
+       if (transaction->getDirection() == RECEIVE)
+       {
+               if (transaction->_transfers == 0)
+               {
+                       transaction->_state = TRANSACTION_COMPLETED;
+                       return true;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s send 
finished", transactionID.c_str());
+                       ret = this->writeRespond(TRANSACTION_FINISHED, 
"Finished");
+                       if (ret <= 0)
+                               return false;
+                       else
+                       {
+                               transaction->_state = TRANSACTION_COMPLETED;
+                               return true;
+                       }
+               }
+       }
+       else
+       {
+               RespondCode code;
+               std::string message;
+               int ret;
+
+               ret = readRespond(code, message);
+
+               if (ret <= 0)
+                       return false;
+
+               if (code == TRANSACTION_FINISHED)
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
finished transaction", transactionID.c_str());
+                       transaction->_state = TRANSACTION_COMPLETED;
+                       return true;
+               }
+               else
+               {
+                       _logger->log_info("Site2Site transaction %s peer 
unknown respond code %d",
+                                       transactionID.c_str(), code);
+                       return false;
+               }
+       }
+}
+
+void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, 
ProcessSession *session)
+{
+       FlowFileRecord *flow = session->get();
+       Transaction *transaction = NULL;
+
+       if (!flow)
+               return;
+
+       if (_peerState != READY)
+       {
+               bootstrap();
+       }
+
+       if (_peerState != READY)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
+               return;
+       }
+
+       // Create the transaction
+       std::string transactionID;
+       transaction = createTransaction(transactionID, SEND);
+
+       if (transaction == NULL)
+       {
+               context->yield();
+               tearDown();
+               throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
+               return;
+       }
+
+       bool continueTransaction = true;
+       uint64_t startSendingNanos = getTimeNano();
+
+       try
+       {
+               while (continueTransaction)
+               {
+                       DataPacket packet(this, transaction, 
flow->getAttributes());
+
+                       if (!send(transactionID, &packet, flow, session))
+                       {
+                               throw Exception(SITE2SITE_EXCEPTION, "Send 
Failed");
+                               return;
+                       }
+                       _logger->log_info("Site2Site transaction %s send flow 
record %s",
+                                                       transactionID.c_str(), 
flow->getUUIDStr().c_str());
+                       session->remove(flow);
+
+                       uint64_t transferNanos = getTimeNano() - 
startSendingNanos;
+                       if (transferNanos > _batchSendNanos)
+                               break;
+
+                       flow = session->get();
+                       if (!flow)
+                       {
+                               continueTransaction = false;
+                       }
+               } // while true
+
+               if (!confirm(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+                       return;
+               }
+               if (!complete(transactionID))
+               {
+                       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+                       return;
+               }
+               _logger->log_info("Site2Site transaction %s successfully send 
flow record %d, content bytes %d",
+                               transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
+       }
+       catch (std::exception &exception)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               if (transaction)
+                       deleteTransaction(transactionID);
+               context->yield();
+               tearDown();
+               _logger->log_debug("Caught Exception during 
Site2SiteClientProtocol::transferFlowFiles");
+               throw;
+       }
+
+       deleteTransaction(transactionID);
+
+       return;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp
new file mode 100644
index 0000000..48e19d0
--- /dev/null
+++ b/libminifi/src/Site2SitePeer.cpp
@@ -0,0 +1,435 @@
+/**
+ * @file Site2SitePeer.cpp
+ * Site2SitePeer class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "Site2SitePeer.h"
+
+//! CRC tables
+std::atomic<bool> CRC32::tableInit(false);
+unsigned int CRC32::table[256];
+
+bool Site2SitePeer::Open()
+{
+       in_addr_t addr;
+       int sock = 0;
+       struct hostent *h;
+       const char *host;
+       uint16_t port;
+
+       host = this->_host.c_str();
+       port = this->_port;
+
+       if (strlen(host) == 0)
+               return false;
+
+#ifdef __MACH__
+       h = gethostbyname(host);
+#else
+       char buf[1024];
+       struct hostent he;
+       int hh_errno;
+       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+       sock = socket(AF_INET, SOCK_STREAM, 0);
+       if (sock < 0)
+       {
+               _logger->log_error("Could not create socket to hostName %s", 
host);
+               this->yield();
+               return false;
+       }
+
+#ifndef __MACH__
+       int opt = 1;
+       bool nagle_off = true;
+
+       if (nagle_off)
+       {
+               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() TCP_NODELAY failed");
+                       close(sock);
+                       this->yield();
+                       return false;
+               }
+               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&opt, sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
+                       close(sock);
+                       this->yield();
+                       return false;
+               }
+       }
+
+       int sndsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_SNDBUF failed");
+               close(sock);
+               this->yield();
+               return false;
+       }
+       int rcvsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvsize, 
(int)sizeof(rcvsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_RCVBUF failed");
+               close(sock);
+               this->yield();
+               return false;
+       }
+#endif
+
+       struct sockaddr_in sa;
+       socklen_t socklen;
+       int status;
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = htonl(INADDR_ANY);
+       sa.sin_port = htons(0);
+       socklen = sizeof(sa);
+       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+       {
+               _logger->log_error("socket bind failed");
+               close(sock);
+               this->yield();
+               return false;
+       }
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = addr;
+       sa.sin_port = htons(port);
+       socklen = sizeof(sa);
+
+       status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+       if (status < 0)
+       {
+               _logger->log_error("socket connect failed to %s %d", host, 
port);
+               close(sock);
+               this->yield();
+               return false;
+       }
+
+       _logger->log_info("Site2Site Peer socket %d connect to server %s port 
%d success", sock, host, port);
+
+       _socket = sock;
+
+       status = sendData((uint8_t *) MAGIC_BYTES, sizeof(MAGIC_BYTES));
+
+       if (status <= 0)
+       {
+               Close();
+               return false;
+       }
+
+       return true;
+}
+
+void Site2SitePeer::Close()
+{
+       if (_socket)
+       {
+               _logger->log_info("Site2Site Peer socket %d close", _socket);
+               close(_socket);
+               _socket = 0;
+       }
+}
+
+int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc)
+{
+       int ret = 0, bytes = 0;
+
+       if (_socket <= 0)
+       {
+               // this->yield();
+               return -1;
+       }
+
+       while (bytes < buflen)
+       {
+               ret = send(_socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       _logger->log_error("Site2Site Peer socket %d send 
failed %s", _socket, strerror(errno));
+                       Close();
+                       // this->yield();
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       if (crc)
+               crc->update(buf, buflen);
+
+       return bytes;
+}
+
+int Site2SitePeer::Select(int msec)
+{
+       fd_set fds;
+       struct timeval tv;
+    int retval;
+    int fd = _socket;
+
+    FD_ZERO(&fds);
+    FD_SET(fd, &fds);
+
+    tv.tv_sec = msec/1000;
+    tv.tv_usec = (msec % 1000) * 1000;
+
+    if (msec > 0)
+       retval = select(fd+1, &fds, NULL, NULL, &tv);
+    else
+       retval = select(fd+1, &fds, NULL, NULL, NULL);
+
+    if (retval <= 0)
+      return retval;
+    if (FD_ISSET(fd, &fds))
+      return retval;
+    else
+      return 0;
+}
+
+int Site2SitePeer::readData(uint8_t *buf, int buflen, CRC32 *crc)
+{
+       int sendSize = buflen;
+       uint8_t *start = buf;
+
+       if (_socket <= 0)
+       {
+               // this->yield();
+               return -1;
+       }
+
+       while (buflen)
+       {
+               int status;
+               status = Select((int) _timeOut);
+               if (status <= 0)
+               {
+                       Close();
+                       return status;
+               }
+               status = recv(_socket, buf, buflen, 0);
+               if (status <= 0)
+               {
+                       Close();
+                       // this->yield();
+                       return status;
+               }
+               buflen -= status;
+               buf += status;
+       }
+
+       if (crc)
+               crc->update(start, sendSize);
+
+       return sendSize;
+}
+
+int Site2SitePeer::writeUTF(std::string str, bool widen, CRC32 *crc)
+{
+       int strlen = str.length();
+       int utflen = 0;
+       int c, count = 0;
+
+       /* use charAt instead of copying String to char array */
+       for (int i = 0; i < strlen; i++) {
+               c = str.at(i);
+               if ((c >= 0x0001) && (c <= 0x007F)) {
+                       utflen++;
+               } else if (c > 0x07FF) {
+                       utflen += 3;
+               } else {
+                       utflen += 2;
+               }
+       }
+
+       if (utflen > 65535)
+               return -1;
+
+       uint8_t *bytearr = NULL;
+       if (!widen)
+       {
+               bytearr = new uint8_t[utflen+2];
+               bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
+       }
+       else
+       {
+               bytearr = new uint8_t[utflen+4];
+               bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF);
+               bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF);
+       }
+
+       int i=0;
+       for (i=0; i<strlen; i++) {
+               c = str.at(i);
+               if (!((c >= 0x0001) && (c <= 0x007F))) break;
+               bytearr[count++] = (uint8_t) c;
+       }
+
+       for (;i < strlen; i++){
+               c = str.at(i);
+               if ((c >= 0x0001) && (c <= 0x007F)) {
+                       bytearr[count++] = (uint8_t) c;
+               } else if (c > 0x07FF) {
+                       bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 
0x0F));
+                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  6) & 
0x3F));
+                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 
0x3F));
+               } else {
+                       bytearr[count++] = (uint8_t) (0xC0 | ((c >>  6) & 
0x1F));
+                       bytearr[count++] = (uint8_t) (0x80 | ((c >>  0) & 
0x3F));
+               }
+       }
+       int ret;
+       if (!widen)
+       {
+               ret = sendData(bytearr, utflen+2, crc);
+       }
+       else
+       {
+               ret = sendData(bytearr, utflen+4, crc);
+       }
+       delete[] bytearr;
+       return ret;
+}
+
+int Site2SitePeer::readUTF(std::string &str, bool widen, CRC32 *crc)
+{
+    uint16_t utflen;
+    int ret;
+
+    if (!widen)
+    {
+       ret = read(utflen, crc);
+       if (ret <= 0)
+               return ret;
+    }
+    else
+    {
+       uint32_t len;
+               ret = read(len, crc);
+        if (ret <= 0)
+               return ret;
+        utflen = len;
+    }
+
+    uint8_t *bytearr = NULL;
+    char *chararr = NULL;
+    bytearr = new uint8_t[utflen];
+    chararr = new char[utflen];
+    memset(chararr, 0, utflen);
+
+    int c, char2, char3;
+    int count = 0;
+    int chararr_count=0;
+
+    ret = read(bytearr, utflen, crc);
+    if (ret <= 0)
+    {
+       delete[] bytearr;
+       delete[] chararr;
+       return ret;
+    }
+
+    while (count < utflen) {
+        c = (int) bytearr[count] & 0xff;
+        if (c > 127) break;
+        count++;
+        chararr[chararr_count++]=(char)c;
+    }
+
+    while (count < utflen) {
+        c = (int) bytearr[count] & 0xff;
+        switch (c >> 4) {
+            case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
+                /* 0xxxxxxx*/
+                count++;
+                chararr[chararr_count++]=(char)c;
+                break;
+            case 12: case 13:
+                /* 110x xxxx   10xx xxxx*/
+                count += 2;
+                if (count > utflen)
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                char2 = (int) bytearr[count-1];
+                if ((char2 & 0xC0) != 0x80)
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
+                                                (char2 & 0x3F));
+                break;
+            case 14:
+                /* 1110 xxxx  10xx xxxx  10xx xxxx */
+                count += 3;
+                if (count > utflen)
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                char2 = (int) bytearr[count-2];
+                char3 = (int) bytearr[count-1];
+                if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+                {
+                       delete[] bytearr;
+                       delete[] chararr;
+                       return -1;
+                }
+                chararr[chararr_count++]=(char)(((c     & 0x0F) << 12) |
+                                                ((char2 & 0x3F) << 6)  |
+                                                ((char3 & 0x3F) << 0));
+                break;
+            default:
+               delete[] bytearr;
+               delete[] chararr;
+               return -1;
+        }
+    }
+    // The number of chars produced may be less than utflen
+    std::string value(chararr, chararr_count);
+    str = value;
+    delete[] bytearr;
+    delete[] chararr;
+    if (!widen)
+       return (2 + utflen);
+    else
+       return (4 + utflen);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TailFile.cpp b/libminifi/src/TailFile.cpp
new file mode 100644
index 0000000..445255b
--- /dev/null
+++ b/libminifi/src/TailFile.cpp
@@ -0,0 +1,272 @@
+/**
+ * @file TailFile.cpp
+ * TailFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <dirent.h>
+#include <limits.h>
+#include <unistd.h>
+
+#include "TimeUtil.h"
+#include "TailFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string TailFile::ProcessorName("TailFile");
+Property TailFile::FileName("File to Tail", "Fully-qualified filename of the 
file that should be tailed", "");
+Property TailFile::StateFile("State File",
+               "Specifies the file that should be used for storing state about 
what data has been ingested so that upon restart NiFi can resume from where it 
left off", "");
+Relationship TailFile::Success("success", "All files are routed to success");
+
+void TailFile::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(FileName);
+       properties.insert(StateFile);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+std::string TailFile::trimLeft(const std::string& s)
+{
+       const char *WHITESPACE = " \n\r\t";
+    size_t startpos = s.find_first_not_of(WHITESPACE);
+    return (startpos == std::string::npos) ? "" : s.substr(startpos);
+}
+
+std::string TailFile::trimRight(const std::string& s)
+{
+       const char *WHITESPACE = " \n\r\t";
+    size_t endpos = s.find_last_not_of(WHITESPACE);
+    return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1);
+}
+
+void TailFile::parseStateFileLine(char *buf)
+{
+       char *line = buf;
+
+    while ((line[0] == ' ') || (line[0] =='\t'))
+       ++line;
+
+    char first = line[0];
+    if ((first == '\0') || (first == '#')  || (first == '\r') || (first == 
'\n') || (first == '='))
+    {
+       return;
+    }
+
+    char *equal = strchr(line, '=');
+    if (equal == NULL)
+    {
+       return;
+    }
+
+    equal[0] = '\0';
+    std::string key = line;
+
+    equal++;
+    while ((equal[0] == ' ') || (equal[0] == '\t'))
+       ++equal;
+
+    first = equal[0];
+    if ((first == '\0') || (first == '\r') || (first== '\n'))
+    {
+       return;
+    }
+
+    std::string value = equal;
+    key = trimRight(key);
+    value = trimRight(value);
+
+    if (key == "FILENAME")
+       this->_currentTailFileName = value;
+    if (key == "POSITION")
+       this->_currentTailFilePosition = std::stoi(value);
+
+    return;
+}
+
+void TailFile::recoverState()
+{
+       std::ifstream file(_stateFile.c_str(), std::ifstream::in);
+       if (!file.good())
+       {
+               _logger->log_error("load state file failed %s", 
_stateFile.c_str());
+               return;
+       }
+       const unsigned int bufSize = 512;
+       char buf[bufSize];
+       for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize))
+       {
+               parseStateFileLine(buf);
+       }
+}
+
+void TailFile::storeState()
+{
+       std::ofstream file(_stateFile.c_str());
+       if (!file.is_open())
+       {
+               _logger->log_error("store state file failed %s", 
_stateFile.c_str());
+               return;
+       }
+       file << "FILENAME=" << this->_currentTailFileName << "\n";
+       file << "POSITION=" << this->_currentTailFilePosition << "\n";
+       file.close();
+}
+
+static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j)
+{
+       return (i.modifiedTime < j.modifiedTime);
+}
+void TailFile::checkRollOver()
+{
+       struct stat statbuf;
+       std::vector<TailMatchedFileItem> matchedFiles;
+       std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
+
+       if (stat(fullPath.c_str(), &statbuf) == 0)
+       {
+               if (statbuf.st_size > this->_currentTailFilePosition)
+                       // there are new input for the current tail file
+                       return;
+
+               uint64_t modifiedTimeCurrentTailFile = ((uint64_t) 
(statbuf.st_mtime) * 1000);
+               std::string pattern = _fileName;
+               std::size_t found = _fileName.find_last_of(".");
+               if (found != std::string::npos)
+                       pattern = _fileName.substr(0,found);
+               DIR *d;
+               d = opendir(this->_fileLocation.c_str());
+               if (!d)
+                       return;
+               while (1)
+               {
+                       struct dirent *entry;
+                       entry = readdir(d);
+                       if (!entry)
+                               break;
+                       std::string d_name = entry->d_name;
+                       if (!(entry->d_type & DT_DIR))
+                       {
+                               std::string fileName = d_name;
+                               std::string fileFullName = this->_fileLocation 
+ "/" + d_name;
+                               if (fileFullName.find(pattern) != 
std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0)
+                               {
+                                       if (((uint64_t) (statbuf.st_mtime) * 
1000) >= modifiedTimeCurrentTailFile)
+                                       {
+                                               TailMatchedFileItem item;
+                                               item.fileName = fileName;
+                                               item.modifiedTime = ((uint64_t) 
(statbuf.st_mtime) * 1000);
+                                               matchedFiles.push_back(item);
+                                       }
+                               }
+                       }
+               }
+               closedir(d);
+
+               // Sort the list based on modified time
+               std::sort(matchedFiles.begin(), matchedFiles.end(), 
sortTailMatchedFileItem);
+               for (std::vector<TailMatchedFileItem>::iterator it = 
matchedFiles.begin(); it!=matchedFiles.end(); ++it)
+               {
+                       TailMatchedFileItem item = *it;
+                       if (item.fileName == _currentTailFileName)
+                       {
+                               ++it;
+                               if (it!=matchedFiles.end())
+                               {
+                                       TailMatchedFileItem nextItem = *it;
+                                       _logger->log_info("TailFile File Roll 
Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str());
+                                       _currentTailFileName = 
nextItem.fileName;
+                                       _currentTailFilePosition = 0;
+                                       storeState();
+                               }
+                               break;
+                       }
+               }
+       }
+       else
+               return;
+}
+
+
+void TailFile::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string value;
+       if (context->getProperty(FileName.getName(), value))
+       {
+               std::size_t found = value.find_last_of("/\\");
+               this->_fileLocation = value.substr(0,found);
+               this->_fileName = value.substr(found+1);
+       }
+       if (context->getProperty(StateFile.getName(), value))
+       {
+               _stateFile = value;
+       }
+       if (!this->_stateRecovered)
+       {
+               _stateRecovered = true;
+               this->_currentTailFileName = _fileName;
+               this->_currentTailFilePosition = 0;
+               // recover the state if we have not done so
+               this->recoverState();
+       }
+       checkRollOver();
+       std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
+       struct stat statbuf;
+       if (stat(fullPath.c_str(), &statbuf) == 0)
+       {
+               if (statbuf.st_size <= this->_currentTailFilePosition)
+                       // there are no new input for the current tail file
+               {
+                       context->yield();
+                       return;
+               }
+               FlowFileRecord *flowFile = session->create();
+               if (!flowFile)
+                       return;
+               std::size_t found = _currentTailFileName.find_last_of(".");
+               std::string baseName = _currentTailFileName.substr(0,found);
+               std::string extension = _currentTailFileName.substr(found+1);
+               flowFile->updateAttribute(PATH, _fileLocation);
+               flowFile->addAttribute(ABSOLUTE_PATH, fullPath);
+               session->import(fullPath, flowFile, true, 
this->_currentTailFilePosition);
+               session->transfer(flowFile, Success);
+               _logger->log_info("TailFile %s for %d bytes", 
_currentTailFileName.c_str(), flowFile->getSize());
+               std::string logName = baseName + "." + 
std::to_string(_currentTailFilePosition) + "-" +
+                               std::to_string(_currentTailFilePosition + 
flowFile->getSize()) + "." + extension;
+               flowFile->updateAttribute(FILENAME, logName);
+               this->_currentTailFilePosition += flowFile->getSize();
+               storeState();
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
new file mode 100644
index 0000000..3ce57ae
--- /dev/null
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -0,0 +1,134 @@
+/**
+ * @file TimerDrivenSchedulingAgent.cpp
+ * TimerDrivenSchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <iostream>
+#include "Property.h"
+#include "TimerDrivenSchedulingAgent.h"
+
+void TimerDrivenSchedulingAgent::schedule(Processor *processor)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _administrativeYieldDuration = 0;
+       std::string yieldValue;
+
+       if (_configure->get(Configure::nifi_administrative_yield_duration, 
yieldValue))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(yieldValue, 
_administrativeYieldDuration, unit) &&
+                                       
Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, 
_administrativeYieldDuration))
+               {
+                       _logger->log_debug("nifi_administrative_yield_duration: 
[%d] ms", _administrativeYieldDuration);
+               }
+       }
+
+       _boredYieldDuration = 0;
+       if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(yieldValue, _boredYieldDuration, 
unit) &&
+                                       
Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
+               {
+                       _logger->log_debug("nifi_bored_yield_duration: [%d] 
ms", _boredYieldDuration);
+               }
+       }
+
+       if (processor->getScheduledState() != RUNNING)
+       {
+               _logger->log_info("Can not schedule threads for processor %s 
because it is not running", processor->getName().c_str());
+               return;
+       }
+
+       std::map<std::string, std::vector<std::thread *>>::iterator it =
+                       _threads.find(processor->getUUIDStr());
+       if (it != _threads.end())
+       {
+               _logger->log_info("Can not schedule threads for processor %s 
because there are existed thread running");
+               return;
+       }
+
+       std::vector<std::thread *> threads;
+       for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
+       {
+               std::thread *thread = new std::thread(run, this, processor);
+               thread->detach();
+               threads.push_back(thread);
+               _logger->log_info("Scheduled Time Driven thread %d running for 
process %s", thread->get_id(),
+                               processor->getName().c_str());
+       }
+       _threads[processor->getUUIDStr().c_str()] = threads;
+
+       return;
+}
+
+void TimerDrivenSchedulingAgent::unschedule(Processor *processor)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (processor->getScheduledState() != RUNNING)
+       {
+               _logger->log_info("Can not unschedule threads for processor %s 
because it is not running", processor->getName().c_str());
+               return;
+       }
+
+       std::map<std::string, std::vector<std::thread *>>::iterator it =
+                       _threads.find(processor->getUUIDStr());
+
+       if (it == _threads.end())
+       {
+               _logger->log_info("Can not unschedule threads for processor %s 
because there are no existed thread running");
+               return;
+       }
+       for (std::vector<std::thread *>::iterator itThread = 
it->second.begin(); itThread != it->second.end(); ++itThread)
+       {
+               std::thread *thread = *itThread;
+               _logger->log_info("Scheduled Time Driven thread %d deleted for 
process %s", thread->get_id(),
+                               processor->getName().c_str());
+               delete thread;
+       }
+       _threads.erase(processor->getUUIDStr());
+       processor->clearActiveTask();
+
+       return;
+}
+
+void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, 
Processor *processor)
+{
+       while (agent->_running)
+       {
+               bool shouldYield = agent->onTrigger(processor);
+
+               if (processor->isYield())
+               {
+                       // Honor the yield
+                       
std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
+               }
+               else if (shouldYield && agent->_boredYieldDuration > 0)
+               {
+                       // No work to do or need to apply back pressure
+                       
std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration));
+               }
+               
std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
+       }
+       return;
+}
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/test/FlowFileRecordTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/FlowFileRecordTest.cpp 
b/libminifi/test/FlowFileRecordTest.cpp
new file mode 100644
index 0000000..09a3d33
--- /dev/null
+++ b/libminifi/test/FlowFileRecordTest.cpp
@@ -0,0 +1,28 @@
+/**
+ * @file MiNiFiMain.cpp 
+ * MiNiFiMain implementation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "FlowFileRecord.h"
+
+int main(int argc, char **argv)
+{
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/test/Server.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp
new file mode 100644
index 0000000..e7b3452
--- /dev/null
+++ b/libminifi/test/Server.cpp
@@ -0,0 +1,607 @@
+/* A simple server in the internet domain using TCP
+   The port number is passed as an argument */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/types.h> 
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <iostream>     // std::cout
+#include <fstream>      // std::ifstream
+#include <signal.h>
+
+#define DEFAULT_NIFI_SERVER_PORT 9000
+#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
+#define MAX_READ_TIMEOUT 30000 // 30 seconds
+
+//! FlowControl Protocol Msg Type
+typedef enum {
+       REGISTER_REQ, // Device Register Request from device to server which 
contain device serial number, current running flow xml version
+       REGISTER_RESP, // Device Register Respond from server to device, may 
contain new flow.xml from server ask device to apply and also device report 
interval
+       REPORT_REQ, // Period Device Report from device to server which contain 
device serial number, current running flow xml name/version and other period 
report info
+       REPORT_RESP, // Report Respond from server to device, may ask device to 
update flow xml or processor property
+       MAX_FLOW_CONTROL_MSG_TYPE
+} FlowControlMsgType;
+
+//! FlowControl Protocol Msg Type String
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
+{
+               "REGISTER_REQ",
+               "REGISTER_RESP",
+               "REPORT_REQ",
+               "REPORT_RESP"
+};
+
+//! Flow Control Msg Type to String
+inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
+{
+       if (type < MAX_FLOW_CONTROL_MSG_TYPE)
+               return FlowControlMsgTypeStr[type];
+       else
+               return NULL;
+}
+
+//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are 
variable length (TLV)
+typedef enum {
+       //Fix length 8 bytes: client to server in register request, required 
field
+       FLOW_SERIAL_NUMBER,
+       // Flow XML name TLV: client to server in register request and report 
request, required field
+       FLOW_XML_NAME,
+       // Flow XML content, TLV: server to client in register respond, option 
field in case server want to ask client to load xml from server
+       FLOW_XML_CONTENT,
+       // Fix length, 4 bytes Report interval in msec: server to client in 
register respond, option field
+       REPORT_INTERVAL,
+       // Processor Name TLV:  server to client in report respond, option 
field in case server want to ask client to update processor property
+       PROCESSOR_NAME,
+       // Processor Property Name TLV: server to client in report respond, 
option field in case server want to ask client to update processor property
+       PROPERTY_NAME,
+       // Processor Property Value TLV: server to client in report respond, 
option field in case server want to ask client to update processor property
+       PROPERTY_VALUE,
+       // Report Blob TLV: client to server in report request, option field in 
case client want to pickyback the report blob in report request to server
+       REPORT_BLOB,
+       MAX_FLOW_MSG_ID
+} FlowControlMsgID;
+
+//! FlowControl Protocol Msg ID String
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
+{
+               "FLOW_SERIAL_NUMBER",
+               "FLOW_XML_NAME",
+               "FLOW_XML_CONTENT",
+               "REPORT_INTERVAL",
+               "PROCESSOR_NAME"
+               "PROPERTY_NAME",
+               "PROPERTY_VALUE",
+               "REPORT_BLOB"
+};
+
+#define TYPE_HDR_LEN 4 // Fix Hdr Type
+#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
+
+//! FlowControl Protocol Msg Len
+inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
+{
+       if (id == FLOW_SERIAL_NUMBER)
+               return (TYPE_HDR_LEN + 8);
+       else if (id == REPORT_INTERVAL)
+               return (TYPE_HDR_LEN + 4);
+       else if (id < MAX_FLOW_MSG_ID)
+               return (TLV_HDR_LEN + payLoadLen);
+       else
+               return -1;
+}
+
+//! Flow Control Msg Id to String
+inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
+{
+       if (id < MAX_FLOW_MSG_ID)
+               return FlowControlMsgIDStr[id];
+       else
+               return NULL;
+}
+
+//! Flow Control Respond status code
+typedef enum {
+       RESP_SUCCESS,
+       RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger 
register
+       RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow 
controller
+       RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow 
controller
+       RESP_FAILURE,
+       MAX_RESP_CODE
+} FlowControlRespCode;
+
+//! FlowControl Resp Code str
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
+{
+               "RESP_SUCCESS",
+               "RESP_TRIGGER_REGISTER",
+               "RESP_START_FLOW_CONTROLLER",
+               "RESP_STOP_FLOW_CONTROLLER",
+               "RESP_FAILURE"
+};
+
+//! Flow Control Resp Code to String
+inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
+{
+       if (code < MAX_RESP_CODE)
+               return FlowControlRespCodeStr[code];
+       else
+               return NULL;
+}
+
+//! Common FlowControlProtocol Header
+typedef struct {
+       uint32_t msgType; //! Msg Type
+       uint32_t seqNumber; //! Seq Number to match Req with Resp
+       uint32_t status; //! Resp Code, see FlowControlRespCode
+       uint32_t payloadLen; //! Msg Payload length
+} FlowControlProtocolHeader;
+
+
+//! encode uint32_t
+uint8_t *encode(uint8_t *buf, uint32_t value)
+{
+               *buf++ = (value & 0xFF000000) >> 24;
+               *buf++ = (value & 0x00FF0000) >> 16;
+               *buf++ = (value & 0x0000FF00) >> 8;
+               *buf++ = (value & 0x000000FF);
+               return buf;
+}
+
+//! encode uint32_t
+uint8_t *decode(uint8_t *buf, uint32_t &value)
+{
+               value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
+               return (buf + 4);
+}
+
+//! encode byte array
+uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
+{
+               memcpy(buf, bufArray, size);
+               buf += size;
+               return buf;
+}
+
+//! encode std::string
+uint8_t *encode(uint8_t *buf, std::string value)
+{
+               // add the \0 for size
+               buf = encode(buf, value.size()+1);
+               buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
+               return buf;
+}
+
+int sendData(int socket, uint8_t *buf, int buflen)
+{
+       int ret = 0, bytes = 0;
+
+       while (bytes < buflen)
+       {
+               ret = send(socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       return bytes;
+}
+
+void error(const char *msg)
+{
+    perror(msg);
+    exit(1);
+}
+
+/* readline - read a '\n' terminated line from socket fd 
+              into buffer bufptr of size len. The line in the
+              buffer is terminated with '\0'.
+              It returns -1 in case of error or if
+              the capacity of the buffer is exceeded.
+             It returns 0 if EOF is encountered before reading '\n'.
+ */
+int readline( int fd, char *bufptr, size_t len )
+{
+  /* Note that this function is very tricky.  It uses the
+     static variables bp, cnt, and b to establish a local buffer.
+     The recv call requests large chunks of data (the size of the buffer).
+     Then if the recv call reads more than one line, the overflow
+     remains in the buffer and it is made available to the next call
+     to readline. 
+     Notice also that this routine reads up to '\n' and overwrites
+     it with '\0'. Thus if the line is really terminated with
+     "\r\n", the '\r' will remain unchanged.
+  */
+  char *bufx = bufptr;
+  static char *bp;
+  static int cnt = 0;
+  static char b[ 4096 ];
+  char c;
+  
+  while ( --len > 0 )
+    {
+      if ( --cnt <= 0 )
+       {
+         cnt = recv( fd, b, sizeof( b ), 0 );
+         if ( cnt < 0 )
+           {
+             if ( errno == EINTR )
+               {
+                 len++;                /* the while will decrement */
+                 continue;
+               }
+             return -1;
+           }
+         if ( cnt == 0 )
+           return 0;
+         bp = b;
+       }
+      c = *bp++;
+      *bufptr++ = c;
+      if ( c == '\n' )
+       {
+         *bufptr = '\0';
+         return bufptr - bufx;
+       }
+    }
+  return -1;
+}
+
+int readData(int socket, uint8_t *buf, int buflen)
+{
+       int sendSize = buflen;
+       int status;
+
+       while (buflen)
+       {
+#ifndef __MACH__
+               status = read(socket, buf, buflen);
+#else
+               status = recv(socket, buf, buflen, 0);
+#endif
+               if (status <= 0)
+               {
+                       return status;
+               }
+               buflen -= status;
+               buf += status;
+       }
+
+       return sendSize;
+}
+
+int readHdr(int socket, FlowControlProtocolHeader *hdr)
+{
+       uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+       uint8_t *data = buffer;
+
+       int status = readData(socket, buffer, 
sizeof(FlowControlProtocolHeader));
+       if (status <= 0)
+               return status;
+
+       uint32_t value;
+       data = decode(data, value);
+       hdr->msgType = value;
+
+       data = decode(data, value);
+       hdr->seqNumber = value;
+
+       data = decode(data, value);
+       hdr->status = value;
+
+       data = decode(data, value);
+       hdr->payloadLen = value;
+
+       return sizeof(FlowControlProtocolHeader);
+}
+
+int readXML(char **xmlContent)
+{
+         std::ifstream is ("conf/flowServer.xml", std::ifstream::binary);
+         if (is) {
+           // get length of file:
+           is.seekg (0, is.end);
+           int length = is.tellg();
+           is.seekg (0, is.beg);
+
+           char * buffer = new char [length];
+
+           printf("Reading %s len %d\n", "conf/flowServer.xml", length);
+           // read data as a block:
+           is.read (buffer,length);
+
+           is.close();
+
+           // ...buffer contains the entire file...
+           *xmlContent = buffer;
+
+           return length;
+         }
+         return 0;
+}
+
+static int sockfd = 0, newsockfd = 0;
+void sigHandler(int signal)
+{
+       if (signal == SIGINT || signal == SIGTERM)
+       {
+               close(newsockfd);
+               close(sockfd);
+               exit(1);
+       }
+}
+
+int main(int argc, char *argv[])
+{
+     int portno;
+     socklen_t clilen;
+     struct sockaddr_in serv_addr, cli_addr;
+     char buffer[4096];
+     int flag = 0;
+     int number = 0;
+     
+     int n;
+     if (argc < 2) {
+         fprintf(stderr,"ERROR, no port provided\n");
+         exit(1);
+     }
+
+        if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, 
sigHandler) == SIG_ERR)
+        {
+
+               return -1;
+        }
+     sockfd = socket(AF_INET, SOCK_STREAM, 0);
+     if (sockfd < 0) 
+        error("ERROR opening socket");
+     bzero((char *) &serv_addr, sizeof(serv_addr));
+     portno = atoi(argv[1]);
+     serv_addr.sin_family = AF_INET;
+     serv_addr.sin_addr.s_addr = INADDR_ANY;
+     serv_addr.sin_port = htons(portno);
+     if (bind(sockfd, (struct sockaddr *) &serv_addr,
+              sizeof(serv_addr)) < 0) 
+              error("ERROR on binding");
+     listen(sockfd,5);
+     if (portno == DEFAULT_NIFI_SERVER_PORT)
+     {
+        while (true)
+        {
+                clilen = sizeof(cli_addr);
+                newsockfd = accept(sockfd,
+                 (struct sockaddr *) &cli_addr, 
+                 &clilen);
+                if (newsockfd < 0)
+                {
+                        error("ERROR on accept");
+                        break;
+                }
+                // process request
+                FlowControlProtocolHeader hdr;
+                int status = readHdr(newsockfd, &hdr);
+                if (status > 0)
+                {
+                        printf("Flow Control Protocol receive MsgType %s\n", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+                    printf("Flow Control Protocol receive Seq Num %d\n", 
hdr.seqNumber);
+                    printf("Flow Control Protocol receive Resp Code %s\n", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+                    printf("Flow Control Protocol receive Payload len %d\n", 
hdr.payloadLen);
+                        if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ)
+                        {
+                               printf("Flow Control Protocol Register Req 
receive\n");
+                               uint8_t *payload = new uint8_t[hdr.payloadLen];
+                               uint8_t *payloadPtr = payload;
+                               status = readData(newsockfd, payload, 
hdr.payloadLen);
+                               while (status > 0 && payloadPtr < (payload + 
hdr.payloadLen))
+                               {
+                                       uint32_t msgID = 0xFFFFFFFF;
+                                       payloadPtr = decode(payloadPtr, msgID);
+                                       if (((FlowControlMsgID) msgID) == 
FLOW_SERIAL_NUMBER)
+                                       {
+                                               // Fixed 8 bytes
+                                               uint8_t seqNum[8];
+                                               memcpy(seqNum, payloadPtr, 8);
+                                               printf("Flow Control Protocol 
Register Req receive serial num\n");
+                                               payloadPtr += 8;
+                                       }
+                                       else if (((FlowControlMsgID) msgID) == 
FLOW_XML_NAME)
+                                       {
+                                               uint32_t len;
+                                               payloadPtr = decode(payloadPtr, 
len);
+                                               printf("Flow Control Protocol 
receive XML name length %d\n", len);
+                                               std::string flowName = (const 
char *) payloadPtr;
+                                               payloadPtr += len;
+                                               printf("Flow Control Protocol 
receive XML name %s\n", flowName.c_str());
+                                       }
+                                       else
+                                       {
+                                               break;
+                                       }
+                               }
+                               delete[] payload;
+                               // Send Register Respond
+                               // Calculate the total payload msg size
+                               char *xmlContent;
+                               uint32_t xmlLen = readXML(&xmlContent);
+                               uint32_t payloadSize = 
FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
+                               if (xmlLen > 0)
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen);
+
+                               uint32_t size = 
sizeof(FlowControlProtocolHeader) + payloadSize;
+                               uint8_t *data = new uint8_t[size];
+                               uint8_t *start = data;
+
+                               // encode the HDR
+                               hdr.msgType = REGISTER_RESP;
+                               hdr.payloadLen = payloadSize;
+                               hdr.status = RESP_SUCCESS;
+                               data = encode(data, hdr.msgType);
+                               data = encode(data, hdr.seqNumber);
+                               data = encode(data, hdr.status);
+                               data = encode(data, hdr.payloadLen);
+
+                               // encode the report interval
+                               data = encode(data, REPORT_INTERVAL);
+                               data = encode(data, DEFAULT_REPORT_INTERVAL);
+
+                               // encode the XML content
+                               if (xmlLen > 0)
+                               {
+                                       data = encode(data, FLOW_XML_CONTENT);
+                                       data = encode(data, xmlLen);
+                                       data = encode(data, (uint8_t *) 
xmlContent, xmlLen);
+                                       delete[] xmlContent;
+                               }
+
+                               // send it
+                               status = sendData(newsockfd, start, size);
+                               delete[] start;
+                        }
+                        else if (((FlowControlMsgType) hdr.msgType) == 
REPORT_REQ)
+                        {
+                                       printf("Flow Control Protocol Report 
Req receive\n");
+                                       uint8_t *payload = new 
uint8_t[hdr.payloadLen];
+                                       uint8_t *payloadPtr = payload;
+                                       status = readData(newsockfd, payload, 
hdr.payloadLen);
+                                       while (status > 0 && payloadPtr < 
(payload + hdr.payloadLen))
+                                       {
+                                               uint32_t msgID = 0xFFFFFFFF;
+                                               payloadPtr = decode(payloadPtr, 
msgID);
+                                               if (((FlowControlMsgID) msgID) 
== FLOW_XML_NAME)
+                                               {
+                                                       uint32_t len;
+                                                       payloadPtr = 
decode(payloadPtr, len);
+                                                       printf("Flow Control 
Protocol receive XML name length %d\n", len);
+                                                       std::string flowName = 
(const char *) payloadPtr;
+                                                       payloadPtr += len;
+                                                       printf("Flow Control 
Protocol receive XML name %s\n", flowName.c_str());
+                                               }
+                                               else
+                                               {
+                                                       break;
+                                               }
+                                       }
+                                       delete[] payload;
+                                       // Send Register Respond
+                                       // Calculate the total payload msg size
+                                       std::string processor = 
"RealTimeDataCollector";
+                                       std::string propertyName1 = "real Time 
Message ID";
+                                       std::string propertyValue1 = "41";
+                                       std::string propertyName2 = "Batch 
Message ID";
+                                       std::string propertyValue2 = 
"172,30,48";
+                                       if (flag == 0)
+                                       {
+                                       propertyName1 = "Real Time Message ID";
+                                       propertyValue1 = "41";
+                                   propertyName2 = "Batch Message ID";
+                                   propertyValue2 = "172,48";
+                                               flag = 1;
+                                       }
+                                       else if (flag == 1)
+                                       {
+                                               propertyName1 = "Real Time 
Message ID";
+                                               propertyValue1 = "172,48";
+                                               propertyName2 = "Batch Message 
ID";
+                                               propertyValue2 = "41";
+                                               flag = 0;
+                                       }
+                                       uint32_t payloadSize = 
FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1);
+
+                                       uint32_t size = 
sizeof(FlowControlProtocolHeader) + payloadSize;
+                                       uint8_t *data = new uint8_t[size];
+                                       uint8_t *start = data;
+
+                                       // encode the HDR
+                                       hdr.msgType = REPORT_RESP;
+                                       hdr.payloadLen = payloadSize;
+                                       hdr.status = RESP_SUCCESS;
+
+                                       if (number >= 10 && number < 20)
+                                   {
+                                       // After 10 second report, stop the 
flow controller for 10 second
+                                      hdr.status = RESP_STOP_FLOW_CONTROLLER;
+                                   }
+                                       else if (number == 20)
+                                       {
+                                               // restart the flow controller 
after 10 second
+                                               hdr.status = 
RESP_START_FLOW_CONTROLLER;
+                                       }
+                                       else if (number == 30)
+                                       {
+                                               // retrigger register
+                                               hdr.status = 
RESP_TRIGGER_REGISTER;
+                                               number = 0;
+                                       }
+
+                                   number++;
+
+                                       data = encode(data, hdr.msgType);
+                                       data = encode(data, hdr.seqNumber);
+                                       data = encode(data, hdr.status);
+                                       data = encode(data, hdr.payloadLen);
+
+                                       // encode the processorName
+                                       data = encode(data, PROCESSOR_NAME);
+                                       data = encode(data, processor);
+
+                                       // encode the propertyName and value TLV
+                                       data = encode(data, PROPERTY_NAME);
+                               data = encode(data, propertyName1);
+                               data = encode(data, PROPERTY_VALUE);
+                               data = encode(data, propertyValue1);
+                               data = encode(data, PROPERTY_NAME);
+                               data = encode(data, propertyName2);
+                               data = encode(data, PROPERTY_VALUE);
+                               data = encode(data, propertyValue2);
+                                       // send it
+                                       status = sendData(newsockfd, start, 
size);
+                                       delete[] start;
+                                }
+                }
+                close(newsockfd);
+        }
+        close(sockfd);
+     }
+     else
+     {
+        clilen = sizeof(cli_addr);
+        newsockfd = accept(sockfd,
+                       (struct sockaddr *) &cli_addr,
+                        &clilen);
+        if (newsockfd < 0)
+               error("ERROR on accept");
+        while (1)
+        {
+               bzero(buffer,4096);
+               n = readline(newsockfd,buffer,4095);
+               if (n <= 0 )
+               {
+                       close(newsockfd);
+                       newsockfd = accept(sockfd,
+                                               (struct sockaddr *) &cli_addr,
+                                                &clilen);
+                       continue;
+               }
+               printf("%s",buffer);
+         }
+         close(newsockfd);
+         close(sockfd);
+     }
+     return 0; 
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
new file mode 100644
index 0000000..2f470a5
--- /dev/null
+++ b/main/CMakeLists.txt
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cmake_minimum_required(VERSION 2.6)
+
+cmake_policy(SET CMP0048 NEW)
+
+include_directories(../include ../libminifi/include 
../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
+
+# Include libxml2
+find_package(LibXml2)
+if (LIBXML2_FOUND)
+    include_directories(${LIBXML2_INCLUDE_DIR})
+else ()
+    # Build from our local version
+endif (LIBXML2_FOUND)
+
+add_executable(minifiexe MiNiFiMain.cpp)
+
+# Link against minifi and yaml-cpp
+target_link_libraries(minifiexe minifi yaml-cpp)
+set_target_properties(minifiexe
+        PROPERTIES OUTPUT_NAME minifi)
+
+install(TARGETS minifiexe
+        RUNTIME
+        DESTINATION bin
+        COMPONENT bin)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/src/Configure.cpp b/src/Configure.cpp
deleted file mode 100644
index d7fd95b..0000000
--- a/src/Configure.cpp
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * @file Configure.cpp
- * Configure class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "Configure.h"
-
-Configure *Configure::_configure(NULL);
-const char *Configure::nifi_flow_configuration_file = 
"nifi.flow.configuration.file";
-const char *Configure::nifi_administrative_yield_duration = 
"nifi.administrative.yield.duration";
-const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
-const char *Configure::nifi_server_name = "nifi.server.name";
-const char *Configure::nifi_server_port = "nifi.server.port";
-const char *Configure::nifi_server_report_interval= 
"nifi.server.report.interval";
-
-
-//! Get the config value
-bool Configure::get(std::string key, std::string &value)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-       std::map<std::string,std::string>::iterator it = _properties.find(key);
-
-       if (it != _properties.end())
-       {
-               value = it->second;
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-// Trim String utils
-std::string Configure::trim(const std::string& s)
-{
-    return trimRight(trimLeft(s));
-}
-
-std::string Configure::trimLeft(const std::string& s)
-{
-       const char *WHITESPACE = " \n\r\t";
-    size_t startpos = s.find_first_not_of(WHITESPACE);
-    return (startpos == std::string::npos) ? "" : s.substr(startpos);
-}
-
-std::string Configure::trimRight(const std::string& s)
-{
-       const char *WHITESPACE = " \n\r\t";
-    size_t endpos = s.find_last_not_of(WHITESPACE);
-    return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1);
-}
-
-//! Parse one line in configure file like key=value
-void Configure::parseConfigureFileLine(char *buf)
-{
-       char *line = buf;
-
-    while ((line[0] == ' ') || (line[0] =='\t'))
-       ++line;
-
-    char first = line[0];
-    if ((first == '\0') || (first == '#')  || (first == '\r') || (first == 
'\n') || (first == '='))
-    {
-       return;
-    }
-
-    char *equal = strchr(line, '=');
-    if (equal == NULL)
-    {
-       return;
-    }
-
-    equal[0] = '\0';
-    std::string key = line;
-
-    equal++;
-    while ((equal[0] == ' ') || (equal[0] == '\t'))
-       ++equal;
-
-    first = equal[0];
-    if ((first == '\0') || (first == '\r') || (first== '\n'))
-    {
-       return;
-    }
-
-    std::string value = equal;
-    key = trimRight(key);
-    value = trimRight(value);
-    set(key, value);
-}
-
-//! Load Configure File
-void Configure::loadConfigureFile(const char *fileName)
-{
-
-    std::string adjustedFilename;
-    if (fileName)
-    {
-        // perform a naive determination if this is a relative path
-        if (fileName[0] != '/')
-        {
-            adjustedFilename = adjustedFilename + _configure->getHome() + "/" 
+ fileName;
-        }
-        else
-        {
-            adjustedFilename += fileName;
-        }
-    }
-    char *path = NULL;
-    char full_path[PATH_MAX];
-    path = realpath(adjustedFilename.c_str(), full_path);
-    _logger->log_info("Using configuration file located at %s", path);
-
-    std::ifstream file(path, std::ifstream::in);
-    if (!file.good())
-    {
-        _logger->log_error("load configure file failed %s", path);
-        return;
-    }
-    this->clear();
-    const unsigned int bufSize = 512;
-    char buf[bufSize];
-    for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize))
-    {
-        parseConfigureFileLine(buf);
-    }
-}
-
-//! Parse Command Line
-void Configure::parseCommandLine(int argc, char **argv)
-{
-       int i;
-       bool keyFound = false;
-       std::string key, value;
-
-       for (i = 1; i < argc; i++)
-       {
-               if (argv[i][0] == '-' && argv[i][1] != '\0')
-               {
-                       keyFound = true;
-                       key = &argv[i][1];
-                       continue;
-               }
-               if (keyFound)
-               {
-                       value = argv[i];
-                       set(key,value);
-                       keyFound = false;
-               }
-       }
-       return;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/src/Connection.cpp b/src/Connection.cpp
deleted file mode 100644
index e036b89..0000000
--- a/src/Connection.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * @file Connection.cpp
- * Connection class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <iostream>
-
-#include "Connection.h"
-
-Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t 
destUUID)
-: _name(name)
-{
-       if (!uuid)
-               // Generate the global UUID for the flow record
-               uuid_generate(_uuid);
-       else
-               uuid_copy(_uuid, uuid);
-
-       if (srcUUID)
-               uuid_copy(_srcUUID, srcUUID);
-       if (destUUID)
-               uuid_copy(_destUUID, destUUID);
-
-       _srcProcessor = NULL;
-       _destProcessor = NULL;
-       _maxQueueSize = 0;
-       _maxQueueDataSize = 0;
-       _expiredDuration = 0;
-       _queuedDataSize = 0;
-
-       _logger = Logger::getLogger();
-
-       _logger->log_info("Connection %s created", _name.c_str());
-}
-
-bool Connection::isEmpty()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       return _queue.empty();
-}
-
-bool Connection::isFull()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0)
-               // No back pressure setting
-               return false;
-
-       if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize)
-               return true;
-
-       if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize)
-               return true;
-
-       return false;
-}
-
-void Connection::put(FlowFileRecord *flow)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _queue.push(flow);
-
-       _queuedDataSize += flow->getSize();
-
-       _logger->log_debug("Enqueue flow file UUID %s to connection %s",
-                       flow->getUUIDStr().c_str(), _name.c_str());
-}
-
-FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> 
&expiredFlowRecords)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       while (!_queue.empty())
-       {
-               FlowFileRecord *item = _queue.front();
-               _queue.pop();
-               _queuedDataSize -= item->getSize();
-
-               if (_expiredDuration > 0)
-               {
-                       // We need to check for flow expiration
-                       if (getTimeMillis() > (item->getEntryDate() + 
_expiredDuration))
-                       {
-                               // Flow record expired
-                               expiredFlowRecords.insert(item);
-                       }
-                       else
-                       {
-                               // Flow record not expired
-                               if (item->isPenalized())
-                               {
-                                       // Flow record was penalized
-                                       _queue.push(item);
-                                       _queuedDataSize += item->getSize();
-                                       break;
-                               }
-                               item->setOriginalConnection(this);
-                               _logger->log_debug("Dequeue flow file UUID %s 
from connection %s",
-                                               item->getUUIDStr().c_str(), 
_name.c_str());
-                               return item;
-                       }
-               }
-               else
-               {
-                       // Flow record not expired
-                       if (item->isPenalized())
-                       {
-                               // Flow record was penalized
-                               _queue.push(item);
-                               _queuedDataSize += item->getSize();
-                               break;
-                       }
-                       item->setOriginalConnection(this);
-                       _logger->log_debug("Dequeue flow file UUID %s from 
connection %s",
-                                       item->getUUIDStr().c_str(), 
_name.c_str());
-                       return item;
-               }
-       }
-
-       return NULL;
-}
-
-void Connection::drain()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       while (!_queue.empty())
-       {
-               FlowFileRecord *item = _queue.front();
-               _queue.pop();
-               delete item;
-       }
-
-       _logger->log_debug("Drain connection %s", _name.c_str());
-}

Reply via email to