http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp 
b/libminifi/src/Site2SiteClientProtocol.cpp
index bd4de97..e0265bb 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -29,1308 +29,1219 @@
 #include "Site2SitePeer.h"
 #include "Site2SiteClientProtocol.h"
 
-bool Site2SiteClientProtocol::establish()
-{
-       if (_peerState != IDLE)
-       {
-               logger_->log_error("Site2Site peer state is not idle while try 
to establish");
-               return false;
-       }
-
-       bool ret = peer_->Open();
-
-       if (!ret)
-       {
-               logger_->log_error("Site2Site peer socket open failed");
-               return false;
-       }
-
-       // Negotiate the version
-       ret = initiateResourceNegotiation();
-
-       if (!ret)
-       {
-               logger_->log_error("Site2Site Protocol Version Negotiation 
failed");
-               /*
-               peer_->yield();
-               tearDown(); */
-               return false;
-       }
-
-       logger_->log_info("Site2Site socket established");
-       _peerState = ESTABLISHED;
-
-       return true;
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+bool Site2SiteClientProtocol::establish() {
+  if (_peerState != IDLE) {
+    logger_->log_error(
+        "Site2Site peer state is not idle while try to establish");
+    return false;
+  }
+
+  bool ret = peer_->Open();
+
+  if (!ret) {
+    logger_->log_error("Site2Site peer socket open failed");
+    return false;
+  }
+
+  // Negotiate the version
+  ret = initiateResourceNegotiation();
+
+  if (!ret) {
+    logger_->log_error("Site2Site Protocol Version Negotiation failed");
+    /*
+     peer_->yield();
+     tearDown(); */
+    return false;
+  }
+
+  logger_->log_info("Site2Site socket established");
+  _peerState = ESTABLISHED;
+
+  return true;
 }
 
-bool Site2SiteClientProtocol::initiateResourceNegotiation()
-{
-       // Negotiate the version
-       if (_peerState != IDLE)
-       {
-               logger_->log_error("Site2Site peer state is not idle while 
initiateResourceNegotiation");
-               return false;
-       }
-
-       logger_->log_info("Negotiate protocol version with destination port %s 
current version %d", _portIdStr.c_str(), _currentVersion);
-
-       int ret = peer_->writeUTF(this->getResourceName());
-
-       logger_->log_info("result of writing resource name is %i",ret);
-       if (ret <= 0)
-       {
-               logger_->log_debug("result of writing resource name is %i",ret);
-               // tearDown();
-               return false;
-       }
-
-       ret = peer_->write(_currentVersion);
-
-       if (ret <= 0)
-       {
-         logger_->log_info("result of writing version is %i",ret);
-               // tearDown();
-               return false;
-       }
-
-       uint8_t statusCode;
-       ret = peer_->read(statusCode);
-
-       if (ret <= 0)
-       {
-         logger_->log_info("result of writing version status code  %i",ret);
-               // tearDown();
-               return false;
-       }
-      logger_->log_info("status code is %i",statusCode);
-       switch (statusCode)
-       {
-       case RESOURCE_OK:
-               logger_->log_info("Site2Site Protocol Negotiate protocol 
version OK");
-               return true;
-       case DIFFERENT_RESOURCE_VERSION:
-               uint32_t serverVersion;
-               ret = peer_->read(serverVersion);
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return false;
-               }
-               logger_->log_info("Site2Site Server Response asked for a 
different protocol version %d", serverVersion);
-               for (unsigned int i = (_currentVersionIndex + 1); i < 
sizeof(_supportedVersion)/sizeof(uint32_t); i++)
-               {
-                       if (serverVersion >= _supportedVersion[i])
-                       {
-                               _currentVersion = _supportedVersion[i];
-                               _currentVersionIndex = i;
-                               return initiateResourceNegotiation();
-                       }
-               }
-               ret = -1;
-               // tearDown();
-               return false;
-       case NEGOTIATED_ABORT:
-               logger_->log_info("Site2Site Negotiate protocol response 
ABORT");
-               ret = -1;
-               // tearDown();
-               return false;
-       default:
-               logger_->log_info("Negotiate protocol response unknown code 
%d", statusCode);
-               return true;
-       }
-
-       return true;
+bool Site2SiteClientProtocol::initiateResourceNegotiation() {
+  // Negotiate the version
+  if (_peerState != IDLE) {
+    logger_->log_error(
+        "Site2Site peer state is not idle while initiateResourceNegotiation");
+    return false;
+  }
+
+  logger_->log_info(
+      "Negotiate protocol version with destination port %s current version %d",
+      _portIdStr.c_str(), _currentVersion);
+
+  int ret = peer_->writeUTF(this->getResourceName());
+
+  logger_->log_info("result of writing resource name is %i", ret);
+  if (ret <= 0) {
+    logger_->log_debug("result of writing resource name is %i", ret);
+    // tearDown();
+    return false;
+  }
+
+  ret = peer_->write(_currentVersion);
+
+  if (ret <= 0) {
+    logger_->log_info("result of writing version is %i", ret);
+    // tearDown();
+    return false;
+  }
+
+  uint8_t statusCode;
+  ret = peer_->read(statusCode);
+
+  if (ret <= 0) {
+    logger_->log_info("result of writing version status code  %i", ret);
+    // tearDown();
+    return false;
+  }
+  logger_->log_info("status code is %i", statusCode);
+  switch (statusCode) {
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        // tearDown();
+        return false;
+      }
+      logger_->log_info(
+          "Site2Site Server Response asked for a different protocol version 
%d",
+          serverVersion);
+      for (unsigned int i = (_currentVersionIndex + 1);
+          i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedVersion[i]) {
+          _currentVersion = _supportedVersion[i];
+          _currentVersionIndex = i;
+          return initiateResourceNegotiation();
+        }
+      }
+      ret = -1;
+      // tearDown();
+      return false;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Negotiate protocol response ABORT");
+      ret = -1;
+      // tearDown();
+      return false;
+    default:
+      logger_->log_info("Negotiate protocol response unknown code %d",
+                        statusCode);
+      return true;
+  }
+
+  return true;
 }
 
-bool Site2SiteClientProtocol::initiateCodecResourceNegotiation()
-{
-       // Negotiate the version
-       if (_peerState != HANDSHAKED)
-       {
-               logger_->log_error("Site2Site peer state is not handshaked 
while initiateCodecResourceNegotiation");
-               return false;
-       }
-
-       logger_->log_info("Negotiate Codec version with destination port %s 
current version %d", _portIdStr.c_str(), _currentCodecVersion);
-
-       int ret = peer_->writeUTF(this->getCodecResourceName());
-
-       if (ret <= 0)
-       {
-         logger_->log_debug("result of getCodecResourceName is %i",ret);
-               // tearDown();
-               return false;
-       }
-
-       ret = peer_->write(_currentCodecVersion);
-
-       if (ret <= 0)
-       {
-         logger_->log_debug("result of _currentCodecVersion is %i",ret);
-               // tearDown();
-               return false;
-       }
-
-       uint8_t statusCode;
-       ret = peer_->read(statusCode);
-
-       if (ret <= 0)
-       {
-               // tearDown();
-               return false;
-       }
-
-       switch (statusCode)
-       {
-       case RESOURCE_OK:
-               logger_->log_info("Site2Site Codec Negotiate version OK");
-               return true;
-       case DIFFERENT_RESOURCE_VERSION:
-               uint32_t serverVersion;
-               ret = peer_->read(serverVersion);
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return false;
-               }
-               logger_->log_info("Site2Site Server Response asked for a 
different codec version %d", serverVersion);
-               for (unsigned int i = (_currentCodecVersionIndex + 1); i < 
sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++)
-               {
-                       if (serverVersion >= _supportedCodecVersion[i])
-                       {
-                               _currentCodecVersion = 
_supportedCodecVersion[i];
-                               _currentCodecVersionIndex = i;
-                               return initiateCodecResourceNegotiation();
-                       }
-               }
-               ret = -1;
-               // tearDown();
-               return false;
-       case NEGOTIATED_ABORT:
-               logger_->log_info("Site2Site Codec Negotiate response ABORT");
-               ret = -1;
-               // tearDown();
-               return false;
-       default:
-               logger_->log_info("Negotiate Codec response unknown code %d", 
statusCode);
-               return true;
-       }
-
-       return true;
+bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
+  // Negotiate the version
+  if (_peerState != HANDSHAKED) {
+    logger_->log_error(
+        "Site2Site peer state is not handshaked while 
initiateCodecResourceNegotiation");
+    return false;
+  }
+
+  logger_->log_info(
+      "Negotiate Codec version with destination port %s current version %d",
+      _portIdStr.c_str(), _currentCodecVersion);
+
+  int ret = peer_->writeUTF(this->getCodecResourceName());
+
+  if (ret <= 0) {
+    logger_->log_debug("result of getCodecResourceName is %i", ret);
+    // tearDown();
+    return false;
+  }
+
+  ret = peer_->write(_currentCodecVersion);
+
+  if (ret <= 0) {
+    logger_->log_debug("result of _currentCodecVersion is %i", ret);
+    // tearDown();
+    return false;
+  }
+
+  uint8_t statusCode;
+  ret = peer_->read(statusCode);
+
+  if (ret <= 0) {
+    // tearDown();
+    return false;
+  }
+
+  switch (statusCode) {
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Codec Negotiate version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        // tearDown();
+        return false;
+      }
+      logger_->log_info(
+          "Site2Site Server Response asked for a different codec version %d",
+          serverVersion);
+      for (unsigned int i = (_currentCodecVersionIndex + 1);
+          i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedCodecVersion[i]) {
+          _currentCodecVersion = _supportedCodecVersion[i];
+          _currentCodecVersionIndex = i;
+          return initiateCodecResourceNegotiation();
+        }
+      }
+      ret = -1;
+      // tearDown();
+      return false;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Codec Negotiate response ABORT");
+      ret = -1;
+      // tearDown();
+      return false;
+    default:
+      logger_->log_info("Negotiate Codec response unknown code %d", 
statusCode);
+      return true;
+  }
+
+  return true;
 }
 
-bool Site2SiteClientProtocol::handShake()
-{
-       if (_peerState != ESTABLISHED)
-       {
-               logger_->log_error("Site2Site peer state is not established 
while handshake");
-               return false;
-       }
-       logger_->log_info("Site2Site Protocol Perform hand shake with 
destination port %s", _portIdStr.c_str());
-       uuid_t uuid;
-       // Generate the global UUID for the com identify
-       uuid_generate(uuid);
-       char uuidStr[37];
-       uuid_unparse_lower(uuid, uuidStr);
-       _commsIdentifier = uuidStr;
-
-       int ret = peer_->writeUTF(_commsIdentifier);
-
-       if (ret <= 0)
-       {
-               // tearDown();
-               return false;
-       }
-
-       std::map<std::string, std::string> properties;
-       properties[HandShakePropertyStr[GZIP]] = "false";
-       properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
-       properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = 
std::to_string(this->_timeOut);
-       if (this->_currentVersion >= 5)
-       {
-               if (this->_batchCount > 0)
-                       properties[HandShakePropertyStr[BATCH_COUNT]] = 
std::to_string(this->_batchCount);
-               if (this->_batchSize > 0)
-                       properties[HandShakePropertyStr[BATCH_SIZE]] = 
std::to_string(this->_batchSize);
-               if (this->_batchDuration > 0)
-                       properties[HandShakePropertyStr[BATCH_DURATION]] = 
std::to_string(this->_batchDuration);
-       }
-
-       if (_currentVersion >= 3)
-       {
-               ret = peer_->writeUTF(peer_->getURL());
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return false;
-               }
-       }
-
-       uint32_t size = properties.size();
-       ret = peer_->write(size);
-       if (ret <= 0)
-       {
-               // tearDown();
-               return false;
-       }
-
-       std::map<std::string, std::string>::iterator it;
-       for (it = properties.begin(); it!= properties.end(); it++)
-       {
-               ret = peer_->writeUTF(it->first);
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return false;
-               }
-               ret = peer_->writeUTF(it->second);
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return false;
-               }
-               logger_->log_info("Site2Site Protocol Send handshake properties 
%s %s", it->first.c_str(), it->second.c_str());
-       }
-
-       RespondCode code;
-       std::string message;
-
-       ret = this->readRespond(code, message);
-
-       if (ret <= 0)
-       {
-               // tearDown();
-               return false;
-       }
-
-       switch (code)
-       {
-       case PROPERTIES_OK:
-               logger_->log_info("Site2Site HandShake Completed");
-               _peerState = HANDSHAKED;
-               return true;
-       case PORT_NOT_IN_VALID_STATE:
+bool Site2SiteClientProtocol::handShake() {
+  if (_peerState != ESTABLISHED) {
+    logger_->log_error(
+        "Site2Site peer state is not established while handshake");
+    return false;
+  }
+  logger_->log_info(
+      "Site2Site Protocol Perform hand shake with destination port %s",
+      _portIdStr.c_str());
+  uuid_t uuid;
+  // Generate the global UUID for the com identify
+  uuid_generate(uuid);
+  char uuidStr[37];
+  uuid_unparse_lower(uuid, uuidStr);
+  _commsIdentifier = uuidStr;
+
+  int ret = peer_->writeUTF(_commsIdentifier);
+
+  if (ret <= 0) {
+    // tearDown();
+    return false;
+  }
+
+  std::map<std::string, std::string> properties;
+  properties[HandShakePropertyStr[GZIP]] = "false";
+  properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
+  properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(
+      this->_timeOut);
+  if (this->_currentVersion >= 5) {
+    if (this->_batchCount > 0)
+      properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(
+          this->_batchCount);
+    if (this->_batchSize > 0)
+      properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(
+          this->_batchSize);
+    if (this->_batchDuration > 0)
+      properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(
+          this->_batchDuration);
+  }
+
+  if (_currentVersion >= 3) {
+    ret = peer_->writeUTF(peer_->getURL());
+    if (ret <= 0) {
+      // tearDown();
+      return false;
+    }
+  }
+
+  uint32_t size = properties.size();
+  ret = peer_->write(size);
+  if (ret <= 0) {
+    // tearDown();
+    return false;
+  }
+
+  std::map<std::string, std::string>::iterator it;
+  for (it = properties.begin(); it != properties.end(); it++) {
+    ret = peer_->writeUTF(it->first);
+    if (ret <= 0) {
+      // tearDown();
+      return false;
+    }
+    ret = peer_->writeUTF(it->second);
+    if (ret <= 0) {
+      // tearDown();
+      return false;
+    }
+    logger_->log_info("Site2Site Protocol Send handshake properties %s %s",
+                      it->first.c_str(), it->second.c_str());
+  }
+
+  RespondCode code;
+  std::string message;
+
+  ret = this->readRespond(code, message);
+
+  if (ret <= 0) {
+    // tearDown();
+    return false;
+  }
+
+  switch (code) {
+    case PROPERTIES_OK:
+      logger_->log_info("Site2Site HandShake Completed");
+      _peerState = HANDSHAKED;
+      return true;
+    case PORT_NOT_IN_VALID_STATE:
     case UNKNOWN_PORT:
     case PORTS_DESTINATION_FULL:
-       logger_->log_error("Site2Site HandShake Failed because destination port 
is either invalid or full");
-               ret = -1;
-               /*
-               peer_->yield();
-               tearDown(); */
-               return false;
-       default:
-               logger_->log_info("HandShake Failed because of unknown respond 
code %d", code);
-               ret = -1;
-               /*
-               peer_->yield();
-               tearDown(); */
-               return false;
-       }
-
-       return false;
+      logger_->log_error(
+          "Site2Site HandShake Failed because destination port is either 
invalid or full");
+      ret = -1;
+      /*
+       peer_->yield();
+       tearDown(); */
+      return false;
+    default:
+      logger_->log_info("HandShake Failed because of unknown respond code %d",
+                        code);
+      ret = -1;
+      /*
+       peer_->yield();
+       tearDown(); */
+      return false;
+  }
+
+  return false;
 }
 
-void Site2SiteClientProtocol::tearDown()
-{
-       if (_peerState >= ESTABLISHED)
-       {
-               logger_->log_info("Site2Site Protocol tearDown");
-               // need to write shutdown request
-               writeRequestType(SHUTDOWN);
-       }
-
-       std::map<std::string, Transaction *>::iterator it;
-       for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++)
-       {
-               delete it->second;
-       }
-       _transactionMap.clear();
-       peer_->Close();
-       _peerState = IDLE;
+void Site2SiteClientProtocol::tearDown() {
+  if (_peerState >= ESTABLISHED) {
+    logger_->log_info("Site2Site Protocol tearDown");
+    // need to write shutdown request
+    writeRequestType(SHUTDOWN);
+  }
+
+  std::map<std::string, Transaction *>::iterator it;
+  for (it = _transactionMap.begin(); it != _transactionMap.end(); it++) {
+    delete it->second;
+  }
+  _transactionMap.clear();
+  peer_->Close();
+  _peerState = IDLE;
 }
 
-int Site2SiteClientProtocol::writeRequestType(RequestType type)
-{
-       if (type >= MAX_REQUEST_TYPE)
-               return -1;
+int Site2SiteClientProtocol::writeRequestType(RequestType type) {
+  if (type >= MAX_REQUEST_TYPE)
+    return -1;
 
-       return peer_->writeUTF(RequestTypeStr[type]);
+  return peer_->writeUTF(RequestTypeStr[type]);
 }
 
-int Site2SiteClientProtocol::readRequestType(RequestType &type)
-{
-       std::string requestTypeStr;
+int Site2SiteClientProtocol::readRequestType(RequestType &type) {
+  std::string requestTypeStr;
 
-       int ret = peer_->readUTF(requestTypeStr);
+  int ret = peer_->readUTF(requestTypeStr);
 
-       if (ret <= 0)
-               return ret;
+  if (ret <= 0)
+    return ret;
 
-       for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++)
-       {
-               if (RequestTypeStr[i] == requestTypeStr)
-               {
-                       type = (RequestType) i;
-                       return ret;
-               }
-       }
+  for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) {
+    if (RequestTypeStr[i] == requestTypeStr) {
+      type = (RequestType) i;
+      return ret;
+    }
+  }
 
-       return -1;
+  return -1;
 }
 
-int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string 
&message)
-{
-       uint8_t firstByte;
+int Site2SiteClientProtocol::readRespond(RespondCode &code,
+                                         std::string &message) {
+  uint8_t firstByte;
 
-       int ret = peer_->read(firstByte);
+  int ret = peer_->read(firstByte);
 
-       if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
-               return -1;
+  if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+    return -1;
 
-       uint8_t secondByte;
+  uint8_t secondByte;
 
-       ret = peer_->read(secondByte);
+  ret = peer_->read(secondByte);
 
-       if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
-               return -1;
+  if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+    return -1;
 
-       uint8_t thirdByte;
+  uint8_t thirdByte;
 
-       ret = peer_->read(thirdByte);
+  ret = peer_->read(thirdByte);
 
-       if (ret <= 0)
-               return ret;
+  if (ret <= 0)
+    return ret;
 
-       code = (RespondCode) thirdByte;
+  code = (RespondCode) thirdByte;
 
-       RespondCodeContext *resCode = this->getRespondCodeContext(code);
+  RespondCodeContext *resCode = this->getRespondCodeContext(code);
 
-       if ( resCode == NULL)
-       {
-               // Not a valid respond code
-               return -1;
-       }
-       if (resCode->hasDescription)
-       {
-               ret = peer_->readUTF(message);
-               if (ret <= 0)
-                       return -1;
-       }
-       return 3 + message.size();
+  if (resCode == NULL) {
+    // Not a valid respond code
+    return -1;
+  }
+  if (resCode->hasDescription) {
+    ret = peer_->readUTF(message);
+    if (ret <= 0)
+      return -1;
+  }
+  return 3 + message.size();
 }
 
-int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string 
message)
-{
-       RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-       if (resCode == NULL)
-       {
-               // Not a valid respond code
-               return -1;
-       }
-
-       uint8_t codeSeq[3];
-       codeSeq[0] = CODE_SEQUENCE_VALUE_1;
-       codeSeq[1] = CODE_SEQUENCE_VALUE_2;
-       codeSeq[2] = (uint8_t) code;
-
-       int ret = peer_->write(codeSeq, 3);
-
-       if (ret != 3)
-               return -1;
-
-       if (resCode->hasDescription)
-       {
-               ret = peer_->writeUTF(message);
-               if (ret > 0)
-                       return (3 + ret);
-               else
-                       return ret;
-       }
-       else
-               return 3;
+int Site2SiteClientProtocol::writeRespond(RespondCode code,
+                                          std::string message) {
+  RespondCodeContext *resCode = this->getRespondCodeContext(code);
+
+  if (resCode == NULL) {
+    // Not a valid respond code
+    return -1;
+  }
+
+  uint8_t codeSeq[3];
+  codeSeq[0] = CODE_SEQUENCE_VALUE_1;
+  codeSeq[1] = CODE_SEQUENCE_VALUE_2;
+  codeSeq[2] = (uint8_t) code;
+
+  int ret = peer_->write(codeSeq, 3);
+
+  if (ret != 3)
+    return -1;
+
+  if (resCode->hasDescription) {
+    ret = peer_->writeUTF(message);
+    if (ret > 0)
+      return (3 + ret);
+    else
+      return ret;
+  } else
+    return 3;
 }
 
-bool Site2SiteClientProtocol::negotiateCodec()
-{
-       if (_peerState != HANDSHAKED)
-       {
-               logger_->log_error("Site2Site peer state is not handshaked 
while negotiate codec");
-               return false;
-       }
+bool Site2SiteClientProtocol::negotiateCodec() {
+  if (_peerState != HANDSHAKED) {
+    logger_->log_error(
+        "Site2Site peer state is not handshaked while negotiate codec");
+    return false;
+  }
 
-       logger_->log_info("Site2Site Protocol Negotiate Codec with destination 
port %s", _portIdStr.c_str());
+  logger_->log_info(
+      "Site2Site Protocol Negotiate Codec with destination port %s",
+      _portIdStr.c_str());
 
-       int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
+  int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
 
-       if (status <= 0)
-       {
-               // tearDown();
-               return false;
-       }
+  if (status <= 0) {
+    // tearDown();
+    return false;
+  }
 
-       // Negotiate the codec version
-       bool ret = initiateCodecResourceNegotiation();
+  // Negotiate the codec version
+  bool ret = initiateCodecResourceNegotiation();
 
-       if (!ret)
-       {
-               logger_->log_error("Site2Site Codec Version Negotiation 
failed");
-               /*
-               peer_->yield();
-               tearDown(); */
-               return false;
-       }
+  if (!ret) {
+    logger_->log_error("Site2Site Codec Version Negotiation failed");
+    /*
+     peer_->yield();
+     tearDown(); */
+    return false;
+  }
 
-       logger_->log_info("Site2Site Codec Completed and move to READY state 
for data transfer");
-       _peerState = READY;
+  logger_->log_info(
+      "Site2Site Codec Completed and move to READY state for data transfer");
+  _peerState = READY;
 
-       return true;
+  return true;
 }
 
-bool Site2SiteClientProtocol::bootstrap()
-{
-       if (_peerState == READY)
-               return true;
-
-       tearDown();
-
-       if (establish() && handShake() && negotiateCodec())
-       {
-               logger_->log_info("Site2Site Ready For data transaction");
-               return true;
-       }
-       else
-       {
-               peer_->yield();
-               tearDown();
-               return false;
-       }
+bool Site2SiteClientProtocol::bootstrap() {
+  if (_peerState == READY)
+    return true;
+
+  tearDown();
+
+  if (establish() && handShake() && negotiateCodec()) {
+    logger_->log_info("Site2Site Ready For data transaction");
+    return true;
+  } else {
+    peer_->yield();
+    tearDown();
+    return false;
+  }
 }
 
-Transaction* Site2SiteClientProtocol::createTransaction(std::string 
&transactionID, TransferDirection direction)
-{
-       int ret;
-       bool dataAvailable;
-       Transaction *transaction = NULL;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               return NULL;
-       }
-
-       if (direction == RECEIVE)
-       {
-               ret = writeRequestType(RECEIVE_FLOWFILES);
-
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return NULL;
-               }
-
-               RespondCode code;
-               std::string message;
-
-               ret = readRespond(code, message);
-
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return NULL;
-               }
-
-                CRCStream<Site2SitePeer> crcstream(peer_);
-               switch (code)
-               {
-               case MORE_DATA:
-                       dataAvailable = true;
-                       logger_->log_info("Site2Site peer indicates that data 
is available");
-                       transaction = new Transaction(direction,crcstream);
-                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
-                       transactionID = transaction->getUUIDStr();
-                       transaction->setDataAvailable(dataAvailable);
-                       logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
-                       return transaction;
-               case NO_MORE_DATA:
-                       dataAvailable = false;
-                       logger_->log_info("Site2Site peer indicates that no 
data is available");
-                       transaction = new Transaction(direction,crcstream);
-                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
-                       transactionID = transaction->getUUIDStr();
-                       transaction->setDataAvailable(dataAvailable);
-                       logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
-                       return transaction;
-               default:
-                       logger_->log_info("Site2Site got unexpected response %d 
when asking for data", code);
-                       // tearDown();
-                       return NULL;
-               }
-       }
-       else
-       {
-               ret = writeRequestType(SEND_FLOWFILES);
-
-               if (ret <= 0)
-               {
-                       // tearDown();
-                       return NULL;
-               }
-               else
-               {
-                       CRCStream<Site2SitePeer> crcstream(peer_);
-                       transaction = new Transaction(direction,crcstream);
-                       _transactionMap[transaction->getUUIDStr()] = 
transaction;
-                       transactionID = transaction->getUUIDStr();
-                       logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
-                       return transaction;
-               }
-       }
+Transaction* Site2SiteClientProtocol::createTransaction(
+    std::string &transactionID, TransferDirection direction) {
+  int ret;
+  bool dataAvailable;
+  Transaction *transaction = NULL;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    return NULL;
+  }
+
+  if (direction == RECEIVE) {
+    ret = writeRequestType(RECEIVE_FLOWFILES);
+
+    if (ret <= 0) {
+      // tearDown();
+      return NULL;
+    }
+
+    RespondCode code;
+    std::string message;
+
+    ret = readRespond(code, message);
+
+    if (ret <= 0) {
+      // tearDown();
+      return NULL;
+    }
+
+    org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> 
crcstream(peer_.get());
+    switch (code) {
+      case MORE_DATA:
+        dataAvailable = true;
+        logger_->log_info("Site2Site peer indicates that data is available");
+        transaction = new Transaction(direction, crcstream);
+        _transactionMap[transaction->getUUIDStr()] = transaction;
+        transactionID = transaction->getUUIDStr();
+        transaction->setDataAvailable(dataAvailable);
+        logger_->log_info("Site2Site create transaction %s",
+                          transaction->getUUIDStr().c_str());
+        return transaction;
+      case NO_MORE_DATA:
+        dataAvailable = false;
+        logger_->log_info("Site2Site peer indicates that no data is 
available");
+        transaction = new Transaction(direction, crcstream);
+        _transactionMap[transaction->getUUIDStr()] = transaction;
+        transactionID = transaction->getUUIDStr();
+        transaction->setDataAvailable(dataAvailable);
+        logger_->log_info("Site2Site create transaction %s",
+                          transaction->getUUIDStr().c_str());
+        return transaction;
+      default:
+        logger_->log_info(
+            "Site2Site got unexpected response %d when asking for data", code);
+        // tearDown();
+        return NULL;
+    }
+  } else {
+    ret = writeRequestType(SEND_FLOWFILES);
+
+    if (ret <= 0) {
+      // tearDown();
+      return NULL;
+    } else {
+      org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> 
crcstream(peer_.get());
+      transaction = new Transaction(direction, crcstream);
+      _transactionMap[transaction->getUUIDStr()] = transaction;
+      transactionID = transaction->getUUIDStr();
+      logger_->log_info("Site2Site create transaction %s",
+                        transaction->getUUIDStr().c_str());
+      return transaction;
+    }
+  }
 }
 
-bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket 
*packet, bool &eof)
-{
-       int ret;
-       Transaction *transaction = NULL;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               return false;
-       }
-
-       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
-
-       if (it == _transactionMap.end())
-       {
-               return false;
-       }
-       else
-       {
-               transaction = it->second;
-       }
-
-       if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED)
-       {
-               logger_->log_info("Site2Site transaction %s is not at started 
or exchanged state", transactionID.c_str());
-               return false;
-       }
-
-       if (transaction->getDirection() != RECEIVE)
-       {
-               logger_->log_info("Site2Site transaction %s direction is 
wrong", transactionID.c_str());
-               return false;
-       }
-
-       if (!transaction->isDataAvailable())
-       {
-               eof = true;
-               return true;
-       }
-
-       if (transaction->_transfers > 0)
-       {
-               // if we already has transfer before, check to see whether 
another one is available
-               RespondCode code;
-               std::string message;
-
-               ret = readRespond(code, message);
-
-               if (ret <= 0)
-               {
-                       return false;
-               }
-               if (code == CONTINUE_TRANSACTION)
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
indicate continue transaction", transactionID.c_str());
-                       transaction->_dataAvailable = true;
-               }
-               else if (code == FINISH_TRANSACTION)
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
indicate finish transaction", transactionID.c_str());
-                       transaction->_dataAvailable = false;
-               }
-               else
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
indicate wrong respond code %d", transactionID.c_str(), code);
-                       return false;
-               }
-       }
-
-       if (!transaction->isDataAvailable())
-       {
-               eof = true;
-               return true;
-       }
-
-       // start to read the packet
-       uint32_t numAttributes;
-       ret = transaction->getStream().read(numAttributes);
-       if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES)
-       {
-               return false;
-       }
-
-       // read the attributes
-       for (unsigned int i = 0; i < numAttributes; i++)
-       {
-               std::string key;
-               std::string value;
-               ret = transaction->getStream().readUTF(key,true);
-               if (ret <= 0)
-               {
-                       return false;
-               }
-               ret = transaction->getStream().readUTF(value,true);
-               if (ret <= 0)
-               {
-                       return false;
-               }
-               packet->_attributes[key] = value;
-               logger_->log_info("Site2Site transaction %s receives attribute 
key %s value %s", transactionID.c_str(), key.c_str(), value.c_str());
-       }
-
-       uint64_t len;
-       ret = transaction->getStream().read(len);
-       if (ret <= 0)
-       {
-               return false;
-       }
-
-       packet->_size = len;
-       transaction->_transfers++;
-       transaction->_state = DATA_EXCHANGED;
-       transaction->_bytes += len;
-       logger_->log_info("Site2Site transaction %s receives flow record %d, 
total length %d", transactionID.c_str(),
-                       transaction->_transfers, transaction->_bytes);
-
-       return true;
+bool Site2SiteClientProtocol::receive(std::string transactionID,
+                                      DataPacket *packet, bool &eof) {
+  int ret;
+  Transaction *transaction = NULL;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    return false;
+  }
+
+  std::map<std::string, Transaction *>::iterator it =
+      this->_transactionMap.find(transactionID);
+
+  if (it == _transactionMap.end()) {
+    return false;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() != TRANSACTION_STARTED
+      && transaction->getState() != DATA_EXCHANGED) {
+    logger_->log_info(
+        "Site2Site transaction %s is not at started or exchanged state",
+        transactionID.c_str());
+    return false;
+  }
+
+  if (transaction->getDirection() != RECEIVE) {
+    logger_->log_info("Site2Site transaction %s direction is wrong",
+                      transactionID.c_str());
+    return false;
+  }
+
+  if (!transaction->isDataAvailable()) {
+    eof = true;
+    return true;
+  }
+
+  if (transaction->_transfers > 0) {
+    // if we already has transfer before, check to see whether another one is 
available
+    RespondCode code;
+    std::string message;
+
+    ret = readRespond(code, message);
+
+    if (ret <= 0) {
+      return false;
+    }
+    if (code == CONTINUE_TRANSACTION) {
+      logger_->log_info(
+          "Site2Site transaction %s peer indicate continue transaction",
+          transactionID.c_str());
+      transaction->_dataAvailable = true;
+    } else if (code == FINISH_TRANSACTION) {
+      logger_->log_info(
+          "Site2Site transaction %s peer indicate finish transaction",
+          transactionID.c_str());
+      transaction->_dataAvailable = false;
+    } else {
+      logger_->log_info(
+          "Site2Site transaction %s peer indicate wrong respond code %d",
+          transactionID.c_str(), code);
+      return false;
+    }
+  }
+
+  if (!transaction->isDataAvailable()) {
+    eof = true;
+    return true;
+  }
+
+  // start to read the packet
+  uint32_t numAttributes;
+  ret = transaction->getStream().read(numAttributes);
+  if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
+    return false;
+  }
+
+  // read the attributes
+  for (unsigned int i = 0; i < numAttributes; i++) {
+    std::string key;
+    std::string value;
+    ret = transaction->getStream().readUTF(key, true);
+    if (ret <= 0) {
+      return false;
+    }
+    ret = transaction->getStream().readUTF(value, true);
+    if (ret <= 0) {
+      return false;
+    }
+    packet->_attributes[key] = value;
+    logger_->log_info(
+        "Site2Site transaction %s receives attribute key %s value %s",
+        transactionID.c_str(), key.c_str(), value.c_str());
+  }
+
+  uint64_t len;
+  ret = transaction->getStream().read(len);
+  if (ret <= 0) {
+    return false;
+  }
+
+  packet->_size = len;
+  transaction->_transfers++;
+  transaction->_state = DATA_EXCHANGED;
+  transaction->_bytes += len;
+  logger_->log_info(
+      "Site2Site transaction %s receives flow record %d, total length %d",
+      transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+
+  return true;
 }
 
-bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket 
*packet, FlowFileRecord *flowFile, ProcessSession *session)
-{
-       int ret;
-       Transaction *transaction = NULL;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               return false;
-       }
-
-       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
-
-       if (it == _transactionMap.end())
-       {
-               return false;
-       }
-       else
-       {
-               transaction = it->second;
-       }
-
-       if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED)
-       {
-               logger_->log_info("Site2Site transaction %s is not at started 
or exchanged state", transactionID.c_str());
-               return false;
-       }
-
-       if (transaction->getDirection() != SEND)
-       {
-               logger_->log_info("Site2Site transaction %s direction is 
wrong", transactionID.c_str());
-               return false;
-       }
-
-       if (transaction->_transfers > 0)
-       {
-               ret = writeRespond(CONTINUE_TRANSACTION, 
"CONTINUE_TRANSACTION");
-               if (ret <= 0)
-               {
-                       return false;
-               }
-       }
-
-       // start to read the packet
-       uint32_t numAttributes = packet->_attributes.size();
-       ret = transaction->getStream().write(numAttributes);
-       if (ret != 4)
-       {
-               return false;
-       }
-
-       std::map<std::string, std::string>::iterator itAttribute;
-       for (itAttribute = packet->_attributes.begin(); itAttribute!= 
packet->_attributes.end(); itAttribute++)
-       {
-               ret = transaction->getStream().writeUTF(itAttribute->first, 
true);
-
-               if (ret <= 0)
-               {
-                       return false;
-               }
-               ret = transaction->getStream().writeUTF(itAttribute->second, 
true);
-               if (ret <= 0)
-               {
-                       return false;
-               }
-               logger_->log_info("Site2Site transaction %s send attribute key 
%s value %s", transactionID.c_str(),
-                               itAttribute->first.c_str(), 
itAttribute->second.c_str());
-       }
-
-       uint64_t len = flowFile->getSize() ;
-       ret = transaction->getStream().write(len);
-       if (ret != 8)
-       {
-               return false;
-       }
-
-       if (flowFile->getSize())
-       {
-               Site2SiteClientProtocol::ReadCallback callback(packet);
-               session->read(flowFile, &callback);
-               if (flowFile->getSize() != packet->_size)
-               {
-                       return false;
-               }
-       }
-
-       transaction->_transfers++;
-       transaction->_state = DATA_EXCHANGED;
-       transaction->_bytes += len;
-       logger_->log_info("Site2Site transaction %s send flow record %d, total 
length %d", transactionID.c_str(),
-                               transaction->_transfers, transaction->_bytes);
-
-       return true;
+bool Site2SiteClientProtocol::send(
+    std::string transactionID, DataPacket *packet, 
std::shared_ptr<FlowFileRecord> flowFile,
+    core::ProcessSession *session) {
+  int ret;
+  Transaction *transaction = NULL;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    return false;
+  }
+
+  std::map<std::string, Transaction *>::iterator it =
+      this->_transactionMap.find(transactionID);
+
+  if (it == _transactionMap.end()) {
+    return false;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() != TRANSACTION_STARTED
+      && transaction->getState() != DATA_EXCHANGED) {
+    logger_->log_info(
+        "Site2Site transaction %s is not at started or exchanged state",
+        transactionID.c_str());
+    return false;
+  }
+
+  if (transaction->getDirection() != SEND) {
+    logger_->log_info("Site2Site transaction %s direction is wrong",
+                      transactionID.c_str());
+    return false;
+  }
+
+  if (transaction->_transfers > 0) {
+    ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION");
+    if (ret <= 0) {
+      return false;
+    }
+  }
+
+  // start to read the packet
+  uint32_t numAttributes = packet->_attributes.size();
+  ret = transaction->getStream().write(numAttributes);
+  if (ret != 4) {
+    return false;
+  }
+
+  std::map<std::string, std::string>::iterator itAttribute;
+  for (itAttribute = packet->_attributes.begin();
+      itAttribute != packet->_attributes.end(); itAttribute++) {
+    ret = transaction->getStream().writeUTF(itAttribute->first, true);
+
+    if (ret <= 0) {
+      return false;
+    }
+    ret = transaction->getStream().writeUTF(itAttribute->second, true);
+    if (ret <= 0) {
+      return false;
+    }
+    logger_->log_info("Site2Site transaction %s send attribute key %s value 
%s",
+                      transactionID.c_str(), itAttribute->first.c_str(),
+                      itAttribute->second.c_str());
+  }
+
+  uint64_t len = flowFile->getSize();
+  ret = transaction->getStream().write(len);
+  if (ret != 8) {
+    return false;
+  }
+
+  if (flowFile->getSize()) {
+    Site2SiteClientProtocol::ReadCallback callback(packet);
+    session->read(flowFile, &callback);
+    if (flowFile->getSize() != packet->_size) {
+      return false;
+    }
+  }
+
+  transaction->_transfers++;
+  transaction->_state = DATA_EXCHANGED;
+  transaction->_bytes += len;
+  logger_->log_info(
+      "Site2Site transaction %s send flow record %d, total length %d",
+      transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+
+  return true;
 }
 
-void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, 
ProcessSession *session)
-{
-       uint64_t bytes = 0;
-       int transfers = 0;
-       Transaction *transaction = NULL;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               context->yield();
-               tearDown();
-               throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
-               return;
-       }
-
-       // Create the transaction
-       std::string transactionID;
-       transaction = createTransaction(transactionID, RECEIVE);
-
-       if (transaction == NULL)
-       {
-               context->yield();
-               tearDown();
-               throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
-               return;
-       }
-
-       try
-       {
-               while (true)
-               {
-                       std::map<std::string, std::string> empty;
-                       uint64_t startTime = getTimeMillis();
-                       DataPacket packet(this, transaction, empty);
-                       bool eof = false;
-
-                       if (!receive(transactionID, &packet, eof))
-                       {
-                               throw Exception(SITE2SITE_EXCEPTION, "Receive 
Failed");
-                               return;
-                       }
-                       if (eof)
-                       {
-                               // transaction done
-                               break;
-                       }
-                       FlowFileRecord *flowFile = session->create();
-                       if (!flowFile)
-                       {
-                               throw Exception(SITE2SITE_EXCEPTION, "Flow File 
Creation Failed");
-                               return;
-                       }
-                       std::map<std::string, std::string>::iterator it;
-                       std::string sourceIdentifier;
-                       for (it = packet._attributes.begin(); it!= 
packet._attributes.end(); it++)
-                       {
-                               if (it->first == FlowAttributeKey(UUID))
-                                       sourceIdentifier = it->second;
-                               flowFile->addAttribute(it->first, it->second);
-                       }
-
-                       if (packet._size > 0)
-                       {
-                               Site2SiteClientProtocol::WriteCallback 
callback(&packet);
-                               session->write(flowFile, &callback);
-                               if (flowFile->getSize() != packet._size)
-                               {
-                                       throw Exception(SITE2SITE_EXCEPTION, 
"Receive Size Not Right");
-                                       return;
-                               }
-                       }
-                       Relationship relation; // undefined relationship
-                       uint64_t endTime = getTimeMillis();
-                       std::string transitUri = peer_->getURL() + "/" + 
sourceIdentifier;
-                       std::string details = "urn:nifi:" + sourceIdentifier + 
"Remote Host=" + peer_->getHostName();
-                       session->getProvenanceReporter()->receive(flowFile, 
transitUri, sourceIdentifier, details, endTime - startTime);
-                       session->transfer(flowFile, relation);
-                       // receive the transfer for the flow record
-                       bytes += packet._size;
-                       transfers++;
-               } // while true
-
-               if (!confirm(transactionID))
-               {
-                       throw Exception(SITE2SITE_EXCEPTION, "Confirm 
Transaction Failed");
-                       return;
-               }
-               if (!complete(transactionID))
-               {
-                       throw Exception(SITE2SITE_EXCEPTION, "Complete 
Transaction Failed");
-                       return;
-               }
-               logger_->log_info("Site2Site transaction %s successfully 
receive flow record %d, content bytes %d",
-                               transactionID.c_str(), transfers, bytes);
-               // we yield the receive if we did not get anything
-               if (transfers == 0)
-                       context->yield();
-       }
-       catch (std::exception &exception)
-       {
-               if (transaction)
-                       deleteTransaction(transactionID);
-               context->yield();
-               tearDown();
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               if (transaction)
-                       deleteTransaction(transactionID);
-               context->yield();
-               tearDown();
-               logger_->log_debug("Caught Exception during 
Site2SiteClientProtocol::receiveFlowFiles");
-               throw;
-       }
-
-       deleteTransaction(transactionID);
-
-       return;
+void Site2SiteClientProtocol::receiveFlowFiles(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  uint64_t bytes = 0;
+  int transfers = 0;
+  Transaction *transaction = NULL;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION,
+                    "Can not establish handshake with peer");
+    return;
+  }
+
+  // Create the transaction
+  std::string transactionID;
+  transaction = createTransaction(transactionID, RECEIVE);
+
+  if (transaction == NULL) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+    return;
+  }
+
+  try {
+    while (true) {
+      std::map<std::string, std::string> empty;
+      uint64_t startTime = getTimeMillis();
+      DataPacket packet(this, transaction, empty);
+      bool eof = false;
+
+      if (!receive(transactionID, &packet, eof)) {
+        throw Exception(SITE2SITE_EXCEPTION, "Receive Failed");
+        return;
+      }
+      if (eof) {
+        // transaction done
+        break;
+      }
+      std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());;
+      if (!flowFile) {
+        throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
+        return;
+      }
+      std::map<std::string, std::string>::iterator it;
+      std::string sourceIdentifier;
+      for (it = packet._attributes.begin(); it != packet._attributes.end();
+          it++) {
+        if (it->first == FlowAttributeKey(UUID))
+          sourceIdentifier = it->second;
+        flowFile->addAttribute(it->first, it->second);
+      }
+
+      if (packet._size > 0) {
+        Site2SiteClientProtocol::WriteCallback callback(&packet);
+        session->write(flowFile, &callback);
+        if (flowFile->getSize() != packet._size) {
+          throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right");
+          return;
+        }
+      }
+      core::Relationship relation;  // undefined relationship
+      uint64_t endTime = getTimeMillis();
+      std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
+      std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host="
+          + peer_->getHostName();
+      session->getProvenanceReporter()->receive(flowFile, transitUri,
+                                                sourceIdentifier, details,
+                                                endTime - startTime);
+      session->transfer(flowFile, relation);
+      // receive the transfer for the flow record
+      bytes += packet._size;
+      transfers++;
+    }  // while true
+
+    if (!confirm(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed");
+      return;
+    }
+    if (!complete(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed");
+      return;
+    }
+    logger_->log_info(
+        "Site2Site transaction %s successfully receive flow record %d, content 
bytes %d",
+        transactionID.c_str(), transfers, bytes);
+    // we yield the receive if we did not get anything
+    if (transfers == 0)
+      context->yield();
+  } catch (std::exception &exception) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug(
+        "Caught Exception during Site2SiteClientProtocol::receiveFlowFiles");
+    throw;
+  }
+
+  deleteTransaction(transactionID);
+
+  return;
 }
 
-bool Site2SiteClientProtocol::confirm(std::string transactionID)
-{
-       int ret;
-       Transaction *transaction = NULL;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               return false;
-       }
-
-       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
-
-       if (it == _transactionMap.end())
-       {
-               return false;
-       }
-       else
-       {
-               transaction = it->second;
-       }
-
-       if (transaction->getState() == TRANSACTION_STARTED && 
!transaction->isDataAvailable() &&
-                       transaction->getDirection() == RECEIVE)
-       {
-               transaction->_state = TRANSACTION_CONFIRMED;
-               return true;
-       }
-
-       if (transaction->getState() != DATA_EXCHANGED)
-               return false;
-
-       if (transaction->getDirection() == RECEIVE)
-       {
-               if (transaction->isDataAvailable())
-                       return false;
-               // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
-               // to peer so that we can verify that the connection is still 
open. This is a two-phase commit,
-               // which helps to prevent the chances of data duplication. 
Without doing this, we may commit the
-               // session and then when we send the response back to the peer, 
the peer may have timed out and may not
-               // be listening. As a result, it will re-send the data. By 
doing this two-phase commit, we narrow the
-               // Critical Section involved in this transaction so that rather 
than the Critical Section being the
-               // time window involved in the entire transaction, it is 
reduced to a simple round-trip conversation.
-               long crcValue = transaction->getCRC();
-               std::string crc = std::to_string(crcValue);
-               logger_->log_info("Site2Site Send confirm with CRC %d to 
transaction %s", transaction->getCRC(),
-                                               transactionID.c_str());
-               ret = writeRespond(CONFIRM_TRANSACTION, crc);
-               if (ret <= 0)
-                       return false;
-               RespondCode code;
-               std::string message;
-               readRespond(code, message);
-               if (ret <= 0)
-                       return false;
-
-               if (code == CONFIRM_TRANSACTION)
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
confirm transaction", transactionID.c_str());
-                       transaction->_state = TRANSACTION_CONFIRMED;
-                       return true;
-               }
-               else if (code == BAD_CHECKSUM)
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
indicate bad checksum", transactionID.c_str());
-                       /*
-                       transaction->_state = TRANSACTION_CONFIRMED;
-                       return true; */
-                       return false;
-               }
-               else
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
unknown respond code %d",
-                                       transactionID.c_str(), code);
-                       return false;
-               }
-       }
-       else
-       {
-               logger_->log_info("Site2Site Send FINISH TRANSACTION for 
transaction %s",
-                                                               
transactionID.c_str());
-               ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
-               if (ret <= 0)
-                       return false;
-               RespondCode code;
-               std::string message;
-               readRespond(code, message);
-               if (ret <= 0)
-                       return false;
-
-               // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer 
to send a 'Confirm Transaction' response
-               if (code == CONFIRM_TRANSACTION)
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
confirm transaction with CRC %s", transactionID.c_str(), message.c_str());
-                       if (this->_currentVersion > 3)
-                       {
-                               long crcValue = transaction->getCRC();
-                               std::string crc = std::to_string(crcValue);
-                               if (message == crc)
-                               {
-                                       logger_->log_info("Site2Site 
transaction %s CRC matched", transactionID.c_str());
-                                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
-                                       if (ret <= 0)
-                                               return false;
-                                       transaction->_state = 
TRANSACTION_CONFIRMED;
-                                       return true;
-                               }
-                               else
-                               {
-                                       logger_->log_info("Site2Site 
transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str());
-                                       ret = writeRespond(BAD_CHECKSUM, 
"BAD_CHECKSUM");
-                                       /*
-                                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
-                                                                               
if (ret <= 0)
-                                                                               
        return false;
-                                                                               
transaction->_state = TRANSACTION_CONFIRMED;
-                                       return true; */
-                                       return false;
-                               }
-                       }
-                       ret = writeRespond(CONFIRM_TRANSACTION, 
"CONFIRM_TRANSACTION");
-                       if (ret <= 0)
-                               return false;
-                       transaction->_state = TRANSACTION_CONFIRMED;
-                       return true;
-               }
-               else
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
unknown respond code %d",
-                                       transactionID.c_str(), code);
-                       return false;
-               }
-               return false;
-       }
+bool Site2SiteClientProtocol::confirm(std::string transactionID) {
+  int ret;
+  Transaction *transaction = NULL;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    return false;
+  }
+
+  std::map<std::string, Transaction *>::iterator it =
+      this->_transactionMap.find(transactionID);
+
+  if (it == _transactionMap.end()) {
+    return false;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() == TRANSACTION_STARTED
+      && !transaction->isDataAvailable()
+      && transaction->getDirection() == RECEIVE) {
+    transaction->_state = TRANSACTION_CONFIRMED;
+    return true;
+  }
+
+  if (transaction->getState() != DATA_EXCHANGED)
+    return false;
+
+  if (transaction->getDirection() == RECEIVE) {
+    if (transaction->isDataAvailable())
+      return false;
+    // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
+    // to peer so that we can verify that the connection is still open. This 
is a two-phase commit,
+    // which helps to prevent the chances of data duplication. Without doing 
this, we may commit the
+    // session and then when we send the response back to the peer, the peer 
may have timed out and may not
+    // be listening. As a result, it will re-send the data. By doing this 
two-phase commit, we narrow the
+    // Critical Section involved in this transaction so that rather than the 
Critical Section being the
+    // time window involved in the entire transaction, it is reduced to a 
simple round-trip conversation.
+    long crcValue = transaction->getCRC();
+    std::string crc = std::to_string(crcValue);
+    logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s",
+                      transaction->getCRC(), transactionID.c_str());
+    ret = writeRespond(CONFIRM_TRANSACTION, crc);
+    if (ret <= 0)
+      return false;
+    RespondCode code;
+    std::string message;
+    readRespond(code, message);
+    if (ret <= 0)
+      return false;
+
+    if (code == CONFIRM_TRANSACTION) {
+      logger_->log_info("Site2Site transaction %s peer confirm transaction",
+                        transactionID.c_str());
+      transaction->_state = TRANSACTION_CONFIRMED;
+      return true;
+    } else if (code == BAD_CHECKSUM) {
+      logger_->log_info("Site2Site transaction %s peer indicate bad checksum",
+                        transactionID.c_str());
+      /*
+       transaction->_state = TRANSACTION_CONFIRMED;
+       return true; */
+      return false;
+    } else {
+      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
+                        transactionID.c_str(), code);
+      return false;
+    }
+  } else {
+    logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s",
+                      transactionID.c_str());
+    ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
+    if (ret <= 0)
+      return false;
+    RespondCode code;
+    std::string message;
+    readRespond(code, message);
+    if (ret <= 0)
+      return false;
+
+    // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 
'Confirm Transaction' response
+    if (code == CONFIRM_TRANSACTION) {
+      logger_->log_info(
+          "Site2Site transaction %s peer confirm transaction with CRC %s",
+          transactionID.c_str(), message.c_str());
+      if (this->_currentVersion > 3) {
+        long crcValue = transaction->getCRC();
+        std::string crc = std::to_string(crcValue);
+        if (message == crc) {
+          logger_->log_info("Site2Site transaction %s CRC matched",
+                            transactionID.c_str());
+          ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
+          if (ret <= 0)
+            return false;
+          transaction->_state = TRANSACTION_CONFIRMED;
+          return true;
+        } else {
+          logger_->log_info("Site2Site transaction %s CRC not matched %s",
+                            transactionID.c_str(), crc.c_str());
+          ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
+          /*
+           ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
+           if (ret <= 0)
+           return false;
+           transaction->_state = TRANSACTION_CONFIRMED;
+           return true; */
+          return false;
+        }
+      }
+      ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
+      if (ret <= 0)
+        return false;
+      transaction->_state = TRANSACTION_CONFIRMED;
+      return true;
+    } else {
+      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
+                        transactionID.c_str(), code);
+      return false;
+    }
+    return false;
+  }
 }
 
-void Site2SiteClientProtocol::cancel(std::string transactionID)
-{
-       Transaction *transaction = NULL;
-
-       if (_peerState != READY)
-       {
-               return;
-       }
-
-       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
-
-       if (it == _transactionMap.end())
-       {
-               return;
-       }
-       else
-       {
-               transaction = it->second;
-       }
-
-       if (transaction->getState() == TRANSACTION_CANCELED || 
transaction->getState() == TRANSACTION_COMPLETED
-                       || transaction->getState() == TRANSACTION_ERROR)
-       {
-               return;
-       }
-
-       this->writeRespond(CANCEL_TRANSACTION, "Cancel");
-       transaction->_state = TRANSACTION_CANCELED;
-
-       tearDown();
-       return;
+void Site2SiteClientProtocol::cancel(std::string transactionID) {
+  Transaction *transaction = NULL;
+
+  if (_peerState != READY) {
+    return;
+  }
+
+  std::map<std::string, Transaction *>::iterator it =
+      this->_transactionMap.find(transactionID);
+
+  if (it == _transactionMap.end()) {
+    return;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() == TRANSACTION_CANCELED
+      || transaction->getState() == TRANSACTION_COMPLETED
+      || transaction->getState() == TRANSACTION_ERROR) {
+    return;
+  }
+
+  this->writeRespond(CANCEL_TRANSACTION, "Cancel");
+  transaction->_state = TRANSACTION_CANCELED;
+
+  tearDown();
+  return;
 }
 
-void Site2SiteClientProtocol::deleteTransaction(std::string transactionID)
-{
-       Transaction *transaction = NULL;
+void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
+  Transaction *transaction = NULL;
 
-       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it =
+      this->_transactionMap.find(transactionID);
 
-       if (it == _transactionMap.end())
-       {
-               return;
-       }
-       else
-       {
-               transaction = it->second;
-       }
+  if (it == _transactionMap.end()) {
+    return;
+  } else {
+    transaction = it->second;
+  }
 
-       logger_->log_info("Site2Site delete transaction %s", 
transaction->getUUIDStr().c_str());
-       delete transaction;
-       _transactionMap.erase(transactionID);
+  logger_->log_info("Site2Site delete transaction %s",
+                    transaction->getUUIDStr().c_str());
+  delete transaction;
+  _transactionMap.erase(transactionID);
 }
 
-void Site2SiteClientProtocol::error(std::string transactionID)
-{
-       Transaction *transaction = NULL;
+void Site2SiteClientProtocol::error(std::string transactionID) {
+  Transaction *transaction = NULL;
 
-       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it =
+      this->_transactionMap.find(transactionID);
 
-       if (it == _transactionMap.end())
-       {
-               return;
-       }
-       else
-       {
-               transaction = it->second;
-       }
+  if (it == _transactionMap.end()) {
+    return;
+  } else {
+    transaction = it->second;
+  }
 
-       transaction->_state = TRANSACTION_ERROR;
-       tearDown();
-       return;
+  transaction->_state = TRANSACTION_ERROR;
+  tearDown();
+  return;
 }
 
-//! Complete the transaction
-bool Site2SiteClientProtocol::complete(std::string transactionID)
-{
-       int ret;
-       Transaction *transaction = NULL;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               return false;
-       }
-
-       std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
-
-       if (it == _transactionMap.end())
-       {
-               return false;
-       }
-       else
-       {
-               transaction = it->second;
-       }
-
-       if (transaction->getState() != TRANSACTION_CONFIRMED)
-       {
-               return false;
-       }
-
-       if (transaction->getDirection() == RECEIVE)
-       {
-               if (transaction->_transfers == 0)
-               {
-                       transaction->_state = TRANSACTION_COMPLETED;
-                       return true;
-               }
-               else
-               {
-                       logger_->log_info("Site2Site transaction %s send 
finished", transactionID.c_str());
-                       ret = this->writeRespond(TRANSACTION_FINISHED, 
"Finished");
-                       if (ret <= 0)
-                               return false;
-                       else
-                       {
-                               transaction->_state = TRANSACTION_COMPLETED;
-                               return true;
-                       }
-               }
-       }
-       else
-       {
-               RespondCode code;
-               std::string message;
-               int ret;
-
-               ret = readRespond(code, message);
-
-               if (ret <= 0)
-                       return false;
-
-               if (code == TRANSACTION_FINISHED)
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
finished transaction", transactionID.c_str());
-                       transaction->_state = TRANSACTION_COMPLETED;
-                       return true;
-               }
-               else
-               {
-                       logger_->log_info("Site2Site transaction %s peer 
unknown respond code %d",
-                                       transactionID.c_str(), code);
-                       return false;
-               }
-       }
+// Complete the transaction
+bool Site2SiteClientProtocol::complete(std::string transactionID) {
+  int ret;
+  Transaction *transaction = NULL;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    return false;
+  }
+
+  std::map<std::string, Transaction *>::iterator it =
+      this->_transactionMap.find(transactionID);
+
+  if (it == _transactionMap.end()) {
+    return false;
+  } else {
+    transaction = it->second;
+  }
+
+  if (transaction->getState() != TRANSACTION_CONFIRMED) {
+    return false;
+  }
+
+  if (transaction->getDirection() == RECEIVE) {
+    if (transaction->_transfers == 0) {
+      transaction->_state = TRANSACTION_COMPLETED;
+      return true;
+    } else {
+      logger_->log_info("Site2Site transaction %s send finished",
+                        transactionID.c_str());
+      ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
+      if (ret <= 0)
+        return false;
+      else {
+        transaction->_state = TRANSACTION_COMPLETED;
+        return true;
+      }
+    }
+  } else {
+    RespondCode code;
+    std::string message;
+    int ret;
+
+    ret = readRespond(code, message);
+
+    if (ret <= 0)
+      return false;
+
+    if (code == TRANSACTION_FINISHED) {
+      logger_->log_info("Site2Site transaction %s peer finished transaction",
+                        transactionID.c_str());
+      transaction->_state = TRANSACTION_COMPLETED;
+      return true;
+    } else {
+      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
+                        transactionID.c_str(), code);
+      return false;
+    }
+  }
 }
 
-void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, 
ProcessSession *session)
-{
-       FlowFileRecord *flow = session->get();
-       Transaction *transaction = NULL;
-
-       if (!flow)
-               return;
-
-       if (_peerState != READY)
-       {
-               bootstrap();
-       }
-
-       if (_peerState != READY)
-       {
-               context->yield();
-               tearDown();
-               throw Exception(SITE2SITE_EXCEPTION, "Can not establish 
handshake with peer");
-               return;
-       }
-
-       // Create the transaction
-       std::string transactionID;
-       transaction = createTransaction(transactionID, SEND);
-
-       if (transaction == NULL)
-       {
-               context->yield();
-               tearDown();
-               throw Exception(SITE2SITE_EXCEPTION, "Can not create 
transaction");
-               return;
-       }
-
-       bool continueTransaction = true;
-       uint64_t startSendingNanos = getTimeNano();
-
-       try
-       {
-               while (continueTransaction)
-               {
-                       uint64_t startTime = getTimeMillis();
-                       DataPacket packet(this, transaction, 
flow->getAttributes());
-
-                       if (!send(transactionID, &packet, flow, session))
-                       {
-                               throw Exception(SITE2SITE_EXCEPTION, "Send 
Failed");
-                               return;
-                       }
-                       logger_->log_info("Site2Site transaction %s send flow 
record %s",
-                                                       transactionID.c_str(), 
flow->getUUIDStr().c_str());
-                       uint64_t endTime = getTimeMillis();
-                       std::string transitUri = peer_->getURL() + "/" + 
flow->getUUIDStr();
-                       std::string details = "urn:nifi:" + flow->getUUIDStr() 
+ "Remote Host=" + peer_->getHostName();
-                       session->getProvenanceReporter()->send(flow, 
transitUri, details, endTime - startTime, false);
-                       session->remove(flow);
-
-                       uint64_t transferNanos = getTimeNano() - 
startSendingNanos;
-                       if (transferNanos > _batchSendNanos)
-                               break;
-
-                       flow = session->get();
-                       if (!flow)
-                       {
-                               continueTransaction = false;
-                       }
-               } // while true
-
-               if (!confirm(transactionID))
-               {
-                       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
-                       return;
-               }
-               if (!complete(transactionID))
-               {
-                       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
-                       return;
-               }
-               logger_->log_info("Site2Site transaction %s successfully send 
flow record %d, content bytes %d",
-                               transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
-       }
-       catch (std::exception &exception)
-       {
-               if (transaction)
-                       deleteTransaction(transactionID);
-               context->yield();
-               tearDown();
-               logger_->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               if (transaction)
-                       deleteTransaction(transactionID);
-               context->yield();
-               tearDown();
-               logger_->log_debug("Caught Exception during 
Site2SiteClientProtocol::transferFlowFiles");
-               throw;
-       }
-
-       deleteTransaction(transactionID);
-
-       return;
+void Site2SiteClientProtocol::transferFlowFiles(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flow = 
std::static_pointer_cast<FlowFileRecord>(session->get());;
+  Transaction *transaction = NULL;
+
+  if (!flow)
+    return;
+
+  if (_peerState != READY) {
+    bootstrap();
+  }
+
+  if (_peerState != READY) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION,
+                    "Can not establish handshake with peer");
+    return;
+  }
+
+  // Create the transaction
+  std::string transactionID;
+  transaction = createTransaction(transactionID, SEND);
+
+  if (transaction == NULL) {
+    context->yield();
+    tearDown();
+    throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
+    return;
+  }
+
+  bool continueTransaction = true;
+  uint64_t startSendingNanos = getTimeNano();
+
+  try {
+    while (continueTransaction) {
+      uint64_t startTime = getTimeMillis();
+      DataPacket packet(this, transaction, flow->getAttributes());
+
+      if (!send(transactionID, &packet, flow, session)) {
+        throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
+        return;
+      }
+      logger_->log_info("Site2Site transaction %s send flow record %s",
+                        transactionID.c_str(), flow->getUUIDStr().c_str());
+      uint64_t endTime = getTimeMillis();
+      std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
+      std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host="
+          + peer_->getHostName();
+      session->getProvenanceReporter()->send(flow, transitUri, details,
+                                             endTime - startTime, false);
+      session->remove(flow);
+
+      uint64_t transferNanos = getTimeNano() - startSendingNanos;
+      if (transferNanos > _batchSendNanos)
+        break;
+
+      flow = std::static_pointer_cast<FlowFileRecord>(session->get());;
+      if (!flow) {
+        continueTransaction = false;
+      }
+    }  // while true
+
+    if (!confirm(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
+      return;
+    }
+    if (!complete(transactionID)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
+      return;
+    }
+    logger_->log_info(
+        "Site2Site transaction %s successfully send flow record %d, content 
bytes %d",
+        transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+  } catch (std::exception &exception) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (transaction)
+      deleteTransaction(transactionID);
+    context->yield();
+    tearDown();
+    logger_->log_debug(
+        "Caught Exception during Site2SiteClientProtocol::transferFlowFiles");
+    throw;
+  }
+
+  deleteTransaction(transactionID);
+
+  return;
 }
+
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp
index ae3cc2d..64732ac 100644
--- a/libminifi/src/Site2SitePeer.cpp
+++ b/libminifi/src/Site2SitePeer.cpp
@@ -31,6 +31,12 @@
 #include "Site2SitePeer.h"
 #include "FlowController.h"
 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
 bool Site2SitePeer::Open() {
 
        if (IsNullOrEmpty (host_))
@@ -53,3 +59,9 @@ void Site2SitePeer::Close() {
        stream_->closeStream();
 }
 
+
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TailFile.cpp b/libminifi/src/TailFile.cpp
deleted file mode 100644
index 36b5e53..0000000
--- a/libminifi/src/TailFile.cpp
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * @file TailFile.cpp
- * TailFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <dirent.h>
-#include <limits.h>
-#include <unistd.h>
-
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "TailFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string TailFile::ProcessorName("TailFile");
-Property TailFile::FileName("File to Tail", "Fully-qualified filename of the 
file that should be tailed", "");
-Property TailFile::StateFile("State File",
-               "Specifies the file that should be used for storing state about 
what data has been ingested so that upon restart NiFi can resume from where it 
left off", "TailFileState");
-Relationship TailFile::Success("success", "All files are routed to success");
-
-void TailFile::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(FileName);
-       properties.insert(StateFile);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-std::string TailFile::trimLeft(const std::string& s)
-{
-       return StringUtils::trimLeft(s);
-}
-
-std::string TailFile::trimRight(const std::string& s)
-{
-       return StringUtils::trimRight(s);
-}
-
-void TailFile::parseStateFileLine(char *buf)
-{
-       char *line = buf;
-
-    while ((line[0] == ' ') || (line[0] =='\t'))
-       ++line;
-
-    char first = line[0];
-    if ((first == '\0') || (first == '#')  || (first == '\r') || (first == 
'\n') || (first == '='))
-    {
-       return;
-    }
-
-    char *equal = strchr(line, '=');
-    if (equal == NULL)
-    {
-       return;
-    }
-
-    equal[0] = '\0';
-    std::string key = line;
-
-    equal++;
-    while ((equal[0] == ' ') || (equal[0] == '\t'))
-       ++equal;
-
-    first = equal[0];
-    if ((first == '\0') || (first == '\r') || (first== '\n'))
-    {
-       return;
-    }
-
-    std::string value = equal;
-    key = trimRight(key);
-    value = trimRight(value);
-
-    if (key == "FILENAME")
-       this->_currentTailFileName = value;
-    if (key == "POSITION")
-       this->_currentTailFilePosition = std::stoi(value);
-
-    return;
-}
-
-void TailFile::recoverState()
-{
-       std::ifstream file(_stateFile.c_str(), std::ifstream::in);
-       if (!file.good())
-       {
-               logger_->log_error("load state file failed %s", 
_stateFile.c_str());
-               return;
-       }
-       const unsigned int bufSize = 512;
-       char buf[bufSize];
-       for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize))
-       {
-               parseStateFileLine(buf);
-       }
-}
-
-void TailFile::storeState()
-{
-       std::ofstream file(_stateFile.c_str());
-       if (!file.is_open())
-       {
-               logger_->log_error("store state file failed %s", 
_stateFile.c_str());
-               return;
-       }
-       file << "FILENAME=" << this->_currentTailFileName << "\n";
-       file << "POSITION=" << this->_currentTailFilePosition << "\n";
-       file.close();
-}
-
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j)
-{
-       return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver()
-{
-       struct stat statbuf;
-       std::vector<TailMatchedFileItem> matchedFiles;
-       std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
-
-       if (stat(fullPath.c_str(), &statbuf) == 0)
-       {
-               if (statbuf.st_size > this->_currentTailFilePosition)
-                       // there are new input for the current tail file
-                       return;
-
-               uint64_t modifiedTimeCurrentTailFile = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-               std::string pattern = _fileName;
-               std::size_t found = _fileName.find_last_of(".");
-               if (found != std::string::npos)
-                       pattern = _fileName.substr(0,found);
-               DIR *d;
-               d = opendir(this->_fileLocation.c_str());
-               if (!d)
-                       return;
-               while (1)
-               {
-                       struct dirent *entry;
-                       entry = readdir(d);
-                       if (!entry)
-                               break;
-                       std::string d_name = entry->d_name;
-                       if (!(entry->d_type & DT_DIR))
-                       {
-                               std::string fileName = d_name;
-                               std::string fileFullName = this->_fileLocation 
+ "/" + d_name;
-                               if (fileFullName.find(pattern) != 
std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0)
-                               {
-                                       if (((uint64_t) (statbuf.st_mtime) * 
1000) >= modifiedTimeCurrentTailFile)
-                                       {
-                                               TailMatchedFileItem item;
-                                               item.fileName = fileName;
-                                               item.modifiedTime = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-                                               matchedFiles.push_back(item);
-                                       }
-                               }
-                       }
-               }
-               closedir(d);
-
-               // Sort the list based on modified time
-               std::sort(matchedFiles.begin(), matchedFiles.end(), 
sortTailMatchedFileItem);
-               for (std::vector<TailMatchedFileItem>::iterator it = 
matchedFiles.begin(); it!=matchedFiles.end(); ++it)
-               {
-                       TailMatchedFileItem item = *it;
-                       if (item.fileName == _currentTailFileName)
-                       {
-                               ++it;
-                               if (it!=matchedFiles.end())
-                               {
-                                       TailMatchedFileItem nextItem = *it;
-                                       logger_->log_info("TailFile File Roll 
Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str());
-                                       _currentTailFileName = 
nextItem.fileName;
-                                       _currentTailFilePosition = 0;
-                                       storeState();
-                               }
-                               break;
-                       }
-               }
-       }
-       else
-               return;
-}
-
-
-void TailFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string value;
-       if (context->getProperty(FileName.getName(), value))
-       {
-               std::size_t found = value.find_last_of("/\\");
-               this->_fileLocation = value.substr(0,found);
-               this->_fileName = value.substr(found+1);
-       }
-       if (context->getProperty(StateFile.getName(), value))
-       {
-               _stateFile = value + "." + getUUIDStr();
-       }
-       if (!this->_stateRecovered)
-       {
-               _stateRecovered = true;
-               this->_currentTailFileName = _fileName;
-               this->_currentTailFilePosition = 0;
-               // recover the state if we have not done so
-               this->recoverState();
-       }
-       checkRollOver();
-       std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
-       struct stat statbuf;
-       if (stat(fullPath.c_str(), &statbuf) == 0)
-       {
-               if (statbuf.st_size <= this->_currentTailFilePosition)
-                       // there are no new input for the current tail file
-               {
-                       context->yield();
-                       return;
-               }
-               FlowFileRecord *flowFile = session->create();
-               if (!flowFile)
-                       return;
-               std::size_t found = _currentTailFileName.find_last_of(".");
-               std::string baseName = _currentTailFileName.substr(0,found);
-               std::string extension = _currentTailFileName.substr(found+1);
-               flowFile->updateAttribute(PATH, _fileLocation);
-               flowFile->addAttribute(ABSOLUTE_PATH, fullPath);
-               session->import(fullPath, flowFile, true, 
this->_currentTailFilePosition);
-               session->transfer(flowFile, Success);
-               logger_->log_info("TailFile %s for %d bytes", 
_currentTailFileName.c_str(), flowFile->getSize());
-               std::string logName = baseName + "." + 
std::to_string(_currentTailFilePosition) + "-" +
-                               std::to_string(_currentTailFilePosition + 
flowFile->getSize()) + "." + extension;
-               flowFile->updateAttribute(FILENAME, logName);
-               this->_currentTailFilePosition += flowFile->getSize();
-               storeState();
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp 
b/libminifi/src/ThreadedSchedulingAgent.cpp
index 2008fec..6c04281 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -22,103 +22,125 @@
 
 #include "ThreadedSchedulingAgent.h"
 
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include "ProcessSessionFactory.h"
-
-void ThreadedSchedulingAgent::schedule(Processor *processor)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _administrativeYieldDuration = 0;
-       std::string yieldValue;
-
-       if (configure_->get(Configure::nifi_administrative_yield_duration, 
yieldValue))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(yieldValue, 
_administrativeYieldDuration, unit) &&
-                                       
Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, 
_administrativeYieldDuration))
-               {
-                       logger_->log_debug("nifi_administrative_yield_duration: 
[%d] ms", _administrativeYieldDuration);
-               }
-       }
-
-       _boredYieldDuration = 0;
-       if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(yieldValue, _boredYieldDuration, 
unit) &&
-                                       
Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration))
-               {
-                       logger_->log_debug("nifi_bored_yield_duration: [%d] 
ms", _boredYieldDuration);
-               }
-       }
-
-       if (processor->getScheduledState() != RUNNING)
-       {
-               logger_->log_info("Can not schedule threads for processor %s 
because it is not running", processor->getName().c_str());
-               return;
-       }
-
-       std::map<std::string, std::vector<std::thread *>>::iterator it =
-                       _threads.find(processor->getUUIDStr());
-       if (it != _threads.end())
-       {
-               logger_->log_info("Can not schedule threads for processor %s 
because there are existing threads running");
-               return;
-       }
-
-       auto processContext = std::make_shared<ProcessContext>(processor);
-       auto sessionFactory = 
std::make_shared<ProcessSessionFactory>(processContext.get());
-
-       processor->onSchedule(processContext.get(), sessionFactory.get());
-
-       std::vector<std::thread *> threads;
-       for (int i = 0; i < processor->getMaxConcurrentTasks(); i++)
-       {
-           ThreadedSchedulingAgent *agent = this;
-               std::thread *thread = new std::thread([agent, processor, 
processContext, sessionFactory] () {
-                       agent->run(processor, processContext.get(), 
sessionFactory.get());
-               });
-               thread->detach();
-               threads.push_back(thread);
-               logger_->log_info("Scheduled thread %d running for process %s", 
thread->get_id(),
-                               processor->getName().c_str());
-       }
-       _threads[processor->getUUIDStr().c_str()] = threads;
-
-       return;
+#include "core/Connectable.h"
+#include "core/ProcessorNode.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+void ThreadedSchedulingAgent::schedule(
+    std::shared_ptr<core::Processor> processor) {
+  std::lock_guard < std::mutex > lock(mutex_);
+
+  _administrativeYieldDuration = 0;
+  std::string yieldValue;
+
+  if (configure_->get(Configure::nifi_administrative_yield_duration,
+                      yieldValue)) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(
+        yieldValue, _administrativeYieldDuration, unit)
+        && core::Property::ConvertTimeUnitToMS(
+            _administrativeYieldDuration, unit, _administrativeYieldDuration)) 
{
+      logger_->log_debug("nifi_administrative_yield_duration: [%d] ms",
+                         _administrativeYieldDuration);
+    }
+  }
+
+  _boredYieldDuration = 0;
+  if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(
+        yieldValue, _boredYieldDuration, unit)
+        && core::Property::ConvertTimeUnitToMS(
+            _boredYieldDuration, unit, _boredYieldDuration)) {
+      logger_->log_debug("nifi_bored_yield_duration: [%d] ms",
+                         _boredYieldDuration);
+    }
+  }
+
+  if (processor->getScheduledState() != core::RUNNING) {
+    logger_->log_info(
+        "Can not schedule threads for processor %s because it is not running",
+        processor->getName().c_str());
+    return;
+  }
+
+  std::map<std::string, std::vector<std::thread *>>::iterator it =
+      _threads.find(processor->getUUIDStr());
+  if (it != _threads.end()) {
+    logger_->log_info(
+        "Can not schedule threads for processor %s because there are existing 
threads running");
+    return;
+  }
+
+  core::ProcessorNode processor_node(processor);
+  auto processContext = std::make_shared
+      < core::ProcessContext > (processor_node,repo_);
+  auto sessionFactory = std::make_shared
+      < core::ProcessSessionFactory
+      > (processContext.get());
+
+  processor->onSchedule(processContext.get(), sessionFactory.get());
+
+  std::vector<std::thread *> threads;
+  for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) {
+    ThreadedSchedulingAgent *agent = this;
+    std::thread *thread = new std::thread(
+        [agent, processor, processContext, sessionFactory] () {
+          agent->run(processor, processContext.get(), sessionFactory.get());
+        });
+    thread->detach();
+    threads.push_back(thread);
+    logger_->log_info("Scheduled thread %d running for process %s",
+                      thread->get_id(), processor->getName().c_str());
+  }
+  _threads[processor->getUUIDStr().c_str()] = threads;
+
+  return;
 }
 
-void ThreadedSchedulingAgent::unschedule(Processor *processor)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-       
-       logger_->log_info("Shutting down threads for processor %s/%s",
-                       processor->getName().c_str(),
-                       processor->getUUIDStr().c_str());
-
-       if (processor->getScheduledState() != RUNNING)
-       {
-               logger_->log_info("Cannot unschedule threads for processor %s 
because it is not running", processor->getName().c_str());
-               return;
-       }
-
-       std::map<std::string, std::vector<std::thread *>>::iterator it =
-                       _threads.find(processor->getUUIDStr());
-
-       if (it == _threads.end())
-       {
-               logger_->log_info("Cannot unschedule threads for processor %s 
because there are no existing threads running", processor->getName().c_str());
-               return;
-       }
-       for (std::vector<std::thread *>::iterator itThread = 
it->second.begin(); itThread != it->second.end(); ++itThread)
-       {
-               std::thread *thread = *itThread;
-               logger_->log_info("Scheduled thread %d deleted for process %s", 
thread->get_id(),
-                               processor->getName().c_str());
-               delete thread;
-       }
-       _threads.erase(processor->getUUIDStr());
-       processor->clearActiveTask();
+void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> 
processor) {
+  std::lock_guard < std::mutex > lock(mutex_);
+
+  logger_->log_info("Shutting down threads for processor %s/%s",
+                    processor->getName().c_str(),
+                    processor->getUUIDStr().c_str());
+
+  if (processor->getScheduledState() != core::RUNNING) {
+    logger_->log_info(
+        "Cannot unschedule threads for processor %s because it is not running",
+        processor->getName().c_str());
+    return;
+  }
+
+  std::map<std::string, std::vector<std::thread *>>::iterator it =
+      _threads.find(processor->getUUIDStr());
+
+  if (it == _threads.end()) {
+    logger_->log_info(
+        "Cannot unschedule threads for processor %s because there are no 
existing threads running",
+        processor->getName().c_str());
+    return;
+  }
+  for (std::vector<std::thread *>::iterator itThread = it->second.begin();
+      itThread != it->second.end(); ++itThread) {
+    std::thread *thread = *itThread;
+    logger_->log_info("Scheduled thread %d deleted for process %s",
+                      thread->get_id(), processor->getName().c_str());
+    delete thread;
+  }
+  _threads.erase(processor->getUUIDStr());
+  processor->clearActiveTask();
 }
+
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 30dc96c..3895e81 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -20,26 +20,38 @@
 #include <chrono>
 #include <thread>
 #include <iostream>
-#include "Property.h"
 #include "TimerDrivenSchedulingAgent.h"
+#include "core/Property.h"
 
-void TimerDrivenSchedulingAgent::run(Processor *processor, ProcessContext 
*processContext, ProcessSessionFactory *sessionFactory)
-{
-       while (this->_running)
-       {
-               bool shouldYield = this->onTrigger(processor, processContext, 
sessionFactory);
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
 
-               if (processor->isYield())
-               {
-                       // Honor the yield
-                       
std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
-               }
-               else if (shouldYield && this->_boredYieldDuration > 0)
-               {
-                       // No work to do or need to apply back pressure
-                       
std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration));
-               }
-               
std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
-       }
-       return;
+void TimerDrivenSchedulingAgent::run(
+    std::shared_ptr<core::Processor> processor,
+    core::ProcessContext *processContext,
+    core::ProcessSessionFactory *sessionFactory) {
+  while (this->running_) {
+    bool shouldYield = this->onTrigger(processor, processContext,
+                                       sessionFactory);
+
+    if (processor->isYield()) {
+      // Honor the yield
+      std::this_thread::sleep_for(
+          std::chrono::milliseconds(processor->getYieldTime()));
+    } else if (shouldYield && this->_boredYieldDuration > 0) {
+      // No work to do or need to apply back pressure
+      std::this_thread::sleep_for(
+          std::chrono::milliseconds(this->_boredYieldDuration));
+    }
+    std::this_thread::sleep_for(
+        std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
+  }
+  return;
 }
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

Reply via email to